forked from mirror/Archipelago
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
1605 lines
66 KiB
Python
1605 lines
66 KiB
Python
|
|
import time
|
|
import logging
|
|
from typing import TYPE_CHECKING, Set, Dict, Any, Iterable
|
|
|
|
from NetUtils import ClientStatus
|
|
import worlds._bizhawk as bizhawk
|
|
from worlds._bizhawk.client import BizHawkClient
|
|
from ..data.Constants import *
|
|
from ..Util import *
|
|
from .subclasses import read_multiple, write_multiple, storage_key, get_stored_data
|
|
|
|
from ..data.Addresses import *
|
|
|
|
if TYPE_CHECKING:
|
|
from worlds._bizhawk.context import BizHawkClientContext
|
|
from ..Subclasses import DSTransition
|
|
from .ItemClass import DSItem
|
|
from .subclasses import Address
|
|
|
|
logger = logging.getLogger("Client")
|
|
|
|
class DSZeldaClient(BizHawkClient):
|
|
local_checked_locations: Set[int]
|
|
local_scouted_locations: Set[int]
|
|
local_tracker: Dict[str, Any]
|
|
item_id_to_name: Dict[int, str]
|
|
location_name_to_id: Dict[str, int]
|
|
location_area_to_watches: Dict[int, dict[str, dict]]
|
|
watches: Dict[str, tuple[int, int, str]]
|
|
item_data: dict[str, "DSItem"]
|
|
|
|
addr_game_state: "Address"
|
|
addr_slot_id: "Address"
|
|
addr_received_item_index: "Address"
|
|
addr_stage: "Address"
|
|
addr_room: "Address"
|
|
addr_entrance: "Address"
|
|
save_spam_protection: bool
|
|
stage_flag_address: "Address" # Stage flag address
|
|
health_address: "Address"
|
|
|
|
treasure_tracker: dict["Address" or str, int]
|
|
|
|
starting_flags: list
|
|
dungeon_key_data: dict
|
|
|
|
starting_entrance: tuple # stage_id, room_id, entrance_id
|
|
scene_addr: tuple # stage, room, floor, entrance
|
|
exit_coords_addr: tuple # x, y, z. what coords to spawn link at when entering a continuous transition
|
|
|
|
dynamic_entrances_by_scene: dict
|
|
|
|
stage_flag_offset: int
|
|
er_y_offest: int # In ph i use coords who's y is 164 off the entrance y
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
# all grabbed from util
|
|
self.item_id_to_name = build_item_id_to_name_dict()
|
|
self.location_name_to_id = build_location_name_to_id_dict()
|
|
self.location_area_to_watches = build_location_room_to_watches()
|
|
self.scene_to_dynamic_flag = build_scene_to_dynamic_flag()
|
|
self.hint_scene_to_watches = build_hint_scene_to_watches()
|
|
self.entrance_id_to_entrance = build_entrance_id_to_data()
|
|
|
|
self.entrances = {}
|
|
self.hint_data = {}
|
|
|
|
self.local_checked_locations = set()
|
|
self.local_scouted_locations = set()
|
|
self.local_tracker = {}
|
|
|
|
self._set_deathlink = False # Check for toggling death link setting
|
|
self.last_deathlink = None
|
|
self.was_alive_last_frame = False
|
|
self.is_expecting_received_death = False
|
|
self.is_dead = False # Read from read_result
|
|
|
|
self.save_slot = 0
|
|
self.version_offset = 0
|
|
|
|
self.last_scene = None
|
|
self.locations_in_scene = {}
|
|
self.watches = {}
|
|
self.receiving_location = False
|
|
self.last_vanilla_item: list[str | list[tuple[str, int]]] = []
|
|
self.delay_reset = False
|
|
self.getting_location = False
|
|
|
|
self._previous_game_state = False # Updated every successful cycle
|
|
self._just_entered_game = False # Set when disconnected or on menu, unset after one full cycle of fully loaded
|
|
self._loaded_menu_read_list = False #
|
|
self._from_menu = True # Last scene was menu
|
|
self._dynamic_flags_to_reset = []
|
|
|
|
self.main_read_list: list["Address"] = []
|
|
self.read_result = {}
|
|
self.current_stage = 0xB
|
|
self.current_scene = None
|
|
self.current_room = None
|
|
self.last_stage = None
|
|
self.entering_from = None
|
|
self.entering_dungeon = None
|
|
self.current_entrance = None
|
|
|
|
self.new_stage_loading = None
|
|
self.getting_location_type = None
|
|
|
|
self._entered_entrance = False
|
|
self._loading_scene = False
|
|
self._backup_coord_read = None
|
|
self.prev_rupee_count = 0
|
|
self._log_received_items = False
|
|
|
|
self.warp_to_start_flag = False
|
|
self.er_map: dict[int, dict["DSTransition", "DSTransition"]] = {}
|
|
self.er_in_scene: dict["DSTransition", "DSTransition"] | None = None
|
|
self.er_messages = dict() # ER-message to send on certain entrances
|
|
self.er_exit_coord_writes: list | None = None
|
|
self.visited_scenes = set()
|
|
|
|
self.delay_pickup = None
|
|
self.last_key_count = 0
|
|
self.key_address: "Address" = addr_null
|
|
|
|
self.last_dungeon_warp_target = None
|
|
self.tried_short_cs = False
|
|
|
|
self.precision_mode = None
|
|
self.precision_operation = None
|
|
self.heal_on_load = False
|
|
self.precision_delay_flags = False
|
|
|
|
self.lss_retry_attempts = 4
|
|
self.last_saved_scene = None
|
|
|
|
def item_count(self, ctx, item_name, items_received=-1) -> int:
|
|
return self.item_data[item_name].get_count(ctx, items_received)
|
|
|
|
async def validate_rom(self, ctx: "BizHawkClientContext") -> bool:
|
|
try:
|
|
if not await self.check_game_version(ctx):
|
|
logger.error("Invalid rom")
|
|
return False
|
|
except bizhawk.RequestFailedError:
|
|
logger.error("Invalid rom")
|
|
return False
|
|
except UnicodeDecodeError:
|
|
logger.error("You are using Bizhawk version 2.9.x, please use version 2.10.x")
|
|
return False
|
|
|
|
ctx.game = self.game
|
|
ctx.items_handling = 0b111
|
|
ctx.want_slot_data = True
|
|
ctx.watcher_timeout = 0.4
|
|
print(f"validation: {ctx.game}, {ctx.items_handling}")
|
|
return True
|
|
|
|
async def check_game_version(self, ctx: "BizHawkClientContext") -> bool:
|
|
"""
|
|
DSZeldaClient calls validate rom, this is for detecting game version and setting game specific variables
|
|
:param ctx:
|
|
:return: valid rom
|
|
"""
|
|
return False
|
|
|
|
def on_package(self, ctx, cmd, args):
|
|
if cmd == 'Connected':
|
|
if 'death_link' in args['slot_data'] and args['slot_data']['death_link']:
|
|
self._set_deathlink = True
|
|
self.last_deathlink = time.time()
|
|
super().on_package(ctx, cmd, args)
|
|
|
|
def get_coord_address(self, at_sea=None, multi=False) -> dict[str, tuple[int, int, str]]:
|
|
"""
|
|
get a dictionary for link/ship/boat coordinate read data of the current scene
|
|
:param at_sea: guess this still exists, for switching between vehicular and human coords
|
|
:param multi: for when you want to return all possible coord addresses. used as a backup load detector
|
|
:return: dict of link_coord to write_data
|
|
"""
|
|
pass
|
|
|
|
async def get_coords(self, ctx, multi=False) -> dict:
|
|
"""
|
|
organize the coords in a neat dictionary
|
|
:param ctx:
|
|
:param multi: gives all coords
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def full_heal(self, ctx, bonus=0):
|
|
"""
|
|
full heals the player. Called when getting heart containers, but can be called elsewhere too
|
|
:param ctx:
|
|
:param bonus:
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def refill_ammo(self, ctx, text=""):
|
|
"""
|
|
full heals the player. Called when getting heart containers, but can be called elsewhere too
|
|
:param ctx:
|
|
:param text: change what text gets displayed on context
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def watched_intro_cs(self, ctx):
|
|
"""
|
|
you know how it's random whether niko talks or not at the beginning?
|
|
this tries to fix that. make it read a memory value or something
|
|
:param ctx:
|
|
:return:
|
|
"""
|
|
return False
|
|
|
|
async def scout_location(self, ctx: "BizHawkClientContext", locations):
|
|
"""
|
|
sends a hint for the requested locations
|
|
:param ctx:
|
|
:param locations:
|
|
:return:
|
|
"""
|
|
local_scouted_locations = set(ctx.locations_scouted)
|
|
for loc in locations:
|
|
local_scouted_locations.add(LOCATIONS_DATA[loc]["id"])
|
|
|
|
if self.local_scouted_locations != local_scouted_locations:
|
|
self.local_scouted_locations = local_scouted_locations
|
|
await ctx.send_msgs([{
|
|
"cmd": "LocationScouts",
|
|
"locations": list(self.local_scouted_locations),
|
|
"create_as_hint": int(2)
|
|
}])
|
|
|
|
async def get_small_key_address(self, ctx) -> "Address":
|
|
"""
|
|
in ph small keys are tied to map data, in st there is a consistent address for them
|
|
:param ctx:
|
|
:return:
|
|
"""
|
|
return addr_null
|
|
|
|
def process_loading_variable(self, read_result) -> bool:
|
|
"""
|
|
Loading variable can vary whether it should be one or 0
|
|
this should fix that
|
|
:param read_result: dict of all the read data
|
|
:return: is loading
|
|
"""
|
|
return False
|
|
|
|
async def process_in_menu(self, ctx, read_result):
|
|
"""
|
|
Called while in menu
|
|
:param read_result:
|
|
:param ctx:
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def precision_backup(self, ctx, precision_read):
|
|
"""
|
|
Check for false cases after triggering a precision read
|
|
:param precision_read:
|
|
:param ctx:
|
|
:return: True to cancel precision
|
|
"""
|
|
|
|
def clear_variables(self):
|
|
"""
|
|
Called if not connected to the server. For clearing variables when switching slots
|
|
:return:
|
|
"""
|
|
|
|
async def on_connect(self, ctx):
|
|
"""
|
|
Called on connecting
|
|
"""
|
|
|
|
async def game_watcher(self, ctx: "BizHawkClientContext") -> None:
|
|
if not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed or ctx.slot is None or ctx.slot == 0:
|
|
self._just_entered_game = True
|
|
self._loaded_menu_read_list = False
|
|
self.last_scene = None
|
|
self._from_menu = True
|
|
self.er_in_scene = None
|
|
self.lss_retry_attempts = 4
|
|
self.last_saved_scene = None
|
|
self.clear_variables()
|
|
ctx.watcher_timeout = 0.4
|
|
return
|
|
|
|
# Precision mode handling
|
|
if self.precision_mode:
|
|
await bizhawk.lock(ctx.bizhawk_ctx)
|
|
precision_read = await self.precision_mode[0].read(ctx, silent=True)
|
|
print(f"Precision read {precision_read} == {self.precision_mode[1]} mode {self.precision_mode}")
|
|
if precision_read == self.precision_mode[1]:
|
|
print(f"Precision read, not yet")
|
|
ctx.watcher_timeout = 0.01
|
|
await bizhawk.unlock(ctx.bizhawk_ctx)
|
|
await bizhawk.lock(ctx.bizhawk_ctx)
|
|
return
|
|
print(f"Trigger activated!")
|
|
if await self.precision_backup(ctx, precision_read):
|
|
await bizhawk.unlock(ctx.bizhawk_ctx)
|
|
else:
|
|
self.precision_operation = self.precision_mode[2:] if len(self.precision_mode) > 2 else "wts"
|
|
print(f"precision operation: {self.precision_operation}")
|
|
self.precision_mode = None
|
|
|
|
# Enable "DeathLink" tag if option was enabled
|
|
if self._set_deathlink:
|
|
self._set_deathlink = False
|
|
await ctx.update_death_link(True)
|
|
|
|
# Get main read list before entering loop
|
|
if not self._loaded_menu_read_list:
|
|
await self.update_main_read_list(ctx, self.current_stage, in_game=False)
|
|
await self.on_connect(ctx)
|
|
self._loaded_menu_read_list = True
|
|
|
|
try:
|
|
|
|
# Read main read list
|
|
self.read_result = read_result = await read_multiple(ctx, self.main_read_list)
|
|
|
|
in_game = read_result[self.addr_game_state]
|
|
slot_memory = read_result[self.addr_slot_id]
|
|
self.current_stage = current_stage = read_result[self.addr_stage]
|
|
|
|
# Loading variables
|
|
loading_scene = self.process_loading_variable(read_result)
|
|
loading = loading_scene or self._entered_entrance
|
|
|
|
# If player is on title screen, don't do anything else
|
|
if not in_game or current_stage not in STAGES:
|
|
self._previous_game_state = False
|
|
self._from_menu = True
|
|
await self.process_in_menu(ctx, read_result)
|
|
ctx.watcher_timeout = 0.4
|
|
print("NOT IN GAME")
|
|
# Finished game?
|
|
if not ctx.finished_game:
|
|
await self._process_game_completion(ctx)
|
|
if not self.precision_operation:
|
|
return
|
|
|
|
# While game from main menu
|
|
if in_game and not self._previous_game_state:
|
|
if not self.precision_operation and not await self.watched_intro_cs(ctx):
|
|
print("In Intro CS")
|
|
return
|
|
self._just_entered_game = True
|
|
self.last_stage = None
|
|
self.last_scene = None
|
|
|
|
# Single call just entered from menu methods
|
|
if in_game and self._from_menu:
|
|
self._generate_er_map(ctx)
|
|
self._from_menu = False
|
|
ctx.watcher_timeout = 0.1 # 9 frame interval to catch 11 frame ER windows (old)
|
|
# 6 frame intervals to catch bounce timings
|
|
await self.enter_game(ctx)
|
|
print(f"Started Game")
|
|
|
|
self.is_dead = not read_result.get(self.health_address, 12)
|
|
|
|
# Get current scene
|
|
current_room = read_result.get(self.addr_room, None)
|
|
current_room = 0 if current_room == 0xFF and current_stage != 0x29 else current_room # Resetting in a dungeon sets a special value
|
|
current_room = 3 if current_room == 0xFF else current_room
|
|
self.current_room = current_room
|
|
self.current_scene = current_scene = current_stage * 0x100 + current_room
|
|
current_entrance = read_result.get(self.addr_entrance, 0)
|
|
num_received_items = read_result.get(self.addr_received_item_index, None)
|
|
|
|
await self.process_read_list(ctx, read_result)
|
|
|
|
# Process on new room. As soon as it's triggered, changing the scene variable changes entrance destination
|
|
if (current_scene != self.last_scene and not self._entered_entrance and not self._loading_scene) or self.precision_operation:
|
|
print(f"") # New Scene, line space
|
|
# Trigger a different entrance to vanilla
|
|
current_stage, current_room, current_entrance = await self._entrance_warp(ctx, self.current_scene, current_entrance)
|
|
current_scene = current_stage * 0x100 + current_room
|
|
self.current_entrance = current_entrance
|
|
self.current_scene = current_scene
|
|
self.current_stage = current_stage
|
|
self.current_room = current_room
|
|
|
|
# Backup in case of missing loading
|
|
self._backup_coord_read = await self.get_coords(ctx, multi=True)
|
|
|
|
# Send data to tracker
|
|
await self.ut_bounce_scene(ctx, current_scene)
|
|
|
|
# Set dynamic flags on scene
|
|
if not self.precision_delay_flags:
|
|
await self._reset_dynamic_flags(ctx)
|
|
await self._set_dynamic_flags(ctx, current_scene)
|
|
|
|
self._entered_entrance = time.time() # Triggered first part of loading - setting new room
|
|
self.entering_dungeon = None
|
|
if self.delay_reset:
|
|
self.delay_reset = 0
|
|
await self._remove_vanilla_item(ctx, num_received_items)
|
|
|
|
await self.detected_new_scene(ctx)
|
|
|
|
# Nothing happens while loading
|
|
if ctx.server and not loading and not self._loading_scene and not self._entered_entrance:
|
|
# If new file, set up starting flags
|
|
if slot_memory == 0:
|
|
if await self.watched_intro_cs(ctx): # Check if watched intro cs
|
|
await self._set_starting_flags(ctx)
|
|
|
|
# Read for checks on specific global flags
|
|
if len(self.watches) > 0:
|
|
triggered_watches = []
|
|
watch_result = await read_multiple(ctx, self.watches.values(), keys=self.watches.keys())
|
|
for loc_name, prev_value in watch_result.items():
|
|
loc_data = LOCATIONS_DATA[loc_name]
|
|
# print(f"Watch data: {loc_name} {prev_value} {loc_data['value']}")
|
|
if prev_value & loc_data["value"]:
|
|
print(f"Got read item {loc_name} from address {loc_data['address']} "
|
|
f"looking at bit {loc_data['value']}")
|
|
|
|
force_remove = False
|
|
await self._process_checked_locations(ctx, loc_name, force_remove)
|
|
self.receiving_location = True
|
|
triggered_watches.append(loc_name)
|
|
self.watches.pop(loc_name)
|
|
|
|
|
|
# Check if link is getting location
|
|
if self.getting_location and not self.receiving_location and self.locations_in_scene is not None:
|
|
self.receiving_location = True
|
|
print("Receiving Location")
|
|
if self.delay_reset > 1:
|
|
self.delay_reset = 0
|
|
await self._process_checked_locations(ctx, None, detection_type=self.getting_location_type)
|
|
|
|
# Process received items
|
|
if num_received_items is not None:
|
|
if num_received_items < len(ctx.items_received):
|
|
print(f"Received items: {num_received_items}")
|
|
if self._just_entered_game:
|
|
self._log_received_items = True
|
|
await self._process_received_items(ctx, num_received_items, self._log_received_items)
|
|
else:
|
|
self._log_received_items = False
|
|
|
|
if num_received_items > len(ctx.items_received):
|
|
await self.addr_received_item_index.overwrite(ctx, len(ctx.items_received))
|
|
logger.info(f"Save file has more items than Multiworld. Probable cause: loaded wrong save file. \n"
|
|
f"Reset item count to Multiworld's. If this is the wrong save file, you can safely quit without saving.")
|
|
|
|
# Exit location received cs
|
|
if self.receiving_location and not self.getting_location:
|
|
self.receiving_location = False
|
|
|
|
# Increment delay reset, probably haven't received item yet
|
|
if self.delay_reset == 1:
|
|
self.delay_reset += 1
|
|
print(f"Delay Reset still active, {self.delay_reset}")
|
|
|
|
# Check for delayed pickup first!
|
|
elif self.delay_pickup is not None:
|
|
print(f"Delay pickup {self.delay_pickup}")
|
|
fallback, pickups = self.delay_pickup
|
|
need_fallback = True
|
|
for location, item, value in pickups:
|
|
new_item_read = await self.get_item_read(ctx, item)
|
|
if "Rupee" in item or "Rupoor" in item:
|
|
if new_item_read - value == self.item_data[item].value:
|
|
print(f"delay pickup rupee: {new_item_read - value} == {self.item_data[item].value}")
|
|
await self._process_checked_locations(ctx, location, True, item=item)
|
|
need_fallback = False
|
|
elif new_item_read != value:
|
|
await self._process_checked_locations(ctx, location, True, item=item)
|
|
need_fallback = False
|
|
|
|
if need_fallback:
|
|
vanilla_item = LOCATIONS_DATA[fallback]["vanilla_item"]
|
|
await self._process_checked_locations(ctx, fallback, True, item=vanilla_item)
|
|
|
|
self.delay_pickup = None
|
|
self.last_key_count = 0
|
|
if self.last_vanilla_item:
|
|
print("Delay Pickup is removing vanilla item")
|
|
await self._remove_vanilla_item(ctx, num_received_items)
|
|
|
|
# Remove vanilla item
|
|
elif self.last_vanilla_item:
|
|
print("Item Received Successfully")
|
|
await self._remove_vanilla_item(ctx, num_received_items)
|
|
|
|
await self.process_post_receive(ctx)
|
|
|
|
|
|
await self.detect_warp_to_start(ctx, read_result)
|
|
await self.process_in_game(ctx, read_result)
|
|
|
|
self._just_entered_game = False
|
|
|
|
# Finished game?
|
|
if not ctx.finished_game:
|
|
await self._process_game_completion(ctx)
|
|
|
|
# Process Deathlink
|
|
if "DeathLink" in ctx.tags:
|
|
await self.process_deathlink(ctx, self.is_dead, self.current_stage, read_result)
|
|
|
|
# Started actual scene loading
|
|
if self._entered_entrance and loading_scene:
|
|
self._loading_scene = True # Second phase of loading room
|
|
self._entered_entrance = False
|
|
print(f"Loading Scene {current_scene}, setting coords {self.er_exit_coord_writes}")
|
|
await self._set_er_coords(ctx)
|
|
|
|
# Fully loaded room
|
|
if self._loading_scene and not loading:
|
|
print("Fully Loaded Room", current_scene)
|
|
self._loading_scene = False
|
|
self._backup_coord_read = None
|
|
self.save_spam_protection = False
|
|
|
|
# Set dynamic flags now if precision loading
|
|
if self.precision_delay_flags:
|
|
await self._reset_dynamic_flags(ctx)
|
|
await self._set_dynamic_flags(ctx, current_scene)
|
|
self.precision_delay_flags = False
|
|
|
|
# Load potential entrance warp destinations, and dynamic entrances
|
|
self.er_in_scene = self.er_map.get(current_scene, dict())
|
|
await self._set_dynamic_entrances(ctx, current_scene)
|
|
|
|
print(f"Entered new scene {hex(current_scene)} with ER:")
|
|
for i, v in self.er_in_scene.items():
|
|
print(f"\t{i} => {v} {i.exit}")
|
|
|
|
await self.process_on_room_load(ctx, current_scene, read_result)
|
|
await self._load_local_locations(ctx, self.current_scene)
|
|
await self._process_scouted_locations(ctx, current_scene)
|
|
|
|
# Check if entering dungeon
|
|
if current_stage in self.dungeon_key_data and self.last_stage != current_stage:
|
|
self.entering_dungeon = current_stage
|
|
self.entering_from = self.last_scene
|
|
else:
|
|
self.entering_from = current_scene # stage and room
|
|
|
|
# Run entering stage code
|
|
if self.last_stage != current_stage:
|
|
print("Fully Loaded Stage")
|
|
await self._enter_stage(ctx, current_stage, current_scene)
|
|
await self.update_main_read_list(ctx, current_stage)
|
|
if self.heal_on_load:
|
|
await self.refill_ammo(ctx)
|
|
self.heal_on_load = False
|
|
|
|
# Hard coded room stuff
|
|
await self.process_hard_coded_rooms(ctx, current_scene)
|
|
|
|
self.last_stage = current_stage
|
|
self.last_scene = current_scene
|
|
print(f"Updated last scene!")
|
|
|
|
self._previous_game_state = in_game
|
|
|
|
# In case of a short load being missed, have a backup check on coords (they stay the same during transitions)
|
|
if self._entered_entrance and self._backup_coord_read:
|
|
if time.time() - self._entered_entrance > 1:
|
|
if not loading_scene:
|
|
self._loading_scene = True # Second phase of loading room
|
|
self._entered_entrance = False
|
|
print("Missed loading read, using backup")
|
|
|
|
if self.precision_operation:
|
|
await bizhawk.unlock(ctx.bizhawk_ctx)
|
|
self.precision_operation = None
|
|
|
|
except bizhawk.RequestFailedError:
|
|
# Exit handler and return to main loop to reconnect
|
|
print("Couldn't read data")
|
|
|
|
async def detected_new_scene(self, ctx: "BizHawkClientContext"):
|
|
"""
|
|
Called on having detected a new scene, after updating entrance warp and setting dynaflags etc.
|
|
"""
|
|
pass
|
|
|
|
async def update_main_read_list(self, ctx: "BizHawkClientContext", stage: int, in_game=True):
|
|
"""
|
|
called with in_game=False when connecting for the first time,
|
|
and then called with in_game=True on entering a new stage.
|
|
decide what addresses to read each client cycle. needs to set self.main_read_list
|
|
:param ctx:
|
|
:param stage: useful to read different flags when in vehicle
|
|
:param in_game: for setting flags to read before in game, to be able to detect when in game
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
def _generate_er_map(self, ctx):
|
|
# Creates a map from scene to dict of entrance dataclass to exit dataclass
|
|
if ctx.slot_data.get("er_pairings", None):
|
|
res = {}
|
|
pairings = {int(k): v for k, v in ctx.slot_data["er_pairings"].items()}
|
|
|
|
# Loop through entrance data, format data
|
|
for data in self.entrances.values():
|
|
|
|
# Figure pair from generation
|
|
if data.id in pairings:
|
|
exit_id = pairings[data.id]
|
|
exit_data = self.entrance_id_to_entrance[exit_id]
|
|
|
|
# Create map from scene to entrance dataclass
|
|
res.setdefault(data.scene, {})
|
|
res[data.scene][data] = exit_data
|
|
# print(f"Creating scene data {hex(data.scene)}: {data} => {exit_data}")
|
|
res = self.add_special_er_data(ctx, res, data.scene, data, exit_data)
|
|
|
|
self.er_map = res
|
|
# print(f"ER Map:")
|
|
# for scene, data in self.er_map.items():
|
|
# print(f"\t{hex(scene)}")
|
|
# for d2, d3 in data.items():
|
|
# print(f"\t\t{d2} => {d3}")
|
|
|
|
|
|
|
|
def add_special_er_data(self, ctx, er_map, scene, detect_data, exit_data):
|
|
"""
|
|
for adding special bonus data to ER dictionary.
|
|
Used in ph for making sure Isle of Ruins entrances work no matter the water level.
|
|
:param ctx:
|
|
:param er_map:
|
|
:param scene:
|
|
:param detect_data:
|
|
:param exit_data:
|
|
:return:
|
|
"""
|
|
return er_map
|
|
|
|
async def enter_game(self, ctx):
|
|
"""
|
|
called once on entering game from menu
|
|
:param ctx:
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def _set_starting_flags(self, ctx: "BizHawkClientContext") -> None:
|
|
write_list = self.addr_slot_id.get_write_list(ctx.slot)
|
|
print(f"New game, setting starting flags for slot {ctx.slot}")
|
|
for adr, _value in STARTING_FLAGS:
|
|
write_list += adr.get_write_list(_value)
|
|
print(f"normal flags wl: {write_list}")
|
|
write_list += await self.set_special_starting_flags(ctx)
|
|
await bizhawk.write(ctx.bizhawk_ctx, write_list)
|
|
|
|
async def set_special_starting_flags(self, ctx: "BizHawkClientContext") -> list[tuple[int, list, str]]:
|
|
"""
|
|
gets called on entering a new save file. flags defined in `self.starting_flags` are set automatically. intended
|
|
to be used for conditional flags.
|
|
:param ctx:
|
|
:return: write_list
|
|
"""
|
|
write_list = []
|
|
return write_list
|
|
|
|
async def process_read_list(self, ctx: "BizHawkClientContext", read_result: dict):
|
|
"""
|
|
called every cycle in game, even while loading
|
|
Game watcher just read self.main_read_list. process data to set up key variables
|
|
:param ctx: BizHawkClientContext
|
|
:param read_result: dict of address name to read value
|
|
:return:
|
|
"""
|
|
|
|
async def _entrance_warp(self, ctx, going_to, entrance=0):
|
|
e_write_list = []
|
|
res = ((going_to & 0xFF00) >> 8, going_to & 0xFF, entrance)
|
|
defer_entrance = None
|
|
|
|
def write_entrance(s, r, e):
|
|
return [a.get_inner_write_list(v) for a, v in zip(self.scene_addr, [s, r, 0, e])]
|
|
|
|
def write_er(exit_d: "DSTransition"):
|
|
if exit_d.entrance[2] == 0xFA:
|
|
# Special condition for exiting ships at sea
|
|
new_entrance = tuple(list(exit_d.entrance[:2]) + [exit_d.extra_data["ship_exit"]])
|
|
write_res = write_entrance(*new_entrance)
|
|
else:
|
|
write_res = write_entrance(*exit_d.entrance)
|
|
|
|
if exit_d.entrance[2] > 0xFA:
|
|
self.er_exit_coord_writes = [addr.get_inner_write_list(coord) for addr, coord in zip(self.exit_coords_addr, exit_d.coords)]
|
|
write_res += self.write_respawn_entrance(exit_d)
|
|
|
|
return write_res
|
|
|
|
def post_process(d):
|
|
new_entrance = d.entrance
|
|
# Ship exits are weird
|
|
if new_entrance[2] == 0xFA:
|
|
new_entrance = tuple(list(new_entrance[:2]) + [d.extra_data["ship_exit"]])
|
|
d.debug_print()
|
|
return write_er(d), new_entrance
|
|
|
|
# Precision Warp
|
|
if self.precision_operation:
|
|
if self.precision_operation == "wts" or "wts" in self.precision_operation:
|
|
print(f"Precision Warp to start")
|
|
self.warp_to_start_flag = True
|
|
self.precision_delay_flags = True
|
|
elif isinstance(self.precision_operation, list):
|
|
if self.precision_operation[0] == "warp":
|
|
e_write_list, res = post_process(self.precision_operation[1])
|
|
self.precision_delay_flags = True
|
|
|
|
# Warp to start
|
|
if self.warp_to_start_flag:
|
|
self.warp_to_start_flag = False
|
|
home = self.starting_entrance[0]*0x100 + self.starting_entrance[1]
|
|
if home != self.last_scene or self.precision_operation == "wts":
|
|
e_write_list += write_entrance(*self.starting_entrance)
|
|
res = self.starting_entrance
|
|
self.current_stage = self.starting_entrance[0]
|
|
self.heal_on_load = True
|
|
logger.info("Warping to Start and Refilling Ammo")
|
|
else:
|
|
logger.info("Warp to start failed, warping from home scene")
|
|
|
|
# Map warp
|
|
elif getattr(self, "map_warp", None):
|
|
logger.info(f"Map warping to {self.map_warp.name}")
|
|
e_write_list, res = post_process(self.map_warp)
|
|
self.map_warp = None
|
|
|
|
|
|
elif self.er_in_scene:
|
|
|
|
# Determine Entrance Warp
|
|
coords = await self.get_coords(ctx)
|
|
for detect_data, exit_data in self.er_in_scene.items():
|
|
# print(f"trying to detect ER {res} {detect_data.entrance} {detect_data.detect_exit(going_to, entrance, coords, self.er_y_offest)}")
|
|
if detect_data.detect_exit(going_to, entrance, coords, self.er_y_offest):
|
|
if await self.conditional_er(ctx, exit_data):
|
|
print(f"Detected entrance: {detect_data} => {exit_data}")
|
|
e_write_list, res = post_process(exit_data)
|
|
defer_entrance = "traverse"
|
|
if detect_data in self.er_messages:
|
|
logger.info(self.er_messages[detect_data])
|
|
else:
|
|
e_write_list, res = post_process(detect_data)
|
|
if ctx.slot_data.get("ut_blocked_entrances_behaviour", 0) in [0, 2]:
|
|
defer_entrance = "check"
|
|
break
|
|
|
|
# Unrandomized entrances can still have bounce conditions
|
|
if not e_write_list:
|
|
bounce_entrance = await self.conditional_bounce(ctx, going_to, entrance)
|
|
print(f"Trying bounce: {bounce_entrance}")
|
|
if bounce_entrance:
|
|
e_write_list, res = post_process(bounce_entrance)
|
|
|
|
|
|
if e_write_list:
|
|
print(f"Writing entrance warp {e_write_list}")
|
|
await bizhawk.write(ctx.bizhawk_ctx, e_write_list)
|
|
if defer_entrance:
|
|
await self.store_visited_entrances(ctx, detect_data, exit_data, defer_entrance)
|
|
|
|
return res
|
|
|
|
def write_respawn_entrance(self, exit_data):
|
|
"""
|
|
when at sea in ph with island shuffle on, the respawn point is not tied to the exit and must be set manually.
|
|
:param exit_data:
|
|
:return: list of write data
|
|
"""
|
|
return []
|
|
|
|
async def conditional_bounce(self, cxt, scene, entrance) -> "Entrance" or None:
|
|
"""
|
|
checks for bounce conditions if entrance is not affected by ER.
|
|
returns the entrance to return to
|
|
:param cxt:
|
|
:param scene:
|
|
:param entrance:
|
|
:return:
|
|
"""
|
|
|
|
async def process_post_receive(self, ctx):
|
|
"""
|
|
Called after finished receiving item, after _remove_vanilla_item and delay_pickup
|
|
"""
|
|
|
|
async def store_visited_entrances(self, ctx, detect_data, exit_data, interaction=None):
|
|
"""
|
|
store visited entrances as a set of ints to datastorage
|
|
:param ctx:
|
|
:param detect_data:
|
|
:param exit_data:
|
|
:param interaction: allows for different stuff, ph uses it for traverse/check data
|
|
:return:
|
|
"""
|
|
|
|
async def conditional_er(self, ctx, exit_data, silent=False) -> bool:
|
|
"""
|
|
for handling custom conditional ER statements.
|
|
If return false, ER will pop you back out the entrance you came from
|
|
:param ctx:
|
|
:param exit_data:
|
|
:return:
|
|
"""
|
|
return True
|
|
|
|
async def _reset_dynamic_flags(self, ctx):
|
|
print(f"resetting flags {self._dynamic_flags_to_reset}")
|
|
reset_data = [DYNAMIC_FLAGS[n] for n in self._dynamic_flags_to_reset]
|
|
res = await self._process_dynamic_flags(ctx, reset_data)
|
|
self._dynamic_flags_to_reset.clear()
|
|
return res
|
|
|
|
async def _set_dynamic_flags(self, ctx, scene):
|
|
# Loop dynamic flags in scene
|
|
if scene in self.scene_to_dynamic_flag:
|
|
print(f"Flags on Scene: {[i['name'] for i in self.scene_to_dynamic_flag[scene]]}")
|
|
return await self._process_dynamic_flags(ctx, self.scene_to_dynamic_flag[scene], True)
|
|
return []
|
|
# Main Loop
|
|
|
|
async def _process_dynamic_flags(self, ctx, flag_list, reset=False):
|
|
read_addr = set()
|
|
set_bits, unset_bits = {}, {}
|
|
for data in flag_list:
|
|
|
|
# Items, locations, slot data
|
|
if not await self._has_dynamic_requirements(ctx, data):
|
|
continue
|
|
|
|
# Create read/write lists
|
|
for a, v in data.get("set_if_true", []):
|
|
read_addr.add(a)
|
|
# You can add an item name as a value, and it will set the value to it's count
|
|
if type(v) is str:
|
|
v = self.item_count(ctx, v)
|
|
set_bits[a] = set_bits.get(a, 0) | v
|
|
print(f"\tsetting bit for {data['name']}")
|
|
for a, v in data.get("unset_if_true", []):
|
|
read_addr.add(a)
|
|
unset_bits[a] = unset_bits.get(a, 0) | v
|
|
print(f"\tunsetting bit for {data['name']}")
|
|
for a, v in data.get("overwrite_if_true", []):
|
|
read_addr.add(a)
|
|
if type(v) is str:
|
|
v = self.item_count(ctx, v)
|
|
set_bits[a] = v
|
|
unset_bits[a] = ~v
|
|
print(f"\toverwriting bit for {data['name']}")
|
|
|
|
# Special full heal condition
|
|
if "full_heal" in data:
|
|
await self.full_heal(ctx)
|
|
|
|
# Create list of flags to reset
|
|
if reset:
|
|
self._dynamic_flags_to_reset += data.get("reset_flags", [])
|
|
|
|
# Write dynamic flags to memory
|
|
read_list = read_addr
|
|
prev = await read_multiple(ctx, read_list)
|
|
print(f"prevs: {[[a, hex(v)] for a, v in prev.items()]}")
|
|
|
|
# Calculate values to write
|
|
for a, v in set_bits.items():
|
|
prev[a] = prev[a] | v
|
|
for a, v in unset_bits.items():
|
|
prev[a] = prev[a] & (~v)
|
|
|
|
# Write
|
|
write_list = [a.get_inner_write_list(v) for a, v in prev.items()]
|
|
await bizhawk.write(ctx.bizhawk_ctx, write_list)
|
|
return write_list
|
|
|
|
async def _set_dynamic_entrances(self, ctx, scene):
|
|
print(f"Setting dynamic Entrances on {hex(scene)}:")
|
|
for data in self.dynamic_entrances_by_scene.get(scene, dict()).values():
|
|
|
|
# Check requirements
|
|
if not await self._has_dynamic_requirements(ctx, data):
|
|
continue
|
|
|
|
# Overwrite er_in_scene with dynamic entrance
|
|
detect_data = data["detect_data"]
|
|
if data["exit_data"] is None:
|
|
if data["destination"] == "_connected_dungeon_entrance":
|
|
dung_entr = self.update_boss_warp(ctx, self.current_stage, scene)
|
|
if dung_entr is not None:
|
|
self.er_in_scene[detect_data] = dung_entr
|
|
else:
|
|
self.er_in_scene[detect_data] = data["exit_data"]
|
|
if "message" in data:
|
|
self.er_messages[detect_data] = data.get("message", None)
|
|
print(f"\t{detect_data} => {data['exit_data']}")
|
|
|
|
async def _has_dynamic_requirements(self, ctx, data) -> bool:
|
|
def check_items(d):
|
|
if "has_items" in d:
|
|
counter = [0] * len(d["has_items"])
|
|
label = "has_items"
|
|
elif "not_has_all_items" in d:
|
|
counter = [0] * len(d["not_has_all_items"])
|
|
label = "not_has_all_items"
|
|
else:
|
|
return True
|
|
|
|
for i, want_item in enumerate(d[label]):
|
|
counter[i] = self.item_count(ctx, want_item[0])
|
|
# print(f"Item Counter {d['name']}: {counter}")
|
|
|
|
for item, count_have in zip(d.get("has_items", []), counter):
|
|
item, count_want, *operation = item
|
|
if not operation:
|
|
if (count_want == 0 and count_have != 0) or (count_want > 0 and count_have < count_want):
|
|
return False
|
|
elif operation[0] == "has_exact":
|
|
if count_want != count_have:
|
|
return False
|
|
elif operation[0] == "not_has":
|
|
if count_have >= count_want:
|
|
return False
|
|
|
|
not_have_counter = 0
|
|
for item, count_have in zip(d.get("not_has_all_items", []), counter):
|
|
item, count_want, *operation = item
|
|
# print(f"count have {count_have} >= {count_want}")
|
|
if count_have >= count_want:
|
|
not_have_counter += 1
|
|
if not_have_counter == len(counter):
|
|
return False
|
|
|
|
return True
|
|
|
|
# Check location conditions
|
|
def check_locations(d):
|
|
for loc in d.get("has_locations", []):
|
|
if self.location_name_to_id[loc] not in ctx.checked_locations:
|
|
return False
|
|
for loc in d.get("not_has_locations", []):
|
|
if self.location_name_to_id[loc] in ctx.checked_locations:
|
|
return False
|
|
if "any_not_has_locations" in d:
|
|
for loc in d.get("any_not_has_locations", []):
|
|
if self.location_name_to_id[loc] not in ctx.checked_locations:
|
|
return True
|
|
return False
|
|
if "any_has_locations" in d:
|
|
for loc in d.get("any_has_locations", []):
|
|
if self.location_name_to_id[loc] in ctx.checked_locations:
|
|
return True
|
|
return False
|
|
return True
|
|
|
|
def check_slot_data(d):
|
|
if "has_slot_data" in d:
|
|
for slot, value in d["has_slot_data"]:
|
|
if type(value) is list:
|
|
if ctx.slot_data.get(slot, None) not in value:
|
|
return False
|
|
else:
|
|
if ctx.slot_data.get(slot, None) != value:
|
|
return False
|
|
return True
|
|
|
|
# Came from particular location
|
|
def check_last_room(d):
|
|
# print(f"checking last scene {self.last_scene} {d.get('last_scenes', [])}")
|
|
for i in d.get("not_last_scenes", []):
|
|
if self.last_scene == i:
|
|
return False
|
|
for i in d.get("last_scenes", []):
|
|
if self.last_scene != i:
|
|
return False
|
|
return True
|
|
|
|
# Read a dict of addresses to see if they match value
|
|
async def check_bits(d):
|
|
if "check_bits" in d:
|
|
r_list = [addr for addr, *_ in d["check_bits"]]
|
|
v_lookup = {addr: v for addr, v, *args in d["check_bits"]}
|
|
arg_lookup = {addr: args for addr, v, *args in d["check_bits"] if args}
|
|
values = await read_multiple(ctx, r_list)
|
|
for addr, p in values.items():
|
|
if not arg_lookup.get(addr, False):
|
|
if not (p & v_lookup[addr]):
|
|
return False
|
|
elif "not" in arg_lookup.get(addr, ""):
|
|
if p & v_lookup[addr]:
|
|
return False
|
|
return True
|
|
|
|
def has_entrance(d):
|
|
if "not_on_entrance" in d:
|
|
if self.current_entrance in d["not_on_entrance"]:
|
|
return False
|
|
if "on_entrance" in d:
|
|
if self.current_entrance not in d["on_entrance"]:
|
|
return False
|
|
return True
|
|
|
|
if not check_items(data):
|
|
print(f"\t{data['name']} does not have item reqs")
|
|
return False
|
|
if not check_locations(data):
|
|
print(f"\t{data['name']} does not have location reqs")
|
|
return False
|
|
if not check_slot_data(data):
|
|
print(f"\t{data['name']} does not have slot data reqs")
|
|
return False
|
|
if not check_last_room(data):
|
|
print(f"\t{data['name']} came from wrong room {hex(self.last_scene)}")
|
|
return False
|
|
if not await check_bits(data):
|
|
print(f"\t{data['name']} is missing bits")
|
|
return False
|
|
if not await self.has_special_dynamic_requirements(ctx, data):
|
|
return False
|
|
if not has_entrance(data):
|
|
return False
|
|
|
|
return True
|
|
|
|
async def has_special_dynamic_requirements(self, ctx, data) -> bool:
|
|
"""
|
|
for adding game specific dynamic parameters
|
|
ph uses this for beedle points and metal counters
|
|
:param ctx:
|
|
:param data:
|
|
:return:
|
|
"""
|
|
return True
|
|
|
|
async def _process_checked_locations(self, ctx: "BizHawkClientContext", pre_process: str = None, r=False,
|
|
detection_type=None, item: str | None = None):
|
|
local_checked_locations = set()
|
|
all_checked_locations = ctx.checked_locations
|
|
location = None
|
|
|
|
# If sent with a pre-proces kwarg
|
|
if pre_process is not None:
|
|
self.receiving_location = True
|
|
loc_id = self.location_name_to_id[pre_process]
|
|
location = LOCATIONS_DATA[pre_process]
|
|
if r or (loc_id not in all_checked_locations):
|
|
await self._set_vanilla_item(ctx, location, item)
|
|
local_checked_locations.add(loc_id)
|
|
print(f"pre-processed {pre_process}, vanill {self.last_vanilla_item}")
|
|
else:
|
|
# Get link's coords
|
|
link_coords = await self.get_coords(ctx)
|
|
|
|
# Certain checks use their detection method to differentiate them, like frogs and salvage
|
|
locations_in_scene = self.locations_in_scene.copy()
|
|
|
|
# Figure out what check was just gotten
|
|
for i, loc in enumerate(locations_in_scene.items()):
|
|
loc_name, location = loc
|
|
loc_bytes = self.location_name_to_id[loc_name]
|
|
|
|
if "address" in location or self.cancel_location_read(location):
|
|
location = None
|
|
continue
|
|
|
|
print(f"Processing locs {loc_name}")
|
|
print(
|
|
f"\tx: {location.get('x_max', 0x8FFFFFFF)} > {link_coords['x']} > {location.get('x_min', -0x8FFFFFFF)}")
|
|
print(
|
|
f"\ty: {location.get('y', link_coords['y']) + 1000} > {link_coords['y']} >= {location.get('y', link_coords['y'])}")
|
|
print(
|
|
f"\tz: {location.get('z_max', 0x8FFFFFFF)} > {link_coords['z']} > {location.get('z_min', -0x8FFFFFFF)}")
|
|
|
|
|
|
if (location.get("x_max", 0x8FFFFFFF) > link_coords["x"] > location.get("x_min", -0x8FFFFFFF) and
|
|
location.get("z_max", 0x8FFFFFFF) > link_coords["z"] > location.get("z_min", -0x8FFFFFFF) and
|
|
location.get("y", link_coords["y"]) + 1000 > link_coords["y"] >= location.get("y", link_coords["y"])):
|
|
# For rooms with checks that move or are close, check what you got first
|
|
if "delay_pickup" in location:
|
|
if len(self.locations_in_scene) > i + 1:
|
|
await self._set_delay_pickup(ctx, loc_name, location)
|
|
break
|
|
|
|
local_checked_locations.add(loc_bytes)
|
|
await self._set_vanilla_item(ctx, location)
|
|
print(f"Got location {loc_name}! with vanilla {self.last_vanilla_item} id {loc_bytes}")
|
|
self.locations_in_scene.pop(loc_name) # Remove location for overlapping purposes
|
|
break
|
|
location = None
|
|
|
|
if location is not None:
|
|
if "set_bit" in location:
|
|
for addr, bit in location["set_bit"]:
|
|
print(f"Setting bit {bit} for location vanil {location['vanilla_item']}")
|
|
await addr.set_bits(ctx, bit)
|
|
|
|
# Delay reset of vanilla item from certain address reads
|
|
if "delay_reset" in location:
|
|
self.delay_reset = 1
|
|
print(f"Started Delay Reset for {self.last_vanilla_item}")
|
|
|
|
# Send locations
|
|
# print(f"Local locations: {local_checked_locations} in \n{all_checked_locations}")
|
|
if any([i not in all_checked_locations for i in local_checked_locations]):
|
|
print(f"Sending Locations: {local_checked_locations}")
|
|
await ctx.send_msgs([{
|
|
"cmd": "LocationChecks",
|
|
"locations": list(local_checked_locations)
|
|
}])
|
|
|
|
await self.check_location_post_processing(ctx, location)
|
|
|
|
def cancel_location_read(self, location) -> bool:
|
|
"""
|
|
called on the main path of _process_checked_location.
|
|
used to cancel special reads that should only happen on special reads
|
|
used in st for stamp book locations
|
|
:param location:
|
|
:return:
|
|
"""
|
|
return False
|
|
|
|
async def get_item_read(self, ctx, item_name: str) -> int:
|
|
if "Small Key" in item_name:
|
|
return await self.key_address.read(ctx)
|
|
item = self.item_data[item_name]
|
|
return await item.address.read(ctx)
|
|
|
|
async def _set_delay_pickup(self, ctx, loc_name, location):
|
|
delay_locations = []
|
|
delay_pickup = location["delay_pickup"]
|
|
if type(delay_pickup) is str:
|
|
delay_locations.append(delay_pickup)
|
|
elif type(delay_pickup) is list:
|
|
delay_locations += delay_pickup
|
|
|
|
self.delay_pickup = [loc_name, []]
|
|
for loc in delay_locations:
|
|
delay_item_check: str | list[str] = LOCATIONS_DATA[loc]["vanilla_item"]
|
|
if isinstance(delay_item_check, str):
|
|
delay_item_check = [delay_item_check]
|
|
for item in delay_item_check:
|
|
self.delay_pickup[1].append([loc, item, await self.get_item_read(ctx, item)])
|
|
if "Potion" in item:
|
|
overflow_item = self.item_data[item].overflow_item
|
|
self.delay_pickup[1].append([loc, overflow_item, await self.get_item_read(ctx, overflow_item)])
|
|
print(f"Delay pickup {self.delay_pickup}")
|
|
# Processes events defined in data\dynamic_flags.py
|
|
|
|
async def _set_vanilla_item(self, ctx, location, vanilla_item: str | None = None):
|
|
item: str | list[str] = vanilla_item or location.get("vanilla_item", None)
|
|
if item is None:
|
|
return
|
|
if isinstance(item, str):
|
|
item_data = self.item_data[item]
|
|
print(f"Setting vanilla for {item_data}")
|
|
if item is not None and not hasattr(item_data, "dummy"):
|
|
if ("incremental" in item_data.tags
|
|
or hasattr(item_data, "progressive")
|
|
or item_data.id not in [i.item for i in ctx.items_received]
|
|
or "always_process" in item_data.tags
|
|
or "monotone_incremental" in item_data.tags):
|
|
self.last_vanilla_item.append(item)
|
|
|
|
await self.unset_special_vanilla_items(ctx, location, item)
|
|
|
|
# If there are multiple items possible at this location, store all of them with current counts for later
|
|
else:
|
|
self.last_vanilla_item.append([(_item, await self.get_item_read(ctx, _item)) for _item in item])
|
|
|
|
async def unset_special_vanilla_items(self, ctx, location, item):
|
|
"""
|
|
called after _set_vanilla_item if it was successful.
|
|
self.last_vanilla_item.pop() any item/location combinations you don't want to remove
|
|
used for farmable rupee spots, or overlap between progressive and non progressive variants of the same item
|
|
:param ctx:
|
|
:param location:
|
|
:param item:
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def check_location_post_processing(self, ctx, location: dict):
|
|
"""
|
|
for running code on specific locations
|
|
in st, this is used for sending goal on location
|
|
:param ctx:
|
|
:param location:
|
|
:return:
|
|
"""
|
|
return
|
|
|
|
async def _process_received_items(self, ctx: "BizHawkClientContext", num_received_items: int, log_items=False) -> None:
|
|
next_item_id = ctx.items_received[num_received_items].item
|
|
item_name = self.item_id_to_name[next_item_id]
|
|
item_data = self.item_data[item_name]
|
|
|
|
if log_items:
|
|
logger.info(f"Received Backlogged Item: {item_name}")
|
|
|
|
# Increment in-game items received count
|
|
write_list = self.addr_received_item_index.get_write_list(num_received_items+1)
|
|
print(f"Vanilla item: {self.last_vanilla_item} for {item_name}")
|
|
|
|
# If same as vanilla item don't remove
|
|
if self.last_vanilla_item and item_name == self.last_vanilla_item[-1] and "always_process" not in item_data.tags:
|
|
self.last_vanilla_item.pop()
|
|
print(f"oops it's vanilla or dummy! {self.last_vanilla_item}")
|
|
else:
|
|
write_list += await item_data.receive_item(self, ctx, num_received_items)
|
|
|
|
# Write the new item to memory!
|
|
print("Write list:")
|
|
for addr, v, domain in write_list:
|
|
print(f" {hex(addr)}: {v} ({domain})")
|
|
# print(f"Write list: {write_list}")
|
|
await bizhawk.write(ctx.bizhawk_ctx, write_list)
|
|
|
|
await self.receive_item_post_processing(ctx, item_name, item_data)
|
|
# Called when a stage has fully loaded
|
|
|
|
async def receive_key_in_own_dungeon(self, ctx, item_name: str, write_keys_to_storage) -> list:
|
|
"""
|
|
called in `_process_received_items` if you receive a key in it's own dungeon.
|
|
for giving the player the key directly
|
|
:param ctx:
|
|
:param item_name:
|
|
:param write_keys_to_storage: inner function that writes keys to storage based on key data
|
|
:return: write data
|
|
"""
|
|
return []
|
|
|
|
async def received_special_small_keys(self, ctx, item_name, write_keys_to_storage) -> list:
|
|
"""
|
|
called in `_process_received_items` if you got a small key. for doing special key stuff.
|
|
in ph, this is used for saving giving midway keys
|
|
:param ctx:
|
|
:param item_name:
|
|
:param write_keys_to_storage: inner function that writes keys to storage based on key data
|
|
:return: write data
|
|
"""
|
|
return []
|
|
|
|
async def received_special_incremental(self, ctx, item_data) -> int:
|
|
"""
|
|
processes incremental item values that are strings for special data, ofter defined by slot data
|
|
:param ctx:
|
|
:param item_data: item data
|
|
:return: value to increment by
|
|
"""
|
|
return 0
|
|
# Called when checking location!
|
|
|
|
async def receive_special_items(self, ctx, item_name, item_data) -> list[tuple[int, list, str]]:
|
|
"""
|
|
called in `_process_received_items` for adding custom item cases
|
|
:param ctx:
|
|
:param item_name:
|
|
:param item_data:
|
|
:return: write list
|
|
"""
|
|
return []
|
|
|
|
async def receive_item_post_processing(self, ctx, item_name, item_data):
|
|
"""
|
|
called at the end of `_process_received_items`. for calling other functions on getting items.
|
|
ph uses it for giving all the ship parts as one item, for resetting the treasure tracker
|
|
and for sending hints on getting treasure maps
|
|
:param ctx:
|
|
:param item_name:
|
|
:param item_data:
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def _remove_vanilla_item(self, ctx: "BizHawkClientContext", num_received_items):
|
|
print(f"Removing vanilla items {self.last_vanilla_item}")
|
|
for item in self.last_vanilla_item:
|
|
if isinstance(item, str):
|
|
item_object = self.item_data[item]
|
|
write_list = await item_object.remove_vanilla(self, ctx, num_received_items)
|
|
await bizhawk.write(ctx.bizhawk_ctx, write_list)
|
|
else:
|
|
# If item is a list of items, we instead want to check which one Link got and loop that back into this process
|
|
for _item, _count in item:
|
|
new_item_read = await self.get_item_read(ctx, _item)
|
|
if "Rupee" in _item or "Rupoor" in _item:
|
|
if new_item_read - _count == self.item_data[_item].value:
|
|
self.last_vanilla_item.append(_item)
|
|
break
|
|
elif new_item_read != _count:
|
|
self.last_vanilla_item.append(_item)
|
|
break
|
|
self.last_vanilla_item.clear()
|
|
|
|
async def detect_warp_to_start(self, ctx, read_result: dict):
|
|
"""
|
|
called every cycle in game. detect warp to start, and cancel any nasty conflicts
|
|
:param ctx:
|
|
:param read_result:
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def process_in_game(self, ctx, read_result: dict):
|
|
"""
|
|
called every cycle in game, not while loading
|
|
:param ctx:
|
|
:param read_result:
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def process_game_completion(self, ctx: "BizHawkClientContext"):
|
|
"""
|
|
Process if player has reached goal
|
|
:param ctx: BizHawkClientContext
|
|
:return: sends game completion to server if return true
|
|
"""
|
|
return False
|
|
|
|
async def process_on_room_load(self, ctx, current_scene, read_result: dict):
|
|
"""
|
|
called once when room is fully loaded, early in the sequence
|
|
:param ctx:
|
|
:param current_scene:
|
|
:param read_result:
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def process_hard_coded_rooms(self, ctx, current_scene):
|
|
"""
|
|
called when room has fully loaded, after most of the obligate methods
|
|
for running specific code for specific rooms, that can't be handled by dynamic flags
|
|
:param ctx:
|
|
:param current_scene:
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def _set_er_coords(self, ctx):
|
|
if self.er_exit_coord_writes:
|
|
await bizhawk.write(ctx.bizhawk_ctx, self.er_exit_coord_writes)
|
|
self.er_exit_coord_writes = None
|
|
|
|
async def enter_special_key_room(self, ctx, stage, scene_id) -> bool:
|
|
"""
|
|
called on entering a new stage, to set small keys in a different way to default
|
|
used in ph to give totok keys without resetting the counter
|
|
:param ctx:
|
|
:param stage:
|
|
:param scene_id:
|
|
:return: true if did special operation, false if not and want to do normal operation
|
|
"""
|
|
return False
|
|
|
|
async def set_stage_flags(self, ctx, stage):
|
|
"""
|
|
called on entering a new stage. sets stage flags. ST doesn't do this yet
|
|
:param ctx:
|
|
:param stage:
|
|
:return:
|
|
"""
|
|
pass
|
|
|
|
async def _enter_stage(self, ctx, stage, scene_id):
|
|
await self.set_stage_flags(ctx, stage)
|
|
# Give dungeon keys
|
|
if stage in self.dungeon_key_data:
|
|
if not await self.enter_special_key_room(ctx, stage, scene_id):
|
|
await self.update_key_count(ctx, stage)
|
|
self.entering_from = scene_id
|
|
|
|
def update_boss_warp(self, ctx, stage, scene_id):
|
|
"""
|
|
method for setting self.last_dungeon_warp_target for redirecting warps after bosses in entrance rando
|
|
:param ctx:
|
|
:param stage:
|
|
:param scene_id:
|
|
:return: PHTransition for the location
|
|
"""
|
|
return None
|
|
|
|
async def _load_local_locations(self, ctx, scene):
|
|
# Load locations in room into loop
|
|
self.locations_in_scene = self.location_area_to_watches.get(scene, {}).copy()
|
|
print(f"Locations in scene {hex(scene)}: {self.locations_in_scene.keys()}")
|
|
self.watches = {}
|
|
sram_read_list = set()
|
|
active_srams = []
|
|
locations_found = ctx.checked_locations
|
|
if self.locations_in_scene is not None:
|
|
# Create memory watches for checks triggerd by flags, and make list for checking sram
|
|
for loc_name, location in self.locations_in_scene.items():
|
|
loc_id = self.location_name_to_id[loc_name]
|
|
if loc_id in locations_found and "address" in location:
|
|
read = await location["address"].read(ctx)
|
|
if read & location["value"]:
|
|
print(f"Location {loc_name} has already been found and triggered")
|
|
continue
|
|
else:
|
|
if "sram_addr" in location and location["sram_addr"] is not None:
|
|
active_srams.append((loc_name, location["sram_addr"], location["sram_value"]))
|
|
sram_read_list.add(location["sram_addr"])
|
|
print(f"\tCreated sram read for location {loc_name}")
|
|
|
|
if "address" in location:
|
|
self.watches[loc_name] = location["address"]
|
|
|
|
# Read and set locations missed when bizhawk was disconnected
|
|
if self.save_slot == 0 and len(sram_read_list) > 0:
|
|
sram_reads = await read_multiple(ctx, sram_read_list)
|
|
for loc_name, addr, _value in active_srams:
|
|
if _value & sram_reads[addr]:
|
|
await self._process_checked_locations(ctx, loc_name)
|
|
|
|
async def update_special_key_count(self, ctx, current_stage: int, new_keys:int, key_data: dict, key_values: dict, key_address: int) -> tuple[int, bool]:
|
|
"""
|
|
called on enter stage if you want to change the number of keys written based on a parameter.
|
|
used in ph for removing a totok key after opening the door on 1f
|
|
:param ctx:
|
|
:param current_stage:
|
|
:param new_keys: number of keys in memory
|
|
:param key_data:
|
|
:param key_values: previous key read values, dict
|
|
:param key_address: location of key counter in heap
|
|
:return: number of keys to write, reset key storage
|
|
"""
|
|
return new_keys, True
|
|
|
|
async def update_key_count(self, ctx, current_stage: int) -> None:
|
|
"""
|
|
Called when entering a dungeon. Updates key count based on a tracker counter in memory,
|
|
specified in self.dungeon_key_data
|
|
:param ctx:
|
|
:param current_stage:
|
|
:return:
|
|
"""
|
|
key_address = self.key_address = await self.get_small_key_address(ctx)
|
|
key_data = self.dungeon_key_data.get(current_stage, {})
|
|
tracker = key_data["address"]
|
|
read_list = [key_address, tracker]
|
|
key_values = await read_multiple(ctx, read_list)
|
|
|
|
new_keys = (((key_values[tracker] & key_data["filter"]) // key_data["value"])
|
|
+ key_values[key_address])
|
|
|
|
# Create write list, reset key tracker
|
|
if new_keys != 0:
|
|
new_keys = 7 if new_keys >= 7 else new_keys
|
|
new_keys, reset_key_count = await self.update_special_key_count(ctx, current_stage, new_keys, key_data, key_values, key_address)
|
|
new_keys = 0 if new_keys < 0 else new_keys
|
|
write_list = key_address.get_write_list(new_keys)
|
|
if reset_key_count:
|
|
reset_tracker = (~key_data["filter"]) & key_values[tracker]
|
|
write_list += tracker.get_write_list(reset_tracker)
|
|
|
|
print(f"Finally writing keys to memory {key_address} with value {hex(new_keys)}")
|
|
await bizhawk.write(ctx.bizhawk_ctx, write_list)
|
|
|
|
async def _process_scouted_locations(self, ctx: "BizHawkClientContext", scene):
|
|
def check_items(d):
|
|
for item in d.get("has_items", []):
|
|
if self.item_data[item].id not in [i.item for i in ctx.items_received]:
|
|
return False
|
|
return True
|
|
|
|
def check_slot_data(d):
|
|
for args in d.get("slot_data", []):
|
|
if type(args) is str:
|
|
option, _value = args, [True]
|
|
else:
|
|
option, _value = args
|
|
_value = [_value] if type(_value) is int else _value # Support lists of values
|
|
if ctx.slot_data.get(option, "unknown_slot_data") not in _value:
|
|
return False
|
|
return True
|
|
|
|
local_scouted_locations = set(ctx.locations_scouted)
|
|
if self.hint_scene_to_watches.get(scene, []):
|
|
print(f"hints {self.hint_scene_to_watches.get(scene, [])}")
|
|
for hint_name in self.hint_scene_to_watches.get(scene, []):
|
|
hint_data = self.hint_data[hint_name]
|
|
# Check requirements
|
|
if not check_items(hint_data):
|
|
continue
|
|
if not check_slot_data(hint_data):
|
|
continue
|
|
|
|
# Figure out locations to hint
|
|
if "locations" in hint_data:
|
|
# Hint required dungeons
|
|
if "Dungeon Hints" in hint_data["locations"]:
|
|
local_scouted_locations.update(self.dungeon_hints(ctx))
|
|
else:
|
|
locations_checked = ctx.locations_scouted
|
|
for loc in hint_data["locations"]:
|
|
loc_id = self.location_name_to_id[loc]
|
|
if loc_id in locations_checked:
|
|
continue
|
|
local_scouted_locations.add(loc_id)
|
|
else:
|
|
local_scouted_locations.add(self.location_name_to_id[hint_name])
|
|
|
|
# Send hints
|
|
if self.local_scouted_locations != local_scouted_locations:
|
|
self.local_scouted_locations = local_scouted_locations
|
|
await ctx.send_msgs([{
|
|
"cmd": "LocationScouts",
|
|
"locations": list(self.local_scouted_locations),
|
|
"create_as_hint": int(2)
|
|
}])
|
|
|
|
async def _process_game_completion(self, ctx: "BizHawkClientContext"):
|
|
if await self.process_game_completion(ctx):
|
|
await ctx.send_msgs([{
|
|
"cmd": "StatusUpdate",
|
|
"status": ClientStatus.CLIENT_GOAL
|
|
}])
|
|
|
|
async def process_deathlink(self, ctx: "BizHawkClientContext", is_dead, stage, read_result):
|
|
"""
|
|
process deathlink, both sending and receiving.
|
|
:param ctx:
|
|
:param is_dead:
|
|
:param stage:
|
|
:param read_result:
|
|
:return:
|
|
"""
|
|
|
|
@staticmethod
|
|
async def store_data(ctx: "BizHawkClientContext", key, data, operation="update", default=None):
|
|
default = list() if default is None else default
|
|
data = list(data) if isinstance(data, set) else data
|
|
print(f"Storing data: {key} {operation} {data} {default}")
|
|
await ctx.send_msgs([{
|
|
"cmd": "Set",
|
|
"key": key,
|
|
"default": default,
|
|
"operations": [{"operation": operation, "value": data}]
|
|
}])
|
|
|
|
async def ut_bounce_scene(self, ctx, scene):
|
|
if ctx.slot_data.get("shuffle_overworld_transitions", False):
|
|
scene |= 1 << 16
|
|
print(f"Storing new scene for UT {hex(scene)}")
|
|
await ctx.send_msgs([{
|
|
"cmd": "Set",
|
|
"key": f"{ctx.slot}_{ctx.team}_UT_MAP",
|
|
"default": 0,
|
|
"operations": [{"operation": "replace", "value": scene}]
|
|
}])
|
|
|
|
def dungeon_hints(self, ctx):
|
|
"""
|
|
Write out dungeon hints depending on settings
|
|
:param ctx:
|
|
:return: list of location to scout
|
|
"""
|
|
return []
|
|
|
|
async def save_scene(self, ctx, read_result, save_addr, save_key, save_comp: "Iterable"):
|
|
"""
|
|
Save the current scene to memory. Used in ph for precision warps and st for weird scene stuff from menu
|
|
"""
|
|
if read_result.get(save_addr, False) in save_comp and not self.save_spam_protection:
|
|
print(f"Saving scene {hex(self.current_scene)}")
|
|
self.last_saved_scene = self.current_scene
|
|
await self.store_data(ctx, storage_key(ctx, save_key), self.last_saved_scene, "replace", default=0)
|
|
self.save_spam_protection = True
|
|
return True
|
|
return False
|
|
|
|
async def get_saved_scene(self, ctx, save_key):
|
|
"""
|
|
Get the last saved scene from datastorage. call from menu
|
|
"""
|
|
if not self.last_saved_scene:
|
|
key = storage_key(ctx, save_key)
|
|
await ctx.send_msgs([{
|
|
"cmd": "Get",
|
|
"keys": [key]
|
|
}])
|
|
last_saved_scene = get_stored_data(ctx, save_key)
|
|
print(f"fetched last saved scene: {last_saved_scene}")
|
|
self.last_saved_scene = last_saved_scene if self.lss_retry_attempts >= 0 else 0 # if last_saved_scene is not None else False
|
|
self.lss_retry_attempts -= 1 |