Files
dockipelago/worlds/tloz_ph/Client.py
Jonathan Tinney 7971961166
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
add schedule I, sonic 1/frontiers/heroes, spirit island
2026-04-02 23:46:36 -07:00

1253 lines
59 KiB
Python

from random import randint
from .DSZeldaClient.DSZeldaClient import *
from .DSZeldaClient.subclasses import (get_address_from_heap, storage_key, get_stored_data, AddrFromPointer)
from .data.Items import ITEMS
from .MapWarp import map_mode
from .data.Entrances import entrance_id_to_entrance
from .data.DynamicEntrances import DYNAMIC_ENTRANCES_BY_SCENE
from typing import Literal
from settings import get_settings
if TYPE_CHECKING:
from worlds._bizhawk.context import BizHawkClientContext, BizHawkClientCommandProcessor
from .Subclasses import PHTransition
from . import PhantomHourglassSettings
default_boat_speed = 0x10A
def get_client_as_command_processor(self: "BizHawkClientCommandProcessor"):
ctx = self.ctx
from worlds._bizhawk.context import BizHawkClientContext
assert isinstance(ctx, BizHawkClientContext)
client = ctx.client_handler
assert isinstance(client, PhantomHourglassClient)
return client
def cmd_boat_option(self: "BizHawkClientCommandProcessor",
option: Literal["snap_speed", "speed", "options"] = "options",
*args: str):
"""
Change various train options. Currently implemented:
- speed <speed: int | "default" | "reset" | "list"> <gear>
- snap_speed (True): instantly switch to new speeds after charting or starting the engine.
- options: lists current option values
"""
# Thanks to Silvris's mm2 implementation for help with bizhawk command processing
valid_options = ["snap_speed", "speed", "options"]
option = option.lower()
if option not in valid_options:
self.output(f" \"{option}\" is not a valid option! {valid_options}")
return False
if option == "speed":
return cmd_boat_speed(self, *args)
value = args[0].lower() if args else "true"
valid_bool_values = {"0": False, "1": True, "false": False, "true": True, "default": True, "reset": True}
value_bool = valid_bool_values.get(value, None)
if value_bool is None:
self.output(f" \"{value}\" is not a valid boolean!")
return False
client = get_client_as_command_processor(self)
if option == "options":
self.output(f" Current boat options:")
self.output(f" speed: {client.boat_speed}")
self.output(f" snap_speed: {client.boat_snap_speed}")
return True
setattr(client, f"boat_{option}", value_bool)
host_settings: PhantomHourglassSettings = get_settings().get('tloz_ph_options')
host_settings.update({f"boat_{option}": value_bool})
self.output(f" Set option {option} to {value_bool}")
return True
def cmd_boat_speed(self: "BizHawkClientCommandProcessor",
speed: int or str = "list"):
def set_speed(speed: int):
client.boat_speed = speed
client.update_boat_speed = True
self.output(f" Setting boat speed: {speed}")
host_settings: PhantomHourglassSettings = get_settings().get('tloz_ph_options')
host_settings.update({f"boat_speed": speed})
client = get_client_as_command_processor(self)
special_speeds = ["list", "default", "reset"]
if speed in special_speeds:
if speed == "list":
self.output(f" Current boat speed: {client.boat_speed}")
return True
elif speed in ["default", "reset"]:
set_speed(default_boat_speed)
return True
try:
speed = min(int(speed), 9999)
speed = max(speed, -9999) # soft cap of 9999
except ValueError:
self.output(f" \"{speed}\" is not a valid speed, must be an int or in {special_speeds}")
return False
client.train_speed = speed
set_speed(client.train_speed)
return True
EQUIP_TIMER_OFFSET = 0x20
# gMapManager -> mCourse -> mSmallKeys
SMALL_KEY_OFFSET = 0x260
STAGE_FLAGS_OFFSET = 0x268
# Addresses to read each cycle
read_keys_always = [PHAddr.game_state, PHAddr.in_cutscene, PHAddr.loading_room,
PHAddr.received_item_index, PHAddr.slot_id,
PHAddr.stage, PHAddr.room, PHAddr.entrance,
PHAddr.in_short_cs, PHAddr.opened_clog, PHAddr.saving
]
read_keys_deathlink = []
read_keys_land = [PHAddr.getting_location, PHAddr.getting_ship_part, PHAddr.in_map]
read_keys_sea = [PHAddr.shot_frog, PHAddr.boat_health, PHAddr.drawing_sea_route, PHAddr.boat_gear]
read_keys_salvage = [PHAddr.salvage_health]
# datastore_keys
checked_key = "ph_checked_entrances"
disconnect_key = "ph_disconnect_entrances"
traversal_key = "ph_traversed_entrances"
ut_events_key = "ph_ut_events"
ut_exclude_key = "ph_keylocking"
save_scene_key = "ph_save_scene"
visited_scenes_key = "ph_visited_scenes"
class PhantomHourglassClient(DSZeldaClient):
game = "The Legend of Zelda - Phantom Hourglass"
system = "NDS"
def __init__(self) -> None:
super().__init__()
# Required variables from inherit
self.starting_flags = STARTING_FLAGS
self.dungeon_key_data = DUNGEON_KEY_DATA
self.starting_entrance = (11, 3, 5) # stage, room, entrance
self.scene_addr = (PHAddr.stage, PHAddr.room, PHAddr.floor, PHAddr.entrance) # Stage, room, floor, entrance
self.exit_coords_addr = (PHAddr.transition_x, PHAddr.transition_y, PHAddr.transition_z) # x, y, z. what coords to spawn link at when entering a
self.dynamic_entrances_by_scene = DYNAMIC_ENTRANCES_BY_SCENE
# continuous transition
self.er_y_offest = 164 # In ph i use coords who's y is 164 off the entrance y
self.stage_flag_offset = STAGE_FLAGS_OFFSET
self.hint_data = HINT_DATA
self.entrances = ENTRANCES
self.item_data = ITEMS
# Ph variables
self.goal_room = 0x3600
self.goal_event_connect = None
self.sent_goal = False
self.last_treasures = 0
self.last_potions = [0, 0]
self.last_ship_parts = []
self.at_sea = False
self.lowered_water = False
self.visited_entrances = set()
self.redisconnected_entrances = set()
self.checked_entrances = set()
self.boss_warp_entrance = None
self.last_warp_stage = None
self.item_location_combo = None
self.metal_count = 0
self.sent_event = False
self.event_reads = []
self.event_data = []
self.last_saved_scene = None
self.lss_retry_attempts = 4
self.death_check = False
self.death_precision = None
self.health_address: "Address" = PHAddr.null
self.last_health_pointer = 0
self.save_spam_protection = False
self.death_warning_spam_protect = False
# Map warp vars
self.map_mode: bool = False # if in warp menu
self.map_warp: "PHTransition" or None = None # destination entrance
self.map_warp_reselector: bool = True # Spam prevention
self.pen_mode_pointer = None
self.last_pen_mode = 0x18
self.addr_game_state = PHAddr.game_state
self.addr_slot_id = PHAddr.slot_id
self.addr_stage = PHAddr.stage
self.addr_room = PHAddr.room
self.addr_entrance = PHAddr.entrance
self.addr_received_item_index = PHAddr.received_item_index
self.boat_speed = default_boat_speed
self.boat_snap_speed = True
self.update_boat_speed = True
self.last_gear = True
async def check_game_version(self, ctx: "BizHawkClientContext") -> bool:
rom_name_bytes = (await PHAddr.game_identifier.read_bytes(ctx))[0]
print(f"{rom_name_bytes}")
rom_name = bytes([byte for byte in rom_name_bytes if byte != 0]).decode("ascii")
print(f"Rom Name: {rom_name}")
if rom_name != "ZELDA_DS:PHAZEP": # EU
if rom_name == "ZELDA_DS:PHAZEE": # US
logger.error("You are using a US rom that is not supported yet. sorry!")
self.version_offset = -64
return False
# Set commands
if "boat" not in ctx.command_processor.commands:
ctx.command_processor.commands["boat"] = cmd_boat_option
return True
async def on_connect(self, ctx):
# Get train settings from host.yaml
host_settings: PhantomHourglassSettings = get_settings().get('tloz_ph_options')
print(f"SETTINGS: {host_settings.get('boat_speed', self.boat_speed)}")
self.boat_speed = host_settings.get("boat_speed", self.boat_speed)
self.boat_snap_speed = host_settings.get("boat_snap_speed", self.boat_snap_speed)
async def set_special_starting_flags(self, ctx: "BizHawkClientContext") -> list[tuple[int, list, str]]:
"""
Game specific starting flag logic.
Flags defined in STARTING_FLAGS are set automatically
:param ctx: BizhawkClientContext
:return: write_list
"""
# Reset save slot
write_list = PHAddr.received_item_index.get_write_list(0)
# Reset starting time for PH
write_list += PHAddr.phantom_hourglass_max.get_write_list(0)
# Set Frog flags if not randomizing frogs
if ctx.slot_data["randomize_frogs"] == 1:
write_list += [a.get_inner_write_list(v) for a, v in STARTING_FROG_FLAGS]
# Set Fog Flags
fog_bits = FOG_SETTINGS_FLAGS[ctx.slot_data["fog_settings"]]
if len(fog_bits) > 0:
write_list += [a.get_inner_write_list(v) for a, v in fog_bits]
if ctx.slot_data["skip_ocean_fights"] == 1:
write_list += PHAddr.adv_flags_22.get_write_list(0x84)
# Ban player from harrow if not randomized
if ctx.slot_data["randomize_harrow"] == 0:
write_list += PHAddr.adv_flags_30.get_write_list(0x18)
# Print starting hints
if ctx.slot_data["dungeon_hint_location"] == 0:
self.dungeon_hints(ctx)
# Start with sea maps if map warping
if ctx.slot_data["map_warp_options"]:
write_list += PHAddr.inventory_5.get_write_list(0x1F)
print(f"ssf write list: {write_list}")
return write_list
async def get_coords(self, ctx, multi=False):
coords = await read_multiple(ctx, self.get_coord_address(multi=multi), signed=True)
if not multi:
return {
"x": coords.get(PHAddr.link_x, coords.get(PHAddr.boat_x)),
"y": coords.get(PHAddr.link_y, 0),
"z": coords.get(PHAddr.link_z, coords.get(PHAddr.boat_z, 0))
}
return coords
def update_metal_count(self, ctx):
metal_ids = [self.item_data[i].id for i in ITEM_GROUPS["Metals"]]
self.metal_count = sum(1 for i in ctx.items_received if i.item in metal_ids)
async def update_treasure_tracker(self, ctx):
self.last_treasures = await PHAddr.all_treasure_count.read(ctx)
# print(f"Treasure Tracker! {split_bits(self.last_treasures, 8)}")
async def give_random_treasure(self, ctx):
address = AddrFromPointer(PHAddr.pink_coral_count + randint(0, 7))
await address.add(ctx, 1)
await self.update_treasure_tracker(ctx)
logger.info(f"Got random treasure from farmable location.")
async def update_potion_tracker(self, ctx):
reads = await read_multiple(ctx, [PHAddr.potion_left, PHAddr.potion_right])
self.last_potions = list(reads.values())
def get_coord_address(self, at_sea=None, multi=False) -> list["Address"]:
if not multi:
at_sea = self.at_sea if at_sea is None else at_sea
if at_sea:
return [PHAddr.boat_x, PHAddr.boat_z]
elif not at_sea:
return [PHAddr.link_x, PHAddr.link_y, PHAddr.link_z]
return [PHAddr.link_x, PHAddr.link_y, PHAddr.link_z, PHAddr.boat_x, PHAddr.boat_z]
async def update_main_read_list(self, ctx, stage, in_game=True):
read_keys = read_keys_always.copy()
death_link_pointer = None
if stage is not None:
if stage == 0:
read_keys += read_keys_sea
self.health_address = PHAddr.boat_health
self.at_sea = True
elif stage == 3:
# Add separate reads for instant-repairs
read_keys += read_keys_salvage
self.health_address = PHAddr.salvage_health
else:
read_keys += read_keys_land
if in_game:
death_link_pointer = (PHAddr.gPlayer, 0xa)
self.at_sea = False
if death_link_pointer:
addr, offset = death_link_pointer
pointer_1 = await addr.read(ctx)
self.health_address = AddrFromPointer(pointer_1 + offset - 0x2000000, size=2, name="link_health")
self.last_health_pointer = pointer_1
read_keys.append(self.health_address)
print(f"Health Address = {self.health_address}")
self.main_read_list = read_keys
else:
self.at_sea = None
return self.main_read_list
async def full_heal(self, ctx, bonus=0):
if not self.at_sea:
hearts = self.item_count(ctx, "Heart Container") + 3 + bonus
print(f"Sent full heal hearts {hearts} addr {self.health_address}")
await self.health_address.overwrite(ctx, hearts * 4)
async def refill_ammo(self, ctx, text=""):
items = [i + " (Progressive)" for i in ["Bombs", "Bombchus", "Bow"]]
# Count upgrades
counts = {self.item_data[i].id: 0 for i in items}
for i in ctx.items_received:
for k in counts:
if k == i.item:
counts[k] += 1
# Write Upgrades
write_list = []
for i, count in enumerate(counts.values()):
data = self.item_data[items[i]]
write_list += data.ammo_address.get_write_list(data.give_ammo[count - 1])
await bizhawk.write(ctx.bizhawk_ctx, write_list)
await self.full_heal(ctx)
if text == "milk_bar":
logger.info(f"You drink a glass of milk. You feel refreshed, and your ammo has been refilled.")
def get_progress(self, ctx, scene=0):
# Count current metals
self.update_metal_count(ctx)
# Figure out totals
if ctx.slot_data["goal_requirements"] < 2:
total = ctx.slot_data["dungeons_required"]
required = total
elif ctx.slot_data["goal_requirements"] == 2:
total = ctx.slot_data["metal_hunt_total"]
required = ctx.slot_data["metal_hunt_required"]
else:
return True
if scene == 0xB0A:
# Oshus Text
bellum_texts = ["spawns the phantoms in TotOK B13.",
"opens the staircase to Bellum at the bottom of TotOK.",
"opens the blue warp to Bellum in TotOK.",
"spawns the ruins of the Ghost Ship in the SW Quadrant.",
"wins the game."]
logger.info(f"You have {self.metal_count} out of {required} rare metals. There are {total} metals in total.\n"
f"Finding the metals {bellum_texts[ctx.slot_data['bellum_access']]}")
elif scene == 0x160A:
zauz_required = ctx.slot_data["zauz_required_metals"]
logger.info(f"Zauz needs {zauz_required} rare metals to give an item. You have {self.metal_count}/{total} metals.")
def process_loading_variable(self, read_result) -> bool:
return read_result[PHAddr.loading_room] == 0xEE
async def process_read_list(self, ctx: "BizHawkClientContext", read_result: dict):
# This go true when link gets item
if self.at_sea:
self.getting_location = read_result.get(PHAddr.shot_frog, False)
else:
self.getting_location = (read_result.get(PHAddr.getting_location, 0) & 0x20
or read_result.get(PHAddr.getting_ship_part, False))
async def process_on_room_load(self, ctx, current_scene, read_result: dict):
self.prev_rupee_count = await PHAddr.rupee_count.read(ctx)
await self.update_potion_tracker(ctx)
await self.update_treasure_tracker(ctx)
async def process_in_game(self, ctx: "BizHawkClientContext", read_result: dict):
# Detect lowering of water and update ER Map
if not self.lowered_water and self.current_stage == 0x24:
await self.lower_water(ctx, True)
await self.detect_ut_event(ctx, self.current_scene)
if self.current_stage == 3 and read_result.get(PHAddr.salvage_health, 5) <= 1:
await self.instant_repair_salvage_arm(ctx)
await self.save_scene(ctx, read_result, PHAddr.saving, save_scene_key, [0x46])
# Map warp entrypoint
if read_result.get(PHAddr.in_map, 0):
await map_mode(self, ctx, read_result)
elif self.map_mode:
self.map_mode = False
self.map_warp = None
self.map_warp_reselector = True
logger.info(f"Illegal map menu exit, canceling all map warps")
if self.warp_to_start_flag and self.map_warp:
self.map_warp = None
logger.info(f"Canceled map warp due to starting a warp to start")
if self.map_warp and self.is_dead:
self.map_warp = None
logger.info(f"Map warp canceled due to death")
if self.is_dead and ctx.slot_data["shuffle_bosses"] and self.current_scene in BOSS_WARP_SCENE_LOOKUP and not self.death_warning_spam_protect:
if read_result[PHAddr.in_cutscene]:
logger.info(f"WARNING! Clicking continue in a boss room will put you out of logic. Please save and quit before continuing.")
self.death_warning_spam_protect = True
elif not self.is_dead:
self.death_warning_spam_protect = False
if self.current_stage == 0:
if read_result[PHAddr.drawing_sea_route]:
self.update_boat_speed = True
if self.last_gear != read_result[PHAddr.boat_gear]:
self.update_boat_speed = True
if self.update_boat_speed:
self.update_boat_speed = False
await PHAddr.boat_max_speed.overwrite(ctx, self.boat_speed, silent=True)
if self.boat_snap_speed:
await PHAddr.boat_speed.overwrite(ctx, self.boat_speed * read_result[PHAddr.boat_gear]//2, silent=True)
self.last_gear = read_result[PHAddr.boat_gear]
async def detect_warp_to_start(self, ctx, read_result: dict):
# Opened clog warp to start check
if read_result.get(PHAddr.opened_clog, False):
if await PHAddr.flipped_clog.read(ctx, silent=True) & 1:
if not self.warp_to_start_flag:
logger.info(f"Primed a warp to start. Enter a transition to warp to {STAGES[0xB]}.")
self.warp_to_start_flag = True
else:
if self.warp_to_start_flag:
logger.info("Canceled warp to start.")
self.warp_to_start_flag = False
# Cancel warp to start if in a dangerous situation
if self.warp_to_start_flag:
# Cyclone slate warp to start crashes, prevent that from working
if self.at_sea:
if await PHAddr.using_cyclone_slate.read(ctx, silent=True) == 1: # is 0x65 if never used cyclone slate
self.warp_to_start_flag = False
logger.info("Canceled warp to start, Cyclone Slate is not a valid warp method")
if self.is_dead:
self.warp_to_start_flag = False
logger.info("Canceled warp to start, death is not a valid warp method")
if self.starting_entrance[:2] == (self.current_stage, read_result[PHAddr.room]):
logger.info(f"In starting scene, canceling warp to start")
self.warp_to_start_flag = False
async def enter_game(self, ctx):
self.save_slot = await PHAddr.save_slot.read(ctx, silent=True)
self.update_metal_count(ctx)
self.set_ending_room(ctx)
await self.lower_water(ctx)
await PHAddr.text_speed.overwrite(ctx, 2) # Set text speed to fast, no matter settings
# Set treasure prices so they match seed (save file resets it on menu)
await PHAddr.treasure_price_index.overwrite(ctx, ctx.slot_data.get("treasure_price_index", 0))
await self.update_stored_entrances(ctx)
# Set warp to start location
if ctx.slot_data["shuffle_overworld_transitions"]:
self.starting_entrance = (11, 0, 0)
async def watched_intro_cs(self, ctx):
watched_intro = await PHAddr.watched_intro.read(ctx, silent=True) & 2
return watched_intro
async def process_hard_coded_rooms(self, ctx, current_scene):
self.sent_event = False # Reset per-room UT events
self.event_data = []
self.event_reads = []
self.save_spam_protection = False # Reset save spam protection
# Yellow warp in TotOK saves keys
# TODO: allow this to work with ER
if self.last_scene is not None:
if current_scene == 0x2509 and self.last_scene == 0x2507:
await self.write_totok_midway_keys(ctx)
# Repair salvage arm in certain rooms
if current_scene in [0x130A, 0x500]:
await self.repair_salvage_arm(ctx, current_scene)
# Milk bar refills all ammo
if current_scene in [0xb0C]:
await self.refill_ammo(ctx, "milk_bar")
# Oshus gives metal info
if current_scene in [0xB0A, 0x160A]:
self.get_progress(ctx, current_scene)
# Shipyard gives ship parts
if current_scene in [0xB0D]:
await self.edit_ship(ctx)
if current_scene in [0xB03]:
await self.remove_ship_parts(ctx)
if current_scene == 0x1401: # Bannan chest needs to happen after load
if await PHAddr.adv_flags_22.read(ctx) & 0x8:
await PHAddr.wayfarer_chest.set_bits(ctx, 0x80)
# Open pedestal doors. sucks that you can't trigger it with dynaflags. slow code but game is slower
if ctx.slot_data.get("randomize_pedestal_items", 0) > 0:
# === TotOK ===
if current_scene == 0x2503: # B3
if self.item_count(ctx, "Force Gem (B3)") >= 3 or self.item_count(ctx, "Force Gems"):
await PHAddr.totok_b3_state.set_bits(ctx, [0xFE, 0x0F])
elif current_scene == 0x250B: # B8
if (self.item_count(ctx, "Round Crystal (Temple of the Ocean King)")
or self.item_count(ctx, "Round Pedestal B8 (Temple of the Ocean King)")
or self.item_count(ctx, "Round Crystals")):
await PHAddr.totok_b8_state.set_bits(ctx, 0x2)
if (self.item_count(ctx, "Triangle Crystal (Temple of the Ocean King)")
or self.item_count(ctx, "Triangle Crystals")
or self.item_count(ctx, "Triangle Pedestal B8 (Temple of the Ocean King)")):
await PHAddr.totok_b8_state.set_bits(ctx, 0x4)
elif current_scene == 0x250C: # B9
if (self.item_count(ctx, "Round Crystal (Temple of the Ocean King)")
or self.item_count(ctx, "Round Pedestal B9 (Temple of the Ocean King)")
or self.item_count(ctx, "Round Crystals")):
await PHAddr.totok_b9_state.set_bits(ctx, 0x4)
if (self.item_count(ctx, "Triangle Crystal (Temple of the Ocean King)")
or self.item_count(ctx, "Triangle Pedestal B9 (Temple of the Ocean King)")
or self.item_count(ctx, "Triangle Crystals")):
await PHAddr.totok_b9_state.set_bits(ctx, 0x8)
if (self.item_count(ctx, "Square Crystal (Temple of the Ocean King)")
or self.item_count(ctx, "Square Crystals")):
await PHAddr.totok_b9_state.set_bits(ctx, 0x22)
if self.item_count(ctx, "Square Pedestal West (Temple of the Ocean King)"):
await PHAddr.totok_b9_state.set_bits(ctx, 0x20)
if self.item_count(ctx, "Square Pedestal Center (Temple of the Ocean King)"):
await PHAddr.totok_b9_state.set_bits(ctx, 0x2)
elif current_scene == 0x2510: # B12
gem_count = self.item_count(ctx, "Force Gem (B12)") | self.item_count(ctx, "Force Gems")*3
if gem_count >= 3:
await PHAddr.totok_b12_state.set_bits(ctx, [0xFE, 0x0F])
elif gem_count == 2:
await PHAddr.totok_b12_state.set_bits(ctx, 0xC)
elif gem_count == 1:
await PHAddr.totok_b12_state.set_bits(ctx, 0x8)
# Remove ability to place force gems on southern pedestals
await PHAddr.totok_b12_pedestal_left.overwrite(ctx, 0x9)
await PHAddr.totok_b12_pedestal_right.overwrite(ctx, 0x9)
# === Temple of Courage ===
elif current_scene == 0x1E00:
if (self.item_count(ctx, "Square Pedestal North (Temple of Courage)")
or self.item_count(ctx, "Square Crystal (Temple of Courage)")
or self.item_count(ctx, "Square Crystals")):
await PHAddr.toc_crystal_state.set_bits(ctx, 0x10)
if (self.item_count(ctx, "Square Pedestal South (Temple of Courage)")
or self.item_count(ctx, "Square Crystal (Temple of Courage)")
or self.item_count(ctx, "Square Crystals")):
await self.stage_flag_address.set_bits(ctx, 0x80)
# === Ghost Ship ===
elif current_scene == 0x2900:
if (self.item_count(ctx, "Triangle Crystal (Ghost Ship)")
or self.item_count(ctx, "Triangle Crystals")):
await self.stage_flag_address.set_bits(ctx, 0x8, offset=1)
if (self.item_count(ctx, "Round Crystal (Ghost Ship)")
or self.item_count(ctx, "Round Crystals")):
await self.stage_flag_address.set_bits(ctx, 0x2, offset=3)
async def write_totok_midway_keys(self, ctx):
data = DUNGEON_KEY_DATA[372]
keys = await self.key_address.read(ctx)
keys = keys * data["value"]
keys = data["filter"] if keys > data["filter"] else keys
await PHAddr.small_key_storage_2.set_bits(ctx, keys)
await PHAddr.custom_storage.set_bits(ctx, 0x1) # Set bit to write future TotOK keys to post midway
@staticmethod
async def repair_salvage_arm(ctx, scene=0x500):
prev = await read_multiple(ctx, [PHAddr.global_salvage_health, PHAddr.rupee_count, PHAddr.custom_storage])
repair_kits = (prev[PHAddr.custom_storage] & 0xE0) >> 5
print(f"Repair kits: {repair_kits}")
if prev[PHAddr.global_salvage_health] <= 2:
write_list = []
text = f"Repaired Salvage Arm for "
if repair_kits > 0:
write_list += PHAddr.custom_storage.get_write_list(prev[PHAddr.custom_storage] - 0x20)
text += f"1 Salvage Repair Kit. You have {prev[PHAddr.custom_storage]} remaining."
else:
# Repair cost, doesn't care if you're out of rupees out of qol
cost = 100 if prev[PHAddr.global_salvage_health] == 0 else (6 - prev[PHAddr.global_salvage_health]) * 10
rupees = 0 if prev[PHAddr.rupee_count] - cost <= 0 else prev[PHAddr.rupee_count] - cost
write_list += PHAddr.rupee_count.get_write_list(rupees)
text += f"{cost} rupees."
write_list += PHAddr.global_salvage_health.get_write_list(5)
await bizhawk.write(ctx.bizhawk_ctx, write_list)
else:
text = f"This room automatically repairs your Salvage Arm, for a cost or a kit, when at 2 health or below."
# Send a client message about the repair
logger.info(text)
@staticmethod
async def instant_repair_salvage_arm(ctx):
salvage_data = await PHAddr.custom_storage.read(ctx, silent=True)
salvage_kits = (salvage_data & 0xE0) >> 5
if salvage_kits > 0:
write_list = (PHAddr.custom_storage.get_write_list(salvage_data - 0x20) +
PHAddr.salvage_health.get_write_list(5) +
PHAddr.global_salvage_health.get_write_list(5)) # Global salvage health
await bizhawk.write(ctx.bizhawk_ctx, write_list)
logger.info(f"Salvage Arm instant-repaired. You have {salvage_kits - 1} Salvage Repair Kits remaining.")
@staticmethod
async def remove_ship_parts(ctx):
ship_write_list = ([1] + [0] * 8) * 8
await PHAddr.ship_part_counts.overwrite(ctx, ship_write_list)
async def edit_ship(self, ctx):
# Figure out what ships player has
ships = [1] + [0]*8
for i in ctx.items_received:
item_id = i.item
item_name = self.item_id_to_name[item_id]
if "Ship:" in item_name:
item_data = self.item_data[item_name]
ships[item_data.ship] = 1
# Give ship parts
ship_write_list = [] + ships * 8
print(ships, ship_write_list)
await bizhawk.write(ctx.bizhawk_ctx, [(PHAddr.ship_part_counts.addr, ship_write_list, "Main RAM")])
await PHAddr.custom_storage.set_bits(ctx, 2)
# Dynamic flags/ Entrances
async def has_special_dynamic_requirements(self, ctx, data) -> bool:
# Special case of metals
def check_metals(d):
if "zauz_metals" in d or "goal_requirement" in d:
self.update_metal_count(ctx)
# Zauz Check
if "zauz_metals" in d:
print(f"Metal check: {self.metal_count} metals out of {ctx.slot_data['zauz_required_metals']}")
if self.metal_count < ctx.slot_data["zauz_required_metals"]:
if d["zauz_metals"]:
return False
else:
if not d["zauz_metals"]:
return False
# Goal Check
if "goal_requirement" in d:
print(f"Metal check: {self.metal_count} metals out of {ctx.slot_data['required_metals']}")
return self.metal_count >= ctx.slot_data["required_metals"]
return True
# Beedle points
def check_beedle_points(d):
if not d.get("beedle_points", False):
return True
reference = {"Beedle Points (10)": 10,
"Beedle Points (20)": 20,
"Beedle Points (50)": 50}
# Count points
reference = {self.item_data[k].id: c for k, c in reference.items()}
points = 0
for i in ctx.items_received:
if i.item in reference:
points += reference[i.item]
print(f"Beedle points {d.get('beedle_points')} >= {points}")
return points >= d.get('beedle_points', 300)
def count_spirit_gems(d):
if "count_gems" in d:
pack_size = ctx.slot_data["spirit_gem_packs"]
gem_count = self.item_count(ctx, f"{d['count_gems']} Gem Pack")
count = pack_size * gem_count
print(count, d["count_gems"])
if count < 20:
return False
return True
# Checks
if not check_metals(data):
print(f"\t{data['name']} does not have enough metals")
return False
if not check_beedle_points(data):
return False
if not count_spirit_gems(data):
print(f"\t{data['name']} does not have enough spirit packs")
return False
if data.get("has_lowered_water", False):
if not self.lowered_water:
print(f"\t{data['name']} has not lowered water")
return False
return True
async def set_stage_flags(self, ctx, stage):
print(f"Setting stage flags")
self.stage_flag_address = await get_address_from_heap(ctx, PHAddr.gMapManager, STAGE_FLAGS_OFFSET)
self.key_address = AddrFromPointer(self.stage_flag_address + SMALL_KEY_OFFSET)
if stage in STAGE_FLAGS:
flags = STAGE_FLAGS[stage]
# Change certain stage flags based on options
if stage == 0 and ctx.slot_data["skip_ocean_fights"] == 1:
flags = SKIP_OCEAN_FIGHTS_FLAGS
if stage == 41 and ctx.slot_data["logic"] >= 1:
flags = SPAWN_B3_REAPLING_FLAGS
print(f"\tSetting Stage flags for {STAGES[stage]}, "
f"adr: {self.stage_flag_address}")
await self.stage_flag_address.set_bits(ctx, flags)
# Unlock boss door if have bk
data = BOSS_DOOR_DATA.get(stage, False)
if data and ctx.slot_data.get("boss_key_behaviour", True):
if self.item_count(ctx, f"Boss Key ({data['name']})"):
await data["address"].set_bits(ctx, data["value"])
# Enter stage
async def enter_special_key_room(self, ctx, stage, scene_id) -> bool:
if stage != 0x25:
return False
if scene_id in [0x2509, 0x250E]:
await self.update_key_count(ctx, 372)
elif scene_id in [0x2500, 0x2504]:
return False # Do normal enter TotOK operation, see update_special_key_count for key calc
return True
async def update_special_key_count(self, ctx, current_stage: int, new_keys, key_data: dict, key_values, key_address: "Address") -> tuple[int, bool]:
if current_stage == 0x25:
if self.location_name_to_id["TotOK 1F Sea Chart Chest"] in ctx.checked_locations:
new_keys -= 1 # Opening the SW sea chart door uses a key permanently! No savescums!
if self.current_scene == 0x2504: # Set B3.5 key count
new_keys -= 2
if not self.item_count(ctx, "Grappling Hook") and ctx.slot_data["randomize_pedestal_items"] == 0:
new_keys -= 1
return new_keys, False
elif current_stage == 372:
return new_keys, False
return new_keys, True
async def get_small_key_address(self, ctx) -> "Address":
return await get_address_from_heap(ctx, PHAddr.gMapManager, SMALL_KEY_OFFSET)
# Called during location processing to determine what vanilla item to remove
async def unset_special_vanilla_items(self, ctx, location, item):
# Multiple sword items don't detect each other by default
if item in ["Oshus' Sword", "Phantom Sword"] and self.item_count(ctx, "Sword (Progressive)"):
self.last_vanilla_item.pop()
# Don't remove heart containers if already at max
if item == "Heart Container" and self.item_count(ctx, item) >= 13:
self.last_vanilla_item.pop()
# Farmable locations don't remove vanilla
if "farmable" in location and location["id"] in ctx.checked_locations:
if item == "Ship Part":
await self.give_random_treasure(ctx)
else:
self.last_vanilla_item.pop()
logger.info(f"Got farmable location")
async def receive_key_in_own_dungeon(self, ctx, item_name: str, write_keys_to_storage):
# TotOK - adds to key increment if you get it in the dungeon, otherwise do as usual
if "Temple of the Ocean King" in item_name:
return [await write_keys_to_storage(37)]
return []
async def received_special_small_keys(self, ctx, item_name, write_keys_to_storage):
# TotOK Midway special data
res = []
if item_name == "Small Key (Temple of the Ocean King)":
res.append(await write_keys_to_storage(37))
if await PHAddr.custom_storage.read(ctx) & 0x1:
res.append(await write_keys_to_storage(372))
return res
async def received_special_incremental(self, ctx, item_data) -> int:
# Sand of hours check
_value = 0
if "Sand" in item_data.value:
if ctx.slot_data["ph_required"] and not self.item_count(ctx, "Phantom Hourglass"):
return 0
sand_lookup = {
"Phantom Hourglass": ctx.slot_data["ph_starting_time"] * 60,
"Sand of Hours": ctx.slot_data["ph_time_increment"] * 60,
"Sand of Hours (Small)": 3600,
"Sand of Hours (Boss)": 7200
}
_value = sum([self.item_count(ctx, i)*v for i, v in sand_lookup.items()])
if _value > 359940:
_value = 359940
print(f"Sand stage {self.current_stage} {_value}")
if self.current_stage == 0x25:
add_value = sand_lookup[item_data.name]
await PHAddr.phantom_hourglass_current.add(ctx, add_value)
elif item_data.value == "pack_size":
_value = ctx.slot_data["spirit_gem_packs"] * self.item_count(ctx, item_data.name)
_value += self.item_count(ctx, item_data.name.removesuffix(" Pack"))
else:
raise ValueError(f"Special item value {item_data.value} is not supported")
return _value
async def receive_item_post_processing(self, ctx, item_name, item_data):
# If treasure, update treasure tracker
if hasattr(item_data, "inventory_id"):
await self.enable_items(ctx, item_data.inventory_id)
if "treasure" in item_data.tags:
await self.update_treasure_tracker(ctx)
if "Potion" in item_name:
await self.update_potion_tracker(ctx)
# If hint on receive, send hint (currently only treasure maps)
if hasattr(item_data, "hint_on_receive"):
if ctx.slot_data["randomize_salvage"] == 1:
await self.scout_location(ctx, item_data.hint_on_receive)
# Increment metal count
if item_name in ITEM_GROUPS["Metals"]:
self.metal_count += 1
await self.process_game_completion(ctx)
exclude_key = storage_key(ctx, ut_exclude_key)
# Exclude forced vanilla items on not needing them any more
if item_name == "Grappling Hook" and ctx.slot_data.get("randomize_pedestal_items", 0) in [0, 1]:
print(f"TotOK B3 has no more useful force gems")
data = [self.location_name_to_id[i] for i in LOCATION_GROUPS["Grappling Hook Excludes"]]
await self.store_data(ctx, exclude_key, data)
# Run code if you got a certain item from a certain location
if self.item_location_combo:
if "Mountain Passage" in self.item_location_combo["name"]:
if ctx.slot_data["keysanity"] < 2 and "Small Key" not in item_name and ctx.slot_data["shuffle_caves"] == 0:
print(f"Mountain Passage has no more useful items")
data = [self.location_name_to_id[i] for i in LOCATION_GROUPS["Mountain Passage"]]
await self.store_data(ctx, exclude_key, data)
self.item_location_combo = None
if hasattr(item_data, "set_bit_in_room") and ctx.slot_data.get("randomize_pedestal_items", 0):
print(f"Trying to set bit in room, room {hex(self.current_scene)}")
if self.current_scene in item_data.set_bit_in_room:
for addr, _value, *args in item_data.set_bit_in_room[self.current_scene]:
print(f"args {args}")
if addr == "stage_flag":
addr = self.stage_flag_address
print(f"Stage address: {addr}")
if args and "count" in args[0]:
if self.item_count(ctx, item_name) < args[0]["count"]:
continue
if isinstance(_value, int):
_value = [_value]
await addr.set_bits(ctx, _value)
# disconnect port entrances
if ctx.slot_data.get("ut_blocked_entrances_behaviour", 0) == 2 and ctx.slot_data["boat_requires_sea_chart"] and hasattr(item_data, "disconnect_entrances"):
disconnects_ids = [ENTRANCES[e].id for e in item_data.disconnect_entrances if str(ENTRANCES[e].id) in ctx.slot_data["er_pairings"]]
await self.redisconnect(ctx, disconnects_ids)
async def redisconnect(self, ctx, data):
reciprocals = [ctx.slot_data["er_pairings"][str(i)] for i in data if
str(i) in ctx.slot_data["er_pairings"]]
all_ids = set(data + reciprocals) if not ctx.slot_data["decouple_entrances"] else set(reciprocals)
# Don't disconnect still blocked entrances
for i in reciprocals:
if not await self.conditional_er(ctx, entrance_id_to_entrance[i], silent=True):
print(f"not redisconnecting blocked entrance {entrance_id_to_entrance[i].name}")
all_ids.remove(i)
if not ctx.slot_data["decouple_entrances"]:
print(f"\treciprocal{entrance_id_to_entrance[ctx.slot_data['er_pairings'][str(i)]].name}")
all_ids.remove(ctx.slot_data["er_pairings"][str(i)])
# Don't disconnect undiscovered entrances
self.checked_entrances |= set(get_stored_data(ctx, checked_key, set()))
for i in all_ids.copy():
if i not in self.checked_entrances:
print(f"not redisconnecting unfound entrance: {entrance_id_to_entrance[i].name}")
all_ids.remove(i)
print(f"Redisconnecting {[self.entrance_id_to_entrance[i].name for i in all_ids]}")
# store redisconnects
key = storage_key(ctx, disconnect_key)
self.redisconnected_entrances |= set(get_stored_data(ctx, disconnect_key, set()))
await self.store_data(ctx, key, all_ids)
self.redisconnected_entrances.update(all_ids)
@staticmethod
async def enable_items(ctx: "BizHawkClientContext", inventory_id: int):
equipped_item_pointer = AddrFromPointer(await PHAddr.gItemManager.read(ctx)-0x02000000, size=4)
equipped_item = await equipped_item_pointer.read(ctx, silent=True)
if equipped_item == 0xffffffff:
print(f"Items menu not visible, enabling: {hex(equipped_item_pointer + EQUIP_TIMER_OFFSET)}")
# Enable items menu
equipped_item_timer = AddrFromPointer(equipped_item_pointer + EQUIP_TIMER_OFFSET, size=2)
await equipped_item_timer.overwrite(ctx, 20)
await equipped_item_pointer.overwrite(ctx, inventory_id)
def set_ending_room(self, ctx):
if ctx.slot_data["goal_requirements"] == 0:
self.goal_room = 0x2509
if ctx.slot_data["ut_events"] > 0:
self.goal_event_connect = ENTRANCES["GOAL: Triforce Door"]
elif ctx.slot_data["bellum_access"] < 4:
self.goal_room = 0x3600
if ctx.slot_data["ut_events"] > 0:
self.goal_event_connect = ENTRANCES["GOAL: Bellumbeck"]
async def process_game_completion(self, ctx: "BizHawkClientContext"):
current_scene = self.read_result[PHAddr.stage] * 0x100 + self.read_result[PHAddr.room]
game_clear = False
current_scene = current_scene * 0x100 if current_scene < 0x100 else current_scene # ???
if ctx.slot_data["bellum_access"] == 4:
game_clear = self.metal_count >= ctx.slot_data["required_metals"]
if game_clear and not self.sent_goal:
await self.store_visited_entrances(ctx, ENTRANCES["GOAL"], ENTRANCES["GOAL"].vanilla_reciprocal)
self.sent_goal = True
else:
game_clear = current_scene == self.goal_room # Enter End Credits
if game_clear and self.goal_event_connect and not self.sent_goal:
await self.store_visited_entrances(ctx, self.goal_event_connect, self.goal_event_connect.vanilla_reciprocal)
self.sent_goal = True
return game_clear
async def process_deathlink(self, ctx: "BizHawkClientContext", is_dead, stage, read_result):
if (not read_result.get(PHAddr.drawing_sea_route, False) and read_result[PHAddr.in_cutscene]
and self.current_scene not in [0x1701]):
if ctx.last_death_link > self.last_deathlink and not is_dead:
# A death was received from another player, make our player die as well
await self.health_address.overwrite(ctx, 0)
self.is_expecting_received_death = True
self.last_deathlink = ctx.last_death_link
if not self.was_alive_last_frame and not is_dead:
# We revived from any kind of death
self.was_alive_last_frame = True
elif self.was_alive_last_frame and is_dead:
# Our player just died...
if stage not in [0, 3]:
health_pointer = await PHAddr.gPlayer.read(ctx)
if self.last_health_pointer != health_pointer:
print(f"Deathlink triggered with wrong health pointer. Updating main read list")
await self.update_main_read_list(ctx, stage, True)
return
self.was_alive_last_frame = False
print(f"health address: {self.health_address}")
if self.is_expecting_received_death:
# ...because of a received deathlink, so let's not make a circular chain of deaths please
self.is_expecting_received_death = False
else:
# ...because of their own incompetence, so let's make their mates pay for that
await ctx.send_death(ctx.player_names[ctx.slot] + " may have disappointed the Ocean King.")
self.last_deathlink = ctx.last_death_link
def add_special_er_data(self, ctx, er_map, scene, detect_data, exit_data):
# all lowered water scenes on ruins need to account for funny scene detections
if scene & 0xFF00 == 0x1100:
high_scene = 0x1200 + (scene & 0xFF)
er_map.setdefault(high_scene, {})
# detecting 11s in scene 12s
print(f"\tnew home scene: {high_scene}")
er_map[high_scene][detect_data] = exit_data
if detect_data.exit_stage == 0x11:
new_detect = detect_data.copy()
new_detect.set_exit_stage(0x12)
er_map[high_scene][new_detect] = exit_data
if detect_data.exit_stage == 0x11:
new_detect = detect_data.copy()
new_detect.set_exit_stage(0x12)
print(f"\tnew detect scene: {new_detect} {new_detect.entrance} {new_detect.exit}")
# detect scene turns to 12
er_map[scene][new_detect] = exit_data
# Leaving a travelling ship can make your detect entrance any quadrant
if detect_data.exit[2] == 0xFA:
for i in range(4):
new_detect = detect_data.copy()
new_detect.set_exit_room(i)
print(f"\tnew detect scene: {new_detect} {new_detect.entrance} {new_detect.exit}")
er_map[scene][new_detect] = exit_data
return er_map
async def lower_water(self, ctx, allow_redisconnect=False):
if await PHAddr.lower_water.read(ctx, silent=True) & 0x4:
print(f"Water has been lowered...")
for scene, data in self.er_map.items():
for detect_data, exit_data in data.items():
if exit_data.stage == 0x11:
exit_data.set_stage(0x12)
self.er_map[scene][detect_data] = exit_data
if allow_redisconnect and not self.lowered_water and ctx.slot_data.get("ut_blocked_entrances_behaviour", 0) == 2:
print(f"Allowing redisconnect")
water_entrances = [i.id for i in ENTRANCES.values() if "ruins_water" in i.extra_data.get("conditional", [])]
await self.redisconnect(ctx, water_entrances)
self.lowered_water = True
async def detect_ut_event(self, ctx, scene):
"""
Send UT event locations on certain flags being set in certain scenes.
"""
if scene in UT_EVENT_DATA and not self.sent_event:
if not self.event_reads:
data = UT_EVENT_DATA[scene]
data = [data] if isinstance(data, dict) else data
self.event_data = data
for i, event in enumerate(data):
address = AddrFromPointer(self.stage_flag_address + event.get("offset", 0), size=event.get("size", 1)) if event["address"] == "stage_flags" else event["address"]
print(f"event data {self.event_data}")
self.event_data[i]["address"] = address
print(f"event data {self.event_data}")
self.event_reads.append(address)
read_results = await read_multiple(ctx, self.event_reads)
for event, res in zip(self.event_data, read_results.values()):
if event["value"] & res:
if "entrance" in event:
print(f"Event detection Success!, {event['entrance']}")
entrance = ENTRANCES[event["entrance"]]
await self.store_visited_entrances(ctx, entrance, entrance.vanilla_reciprocal)
elif "event" in event:
print(f"Event detection Success!, {event['event']}")
key = storage_key(ctx, ut_events_key)
await self.store_data(ctx, key, [event["event"]])
self.event_reads.remove(event["address"])
self.event_data.remove(event)
if not self.event_data:
print(f"All events sent!")
self.sent_event = True
else:
self.sent_event = True
async def conditional_er(self, ctx, exit_data, silent=False) -> bool:
print(f"\tcond. {exit_data.name} {exit_data.extra_data} lowered water: {self.lowered_water}")
if "conditional" in exit_data.extra_data:
# Bounce back if the entrance connects to a lower room
if "ruins_water" in exit_data.extra_data["conditional"] and not self.lowered_water:
if not silent: logger.info(f"This entrance is flooded (Isle of Ruins)")
return False
# Can't enter the sea without the correct chart
print(f"{exit_data.extra_data['conditional']}, {exit_data.stage}, {ctx.slot_data['boat_requires_sea_chart']}")
if "need_sea_chart" in exit_data.extra_data["conditional"] and exit_data.stage == 0 and ctx.slot_data["boat_requires_sea_chart"]:
quadrant = exit_data.room
chart = SEA_CHARTS[quadrant]
print(f"chart: {chart} {self.item_count(ctx, chart)}")
if not self.item_count(ctx, chart):
if not silent: logger.info(f"Missing correct sea chart ({chart})")
return False
return True
async def conditional_bounce(self, ctx, scene, entrance) -> "PHTransition" or None:
if scene in [0, 1, 2, 3] and ctx.slot_data["boat_requires_sea_chart"]:
chart = SEA_CHARTS[scene]
if not self.item_count(ctx, chart):
for e in self.entrances.values():
if e.detect_exit_scene(scene, entrance):
logger.info(f"Missing correct sea chart ({chart})")
return e
return None
async def update_stored_entrances(self, ctx: "BizHawkClientContext"):
self.visited_entrances.clear()
self.redisconnected_entrances.clear()
self.visited_scenes.clear()
self.checked_entrances.clear()
await ctx.send_msgs([{
"cmd": "Get",
"keys": [storage_key(ctx, disconnect_key),
storage_key(ctx, traversal_key),
storage_key(ctx, visited_scenes_key),
storage_key(ctx, checked_key)],
}])
# UT store entrances to remove
async def store_visited_entrances(self, ctx: "BizHawkClientContext", detect_data, exit_data, interaction="traverse"):
self.visited_entrances |= set(get_stored_data(ctx, traversal_key, set()))
old_visited_entrances = self.visited_entrances.copy()
new_data = {detect_data.id, exit_data.id} if not ctx.slot_data["decouple_entrances"] and detect_data.two_way else {detect_data.id}
print(f"New Storage Data: {new_data} {ctx.slot_data['decouple_entrances']}")
if interaction == "traverse" or ctx.slot_data.get("ut_blocked_entrances_behaviour", 1) == 0:
key = storage_key(ctx, traversal_key)
self.visited_entrances.update(new_data)
new_data = self.visited_entrances-old_visited_entrances
elif interaction == "check":
key = storage_key(ctx, checked_key)
self.checked_entrances.update(new_data)
else:
raise ValueError(f"store_visited_entrances() had an unhandled interaction value {interaction}")
if new_data:
await self.store_data(ctx, key, new_data)
def write_respawn_entrance(self, exit_data: "PHTransition"):
# If ER:ing to sea, set respawn entrance to where you came from cause that doesn't change by itself when warping
if exit_data.stage == 0:
return PHAddr.boat_respawn.get_write_list([exit_data.room, exit_data.entrance[2]])
return []
# fixes conflict with bizhawk_UT
async def game_watcher(self, ctx: "BizHawkClientContext") -> None:
await super().game_watcher(ctx)
def update_boss_warp(self, ctx, stage, scene_id):
if scene_id in BOSS_WARP_SCENE_LOOKUP: # Boss rooms
reverse_exit = BOSS_WARP_SCENE_LOOKUP[scene_id]
reverse_exit_id = self.entrances[reverse_exit].id
pair = ctx.slot_data["er_pairings"].get(f"{reverse_exit_id}", None)
if pair is None:
print(f"Boss Entrance not Randomized")
return None
self.boss_warp_entrance = self.entrance_id_to_entrance[pair]
# If last room was a dungeon, warp to dungeon entrance
dungeon_exit = BOSS_WARP_LOOKUP.get(self.boss_warp_entrance.stage, None)
if dungeon_exit:
self.boss_warp_entrance = self.entrances[dungeon_exit]
print(f"Warp Stage: {stage}, last: {self.last_warp_stage}, current warp {self.boss_warp_entrance}")
return self.boss_warp_entrance
def dungeon_hints(self, ctx):
res = []
print(f"testing for dungeon hints")
# Send boss reward hints
if ctx.slot_data["dungeon_hint_type"] == 2:
print(f"Boss reward locations: {ctx.slot_data.get('required_dungeon_locations', [])}")
for loc in ctx.slot_data.get("required_dungeon_locations", []):
res.append(self.location_name_to_id[loc])
elif ctx.slot_data["dungeon_hint_type"] == 1:
dungeons = ctx.slot_data["required_dungeons"]
if dungeons:
logger.info(f"Your required dungeons are:")
for d in dungeons:
logger.info(f" {d}")
else:
logger.info(f"You have no required dungeons.")
# Send excluded dungeon hints
if ctx.slot_data["excluded_dungeon_hints"]:
dungeons = ctx.slot_data["required_dungeons"]
excluded = [d for d in DUNGEON_NAMES[2:] if d not in dungeons]
if excluded:
logger.info(f"Your excluded dungeons are:")
for d in excluded:
logger.info(f" {d}")
else:
logger.info(f"You have no excluded dungeons.")
return res
async def check_location_post_processing(self, ctx, location):
if location is not None and "do_special" in location:
if location["do_special"] == "keylock":
print(f"Got item in Mountain passage: {ctx.items_received[-1]}")
self.item_location_combo = location
if location["do_special"] == "ut_event":
key = storage_key(ctx, ut_events_key)
print(f"got ut_event location for key {key} loc {location['name']}")
if location["name"] == "TotOK 1F Sea Chart Chest":
await self.store_data(ctx, key, ["1f"])
if isinstance(location["do_special"], dict):
event_type = location["do_special"].get("event_type", None)
if event_type == "ut_connect":
event_name = location["do_special"]["event_name"]
entr = ENTRANCES[event_name]
await self.store_visited_entrances(ctx, entr, entr.vanilla_reciprocal)
async def ut_bounce_scene(self, ctx, scene):
if not ctx.slot_data["shuffle_houses"] and map_type_lookup.get(scene) == "house":
print(f"Not map switching due to house: {hex(scene)}")
return
if not ctx.slot_data["shuffle_caves"] and map_type_lookup.get(scene) == "cave":
print(f"Not map switching due to cave: {hex(scene)}")
return
if map_type_lookup.get(scene) == "ship":
return
if scene in range(4) or scene in [0x300]: # Sea overview if port shuffle
tab_scene = 1 if ctx.slot_data["shuffle_ports"] else 0
else:
tab_scene = scene | (1 << 16) if ctx.slot_data.get("shuffle_overworld_transitions", False) else scene
print(f"Storing new scene for UT {hex(tab_scene)}")
await ctx.send_msgs([{
"cmd": "Set",
"key": f"{ctx.slot}_{ctx.team}_UT_MAP",
"default": 0,
"operations": [{"operation": "replace", "value": tab_scene}]
}])
# Save visited scenes
if ctx.slot_data.get("map_warp_options", 0):
self.visited_scenes.add(scene)
await self.store_data(ctx, storage_key(ctx, visited_scenes_key), [scene])
async def process_in_menu(self, ctx: "BizHawkClientContext", read_result):
self.death_precision = None
if (not read_result.get(PHAddr.in_map, 0) and self.map_mode) or self.map_warp:
self.map_mode = False
self.map_warp = None
self.map_warp_reselector = True
logger.info(f"Illegal map menu exit, canceling all map warps")
await self.get_saved_scene(ctx, save_scene_key)
if self.current_stage & 0xFF == 0x6E:
started_save_file = await PHAddr.started_save_file.read(ctx, silent=True)
if started_save_file:
print(f"Started save file with saved scene {hex(self.last_saved_scene)}")
if self.warp_to_start_flag:
print(f"Started save file with warp to start active, warping to start")
self.warp_to_start_flag = False
self.precision_mode = [PHAddr.stage_small, 0x6E, "wts"]
ctx.watcher_timeout = 0.1
elif self.last_saved_scene in BOSS_WARP_SCENE_LOOKUP:
print(f"Problem entrance detected")
warp_exit = self.update_boss_warp(ctx, self.current_stage, self.last_saved_scene)
if warp_exit is not None:
self.precision_mode = [PHAddr.stage_small, 0x6E, "warp", warp_exit]
ctx.watcher_timeout = 0.1
async def precision_backup(self, ctx, precision_read):
if len(self.precision_mode) > 2 and self.precision_mode[2] == "warp":
if precision_read == 0x34:
print(f"New file, cancel precision")
return True
return False