import random import struct from typing import ByteString, List, Callable import json import pymem from pymem import pattern from pymem.exception import ProcessNotFound, ProcessError, MemoryReadError, WinAPIError from dataclasses import dataclass from CommonClient import logger from ..locs import (OrbLocations as Orbs, CellLocations as Cells, ScoutLocations as Flies, SpecialLocations as Specials, OrbCacheLocations as Caches) # Some helpful constants. sizeof_uint64 = 8 sizeof_uint32 = 4 sizeof_uint8 = 1 sizeof_float = 4 # IMPORTANT: OpenGOAL memory structures are particular about the alignment, in memory, of member elements according to # their size in bits. The address for an N-bit field must be divisible by N. Use this class to define the memory offsets # of important values in the struct. It will also do the byte alignment properly for you. # See https://opengoal.dev/docs/reference/type_system/#arrays @dataclass class OffsetFactory: current_offset: int = 0 def define(self, size: int, length: int = 1) -> int: # If necessary, align current_offset to the current size first. bytes_to_alignment = self.current_offset % size if bytes_to_alignment != 0: self.current_offset += (size - bytes_to_alignment) # Increment current_offset so the next definition can be made. offset_to_use = self.current_offset self.current_offset += (size * length) return offset_to_use # Start defining important memory address offsets here. They must be in the same order, have the same sizes, and have # the same lengths, as defined in `ap-info-jak1`. offsets = OffsetFactory() # Cell, Buzzer, and Special information. next_cell_index_offset = offsets.define(sizeof_uint64) next_buzzer_index_offset = offsets.define(sizeof_uint64) next_special_index_offset = offsets.define(sizeof_uint64) cells_checked_offset = offsets.define(sizeof_uint32, 101) buzzers_checked_offset = offsets.define(sizeof_uint32, 112) specials_checked_offset = offsets.define(sizeof_uint32, 32) buzzers_received_offset = offsets.define(sizeof_uint8, 16) specials_received_offset = offsets.define(sizeof_uint8, 32) # Deathlink information. death_count_offset = offsets.define(sizeof_uint32) death_cause_offset = offsets.define(sizeof_uint8) deathlink_enabled_offset = offsets.define(sizeof_uint8) # Move Rando information. next_orb_cache_index_offset = offsets.define(sizeof_uint64) orb_caches_checked_offset = offsets.define(sizeof_uint32, 16) moves_received_offset = offsets.define(sizeof_uint8, 16) moverando_enabled_offset = offsets.define(sizeof_uint8) # Orbsanity information. orbsanity_option_offset = offsets.define(sizeof_uint8) orbsanity_bundle_offset = offsets.define(sizeof_uint32) collected_bundle_offset = offsets.define(sizeof_uint32, 17) # Progression and Completion information. fire_canyon_unlock_offset = offsets.define(sizeof_float) mountain_pass_unlock_offset = offsets.define(sizeof_float) lava_tube_unlock_offset = offsets.define(sizeof_float) citizen_orb_amount_offset = offsets.define(sizeof_float) oracle_orb_amount_offset = offsets.define(sizeof_float) completion_goal_offset = offsets.define(sizeof_uint8) completed_offset = offsets.define(sizeof_uint8) # Text to display in the HUD (32 char max per string). their_item_name_offset = offsets.define(sizeof_uint8, 32) their_item_owner_offset = offsets.define(sizeof_uint8, 32) my_item_name_offset = offsets.define(sizeof_uint8, 32) my_item_finder_offset = offsets.define(sizeof_uint8, 32) # The End. end_marker_offset = offsets.define(sizeof_uint8, 4) # Can't believe this is easier to do in GOAL than Python but that's how it be sometimes. def as_float(value: int) -> int: return int(struct.unpack('f', value.to_bytes(sizeof_float, "little"))[0]) # "Jak" to be replaced by player name in the Client. def autopsy(cause: int) -> str: if cause in [1, 2, 3, 4]: return random.choice(["Jak said goodnight.", "Jak stepped into the light.", "Jak gave Daxter his insect collection.", "Jak did not follow Step 1."]) if cause == 5: return "Jak fell into an endless pit." if cause == 6: return "Jak drowned in the spicy water." if cause == 7: return "Jak tried to tackle a Lurker Shark." if cause == 8: return "Jak hit 500 degrees." if cause == 9: return "Jak took a bath in a pool of dark eco." if cause == 10: return "Jak got bombarded with flaming 30-ton boulders." if cause == 11: return "Jak hit 800 degrees." if cause == 12: return "Jak ceased to be." if cause == 13: return "Jak got eaten by the dark eco plant." if cause == 14: return "Jak burned up." if cause == 15: return "Jak hit the ground hard." if cause == 16: return "Jak crashed the zoomer." if cause == 17: return "Jak got Flut Flut hurt." if cause == 18: return "Jak poisoned the whole darn catch." if cause == 19: return "Jak collided with too many obstacles." return "Jak died." class JakAndDaxterMemoryReader: marker: ByteString goal_address = None connected: bool = False initiated_connect: bool = False # The memory reader just needs the game running. gk_process: pymem.process = None location_outbox: List[int] = [] outbox_index: int = 0 finished_game: bool = False # Deathlink handling deathlink_enabled: bool = False send_deathlink: bool = False cause_of_death: str = "" death_count: int = 0 # Orbsanity handling orbsanity_enabled: bool = False orbs_paid: int = 0 def __init__(self, marker: ByteString = b'UnLiStEdStRaTs_JaK1\x00'): self.marker = marker self.connect() async def main_tick(self, location_callback: Callable, finish_callback: Callable, deathlink_callback: Callable, deathlink_toggle: Callable, paid_orbs_callback: Callable): if self.initiated_connect: await self.connect() self.initiated_connect = False if self.connected: try: self.gk_process.read_bool(self.gk_process.base_address) # Ping to see if it's alive. except (ProcessError, MemoryReadError, WinAPIError): logger.error("The gk process has died. Restart the game and run \"/memr connect\" again.") self.connected = False else: return # Save some state variables temporarily. old_deathlink_enabled = self.deathlink_enabled # Read the memory address to check the state of the game. self.read_memory() # Checked Locations in game. Handle the entire outbox every tick until we're up to speed. if len(self.location_outbox) > self.outbox_index: location_callback(self.location_outbox) self.save_data() self.outbox_index += 1 if self.finished_game: finish_callback() if old_deathlink_enabled != self.deathlink_enabled: deathlink_toggle() logger.debug("Toggled DeathLink " + ("ON" if self.deathlink_enabled else "OFF")) if self.send_deathlink: deathlink_callback() if self.orbs_paid > 0: paid_orbs_callback(self.orbs_paid) self.orbs_paid = 0 async def connect(self): try: self.gk_process = pymem.Pymem("gk.exe") # The GOAL Kernel logger.info("Found the gk process: " + str(self.gk_process.process_id)) except ProcessNotFound: logger.error("Could not find the gk process.") self.connected = False return # If we don't find the marker in the first loaded module, we've failed. modules = list(self.gk_process.list_modules()) marker_address = pattern.pattern_scan_module(self.gk_process.process_handle, modules[0], self.marker) if marker_address: # At this address is another address that contains the struct we're looking for: the game's state. # From here we need to add the length in bytes for the marker and 4 bytes of padding, # and the struct address is 8 bytes long (it's an uint64). goal_pointer = marker_address + len(self.marker) + 4 self.goal_address = int.from_bytes(self.gk_process.read_bytes(goal_pointer, sizeof_uint64), byteorder="little", signed=False) logger.info("Found the archipelago memory address: " + str(self.goal_address)) self.connected = True else: logger.error("Could not find the archipelago memory address.") self.connected = False if self.connected: logger.info("The Memory Reader is ready!") def print_status(self): logger.info("Memory Reader Status:") logger.info(" Game process ID: " + (str(self.gk_process.process_id) if self.gk_process else "None")) logger.info(" Game state memory address: " + str(self.goal_address)) logger.info(" Last location checked: " + (str(self.location_outbox[self.outbox_index - 1]) if self.outbox_index else "None")) def read_memory(self) -> List[int]: try: # Need to grab these first and convert to floats, see below. citizen_orb_amount = self.read_goal_address(citizen_orb_amount_offset, sizeof_float) oracle_orb_amount = self.read_goal_address(oracle_orb_amount_offset, sizeof_float) next_cell_index = self.read_goal_address(next_cell_index_offset, sizeof_uint64) for k in range(0, next_cell_index): next_cell = self.read_goal_address(cells_checked_offset + (k * sizeof_uint32), sizeof_uint32) cell_ap_id = Cells.to_ap_id(next_cell) if cell_ap_id not in self.location_outbox: self.location_outbox.append(cell_ap_id) logger.debug("Checked power cell: " + str(next_cell)) # If orbsanity is ON and next_cell is one of the traders or oracles, then run a callback # to add their amount to the DataStorage value holding our current orb trade total. if next_cell in {11, 12, 31, 32, 33, 96, 97, 98, 99}: citizen_orb_amount = as_float(citizen_orb_amount) self.orbs_paid += citizen_orb_amount logger.debug(f"Traded {citizen_orb_amount} orbs!") if next_cell in {13, 14, 34, 35, 100, 101}: oracle_orb_amount = as_float(oracle_orb_amount) self.orbs_paid += oracle_orb_amount logger.debug(f"Traded {oracle_orb_amount} orbs!") next_buzzer_index = self.read_goal_address(next_buzzer_index_offset, sizeof_uint64) for k in range(0, next_buzzer_index): next_buzzer = self.read_goal_address(buzzers_checked_offset + (k * sizeof_uint32), sizeof_uint32) buzzer_ap_id = Flies.to_ap_id(next_buzzer) if buzzer_ap_id not in self.location_outbox: self.location_outbox.append(buzzer_ap_id) logger.debug("Checked scout fly: " + str(next_buzzer)) next_special_index = self.read_goal_address(next_special_index_offset, sizeof_uint64) for k in range(0, next_special_index): next_special = self.read_goal_address(specials_checked_offset + (k * sizeof_uint32), sizeof_uint32) special_ap_id = Specials.to_ap_id(next_special) if special_ap_id not in self.location_outbox: self.location_outbox.append(special_ap_id) logger.debug("Checked special: " + str(next_special)) death_count = self.read_goal_address(death_count_offset, sizeof_uint32) death_cause = self.read_goal_address(death_cause_offset, sizeof_uint8) if death_count > self.death_count: self.cause_of_death = autopsy(death_cause) # The way he names his variables? Wack! self.send_deathlink = True self.death_count += 1 # Listen for any changes to this setting. deathlink_flag = self.read_goal_address(deathlink_enabled_offset, sizeof_uint8) self.deathlink_enabled = bool(deathlink_flag) next_cache_index = self.read_goal_address(next_orb_cache_index_offset, sizeof_uint64) for k in range(0, next_cache_index): next_cache = self.read_goal_address(orb_caches_checked_offset + (k * sizeof_uint32), sizeof_uint32) cache_ap_id = Caches.to_ap_id(next_cache) if cache_ap_id not in self.location_outbox: self.location_outbox.append(cache_ap_id) logger.debug("Checked orb cache: " + str(next_cache)) # Listen for any changes to this setting. # moverando_flag = self.read_goal_address(moverando_enabled_offset, sizeof_uint8) # self.moverando_enabled = bool(moverando_flag) orbsanity_option = self.read_goal_address(orbsanity_option_offset, sizeof_uint8) bundle_size = self.read_goal_address(orbsanity_bundle_offset, sizeof_uint32) self.orbsanity_enabled = orbsanity_option > 0 # Per Level Orbsanity option. Only need to do this loop if we chose this setting. if orbsanity_option == 1: for level in range(0, 16): collected_bundles = self.read_goal_address(collected_bundle_offset + (level * sizeof_uint32), sizeof_uint32) # Count up from the first bundle, by bundle size, until you reach the latest collected bundle. # e.g. {25, 50, 75, 100, 125...} if collected_bundles > 0: for bundle in range(bundle_size, bundle_size + collected_bundles, # Range max is non-inclusive. bundle_size): bundle_ap_id = Orbs.to_ap_id(Orbs.find_address(level, bundle, bundle_size)) if bundle_ap_id not in self.location_outbox: self.location_outbox.append(bundle_ap_id) logger.debug("Checked orb bundle: " + str(bundle_ap_id)) # Global Orbsanity option. Index 16 refers to all orbs found regardless of level. if orbsanity_option == 2: collected_bundles = self.read_goal_address(collected_bundle_offset + (16 * sizeof_uint32), sizeof_uint32) if collected_bundles > 0: for bundle in range(bundle_size, bundle_size + collected_bundles, # Range max is non-inclusive. bundle_size): bundle_ap_id = Orbs.to_ap_id(Orbs.find_address(16, bundle, bundle_size)) if bundle_ap_id not in self.location_outbox: self.location_outbox.append(bundle_ap_id) logger.debug("Checked orb bundle: " + str(bundle_ap_id)) completed = self.read_goal_address(completed_offset, sizeof_uint8) if completed > 0 and not self.finished_game: self.finished_game = True logger.info("Congratulations! You finished the game!") except (ProcessError, MemoryReadError, WinAPIError): logger.error("The gk process has died. Restart the game and run \"/memr connect\" again.") self.connected = False return self.location_outbox def read_goal_address(self, offset: int, length: int) -> int: return int.from_bytes( self.gk_process.read_bytes(self.goal_address + offset, length), byteorder="little", signed=False) def save_data(self): with open("jakanddaxter_location_outbox.json", "w+") as f: dump = { "outbox_index": self.outbox_index, "location_outbox": self.location_outbox } json.dump(dump, f, indent=4) def load_data(self): try: with open("jakanddaxter_location_outbox.json", "r") as f: load = json.load(f) self.outbox_index = load["outbox_index"] self.location_outbox = load["location_outbox"] except FileNotFoundError: pass