Files
dockipelago/worlds/tloz_ph/Subclasses.py
Jonathan Tinney 7971961166
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
add schedule I, sonic 1/frontiers/heroes, spirit island
2026-04-02 23:46:36 -07:00

451 lines
18 KiB
Python

from typing import TYPE_CHECKING
from BaseClasses import Entrance, Region
from enum import IntEnum
from .DSZeldaClient.subclasses import DSTransition, split_bits, AddrFromPointer
from .DSZeldaClient.ItemClass import DSItem, remove_vanilla_normal
from .data.SwitchLogic import *
from .data.Constants import EQUIPPED_SHIP_PARTS_ADDR, BOSS_DOOR_DATA, ITEM_GROUPS
from .data.Addresses import PHAddr
if TYPE_CHECKING:
from entrance_rando import ERPlacementState
from .Client import PhantomHourglassClient
from worlds._bizhawk.context import BizHawkClientContext
async def receive_ship(client: "PhantomHourglassClient", ctx: "BizHawkClientContext", item: "PHItem", _):
res = []
if not (await PHAddr.custom_storage.read(ctx) & 2):
for addr in EQUIPPED_SHIP_PARTS_ADDR:
res += addr.get_write_list(item.ship)
return res
async def receive_boss_key(client: "PhantomHourglassClient", ctx: "BizHawkClientContext", item: "PHItem", _):
res = []
if (ctx.slot_data.get("boss_key_behaviour", True)
and client.current_stage in BOSS_DOOR_DATA
and BOSS_DOOR_DATA[client.current_stage]["name"] in item.name): # TODO: Add boss door data to boss key items?
data = BOSS_DOOR_DATA[client.current_stage]
last_value = await data["address"].read(ctx)
new_value = last_value | data["value"]
res += data["address"].get_write_list(new_value)
return res
async def receive_potion(client: "PhantomHourglassClient", ctx: "BizHawkClientContext", item: "PHItem", _):
res = []
await client.update_potion_tracker(ctx)
print(f"Potion data: {client.last_potions} {item.value}")
for slot, pot, addr in zip([0, 1], client.last_potions, [PHAddr.potion_left, PHAddr.potion_right]):
if not pot:
res += addr.get_write_list(item.value)
prev = await PHAddr.inventory_2.read(ctx, silent=True)
res += PHAddr.inventory_2.get_write_list(prev | 0x6)
client.last_potions[slot] = item.value
break
return res
async def receive_dummy(*args): return []
async def receive_full_heal(client, ctx, item, rii):
await client.full_heal(ctx)
return []
async def remove_vanilla_treasure(client: "PhantomHourglassClient", ctx: "BizHawkClientContext", item: "PHItem", _):
treasure_write_list = split_bits(client.last_treasures, 8)
return [(0x1BA5AC, treasure_write_list, item.domain)]
async def remove_vanilla_ship_part(client: "PhantomHourglassClient", ctx: "BizHawkClientContext", item: "PHItem", _):
await client.remove_ship_parts(ctx)
if client.last_scene == 0xB0D:
await client.edit_ship(ctx)
return []
async def remove_vanilla_potion(client: "PhantomHourglassClient", ctx: "BizHawkClientContext", item: "PHItem", _):
print(f"Pots {client.last_potions}")
for _i, slot in enumerate(client.last_potions):
if not slot:
addr = [PHAddr.potion_left, PHAddr.potion_right][_i]
return addr.get_write_list(0)
# else:
rupee_item = client.item_data[item.overflow_item]
print(f"Removing potion rupees")
prev_rupees = await PHAddr.rupee_count.read(ctx)
rupee_count = max(prev_rupees - rupee_item.value, 0)
return PHAddr.rupee_count.get_write_list(rupee_count)
async def remove_vanilla_oshus_sword(client: "PhantomHourglassClient", ctx: "BizHawkClientContext", item: "PHItem", _):
res = item.ammo_address.get_write_list(0)
res += await remove_vanilla_normal(client, ctx, item, _)
return res
async def remove_vanilla_sea_charts(client: "PhantomHourglassClient", ctx: "BizHawkClientContext", item: "PHItem", _):
if ctx.slot_data.get("map_warp_options", 0):
return []
return await remove_vanilla_normal(client, ctx, item, _)
async def remove_vanilla_throwable_keys(client: "PhantomHourglassClient", ctx: "BizHawkClientContext", item: "PHItem", _):
# Don't do anything if vanilla bk behaviour
if "Boss Key" in item.name and not ctx.slot_data["boss_key_behaviour"]:
return []
# Don't do anything if vanilla pedestal item behaviour
if ("Crystal" in item.name or "Force Gem" in item.name) and not ctx.slot_data.get("randomize_pedestal_items", 0):
return []
# Read actor id in link's held item address. For some reason it's somewhere else in GT
if client.current_stage == 0x20:
bk_id = await PHAddr.link_held_item_goron.read(ctx, silent=True)
elif client.current_stage == 0x25:
bk_id = await PHAddr.link_held_item_2.read(ctx, silent=True)
else:
bk_id = await PHAddr.link_held_item.read(ctx, silent=True)
# Get the actor table
actor_table_addr = AddrFromPointer(await PHAddr.actor_table_pointer.read(ctx, silent=True) - 0x2000000, size=250)
actor_table = hex(await actor_table_addr.read(ctx, silent=True))
actor_table = "0" + actor_table[2:]
print(f"Removing throwable key {item.name} with bk_id {bk_id}")
# Loop through the actor table checking if each actor has the bk_id.
for _i in range(len(actor_table) // 8):
actor_data = actor_table[_i * 8:(_i + 1) * 8]
if actor_data[1] == "0": # filter out empty slots
continue
actor_id_addr = AddrFromPointer(int(actor_data, 16) + 8 - 0x2000000, size=4)
actor_id = await actor_id_addr.read(ctx, silent=True)
# If you find the boss key, delete its pointer
if actor_id == bk_id:
little_endian_lol = AddrFromPointer(actor_table_addr + len(actor_table) // 2 - (_i + 1) * 4, size=4)
print(f"Found bk pointer: {actor_table_addr} at index {_i}")
await little_endian_lol.overwrite(ctx, 0, silent=True)
break
return []
class PHItem(DSItem):
def __init__(self, name, data, all_items):
super().__init__(name, data, all_items)
def get_receive_function(self):
receive_func = super().get_receive_function()
if receive_func is None:
if hasattr(self, "ship"):
return receive_ship
if self.name == "Refill: Health":
return receive_full_heal
if "Boss Key" in self.name:
return receive_boss_key
if "Potion" in self.name:
return receive_potion
return receive_dummy
return receive_func
def get_remove_vanilla_function(self):
if self.name == "Treasure":
return remove_vanilla_treasure
if self.name == "Ship Part":
return remove_vanilla_ship_part
if "Potion" in self.name:
return remove_vanilla_potion
if "Oshus' Sword" in self.name:
return remove_vanilla_oshus_sword
if "Sea Chart" in self.name:
return remove_vanilla_sea_charts
if self.name in ITEM_GROUPS["Throwable Keys"]:
return remove_vanilla_throwable_keys
return super().get_remove_vanilla_function()
class PHEntrance(Entrance):
switch_state = {"TotOK": 0b1, "ToF": 0b1, "ToC": 0b1, "GT": 0b1, "ToI": 0b1}
global_switch_state = 0b1
def is_valid_source_transition(self, er_state: "ERPlacementState") -> bool:
return self.can_reach(er_state.collection_state)
def can_connect_to(self, other: Entrance, dead_end: bool, er_state: "ERPlacementState") -> bool:
# the implementation of coupled causes issues for self-loops since the reverse entrance will be the
# same as the forward entrance. In decoupled they are ok.
# print(f"Checking connection for {self.name} -> {other.name}")
# Vanilla GER Check first, cause the less resource intensive
if not (self.randomization_type == other.randomization_type and (not er_state.coupled or self.name != other.name)):
# print(f"\t{self.name} could not connect to {other.name}")
return False
# Don't connect to the same scene if using an entrance type that doesn't like it
from .data.Entrances import ENTRANCES
old_scene = ENTRANCES[self.name].scene
new_scene = ENTRANCES[other.name].scene
if (old_scene == new_scene
and (self.randomization_group & EntranceGroups.AREA_MASK in [EntranceGroups.OVERWORLD, EntranceGroups.ISLAND]
or other.randomization_group & EntranceGroups.AREA_MASK in [EntranceGroups.OVERWORLD, EntranceGroups.ISLAND])):
# print(f"Tried connecting to the same scene: {self.name}")
return False
# Check if you have a valid switch state for the transition you are trying
if hasattr(er_state, "switch_state_option") and other.name in switch_sensitive_entrances:
if er_state.switch_state_option == 2:
if not self.global_switch_state & switch_sensitive_entrances[other.name]:
print(f"\t{self.name} could not connect to {other.name} cause switch state 2")
return False
else:
dungeon = other.name.split(None, 1)[0]
if dungeon in self.switch_state and not self.switch_state[dungeon] & switch_sensitive_entrances[other.name]:
print(f"\t{self.name} could not connect to {other.name} cause switch state 1/0")
print(f"\t{self.switch_state[dungeon]} & {switch_sensitive_entrances[other.name]}")
return False
# Target group lookup is passed in through on_connect cause cursed.
# That means it's not in here until the first entrance has been connected
if not hasattr(er_state, "target_group_lookup"):
return True
# Check if there are enough valid entrances to go around for the dead ends
if not hasattr(er_state, "dead_end_counter"):
self.make_dead_end_counter(er_state)
# In stage 2 it is allowed to finish off groups with dead ends in them
if dead_end and not hasattr(er_state, "stage_2"):
er_state.stage_2 = True
# When in phase 3, ignore?
""" This wasn't working, ignore
if dead_end or not hasattr(er_state, "dead_end_2"):
# print(f"Trying to connect {self.name} => {other.name}")
for counter in er_state.dead_end_counter.values():
# print(f"\t{decode_entrance_groups(counter.group)}: {counter.others}")
if self.name in counter.others or other.name in counter.others:
for counter2 in er_state.dead_end_counter.values():
# print(f"\t\tChecking dead ends {counter2.dead_ends} for group {decode_entrance_groups(counter2.group)}")
# print(f"\t\tChecking others {counter2.others}")
sub, sub_d = 0, 0
if self.name in counter2.others:
sub += 1
if other.name in counter2.others:
sub += 1
if self.name in counter2.dead_ends:
sub_d += 1
if other.name in counter2.dead_ends:
sub_d += 1
# print(f"\tFound {sub} entrances in others and {sub_d} entrances in dead_ends")
# print(f"\tde {len(counter2.dead_ends) - sub_d} > {len(counter2.others) - sub}")
if len(counter2.dead_ends) - sub_d > len(counter2.others) - sub:
print(f"\tFailed {self.name} => {other.name} "
f"for group {decode_entrance_groups(counter2.group)} "
f"from group {decode_entrance_groups(counter.group)}")
# return False
"""
return True
def make_dead_end_counter(self, er_state: "ERPlacementState"):
class DECounter:
def __init__(self, entrance_group):
self.group = entrance_group
self.dead_ends = []
self.others = []
# Create counter objects, and populate remaining entrances dict and dead ends
remaining_entrances = {}
for dead_end in er_state.entrance_lookup.dead_ends:
remaining_entrances.setdefault(dead_end.randomization_group, DECounter(dead_end.randomization_group))
remaining_entrances[dead_end.randomization_group].dead_ends.append(dead_end.name)
# Add potential connected entrances
target_group_lookup = er_state.target_group_lookup
for group, counter in remaining_entrances.items():
# print(f"Added group {decode_entrance_groups(group)}")
for entrance in er_state.entrance_lookup.others:
if entrance.randomization_group in target_group_lookup[group]:
# print(f"\t{entrance.name}")
remaining_entrances[group].others.append(entrance.name)
er_state.dead_end_counter = remaining_entrances
class PHRegion(Region):
entrance_type = PHEntrance
island_lookup = {
0: "sea",
1: "mercay",
2: "cannon",
3: "ember",
4: "molida",
5: "spirit",
6: "gust",
7: "bannan",
8: "uncharted",
9: "zauz",
10: "ghost",
11: "goron",
12: "frost",
13: "dead",
14: "ruins"
}
direction_lookup = {
0: "none",
1: "left",
2: "right",
3: "up",
4: "down",
5: "enter",
6: "exit"}
type_lookup = {
0: "none",
1: "house",
2: "cave",
3: "port",
4: "overworld",
5: "dungeon",
6: "boss",
7: "dungeon_room",
8: "warp",
9: "stairs",
10: "holes",
}
# Print EntranceGroups as human readable string
def decode_entrance_groups(group):
direction = group & EntranceGroups.DIRECTION_MASK
area = (group & EntranceGroups.AREA_MASK) >> 3
island = (group & EntranceGroups.ISLAND_MASK) >> 7
return f"{direction_lookup[direction]}_{type_lookup[area]}_{island_lookup[island]}"
class EntranceGroups(IntEnum):
NONE = 0
# Directions
LEFT = 1
RIGHT = 2
UP = 3
DOWN = 4
INSIDE = 5
OUTSIDE = 6
# Types
HOUSE = 1 << 3
CAVE = 2 << 3
ISLAND = 3 << 3
OVERWORLD = 4 << 3
DUNGEON_ENTRANCE = 5 << 3
BOSS = 6 << 3
DUNGEON_ROOM = 7 << 3
WARP_PORTAL = 8 << 3
STAIRS = 9 << 3
HOLES = 10 << 3
EVENT = 11 << 3
# Island mask
SEA = 0 << 7
MERCAY = 1 << 7
CANNON = 2 << 7
EMBER = 3 << 7
MOLIDA = 4 << 7
SPIRIT = 5 << 7
GUST = 6 << 7
BANNAN = 7 << 7
UNCHARTED = 8 << 7
ZAUZ = 9 << 7
GHOST = 10 << 7
GORON = 11 << 7
FROST = 12 << 7
DEAD = 13 << 7
RUINS = 14 << 7
# Bitmasks
DIRECTION_MASK = HOUSE - 1
AREA_MASK = MERCAY - HOUSE
ISLAND_MASK = ~0 << 7
def __str__(self):
return decode_entrance_groups(self.value)
def island(self):
return self & self.ISLAND_MASK
def entrance_type(self):
return self & self.AREA_MASK
@staticmethod
def area_shift(area):
return area << 3
@staticmethod
def area_unshift(area):
return area >> 3
@staticmethod
def island_shift(island):
return island << 7
@staticmethod
def island_unshift(island):
return island >> 7
OPPOSITE_ENTRANCE_GROUPS = {
EntranceGroups.RIGHT: EntranceGroups.LEFT,
EntranceGroups.LEFT: EntranceGroups.RIGHT,
EntranceGroups.UP: EntranceGroups.DOWN,
EntranceGroups.DOWN: EntranceGroups.UP,
0: 0,
EntranceGroups.NONE: EntranceGroups.NONE,
EntranceGroups.INSIDE: EntranceGroups.OUTSIDE,
EntranceGroups.OUTSIDE: EntranceGroups.INSIDE
}
class PHTransition(DSTransition):
"""
Datastructures for dealing with Transitions on the client side.
Not to be confused with PHEntrances, that deals with entrance objects during ER placement.
"""
entrance_groups = EntranceGroups
opposite_entrance_groups = OPPOSITE_ENTRANCE_GROUPS
switch_logic_lookup = {}
for i in switch_logic:
switch_logic_lookup.setdefault(i[0], [])
switch_logic_lookup[i[0]].append(i)
# print(f"SLL: {switch_logic_lookup}")
# Called in on_connect. updates the switch states one can reach an exit with, based on switch_logic
def update_switch_logic(old_ex: "PHEntrance", entr: "PHEntrance", er_state, logic_option, switch_option, new_exits):
# Get the entrance object for an exit to set its logical switch states
def find_exit(exit_name):
for e in er_state.entrance_lookup._usable_exits:
if e.name == exit_name:
return e
return None
# Don't process if vanilla behaviour and the connection doesn't connect rooms in the same dungeon
if switch_option == 0:
dungeon_connections = [EntranceGroups.DUNGEON_ROOM, EntranceGroups.WARP_PORTAL, EntranceGroups.DUNGEON_ENTRANCE]
if not (entr.randomization_group & EntranceGroups.AREA_MASK in dungeon_connections
and old_ex.randomization_group & EntranceGroups.AREA_MASK in dungeon_connections
and old_ex.randomization_group & EntranceGroups.ISLAND_MASK == entr.randomization_group & EntranceGroups.ISLAND_MASK):
print(f"Switch logic canceled due to entrance pairing being in different dungeons on vanilla setting")
return
# Lookup switch logic and propagate it to the newly revealed exits
# print(f"\tAttempting SLL {entr.name}")
if entr.name in switch_logic_lookup:
# print(f"\tsuccess found {switch_logic_lookup[entr.name]}")
for _, ex, *logic in switch_logic_lookup[entr.name]:
logic_state = min(logic_option, len(logic)-1)
ex_object = find_exit(ex)
print(f"\tpropagating switch logic for {ex} with state {logic[logic_state]} from {entr.name}")
if ex_object:
if switch_option == 2:
ex_object.global_switch_state = logic[logic_state]
else:
dungeon = entr.name.split(None, 1)[0]
ex_object.switch_state = entr.switch_state
ex_object.switch_state[dungeon] = logic[logic_state]
# if not in switch logic, propagate the previous exit's logic
for ex in new_exits:
print(f"\tupdating switch logic for {ex.name} to from {old_ex.name} to {old_ex.global_switch_state}")
ex.global_switch_state = old_ex.global_switch_state
ex.switch_state = old_ex.switch_state