Files
dockipelago/worlds/sonic1/sram.py
Jonathan Tinney 7971961166
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
add schedule I, sonic 1/frontiers/heroes, spirit island
2026-04-02 23:46:36 -07:00

277 lines
11 KiB
Python

# Char | Byte order | Size | Alignment
#- @ | native | native | native
#- = | native | standard | none
#- < | little-endian | standard | none
#- > | big-endian | standard | none
#- ! | network | standard | none
from collections import namedtuple
import logging
import struct
import typing
logger = logging.getLogger("Client")
class ParseField(object):
count = 1
field_type = 'B'
def __init__(self, format_str, count=1):
if len(format_str) > 1:
self.field_type= format_str[-1]
self.count = int(format_str[:-1])
else:
self.field_type = format_str
self.count = count
FieldRecord = namedtuple('FieldRecord',['name', 'type', 'index', 'count', 'length', 'offset'])
class _array_proxy(object):
_sram: 'SegaSRAM'
_fr: FieldRecord
def __init__(self, sram, fr):
super().__setattr__("_sram", sram)
super().__setattr__("_fr", fr)
def __getitem__(self, key) -> typing.Any:
assert(self._fr.count > key)
return self._sram._fields[self._fr.offset+key]
def __setitem__(self, key, value):
assert(self._fr.count > key)
self._sram.stage(self._fr.offset+key, [value])
class _fields_proxy(object):
_sram: 'SegaSRAM'
def __init__(self, sram):
super().__setattr__("_sram", sram)
def __getattribute__(self, name) -> typing.Any|list:
sram = object.__getattribute__(self, '_sram')
if name == '_sram':
return sram
if name in sram.field_map:
k = sram.field_map[name]
if k.count == 1:
return sram._fields[k.offset]
else:
return _array_proxy(sram, k)
else:
return object.__getattribute__(self, name)
def __setattr__(self, name, value):
k = self._sram.field_map[name]
assert(k.count == 1)
self._sram.stage(k.offset, [value])
def __getitem__(self, key):
if isinstance(key,int):
return self.__getattribute__(self._sram.field_table[key].name)
else:
return self.__getattribute__(self._sram.field_map[key].name)
def __setitem__(self, key, value):
if isinstance(key,int):
return self.__setattr__(self._sram.field_table[key].name, value)
else:
return self.__setattr__(self._sram.field_map[key].name, value)
class DataOrder(_fields_proxy):
pass
class BigEndian(DataOrder):
pass
class LittleEndian(DataOrder):
pass
class NetworkEndian(DataOrder):
pass
class SegaSRAM(object):
_raw: typing.List[bytes]
clean_data: bytes
byte_count: int
ram_type = 0
format: str = ""
_fields = ()
_fieldproxy: DataOrder
staged: typing.List[typing.Tuple]
extra_addresses: typing.List[typing.Tuple[int,int,str]]
extra_data: typing.List[bytes]
field_table: list[FieldRecord]
field_map: dict[str,FieldRecord]
_read_callable: typing.Callable
_write_callable: typing.Callable
_struct_type_map = {'x': None, 'c': int, 'b': int, 'B': int, '?': bool,
'h': int, 'H': int, 'i': int, 'I': int, 'l': int, 'L': int, 'q': int, 'Q': int,
'n': int, 'N': int, 'e': float, 'f': float, 'd': float, 's': bytes, 'p': bytes, 'P': int}
def __init__(self, read_callable, write_callable, ram_type=0):
'''Ram_type 0 for even addresses (FF__), 1 for odd addresses (__FF), 2 for both (FFFF)'''
self._raw = []
self.ram_type = ram_type
self.staged = []
self.extra_addresses = []
self.extra_data = []
self._fieldproxy = _fields_proxy(self)
self._read_callable = read_callable
self._write_callable = write_callable
@property
def fields(self):
return self._fieldproxy
@fields.setter
def fields(self, format: type[DataOrder]):
logger.info(f"Setting {format=}")
newformat = ""
newtable = []
# So, first the endian
assert(issubclass(format, DataOrder))
match format.__class__:
case BigEndian():
newformat += ">"
case LittleEndian():
newformat += "<"
case NetworkEndian():
newformat += "!"
offset = 0
i = 0
for k,v in format.__dict__.items():
if isinstance(v, ParseField):
#logger.info(f"{k} is {v}")
if v.field_type in 'sp':
fr = FieldRecord(k, self._struct_type_map[v.field_type], i, 1, v.count, offset)
offset += 1
else:
fr = FieldRecord(k, self._struct_type_map[v.field_type], i, v.count, 1, offset)
offset += v.count
i += 1
newformat += f"{v.count if v.count > 1 else ''}{v.field_type}"
newtable.append(fr)
self.format = newformat
self.layout = format
self._fieldproxy = format(self)
self.field_table = newtable
self.field_map = {i.name:i for i in newtable}
self.byte_count = struct.calcsize(newformat)*(1 if self.ram_type in [2,3] else 2)
async def detect_type(self, ctx, magic: bytes, address=0x0):
'''Magic is the sequence of bytes to look for at the address'''
tests = []
addresses = []
magic_odd = len(magic)%2
address_odd = address%2
# First assume the address is console side, including the dead bytes
magic_len = len(magic)*2 # Double length for dead bytes
addresses.append((address-address_odd, magic_len, "SRAM"))
# We'll do that again, only this time we'll assume the address excludes the dead bytes
addresses.append((address*2, magic_len, "SRAM"))
# Maybe it's 16-bit?
addresses.append((address-address_odd, len(magic)+address_odd+magic_odd, "SRAM"))
data = await self._read_callable(ctx.bizhawk_ctx, addresses)
# Now to test ways of mangling that
tests.append((bytes([data[0][i] for i in range(0,len(data[0]), 2)]), 0)) # even bytes, include dead
tests.append((bytes([data[0][i] for i in range(1,len(data[0]), 2)]), 1)) # odd bytes, include dead
tests.append((bytes([data[1][i] for i in range(0,len(data[1]), 2)]), 0)) # even bytes, exclude dead
tests.append((bytes([data[1][i] for i in range(1,len(data[1]), 2)]), 1)) # odd bytes, exclude dead
tests.append((data[2][address_odd:len(magic)+address_odd], 2)) # both bytes, correct order
swapped = [] # Both bytes, byte swapped... we don't actually support that right now.
for i in range(0,len(data[2]), 2):
swapped.extend([data[2][i+1],data[2][i]])
if address_odd == 1:
swapped.pop(0)
tests.append((bytes(swapped[:len(magic)]),3))
self.ram_type = -1 # So we can test detection failures
for (test,result) in tests:
#logger.info(f"{test=} {result=}")
if test == magic:
self.ram_type = result
if self._fieldproxy is not None:
self.byte_count = struct.calcsize(self.format)*(1 if self.ram_type in [2,3] else 2)
return
async def read_bytes(self, ctx, clear_stage=True):
if clear_stage:
self.staged = []
data = await self._read_callable(ctx.bizhawk_ctx, [(0x0, self.byte_count, "SRAM"),]+self.extra_addresses)
self._raw = data[0]
self.extra_data = data[1:]
# Because of 8bit sram stupidity, we're probably going to need to unpack this by dropping every other byte.
# So:
if self.ram_type == 0:
self.clean_data = bytes([data[0][i] for i in range(0,len(data[0]), 2)])
elif self.ram_type == 1:
self.clean_data = bytes([data[0][i] for i in range(1,len(data[0]), 2)])
elif self.ram_type == 2:
self.clean_data = data[0]
elif self.ram_type == 3:
# I hate this so much. This will likely fail if your data and format aren't properly word aligned
tdata = []
for i in range(0,len(data[0]), 2):
tdata.extend([data[0][i+1],data[0][i]])
self.clean_data = bytes(tdata)
self._fields = struct.unpack(self.format,self.clean_data)
#seed_name = ''.join([chr(c) for c in clean_data[-20:]])
#logger.info(f"Data... {clean_data=} ({len(clean_data)=}) {seed_name=} {len(seed_name)=}")
# We're only caring about the seed in the start.
async def full_write(self, ctx, data, clear_stage=True):
if clear_stage:
self.staged = []
self._fields = tuple(data)
self.clean_data = struct.pack(self.format, *data)
wrdata = []
if self.ram_type == 0:
for b in self.clean_data:
wrdata.extend([b,0x0])
elif self.ram_type == 1:
for b in self.clean_data:
wrdata.extend([0x0,b])
elif self.ram_type == 2:
wrdata = self.clean_data
elif self.ram_type == 3:
# I hate this so much. This will likely fail if your data and format aren't properly word aligned
wrdata = []
for i in range(0,len(self.clean_data), 2):
wrdata.extend([self.clean_data[i+1],self.clean_data[i]])
self._raw = bytes(wrdata)
await self._write_callable(ctx.bizhawk_ctx, [(0, wrdata, "SRAM")])
def stage(self, offset, data):
for i in range(0,len(data)):
if self._fields[offset+i] != data[i]:
self.staged.append((offset+i, data[i]))
async def commit(self, ctx):
if len(self.staged) > 0:
out = list(self._fields)
while len(self.staged):
u = self.staged.pop(0)
out[u[0]] = u[1]
self._fields = tuple(out)
self.clean_data = struct.pack(self.format, *out)
wrdata = []
if self.ram_type == 0:
for b in self.clean_data:
wrdata.extend([b,0x0])
elif self.ram_type == 1:
for b in self.clean_data:
wrdata.extend([0x0,b])
elif self.ram_type == 2:
wrdata = self.clean_data
elif self.ram_type == 3:
# I hate this so much. This will likely fail if your data and format aren't properly word aligned
wrdata = []
for i in range(0,len(self.clean_data), 2):
wrdata.extend([self.clean_data[i+1],self.clean_data[i]])
tempraw = bytes(wrdata)
patches = []
for i,bs in enumerate(zip(self._raw, tempraw)):
if bs[0] != bs[1]:
logger.debug(f"{i=} {bs=}")
patches.append([i,[bs[1]], "SRAM"])
self._raw = tempraw
await self._write_callable(ctx.bizhawk_ctx,patches)