Files
dockipelago/worlds/tloz_ph/DSZeldaClient/subclasses.py
Jonathan Tinney 7971961166
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
add schedule I, sonic 1/frontiers/heroes, spirit island
2026-04-02 23:46:36 -07:00

400 lines
14 KiB
Python

from enum import IntEnum
from typing import TYPE_CHECKING
import worlds._bizhawk as bizhawk
if TYPE_CHECKING:
try:
from ..Client import PhantomHourglassClient
except ImportError:
pass
async def read_multiple(ctx, addresses, signed=False, keys=None) -> dict["Address", int] or dict[str, int]:
reads = await bizhawk.read(ctx.bizhawk_ctx, [a.get_inner_read_list() for a in addresses])
reads = [int.from_bytes(r, "little", signed=signed) for r in reads]
if keys:
return {k: r for k, r in zip(keys, reads)}
return {a: r for a, r in zip(addresses, reads)}
async def write_multiple(ctx, addresses: list["Address"], values: list[int]):
writes = [a.get_inner_write_list(v) for a, v in zip(addresses, values)]
await bizhawk.write(ctx.bizhawk_ctx, writes)
# Get address from pointer
async def get_address_from_heap(ctx, pointer, offset=0, size=4) -> "Address":
"""
Reads a pointer, and follows that pointer with an offset
:param size: how many bytes
:param ctx:
:param pointer:
:param offset:
:return:
"""
m_course = 0
while m_course == 0:
m_course = await pointer.read(ctx)
m_course = Address.from_pointer(m_course - 0x02000000, size=4)
read = await m_course.read(ctx)
print(f"Got map address @ {hex(read + offset - 0x02000000)}")
return Address.from_pointer(read + offset - 0x02000000, size=size)
def storage_key(ctx, key: str):
return f"{key}_{ctx.slot}_{ctx.team}"
def get_stored_data(ctx, key, default=None):
store = ctx.stored_data.get(storage_key(ctx, key), default)
store = store if store is not None else default
return store
# Split up large values to write into smaller chunks
def split_bits(value, size):
ret = []
f = 0xFFFFFFFFFFFFFF00
for _ in range(size):
ret.append(value & 0xFF)
value = (value & f) >> 8
return ret
all_addresses = []
class Address:
addr_eu: int
addr_us: int
addr: int
current_region: int
domain: str
size: int
offset: int
name: str
all_addresses: list = all_addresses
def __init__(self, addr_eu, addr_us=None, size=1, domain="Main RAM", name=""):
if domain == "Main RAM":
assert addr_eu < 0x400000
self.addr_eu = addr_eu
self.addr_us = addr_us if addr_us else None
self.addr_lookup = [self.addr_eu, self.addr_us]
self.addr = self.addr_eu
self.current_region = 0
self.domain = domain
self.size = size
self.name = name
self.all_addresses.append(self)
def set_region(self, region: str or int):
self.current_region = self._region_int(region)
self.addr = self.addr_lookup[self.current_region]
@staticmethod
def _region_int(region: str or int):
if isinstance(region, str):
assert region.lower() in ["eu", "us"]
region = ["eu", "us"].index(region.lower())
assert region in [0, 1]
return region
def get_address(self, region=None):
if region is not None:
region = self._region_int(region)
return self.addr_lookup[region]
return self.addr
def get_read_list(self):
return [self.get_inner_read_list()]
def get_inner_read_list(self) -> tuple:
return self.addr, self.size, self.domain
def get_write_list(self, value:int or list):
return [self.get_inner_write_list(value)]
def get_inner_write_list(self, value:int or list):
if isinstance(value, int):
value = split_bits(value, self.size)
return self.addr, value[:self.size], self.domain
async def read(self, ctx, signed=False, silent=False):
read_result = await self.read_bytes(ctx)
res = sum([int.from_bytes(b, "little", signed=signed)<<(8*i) for i, b in enumerate(read_result)])
if not silent:
print(f"\tReading address {self}, got value {res}")
return res
async def read_bytes(self, ctx):
return await bizhawk.read(ctx.bizhawk_ctx, [(self.addr, self.size, self.domain)])
async def overwrite(self, ctx, value, silent=False, offset=0):
if isinstance(value, int):
value = split_bits(value, self.size)
if not silent:
print(f"\tWriting to address {self} with value {value}")
return await bizhawk.write(ctx.bizhawk_ctx, [(self.addr+offset, value, self.domain)])
async def add(self, ctx, value: int, silent=False, offset=0):
silent=False
prev = await self.read(ctx, silent=silent)
return await self.overwrite(ctx, prev + value, silent=silent, offset=offset)
async def set_bits(self, ctx, value: int or list, silent=False, offset=0):
if isinstance(value, int):
value = split_bits(value, self.size)
prev = split_bits(await self.read(ctx, silent=silent), self.size)
# print(f"Setting bits {self} {prev} {value} {[p | v for p, v in zip(prev, value)]}")
return await self.overwrite(ctx, [p | v for p, v in zip(prev, value)], silent=silent, offset=offset)
async def unset_bits(self, ctx, value: int or list, silent=False, offset=0):
if isinstance(value, int):
value = split_bits(value, self.size)
prev = split_bits(await self.read(ctx, silent=silent), self.size)
# print(f"Setting bits {self} {prev} {value} {[p | v for p, v in zip(prev, value)]}")
return await self.overwrite(ctx, [p & (~v) for p, v in zip(prev, value)], silent=silent, offset=offset)
def __repr__(self, region="eu"):
return f"Address Object {hex(self.get_address(region))} {self.name}"
def __str__(self):
name = f"{self.name}: " if self.name else ""
return f"{name}{hex(self.get_address())}"
def __add__(self, other):
return self.addr + other
def __sub__(self, other):
if isinstance(other, Address):
return self.addr - other.addr
return self.addr - other
def __eq__(self, other):
return self.addr == other
def __ne__(self, other):
return self.addr != other
def __bool__(self):
return bool(self.addr)
def __hash__(self):
return self.addr
def __gt__(self, other):
return self.addr > other
def __lt__(self, other):
return self.addr < other
def __ge__(self, other):
return self.addr >= other
def __le__(self, other):
return self.addr <= other
@classmethod
def pointer(cls, addr, name=""):
"""Pointer from Data TCM"""
return cls(addr, addr, 4, "Data TCM", name)
@classmethod
def from_pointer(cls, addr, size=1, domain="Main RAM", name=""):
"""When addresses are grabbed from pointers, the address is the same in all versions"""
return cls(addr, addr, size, domain, name)
class Pointer(Address):
"""
Pointer from Data TCM
work towards depreciating, it should have been a classmethod from the start
"""
def __init__(self, addr, name=""):
super().__init__(addr, addr, 4, "Data TCM", name)
class AddrFromPointer(Address):
"""
When addresses are grabbed from pointers, version doesn't matter.
work towards depreciating, it should have been a classmethod from the start
"""
def __init__(self, addr, size=1, domain="Main RAM", name=""):
super().__init__(addr, addr, size, domain, name)
class SRAM(Address):
"""
Saveram also has slot data to care about.
"""
def __init__(self, addr_eu_1, addr_eu_2=None, addr_us_1=None, addr_us_2=None, name=""):
super().__init__(addr_eu_1, addr_us_1, size=1, domain="SRAM", name=name)
self.slot = 0
self.addr_lookup = [(addr_eu_1, addr_eu_2), (addr_us_1, addr_us_2)]
async def read(self, ctx, signed=False, silent=False, slot=0):
addr = self.addr_lookup[self.current_region][self.slot]
read_result = await bizhawk.read(ctx.bizhawk_ctx, [(addr, self.size, self.domain)])
res = int.from_bytes(read_result[0], "little", signed=signed)
if not silent:
print(f"\tReading address {self}, got value {hex(res)}")
return res
class DSTransition:
"""
Datastructures for dealing with Transitions on the client side.
Not to be confused with PHEntrances, that deals with entrance objects during ER placement.
"""
entrance_groups: IntEnum | None = None # set these in game instance or
opposite_entrance_groups: dict[IntEnum, IntEnum] | None = None
def __init__(self, name, data):
self.data = data
self.name: str = name
self.id: int = data.get("id", None)
assert self.id is not None
self.entrance: tuple = data.get("entrance", None)
self.exit: tuple = data.get("exit", None)
self.entrance_region: str = data["entrance_region"]
self.exit_region: str = data["exit_region"]
self.two_way: bool = data.get("two_way", True)
self.category_group = data["type"]
self.direction = data["direction"]
self.island = data.get("island", self.entrance_groups.NONE if self.entrance_groups else None)
self.coords: tuple | None = data.get("coords", None)
self.extra_data: dict = data.get("extra_data", {})
self.stage, self.room, _ = self.entrance if self.entrance else (None, None, None)
self.scene: int = self.get_scene()
self.exit_scene: int = self.get_exit_scene()
self.exit_stage = self.exit[0] if self.exit else None
self.y = self.coords[1] if self.coords else None
self.vanilla_reciprocal: DSTransition | None = None # Paired location
self.copy_number = 0
def get_scene(self):
if self.room:
return self.stage * 0x100 + self.room
else:
return self.stage << 8
def get_exit_scene(self):
if self.exit:
return self.exit[0] * 0x100 + self.exit[1]
else:
return None
def is_pairing(self, r1, r2) -> bool:
return r1 == self.entrance_region and r2 == self.exit_region
def get_y(self):
return self.coords[1] if self.coords else None
def detect_exit_simple(self, stage, room, entrance):
return self.exit == (stage, room, entrance)
def detect_exit_scene(self, scene, entrance):
return self.exit_scene == scene and entrance == self.exit[2]
def detect_exit(self, scene, entrance, coords, y_offest):
if self.detect_exit_scene(scene, entrance):
if entrance < 0xF0:
return True
# Continuous entrance check
x_max = self.extra_data.get("x_max", 0x8FFFFFFF)
x_min = self.extra_data.get("x_min", -0x8FFFFFFF)
z_max = self.extra_data.get("z_max", 0x8FFFFFFF)
z_min = self.extra_data.get("z_min", -0x8FFFFFFF)
y = self.coords[1] if self.coords else coords["y"] - y_offest
# print(f"Checking entrance {self.name}: x {x_max} > {coords['x']} > {x_min}")
# print(f"\ty: {y + 1000} > {y} > {coords['y'] - y_offest}")
# print(f"\tz: {z_max} > {coords['z']} > {z_min}")
if y + 2000 > coords["y"] - y_offest >= y and x_max > coords["x"] > x_min and z_max > coords["z"] > z_min:
return True
return False
def set_stage(self, new_stage):
self.stage = new_stage
self.scene = self.get_scene()
self.entrance = tuple([new_stage] + list(self.entrance[1:]))
def set_exit_stage(self, new_stage):
self.exit = tuple([new_stage] + list(self.exit[1:]))
self.exit_scene = self.get_exit_scene()
self.exit_stage = self.exit[0]
def set_exit_room(self, new_room):
self.exit = tuple([self.exit[0], new_room, self.exit[2]])
self.exit_scene = self.get_exit_scene()
def copy(self):
res = DSTransition(f"{self.name}{self.copy_number+1}", self.data)
res.copy_number = self.copy_number + 1
return res
def __str__(self):
return self.name
def debug_print(self):
print(f"Debug print for entrance {self.name}")
print(f"\tentrance {self.entrance}")
print(f"\texit {self.exit}")
print(f"\tcoords {self.coords}")
print(f"\textra_data {self.extra_data}")
@classmethod
def from_data(cls, entrance_data):
res = dict()
counter = {}
ident = 0
for name, data in entrance_data.items():
data["id"] = ident
res[name] = cls(name, data)
# print(f"{i} {ENTRANCES[name].entrance_region} -> {ENTRANCES[name].exit_region}")
ident += 1
point = data["entrance_region"] + "<=>" + data["exit_region"]
counter.setdefault(point, 0)
counter[point] += 1
if "one_way_data" in data:
res[name].extra_data |= data["one_way_data"]
if data.get("two_way", True):
two_way = True
else:
two_way = False
reverse_name = data.get("return_name", f"Unnamed Entrance {ident}")
reverse_data = {
"entrance_region": data.get("reverse_exit_region", data["exit_region"]),
"exit_region": data.get("reverse_entrance_region", data["entrance_region"]),
"id": ident,
"entrance": data.get("exit", data.get("entrance", None)),
"exit": data["entrance"],
"two_way": two_way,
"type": data["type"],
"island": data.get("return_island", data.get("island", cls.entrance_groups.NONE)),
"direction": cls.opposite_entrance_groups[data["direction"]],
"coords": data.get("coords", None),
}
if "extra_data" in data:
reverse_data["extra_data"] = data["extra_data"]
if "reverse_one_way_data" in data:
reverse_data.setdefault("extra_data", {})
reverse_data["extra_data"] = data["reverse_one_way_data"]
if reverse_name in res:
print(f"DUPLICATE ENTRANCE!!! {reverse_name}")
res[reverse_name] = cls(reverse_name, reverse_data)
res[name].vanilla_reciprocal = res[reverse_name]
res[reverse_name].vanilla_reciprocal = res[name]
# print(f"{i} {ENTRANCES[reverse_name].entrance_region} -> {ENTRANCES[reverse_name].exit_region}")
ident += 1
point: str = reverse_data["entrance_region"] + "<=>" + reverse_data["exit_region"]
counter.setdefault(point, 0)
counter[point] += 1
return res