Files
dockipelago/worlds/sonic1/client.py
Jonathan Tinney 7971961166
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
add schedule I, sonic 1/frontiers/heroes, spirit island
2026-04-02 23:46:36 -07:00

354 lines
18 KiB
Python

import logging
from NetUtils import ClientStatus, RawJSONtoTextParser
from worlds._bizhawk.client import BizHawkClient
from worlds._bizhawk import display_message, read, set_message_interval, write
from . import constants
from .sram import SegaSRAM
logger = logging.getLogger("Client")
MAGIC_BROKEN = 0x0
MAGIC_UNBROKEN = 0x1F
MAGIC_EMPTY_SEED = ' '*20
class S1Client(BizHawkClient):
system = ("GEN",)
patch_suffix = (".aps1",)
game = "Sonic the Hedgehog 1"
sram_abstraction: SegaSRAM
async def validate_rom(self, ctx):
# loaded_hash = await get_hash(ctx.bizhawk_ctx)
print(ctx.rom_hash)
if ctx.rom_hash in ["EE5D0A76A515111B589B2E523B3AC685C20E37AB", # AP-070
"A40C5AA20DB7F7B5C4FD25C0552E7EFA8B70A5E9"]: # AP-071 - compatible
ctx.game = self.game
ctx.items_handling = 0b111
ctx.finished_game = False
if "server_seed_name" not in ctx.__dict__:
# Probably running on 0.5.1?
ctx.server_seed_name = None
if ctx.server_seed_name:
ctx.remote_seed_name = f"{ctx.server_seed_name[-20:]:20}"
if len(ctx.locations_checked) != 0:
# This is in the hopes of avoiding sending reused data
ctx.locations_checked.clear()
ctx.locations_scouted.clear()
ctx.stored_data_notification_keys.clear()
ctx.checked_locations.clear()
else:
ctx.remote_seed_name = MAGIC_EMPTY_SEED
ctx.rom_seed_name = MAGIC_EMPTY_SEED
await set_message_interval(ctx.bizhawk_ctx, 0)
self.sram_abstraction = SegaSRAM(read, write)
self.sram_abstraction.fields = constants.S1Layout
# This will try to work out what byte order is in use. Important because bizhawk has a byte swap issue
await self.sram_abstraction.detect_type(ctx, magic=b'AS10')
logger.info(f"Using sram type {self.sram_abstraction.ram_type}")
self.sram_abstraction.extra_addresses.append((0x0F600, 1, "68K RAM")) # Game mode
self.sram_abstraction.extra_addresses.append((0x0FE10, 7, "68K RAM")) # Zone and Act... v_lastspecial is 0xFE16
ctx.curr_map = None
ctx.my_stored_data = {}
ctx.previous_deathlinks = set()
ctx.complained_about_seed = ""
ctx.messages = []
return True
return False
def on_package(self, ctx, cmd, args):
if cmd == 'RoomInfo':
logger.debug(f"{args['seed_name']=} ?= {ctx.rom_seed_name=}")
ctx.remote_seed_name = f"{args['seed_name'][-20:]:20}"
if ctx.rom_seed_name != ctx.remote_seed_name:
if ctx.rom_seed_name != MAGIC_EMPTY_SEED:
# CommonClient's on_package displays an error to the user in this case, but connection is not cancelled.
self.game_state = False
self.disconnect_pending = True
if len(ctx.locations_checked) != 0:
# This is in the hopes of avoiding sending reused data
ctx.locations_checked.clear()
ctx.locations_scouted.clear()
ctx.stored_data_notification_keys.clear()
ctx.checked_locations.clear()
ctx.my_stored_data = {}
elif cmd == 'Bounced':
#logger.info(f"{cmd=} -> {args=}")
if "DeathLink" in args.get("tags",{}) and args["data"]["time"] not in ctx.previous_deathlinks:
ctx.previous_deathlinks.add(args["data"]["time"])
ctx.my_stored_data[f"{ctx.slot}_{ctx.team}_sonic1_deathl_in"] += 1
elif cmd == 'Connected':
ctx.my_stored_data = {}
elif cmd == "Retrieved":
#logger.info(f"{cmd=} -> {args=}")
for (k,v) in args["keys"].items():
ctx.my_stored_data[k] = v if v else 0
elif cmd == "PrintJSON":
ctx.messages.append(RawJSONtoTextParser(ctx)(args["data"]))
#logger.info(f"{cmd=} -> {args=}... {s=}")
super().on_package(ctx, cmd, args)
async def game_watcher(self, ctx):
assert isinstance(self.sram_abstraction, SegaSRAM)
assert isinstance(self.sram_abstraction.fields, constants.S1Layout)
if self.sram_abstraction.ram_type == -1: # Detection failure!
# This will try to work out what byte order is in use. Important because bizhawk has a byte swap issue
await self.sram_abstraction.detect_type(ctx, magic=b'AS10')
logger.info(f"Using sram type {self.sram_abstraction.ram_type}")
if self.sram_abstraction.ram_type == -1: # Double Detection failure!
return
await self.sram_abstraction.read_bytes(ctx)
if self.sram_abstraction.fields.SR_Head != b'AS10' or b'\xff' in self.sram_abstraction.fields.SR_Seed:
return # This means we're not initialised
seed_name = str(self.sram_abstraction.fields.SR_Seed,'ascii')
# We're only caring about the seed in the start.
ctx.rom_seed_name = seed_name
slot_id = self.sram_abstraction.fields.SR_Slot
ctx.rom_slot = slot_id
if (not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed
or ctx.remote_seed_name == MAGIC_EMPTY_SEED
or getattr(ctx,"slot_data",None) is None):
return
if ctx.messages:
s = ctx.messages.pop(0)
await display_message(ctx.bizhawk_ctx, s)
map_code = ctx.curr_map
if self.sram_abstraction.extra_data[0] == b"\x0C": # Level mode
# This fixes the oddity of the game switching to GHZ1 for special stage conclusion:
if ctx.curr_map not in range(19,25):
map_code = constants.level_bytes.get(self.sram_abstraction.extra_data[1][:2],0)
else:
map_code = ctx.curr_map
elif self.sram_abstraction.extra_data[0] == b"\x10": # Special zone
map_code = int(self.sram_abstraction.extra_data[1][6])+19
elif self.sram_abstraction.extra_data[0] == b"\x04": # Title
# Really we should only go back to the menu when we go back to the menu. So, look for Title game mode.
map_code = 0
if ctx.curr_map != map_code:
ctx.curr_map = map_code
await ctx.send_msgs([{
"cmd": "Set",
"key": f"{ctx.slot}_{ctx.team}_sonic1_area",
"default": 0,
"want_reply": True,
"operations": [{
"operation": "replace",
"value": map_code,
}],
}])
cleanslate = False
missing = []
for key in ["invinc_out", "shield_out", "speeds_out", "deathl_in", "deathl_out", "deaths", "bosses"]:
key = f"{ctx.slot}_{ctx.team}_sonic1_{key}"
if key not in ctx.my_stored_data:
ctx.my_stored_data[key] = -1
missing.append(key)
elif ctx.my_stored_data[key] == -1:
return
if len(missing):
await ctx.send_msgs([{"cmd": "Get", "keys": missing}, {"cmd": "SetNotify", "keys": missing}])
await ctx.update_death_link(ctx.slot_data.get("recv_death", False))
return
if self.sram_abstraction.fields.SR_Head == b'AS10':
# So, this should be valid save data...
if seed_name == MAGIC_EMPTY_SEED:
# Only, there's a blank save, so we should write the full save.
output = [b'AS10']
#logger.info(f"{ctx.locations_checked=} {ctx.checked_locations=}")
output += bytes([MAGIC_BROKEN if constants.id_base+i in ctx.checked_locations else MAGIC_UNBROKEN
for i in range(1,len(constants.monitor_by_id)+1)])
output.extend([0x0,0x0,0x0, # SR_Specials, SR_Emeralds, SR_Bosses
0x0,0x0,0x0, # SR_BuffGoals, SR_BuffDisR, SR_RingsFound
0x0,0x0, # SR_LevelGate, SR_SSGate
0x0,0x0, 0x0,0x0, 0x0,0x0, 0x0,0x0, # (Invinc|Shield|SpeedS|DeathL)(in|out)
0x0]) # SR_Deaths
output.append(ctx.remote_seed_name.encode())
output.append(ctx.slot)
await self.sram_abstraction.full_write(ctx, output)
await self.sram_abstraction.read_bytes(ctx)
seed_name = ctx.remote_seed_name
ctx.rom_seed_name = seed_name
slot_id = ctx.rom_slot = ctx.slot
cleanslate = True
'''
move.w #0,(a0)+ ; Special zone bitfield
move.w #0,(a0)+ ; Emerald bitfield, 0x00 (none), 0x01 (first em), upto 0x3F (all 6)
; Boss's alive bitfield: FZ Star3 Lab3 Spring3 Marb3 GH3
move.w #0,(a0)+ ; Boss bitfield, 0x3F (none), 0x3E (GH3), upto 0x00 (all 6 alive)
move.w #0,(a0)+ ; Buff: Disable goal blocks. 0x00 (off), 0x01 (on)
move.w #0,(a0)+ ; Buff: Disable R. 0x00 (off), 0x01 (on)
move.w #0,(a0)+ ; Number of rings found.
move.w #0,(a0)+ ; Level gate bitmask.
move.w #0,(a0)+ ; Specials gate bitmask.
'''
if seed_name == ctx.remote_seed_name and slot_id == ctx.slot:
dirty = False
mons = self.sram_abstraction.fields.SR_Monitors
for i in range(1,len(constants.monitor_by_id)+1):
broken = (mons[i-1] == MAGIC_BROKEN)
checked = constants.id_base+i in ctx.checked_locations
if broken != checked:
#logger.info(f"{constants.monitor_by_idx[i]} {broken} != {checked}")
ctx.locations_checked.add(constants.id_base+i) # Do I need to do this?
dirty = True
self.sram_abstraction.fields.SR_Monitors[i-1] = MAGIC_BROKEN
# GH3, MZ3, SY3, LZ3, SL3, FZ
bosses = self.sram_abstraction.fields.SR_Bosses
prev_bosses = ctx.my_stored_data.get("bosses",0)
for bit, idx in [[1,211], [2,212], [4,213], [8,214], [16,215], [32,216]]:
if constants.id_base+idx not in ctx.checked_locations and bosses&bit != 0:
ctx.locations_checked.add(constants.id_base+idx) # Do I need to do this?
dirty = True
if cleanslate:
bosses = prev_bosses|bosses
self.sram_abstraction.fields.SR_Bosses = prev_bosses|bosses
#self.sram_abstraction.stage(basis+2, [boss_build])
if prev_bosses != bosses:
ctx.my_stored_data["bosses"] = bosses
await ctx.send_msgs([{"cmd": "Set", "key": f"{ctx.slot}_{ctx.team}_sonic1_bosses",
"default": 0, "want_reply": True,
"operations": [{"operation": "replace", "value": bosses}]}])
#logger.info(f"Data... {clean_data[basis:]=}")
#logger.info(f"Data... {ctx.items_received=}")
#ctx.items_received=[NetworkItem(item=3141501088, location=3141501221, player=2, flags=2)]
ringcount = 0
emeraldsset = 0
buffs = [0,0]
levelkeys = 0
sskeys = 0
invinc = 0
shield = 0
speeds = 0
has_fz_key = False
for it in ctx.items_received:
idx = it.item - constants.id_base
#logger.info(["Emerald 1", "Emerald 2", "Emerald 3", "Emerald 4", "Emerald 5", "Emerald 6", "Disable GOAL blocks", "Disable R blocks"][idx-1])
if idx <= 6:
emeraldsset |= [1,2,4,8,16,32][idx-1]
elif idx == 7:
buffs[0] = 1
elif idx == 8:
buffs[1] = 1
elif idx in range(9,15):
levelkeys |= [1,2,4,8,16,32,64,128][idx-9]
elif idx == 15:
has_fz_key = True
elif idx == 16:
sskeys += 1
elif idx in [23,24]:
ringcount += 1
elif idx == 25:
invinc += 1
elif idx == 26:
shield += 1
elif idx in [27,28]:
speeds += 1
elif idx >= constants.filler_base-1:
# Junk item... do nothing
pass
else:
logger.info(f"Received item {idx} and I don't know what it is.")
levelkeys |= 128 # bit to enable special stages
sskeys = [0,1,3,7,15,31,63,127,255][sskeys]
specials = self.sram_abstraction.fields.SR_Specials
special_build = 0
for bit, idx in [[1,221], [2,222], [4,223], [8,224], [16,225], [32,226]]:
if constants.id_base+idx in ctx.checked_locations:
special_build |= bit
else:
if specials&bit != 0:
ctx.locations_checked.add(constants.id_base+idx) # Do I need to do this?
dirty = True
if cleanslate:
self.sram_abstraction.fields.SR_Specials = special_build&sskeys
#self.sram_abstraction.stage(basis, [special_build])
fzl = ctx.slot_data.get("final_zone_last",0)
show_fz_key: bool = (fzl == 0)
if ctx.finished_game:
has_fz_key = True
show_fz_key = True
finish_game = False
if not ctx.finished_game and (
specials.bit_count() >= ctx.slot_data.get("specials_goal",6) # Special stags goal from yaml
and emeraldsset.bit_count() >= ctx.slot_data.get("emerald_goal", 6) # Emerald goal from yaml
and ringcount >= ctx.slot_data.get("ring_goal", 100) # Ring goal from yaml.
):
bg = ctx.slot_data.get("boss_goal", 6) # Boss goal from yaml
if bosses.bit_count() >= bg and (fzl in [0,1] or (fzl == 2 and bosses & 32)):
finish_game = True
has_fz_key = True
show_fz_key = True
sskeys |= 64 # bit to show victory
elif bosses.bit_count() >= bg - 1 and has_fz_key:
show_fz_key = True
if show_fz_key:
levelkeys |= 64 # bit for FZ
if ctx.slot_data.get("hard_mode", 0):
ringcount = 0
self.sram_abstraction.fields.SR_Emeralds = emeraldsset
self.sram_abstraction.fields.SR_BuffGoals = buffs[0]
self.sram_abstraction.fields.SR_BuffDisR = buffs[1]
self.sram_abstraction.fields.SR_RingsFound = ringcount
self.sram_abstraction.fields.SR_LevelGate = levelkeys
self.sram_abstraction.fields.SR_SSGate = sskeys
self.sram_abstraction.fields.SR_Invinc_in = invinc
self.sram_abstraction.fields.SR_Shield_in = shield
self.sram_abstraction.fields.SR_SpeedS_in = speeds
keys_of_interest = ['Invinc_out', 'Shield_out', 'SpeedS_out']
if ctx.slot_data['recv_death']:
keys_of_interest.extend(['DeathL_in', 'DeathL_out'])
if ctx.slot_data['send_death']:
keys_of_interest.append('Deaths')
for bk in keys_of_interest:
vn = f"SR_{bk}"
k = f"{ctx.slot}_{ctx.team}_sonic1_{bk.lower()}"
i_out = ctx.my_stored_data[k]
if not i_out:
i_out = 0
if i_out > self.sram_abstraction.fields[vn]:
self.sram_abstraction.fields[vn] = i_out
elif i_out < self.sram_abstraction.fields[vn]:
ctx.my_stored_data[k] = i_out = self.sram_abstraction.fields[vn]
await ctx.send_msgs([{"cmd": "Set", "key": k,
"default": 0, "want_reply": True,
"operations": [{"operation": "replace", "value": i_out}]}])
if k == f"{ctx.slot}_{ctx.team}_sonic1_deaths":
await ctx.send_death("Sonic died")
ctx.previous_deathlinks.add(ctx.last_death_link)
await self.sram_abstraction.commit(ctx)
if dirty:
await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(ctx.locations_checked)}]) # Or this?
#logger.info(f"{ctx.locations_checked=}")
if finish_game:
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
else:
seed_complaint = f"{seed_name}/{slot_id} =?= {ctx.remote_seed_name}/{ctx.slot}"
if ctx.complained_about_seed != seed_complaint:
logger.info(seed_complaint)
ctx.complained_about_seed = seed_complaint
#logger.info(f"{ctx.username=}")