From 775065715d186832ea0498133648f23b5de7826d Mon Sep 17 00:00:00 2001 From: Doug Hoskisson Date: Sat, 29 Nov 2025 16:22:35 -0800 Subject: [PATCH] SNIClient: new `SnesReader` interface (#5155) * SNIClient: new SnesReader interface * fix Python 3.8 compatibility `bisect_right` * move to worlds because we don't have good separation importable modules and entry points * `read` gives object that contains data * remove python 3.10 implementation and update typing * remove obsolete comment * freeze _MemRead and assert type of get parameter * some optimization in `SnesData.get` * pass context to `read` so that we can have a static instance of `SnesReader` * add docstring to `SnesReader` * remove unused import * break big reads into chunks * some minor improvements - `dataclass` instead of `NamedTuple` for `Read` - comprehension in `SnesData.__init__` - `slots` for dataclasses * update chunk size to 2048 --- worlds/AutoSNIClient.py | 130 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 129 insertions(+), 1 deletion(-) diff --git a/worlds/AutoSNIClient.py b/worlds/AutoSNIClient.py index 98243ce3fc..d2558bb98d 100644 --- a/worlds/AutoSNIClient.py +++ b/worlds/AutoSNIClient.py @@ -1,14 +1,26 @@ from __future__ import annotations import abc +from bisect import bisect_right +from dataclasses import dataclass +import enum import logging -from typing import TYPE_CHECKING, ClassVar, Dict, Iterable, Tuple, Any, Optional, Union, TypeGuard +from typing import (TYPE_CHECKING, Any, ClassVar, Dict, Generic, Iterable, + Optional, Sequence, Tuple, TypeGuard, TypeVar, Union) + from worlds.LauncherComponents import Component, SuffixIdentifier, Type, components if TYPE_CHECKING: from SNIClient import SNIContext +SNES_READ_CHUNK_SIZE = 2048 +""" +note: SNI v0.0.101 currently has a bug where reads from +RetroArch >2048 bytes will only return the last ~2048 bytes read. +https://github.com/alttpo/sni/issues/51 +""" + component = Component('SNI Client', 'SNIClient', component_type=Type.CLIENT, file_identifier=SuffixIdentifier(".apsoe"), description="A client for connecting to SNES consoles via Super Nintendo Interface.") components.append(component) @@ -91,3 +103,119 @@ class SNIClient(abc.ABC, metaclass=AutoSNIClientRegister): def on_package(self, ctx: SNIContext, cmd: str, args: Dict[str, Any]) -> None: """ override this with code to handle packages from the server """ pass + + +@dataclass(frozen=True, slots=True, order=True) +class Read: + """ snes memory read - address and size in bytes """ + address: int + size: int + + +@dataclass(frozen=True, slots=True) +class _MemRead: + location: Read + data: bytes + + +_T_Enum = TypeVar("_T_Enum", bound=enum.Enum) + + +class SnesData(Generic[_T_Enum]): + _ranges: Sequence[_MemRead] + """ sorted by address """ + + def __init__(self, ranges: Sequence[tuple[Read, bytes]]) -> None: + self._ranges = [_MemRead(r, d) for r, d in ranges] + + def get(self, read: _T_Enum) -> bytes: + assert isinstance(read.value, Read), read.value + address = read.value.address + index = bisect_right(self._ranges, address, key=lambda r: r.location.address) - 1 + assert index >= 0, (self._ranges, read.value) + mem_read = self._ranges[index] + sub_index = address - mem_read.location.address + return mem_read.data[sub_index:sub_index + read.value.size] + + +class SnesReader(Generic[_T_Enum]): + """ + how to use: + ``` + from enum import Enum + from worlds.AutoSNIClient import Read, SNIClient, SnesReader + + class MyGameMemory(Enum): + game_mode = Read(WRAM_START + 0x0998, 1) + send_queue = Read(SEND_QUEUE_START, 8 * 127) + ... + + snes_reader = SnesReader(MyGameMemory) + + snes_data = await snes_reader.read(ctx) + if snes_data is None: + snes_logger.info("error reading from snes") + return + + game_mode = snes_data.get(MyGameMemory.game_mode) + ``` + """ + _ranges: Sequence[Read] + """ sorted by address """ + + def __init__(self, reads: type[_T_Enum]) -> None: + self._ranges = self._make_ranges(reads) + + @staticmethod + def _make_ranges(reads: type[enum.Enum]) -> Sequence[Read]: + + unprocessed_reads: list[Read] = [] + for e in reads: + assert isinstance(e.value, Read), (reads.__name__, e, e.value) + unprocessed_reads.append(e.value) + unprocessed_reads.sort() + + ranges: list[Read] = [] + for read in unprocessed_reads: + # v end of the previous range + if len(ranges) == 0 or read.address - (ranges[-1].address + ranges[-1].size) > 255: + ranges.append(read) + else: # combine with previous range + chunk_address = ranges[-1].address + assert read.address >= chunk_address, "sort() didn't work? or something" + original_chunk_size = ranges[-1].size + new_size = max((read.address + read.size) - chunk_address, + original_chunk_size) + ranges[-1] = Read(chunk_address, new_size) + logging.debug(f"{len(ranges)=} {max(r.size for r in ranges)=}") + return ranges + + async def read(self, ctx: "SNIContext") -> SnesData[_T_Enum] | None: + """ + returns `None` if reading fails, + otherwise returns the data for the registered `Enum` + """ + from SNIClient import snes_read + + reads: list[tuple[Read, bytes]] = [] + for r in self._ranges: + if r.size < SNES_READ_CHUNK_SIZE: # most common + response = await snes_read(ctx, r.address, r.size) + if response is None: + return None + reads.append((r, response)) + else: # big read + # Problems were reported with big reads, + # so we chunk it into smaller pieces. + read_so_far = 0 + collection: list[bytes] = [] + while read_so_far < r.size: + remaining_size = r.size - read_so_far + chunk_size = min(SNES_READ_CHUNK_SIZE, remaining_size) + response = await snes_read(ctx, r.address + read_so_far, chunk_size) + if response is None: + return None + collection.append(response) + read_so_far += chunk_size + reads.append((r, b"".join(collection))) + return SnesData(reads)