KH2: Deathlink and ingame item popups (#5206)

---------

Co-authored-by: qwint <qwint.42@gmail.com>
Co-authored-by: Delilah <lindsaydiane@gmail.com>
This commit is contained in:
JaredWeakStrike
2026-01-28 01:10:29 -05:00
committed by GitHub
parent 65ef35f1b4
commit a6740e7be3
16 changed files with 1705 additions and 1066 deletions

View File

@@ -0,0 +1,95 @@
from CommonClient import ClientCommandProcessor
from typing import TYPE_CHECKING
# I don't know what is going on here, but it works.
if TYPE_CHECKING:
from . import KH2Context
else:
KH2Context = object
class KH2CommandProcessor(ClientCommandProcessor):
ctx: KH2Context
def _cmd_receive_notif(self, notification_type=""):
"""Change receive notification type.Valid Inputs:Puzzle, Info, Chest and None
Puzzle: Puzzle Piece Popup when you receive an item.
Info: Displays the Information notification when you receive an item.
Chest: Displays the Chest notification when you receive an item.
None: Toggle off any of the receiving notifications.
"""
notification_type = notification_type.lower()
if notification_type in {"puzzle", "info", "chest", "none"}:
temp_client_settings = self.ctx.client_settings["receive_popup_type"]
self.ctx.client_settings["receive_popup_type"] = notification_type
self.output(f"Changed receive notification type from {temp_client_settings} to {self.ctx.client_settings['receive_popup_type']}")
else:
self.output(f"Unknown receive notification type:{notification_type}. Valid Inputs: Puzzle, Info, Chest, None")
def _cmd_send_notif(self, notification_type=""):
"""Change send notification type.Valid Inputs:Puzzle, Info, Chest and None
Puzzle: Puzzle Piece Popup when you send an item.
Info: Displays the Information notification when you send an item.
Chest: Displays the Chest notification when you send an item.
None: Toggle off any of the receiving notifications.
"""
notification_type = notification_type.lower()
if notification_type in {"puzzle", "info", "chest", "none"}:
temp_client_settings = self.ctx.client_settings["send_popup_type"]
self.ctx.client_settings["send_popup_type"] = notification_type
# doing it in this order to make sure it actually changes
self.output(f"Changed send notification type from {temp_client_settings} to {self.ctx.client_settings['send_popup_type']}")
else:
self.output(f"Unknown send notification type:{notification_type}. Valid Inputs: Puzzle, Info, Chest, None")
def _cmd_change_send_truncation_priority(self, priority=""):
"""Change what gets truncated first when using Chest or Puzzle piece send notification. Playername min is 5 and ItemName is 15"""
priority = priority.lower()
if priority in {"playername", "itemname"}:
temp_client_settings = self.ctx.client_settings["send_truncate_first"]
self.ctx.client_settings["send_truncate_first"] = priority
self.output(f"Changed receive notification type truncation from {temp_client_settings} to {self.ctx.client_settings['send_truncate_first']}")
else:
self.output(f"Unknown priority: {priority}. Valid Inputs: PlayerName, ItemName")
def _cmd_change_receive_truncation_priority(self, priority=""):
"""Change what gets truncated first when using Chest or Puzzle piece receive notification. Playername min is 5 and ItemName is 15"""
priority = priority.lower()
if priority in {"playername", "itemname"}:
temp_client_settings = self.ctx.client_settings["receive_truncate_first"]
self.ctx.client_settings["receive_truncate_first"] = priority
self.output(f"Changed receive notification truncation type from {temp_client_settings} to {self.ctx.client_settings['receive_truncate_first']}")
else:
self.output(f"Unknown priority: {priority}. Valid Inputs: PlayerName, ItemName")
def _cmd_deathlink(self):
"""Toggles Deathlink"""
if self.ctx.deathlink_toggle:
# self.ctx.tags.add("DeathLink")
self.ctx.deathlink_toggle = False
self.output(f"Death Link turned off")
else:
self.ctx.deathlink_toggle = True
self.output(f"Death Link turned on")
def _cmd_add_to_blacklist(self, player_name: str = ""):
"""Adds player to deathlink blacklist"""
if player_name not in self.ctx.deathlink_blacklist:
self.ctx.deathlink_blacklist.append(player_name)
def _cmd_remove_from_blacklist(self, player_name: str = ""):
"""Removes player from the deathlink blacklist"""
if player_name in self.ctx.deathlink_blacklist:
self.ctx.deathlink_blacklist.remove(player_name)
#def _cmd_kill(self):
# self.ctx.kh2_write_byte(0x810000, 1)
#def _cmd_chest(self,itemid:int):
# from .RecieveItems import to_khscii
# from .ReadAndWrite import kh2_write_bytes,kh2_write_byte
# displayed_string = to_khscii(self.ctx,"Yessir")
#
# kh2_write_byte(self.ctx, 0x800150, int(itemid))
# kh2_write_bytes(self.ctx, address = 0x800154,value = displayed_string)
# kh2_write_byte(self.ctx, 0x800000, 3)

View File

@@ -0,0 +1,754 @@
from __future__ import annotations
import ModuleUpdate
import Utils
ModuleUpdate.update()
import os
import asyncio
import json
import requests
from pymem import pymem
from worlds.kh2 import item_dictionary_table, exclusion_item_table, CheckDupingItems, all_locations, exclusion_table, \
SupportAbility_Table, ActionAbility_Table, all_weapon_slot
from worlds.kh2.Names import ItemName
from .WorldLocations import *
from NetUtils import ClientStatus, NetworkItem
from CommonClient import gui_enabled, logger, get_base_parser, CommonContext, server_loop
from .CMDProcessor import KH2CommandProcessor
from .SendChecks import finishedGame
class KH2Context(CommonContext):
command_processor = KH2CommandProcessor
game = "Kingdom Hearts 2"
items_handling = 0b111 # Indicates you get items sent from other worlds.
def __init__(self, server_address, password):
super(KH2Context, self).__init__(server_address, password)
self.goofy_ability_to_slot = dict()
self.donald_ability_to_slot = dict()
self.all_weapon_location_id = None
self.sora_ability_to_slot = dict()
self.kh2_seed_save = None
self.kh2_local_items = None
self.growthlevel = None
self.kh2connected = False
self.kh2_finished_game = False
self.serverconnected = False
self.item_name_to_data = {name: data for name, data, in item_dictionary_table.items()}
self.location_name_to_data = {name: data for name, data, in all_locations.items()}
self.kh2_data_package = {}
self.kh2_loc_name_to_id = None
self.kh2_item_name_to_id = None
self.lookup_id_to_item = None
self.lookup_id_to_location = None
self.sora_ability_dict = {k: v.quantity for dic in [SupportAbility_Table, ActionAbility_Table] for k, v in
dic.items()}
self.location_name_to_worlddata = {name: data for name, data, in all_world_locations.items()}
self.slot_name = None
self.disconnect_from_server = False
self.sending = []
# queue for the strings to display on the screen
self.queued_puzzle_popup = []
self.queued_info_popup = []
self.queued_chest_popup = []
# special characters for printing in game
# A dictionary of all the special characters, which
# are hard to convert through a mathematical formula.
self.special_dict = {
' ': 0x01, '\n': 0x02, '-': 0x54, '!': 0x48, '?': 0x49, '%': 0x4A, '/': 0x4B,
'.': 0x4F, ',': 0x50, ';': 0x51, ':': 0x52, '\'': 0x57, '(': 0x5A, ')': 0x5B,
'[': 0x62, ']': 0x63, 'à': 0xB7, 'á': 0xB8, 'â': 0xB9, 'ä': 0xBA, 'è': 0xBB,
'é': 0xBC, 'ê': 0xBD, 'ë': 0xBE, 'ì': 0xBF, 'í': 0xC0, 'î': 0xC1, 'ï': 0xC2,
'ñ': 0xC3, 'ò': 0xC4, 'ó': 0xC5, 'ô': 0xC6, 'ö': 0xC7, 'ù': 0xC8, 'ú': 0xC9,
'û': 0xCA, 'ü': 0xCB, 'ç': 0xE8, 'À': 0xD0, 'Á': 0xD1, 'Â': 0xD2, 'Ä': 0xD3,
'È': 0xD4, 'É': 0xD5, 'Ê': 0xD6, 'Ë': 0xD7, 'Ì': 0xD8, 'Í': 0xD9, 'Î': 0xDA,
'Ï': 0xDB, 'Ñ': 0xDC, 'Ò': 0xDD, 'Ó': 0xDE, 'Ô': 0xDF, 'Ö': 0xE0, 'Ù': 0xE1,
'Ú': 0xE2, 'Û': 0xE3, 'Ü': 0xE4, '¡': 0xE5, '¿': 0xE6, 'Ç': 0xE7
}
# list used to keep track of locations+items player has. Used for disoneccting
self.kh2_seed_save_cache = {
"itemIndex": -1,
# back of soras invo is 0x25E2. Growth should be moved there
# Character: [back of invo, front of invo]
"SoraInvo": [0x25D8, 0x2546],
"DonaldInvo": [0x26F4, 0x2658],
"GoofyInvo": [0x2808, 0x276C],
"AmountInvo": {
"Ability": {},
"Amount": {
"Bounty": 0,
},
"Growth": {
"High Jump": 0, "Quick Run": 0, "Dodge Roll": 0,
"Aerial Dodge": 0, "Glide": 0
},
"Bitmask": [],
"Weapon": {"Sora": [], "Donald": [], "Goofy": []},
"Equipment": {}, # ItemName: Amount
"Magic": {
"Fire Element": 0,
"Blizzard Element": 0,
"Thunder Element": 0,
"Cure Element": 0,
"Magnet Element": 0,
"Reflect Element": 0
},
"StatIncrease": {
ItemName.MaxHPUp: 0,
ItemName.MaxMPUp: 0,
ItemName.DriveGaugeUp: 0,
ItemName.ArmorSlotUp: 0,
ItemName.AccessorySlotUp: 0,
ItemName.ItemSlotUp: 0,
},
},
}
self.kh2seedname = None
self.kh2_seed_save_path_join = None
self.kh2slotdata = None
self.mem_json = None
self.itemamount = {}
self.client_settings = {
"send_truncate_first": "playername", # there is no need to truncate item names for info popup
"receive_truncate_first": "playername", # truncation order. Can be PlayerName or ItemName
"send_popup_type": "chest", # type of popup when you receive an item
"receive_popup_type": "chest", # can be Puzzle, Info, Chest or None
}
if "localappdata" in os.environ:
self.game_communication_path = os.path.expandvars(r"%localappdata%\KH2AP")
self.kh2_client_settings = f"kh2_client_settings.json"
self.kh2_client_settings_join = os.path.join(self.game_communication_path, self.kh2_client_settings)
if not os.path.exists(self.game_communication_path):
os.makedirs(self.game_communication_path)
if not os.path.exists(self.kh2_client_settings_join):
# make the json with the settings
with open(self.kh2_client_settings_join, "wt") as f:
f.close()
elif os.path.exists(self.kh2_client_settings_join):
with open(self.kh2_client_settings_join) as f:
# if the file isnt empty load it
# this is the best I could fine to valid json stuff https://stackoverflow.com/questions/23344948/validate-and-format-json-files
try:
self.kh2_seed_save = json.load(f)
except json.decoder.JSONDecodeError:
pass
# this is what is effectively doing on
# self.client_settings = default
f.close()
self.hitlist_bounties = 0
# hooked object
self.kh2 = None
self.final_xemnas = False
self.worldid_to_locations = {
# 1: {}, # world of darkness (story cutscenes)
2: TT_Checks,
# 3: {}, # destiny island doesn't have checks
4: HB_Checks,
5: BC_Checks,
6: Oc_Checks,
7: AG_Checks,
8: LoD_Checks,
9: HundredAcreChecks,
10: PL_Checks,
11: Atlantica_Checks,
12: DC_Checks,
13: TR_Checks,
14: HT_Checks,
15: HB_Checks, # world map, but you only go to the world map while on the way to goa so checking hb
16: PR_Checks,
17: SP_Checks,
18: TWTNW_Checks,
# 255: {}, # starting screen
}
#Sora,Donald and Goofy are always in your party
self.WorldIDtoParty = {
4: "Beast",
6: "Auron",
7: "Aladdin",
8: "Mulan",
10: "Simba",
14: "Jack Skellington",
16: "Jack Sparrow",
17: "Tron",
18: "Riku"
}
self.last_world_int = -1
# PC Address anchors
# epic .10 addresses
self.Now = 0x0716DF8
self.Save = 0x9A9330
self.Journal = 0x743260
self.Shop = 0x743350
self.Slot1 = 0x2A23018
self.InfoBarPointer = 0xABE2A8
self.isDead = 0x0BEEF28
self.FadeStatus = 0xABAF38
self.PlayerGaugePointer = 0x0ABCCC8
self.kh2_game_version = None # can be egs or steam
self.kh2_seed_save_path = None
self.chest_set = set(exclusion_table["Chests"])
self.keyblade_set = set(CheckDupingItems["Weapons"]["Keyblades"])
self.staff_set = set(CheckDupingItems["Weapons"]["Staffs"])
self.shield_set = set(CheckDupingItems["Weapons"]["Shields"])
self.all_weapons = self.keyblade_set.union(self.staff_set).union(self.shield_set)
self.equipment_categories = CheckDupingItems["Equipment"]
self.armor_set = set(self.equipment_categories["Armor"])
self.accessories_set = set(self.equipment_categories["Accessories"])
self.all_equipment = self.armor_set.union(self.accessories_set)
self.CharacterAnchors = {
"Sora": 0x24F0,
"Donald": 0x2604,
"Goofy": 0x2718,
"Auron": 0x2940,
"Mulan": 0x2A54,
"Aladdin": 0x2B68,
"Jack Sparrow": 0x2C7C,
"Beast": 0x2D90,
"Jack Skellington": 0x2EA4,
"Simba": 0x2FB8,
"Tron": 0x30CC,
"Riku": 0x31E0
}
self.Equipment_Anchor_Dict = {
#Sora, Donald, Goofy in that order
# each slot is a short, Sora Anchor:0x24F0, Donald Anchor: 0x2604, Goofy Anchor: 0x2718
# Each of these has 8 slots that could have them no matter how many slots are unlocked
# If Ability Ring on slot 5 of sora
# ReadShort(Save+CharacterAnchors["Sora"]+Equiptment_Anchor["Accessories][4 (index 5)]) == self.item_name_to_data[item_name].memaddr
"Armor": [0x14, 0x16, 0x18, 0x1A, 0x1C, 0x1E, 0x20, 0x22],
"Accessories": [0x24, 0x26, 0x28, 0x2A, 0x2C, 0x2E, 0x30, 0x32]
}
self.AbilityQuantityDict = {}
self.ability_categories = CheckDupingItems["Abilities"]
self.sora_ability_set = set(self.ability_categories["Sora"])
self.donald_ability_set = set(self.ability_categories["Donald"])
self.goofy_ability_set = set(self.ability_categories["Goofy"])
self.all_abilities = self.sora_ability_set.union(self.donald_ability_set).union(self.goofy_ability_set)
self.stat_increase_set = set(CheckDupingItems["Stat Increases"])
self.AbilityQuantityDict = {item: self.item_name_to_data[item].quantity for item in self.all_abilities}
# Growth:[level 1,level 4,slot]
self.growth_values_dict = {
"High Jump": [0x05E, 0x061, 0x25DA],
"Quick Run": [0x62, 0x65, 0x25DC],
"Dodge Roll": [0x234, 0x237, 0x25DE],
"Aerial Dodge": [0x66, 0x069, 0x25E0],
"Glide": [0x6A, 0x6D, 0x25E2]
}
self.ability_code_list = None
self.master_growth = {"High Jump", "Quick Run", "Dodge Roll", "Aerial Dodge", "Glide"}
self.base_hp = 20
self.base_mp = 100
self.base_drive = 5
self.base_accessory_slots = 1
self.base_armor_slots = 1
self.base_item_slots = 3
self.front_ability_slots = [0x2546, 0x2658, 0x276C, 0x2548, 0x254A, 0x254C, 0x265A, 0x265C, 0x265E, 0x276E,
0x2770, 0x2772]
self.deathlink_toggle = False
self.deathlink_blacklist = []
from .ReadAndWrite import kh2_read_longlong, kh2_read_int, kh2_read_string, kh2_read_byte, kh2_write_bytes, kh2_write_int, kh2_write_short, kh2_write_byte, kh2_read_short, kh2_return_base_address
from .SendChecks import checkWorldLocations, checkSlots, checkLevels, verifyChests, verifyLevel
from .RecieveItems import displayPuzzlePieceTextinGame, displayInfoTextinGame, displayChestTextInGame, verifyItems, give_item, IsInShop, to_khscii
async def server_auth(self, password_requested: bool = False):
if password_requested and not self.password:
await super(KH2Context, self).server_auth(password_requested)
await self.get_username()
# if slot name != first time login or previous name
# and seed name is none or saved seed name
if not self.slot_name and not self.kh2seedname:
await self.send_connect()
elif self.slot_name == self.auth and self.kh2seedname:
await self.send_connect()
else:
logger.info(f"You are trying to connect with data still cached in the client. Close client or connect to the correct slot: {self.slot_name}")
self.serverconnected = False
self.disconnect_from_server = True
# to not softlock the client when you connect to the wrong slot/game
def event_invalid_slot(self):
self.kh2seedname = None
CommonContext.event_invalid_slot(self)
def event_invalid_game(self):
self.kh2seedname = None
CommonContext.event_invalid_slot(self)
async def connection_closed(self):
self.kh2connected = False
self.serverconnected = False
if self.kh2seedname is not None and self.auth is not None:
with open(self.kh2_seed_save_path_join, 'w') as f:
f.write(json.dumps(self.kh2_seed_save, indent=4))
f.close()
await super(KH2Context, self).connection_closed()
async def disconnect(self, allow_autoreconnect: bool = False):
self.kh2connected = False
self.serverconnected = False
self.locations_checked = []
if self.kh2seedname not in {None} and self.auth not in {None}:
with open(self.kh2_seed_save_path_join, 'w') as f:
f.write(json.dumps(self.kh2_seed_save, indent=4))
f.close()
await super(KH2Context, self).disconnect()
@property
def endpoints(self):
if self.server:
return [self.server]
else:
return []
async def shutdown(self):
if self.kh2seedname not in {None} and self.auth not in {None}:
with open(self.kh2_seed_save_path_join, 'w') as f:
f.write(json.dumps(self.kh2_seed_save, indent=4))
f.close()
with open(self.kh2_client_settings_join, 'w') as f2:
f2.write(json.dumps(self.client_settings, indent=4))
f2.close()
await super(KH2Context, self).shutdown()
def on_package(self, cmd: str, args: dict):
if cmd == "RoomInfo":
if not self.kh2seedname:
self.kh2seedname = args['seed_name']
elif self.kh2seedname != args['seed_name']:
self.disconnect_from_server = True
self.serverconnected = False
self.kh2connected = False
logger.info("Connection to the wrong seed, connect to the correct seed or close the client.")
return
self.kh2_seed_save_path = f"kh2save2{self.kh2seedname}{self.auth}.json"
self.kh2_seed_save_path_join = os.path.join(self.game_communication_path, Utils.get_file_safe_name(self.kh2_seed_save_path))
if not os.path.exists(self.kh2_seed_save_path_join):
self.kh2_seed_save = {
"Levels": {
"SoraLevel": 0,
"ValorLevel": 0,
"WisdomLevel": 0,
"LimitLevel": 0,
"MasterLevel": 0,
"FinalLevel": 0,
"SummonLevel": 0,
},
# Item: Amount of them sold
"SoldEquipment": dict(),
}
with open(self.kh2_seed_save_path_join, 'wt') as f:
f.close()
elif os.path.exists(self.kh2_seed_save_path_join):
with open(self.kh2_seed_save_path_join) as f:
try:
self.kh2_seed_save = json.load(f)
except json.decoder.JSONDecodeError:
self.kh2_seed_save = None
if self.kh2_seed_save is None or self.kh2_seed_save == {}:
self.kh2_seed_save = {
"Levels": {
"SoraLevel": 0,
"ValorLevel": 0,
"WisdomLevel": 0,
"LimitLevel": 0,
"MasterLevel": 0,
"FinalLevel": 0,
"SummonLevel": 0,
},
# Item: Amount of them sold
"SoldEquipment": dict(),
}
f.close()
if cmd == "Connected":
self.kh2slotdata = args['slot_data']
self.kh2_data_package = Utils.load_data_package_for_checksum(
"Kingdom Hearts 2", self.checksums["Kingdom Hearts 2"])
if "location_name_to_id" in self.kh2_data_package:
self.data_package_kh2_cache(
self.kh2_data_package["location_name_to_id"], self.kh2_data_package["item_name_to_id"])
self.connect_to_game()
else:
asyncio.create_task(self.send_msgs([{"cmd": "GetDataPackage", "games": ["Kingdom Hearts 2"]}]))
self.locations_checked = set(args["checked_locations"])
if cmd == "ReceivedItems":
# Sora Front of Ability List:0x2546
# Donald Front of Ability List:0x2658
# Goofy Front of Ability List:0x276A
start_index = args["index"]
if start_index == 0:
self.kh2_seed_save_cache = {
"itemIndex": -1,
# back of soras invo is 0x25E2. Growth should be moved there
# Character: [back of invo, front of invo]
"SoraInvo": [0x25D8, 0x2546],
"DonaldInvo": [0x26F4, 0x2658],
"GoofyInvo": [0x2808, 0x276C],
"AmountInvo": {
"Ability": {},
"Amount": {
"Bounty": 0,
},
"Growth": {
"High Jump": 0, "Quick Run": 0, "Dodge Roll": 0,
"Aerial Dodge": 0, "Glide": 0
},
"Bitmask": [],
"Weapon": {"Sora": [], "Donald": [], "Goofy": []},
"Equipment": {}, # ItemName: Amount
"Magic": {
"Fire Element": 0,
"Blizzard Element": 0,
"Thunder Element": 0,
"Cure Element": 0,
"Magnet Element": 0,
"Reflect Element": 0
},
"StatIncrease": {
ItemName.MaxHPUp: 0,
ItemName.MaxMPUp: 0,
ItemName.DriveGaugeUp: 0,
ItemName.ArmorSlotUp: 0,
ItemName.AccessorySlotUp: 0,
ItemName.ItemSlotUp: 0,
},
},
}
if start_index > self.kh2_seed_save_cache["itemIndex"] and self.serverconnected:
self.kh2_seed_save_cache["itemIndex"] = start_index
for item in args['items']:
networkItem = NetworkItem(*item)
# actually give player the item
asyncio.create_task(self.give_item(networkItem.item, networkItem.location))
if cmd == "RoomUpdate":
if "checked_locations" in args:
new_locations = set(args["checked_locations"])
self.locations_checked |= new_locations
if cmd == "DataPackage":
if "Kingdom Hearts 2" in args["data"]["games"]:
self.data_package_kh2_cache(
args["data"]["games"]["Kingdom Hearts 2"]["location_name_to_id"],
args["data"]["games"]["Kingdom Hearts 2"]["item_name_to_id"])
self.connect_to_game()
asyncio.create_task(self.send_msgs([{'cmd': 'Sync'}]))
if cmd == "PrintJSON":
# shamelessly stolen from kh1
if args.get("type") == "ItemSend":
item = args["item"]
networkItem = NetworkItem(*item)
itemId = networkItem.item
receiverID = args["receiving"]
senderID = networkItem.player
receive_popup_type = self.client_settings["receive_popup_type"].lower()
send_popup_type = self.client_settings["send_popup_type"].lower()
receive_truncate_first = self.client_settings["receive_truncate_first"].lower()
send_truncate_first = self.client_settings["send_truncate_first"].lower()
# checking if sender is the kh2 player, and you aren't sending yourself the item
if receiverID == self.slot and senderID != self.slot: # item is sent to you and is not from yourself
itemName = self.item_names.lookup_in_game(itemId)
playerName = self.player_names[networkItem.player] # player that sent you the item
totalLength = len(itemName) + len(playerName)
if receive_popup_type == "info": # no restrictions on size here
temp_length = f"Obtained {itemName} from {playerName}"
if totalLength > 90:
self.queued_info_popup += [temp_length[:90]] # slice it to be 90
else:
self.queued_info_popup += [temp_length]
else: # either chest or puzzle. they are handled the same length wise
totalLength = len(itemName) + len(playerName)
while totalLength > 25:
if receive_truncate_first == "playername":
if len(playerName) > 5:
playerName = playerName[:-1]
else:
itemName = itemName[:-1]
else:
if len(ItemName) > 15:
itemName = itemName[:-1]
else:
playerName = playerName[:-1]
totalLength = len(itemName) + len(playerName)
# from =6. totalLength of the string cant be over 31 or game crash
if receive_popup_type == "puzzle": # sanitize ItemName and receiver name
self.queued_puzzle_popup += [f"{itemName} from {playerName}"]
else:
self.queued_chest_popup += [f"{itemName} from {playerName}"]
if receiverID != self.slot and senderID == self.slot: #item is sent to other players
itemName = self.item_names.lookup_in_slot(itemId, receiverID)
playerName = self.player_names[receiverID]
totalLength = len(itemName) + len(playerName)
if send_popup_type == "info":
if totalLength > 90:
temp_length = f"Sent {itemName} to {playerName}"
self.queued_info_popup += [temp_length[:90]] #slice it to be 90
else:
self.queued_info_popup += [f"Sent {itemName} to {playerName}"]
else: # else chest or puzzle. they are handled the same length wise
while totalLength > 27:
if send_truncate_first == "playername":
if len(playerName) > 5: #limit player name to at least be 5 characters
playerName = playerName[:-1]
else:
itemName = itemName[:-1]
else:
if len(ItemName) > 15: # limit item name to at least be 15 characters
itemName = itemName[:-1]
else:
playerName = playerName[:-1]
totalLength = len(itemName) + len(playerName)
if send_popup_type == "puzzle":
# to = 4 totalLength of the string cant be over 31 or game crash
self.queued_puzzle_popup += [f"{itemName} to {playerName}"]
else:
self.queued_chest_popup += [f"{itemName} to {playerName}"]
def connect_to_game(self):
if "KeybladeAbilities" in self.kh2slotdata.keys():
# sora ability to slot
self.AbilityQuantityDict.update(self.kh2slotdata["KeybladeAbilities"])
# itemid:[slots that are available for that item]
self.AbilityQuantityDict.update(self.kh2slotdata["StaffAbilities"])
self.AbilityQuantityDict.update(self.kh2slotdata["ShieldAbilities"])
self.all_weapon_location_id = {self.kh2_loc_name_to_id[loc] for loc in all_weapon_slot}
try:
if not self.kh2:
self.kh2 = pymem.Pymem(process_name="KINGDOM HEARTS II FINAL MIX")
self.get_addresses()
except Exception as e:
if self.kh2connected:
self.kh2connected = False
logger.info("Game is not open. If it is open run the launcher/client as admin.")
self.serverconnected = True
self.slot_name = self.auth
def data_package_kh2_cache(self, loc_to_id, item_to_id):
self.kh2_loc_name_to_id = loc_to_id
self.lookup_id_to_location = {v: k for k, v in self.kh2_loc_name_to_id.items()}
self.kh2_item_name_to_id = item_to_id
self.lookup_id_to_item = {v: k for k, v in self.kh2_item_name_to_id.items()}
self.ability_code_list = [self.kh2_item_name_to_id[item] for item in exclusion_item_table["Ability"]]
def on_deathlink(self, data: typing.Dict[str, typing.Any]) -> None:
"""Gets dispatched when a new DeathLink is triggered by another linked player."""
if data["source"] not in self.deathlink_blacklist:
self.last_death_link = max(data["time"], self.last_death_link)
text = data.get("cause", "")
if text:
logger.info(f"DeathLink: {text}")
else:
logger.info(f"DeathLink: Received from {data['source']}")
# kills sora by setting flag for the lua to read
self.kh2_write_byte(0x810000, 1)
async def is_dead(self):
# General Death link logic: if hp is 0 and sora has 5 drive gauge and deathlink flag isnt set
# if deathlink is on and script is hasnt killed sora and sora isnt dead
if self.deathlink_toggle and self.kh2_read_byte(0x810000) == 0 and self.kh2_read_byte(0x810001) != 0:
# set deathlink flag so it doesn't send out bunch
# basically making the game think it got its death from a deathlink instead of from the game
self.kh2_write_byte(0x810000, 0)
# 0x810001 is set to 1 when you die via the goa script. This is done because the polling rate for the client can miss a death
# but the lua script runs eveery frame so we cant miss them now
self.kh2_write_byte(0x810001, 0)
#todo: read these from the goa lua instead since the deathlink is after they contiune which means that its just before they would've gotten into the fight
Room = self.kh2_read_byte(0x810002)
Event = self.kh2_read_byte(0x810003)
World = self.kh2_read_byte(0x810004)
if (World, Room, Event) in DeathLinkPair.keys():
logger.info(f"Deathlink: {self.player_names[self.slot]} died to {DeathLinkPair[(World,Room, Event)]}.")
await self.send_death(death_text=f"{self.player_names[self.slot]} died to {DeathLinkPair[(World,Room, Event)]}.")
else:
logger.info(f"Deathlink: {self.player_names[self.slot]} lost their heart to darkness.")
await self.send_death(death_text=f"{self.player_names[self.slot]} lost their heart to darkness.")
def run_gui(self):
"""Import kivy UI system and start running it as self.ui_task."""
from kvui import GameManager
class KH2Manager(GameManager):
logging_pairs = [
("Client", "Archipelago")
]
base_title = "Archipelago KH2 Client"
self.ui = KH2Manager(self)
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
def get_addresses(self):
if not self.kh2connected and self.kh2 is not None:
if self.kh2_game_version is None:
# current verions is .10 then runs the get from github stuff
if self.kh2_read_string(0x9A98B0, 4) == "KH2J":
self.kh2_game_version = "STEAM"
self.Now = 0x0717008
self.Save = 0x09A98B0
self.Slot1 = 0x2A23598
self.Journal = 0x7434E0
self.Shop = 0x7435D0
self.InfoBarPointer = 0xABE828
self.isDead = 0x0BEF4A8
self.FadeStatus = 0xABB4B8
self.PlayerGaugePointer = 0x0ABD248
elif self.kh2_read_string(0x9A9330, 4) == "KH2J":
self.kh2_game_version = "EGS"
else:
if self.game_communication_path:
logger.info("Checking with most up to date addresses from the addresses json.")
# if mem addresses file is found then check version and if old get new one
kh2memaddresses_path = os.path.join(self.game_communication_path, "kh2memaddresses.json")
if not os.path.exists(kh2memaddresses_path):
logger.info("File is not found. Downloading json with memory addresses. This might take a moment")
mem_resp = requests.get("https://raw.githubusercontent.com/JaredWeakStrike/KH2APMemoryValues/master/kh2memaddresses.json")
if mem_resp.status_code == 200:
self.mem_json = json.loads(mem_resp.content)
with open(kh2memaddresses_path, 'w') as f:
f.write(json.dumps(self.mem_json, indent=4))
f.close()
else:
with open(kh2memaddresses_path) as f:
self.mem_json = json.load(f)
f.close()
if self.mem_json:
for key in self.mem_json.keys():
if self.kh2_read_string(int(self.mem_json[key]["GameVersionCheck"], 0), 4) == "KH2J":
self.Now = int(self.mem_json[key]["Now"], 0)
self.Save = int(self.mem_json[key]["Save"], 0)
self.Slot1 = int(self.mem_json[key]["Slot1"], 0)
self.Journal = int(self.mem_json[key]["Journal"], 0)
self.Shop = int(self.mem_json[key]["Shop"], 0)
self.InfoBarPointer = int(self.mem_json[key]["InfoBarPointer"], 0)
self.isDead = int(self.mem_json[key]["isDead"], 0)
self.FadeStatus = int(self.mem_json[key]["FadeStatus"], 0)
self.PlayerGaugePointer = int(self.mem_json[key]["PlayerGaugePointer"], 0)
self.kh2_game_version = key
if self.kh2_game_version is not None:
logger.info(f"You are now auto-tracking {self.kh2_game_version}")
self.kh2connected = True
else:
logger.info("Your game version does not match what the client requires. Check in the "
"kingdom-hearts-2-final-mix channel for more information on correcting the game "
"version.")
self.kh2connected = False
async def kh2_watcher(ctx: KH2Context):
while not ctx.exit_event.is_set():
try:
if ctx.kh2connected and ctx.serverconnected:
ctx.sending = []
await asyncio.create_task(ctx.checkWorldLocations())
await asyncio.create_task(ctx.checkLevels())
await asyncio.create_task(ctx.checkSlots())
await asyncio.create_task(ctx.verifyChests())
await asyncio.create_task(ctx.verifyItems())
await asyncio.create_task(ctx.verifyLevel())
await asyncio.create_task(ctx.is_dead())
if (ctx.deathlink_toggle and "DeathLink" not in ctx.tags) or (not ctx.deathlink_toggle and "DeathLink" in ctx.tags):
await ctx.update_death_link(ctx.deathlink_toggle)
if finishedGame(ctx) and not ctx.kh2_finished_game:
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
ctx.kh2_finished_game = True
if ctx.sending:
message = [{"cmd": 'LocationChecks', "locations": ctx.sending}]
await ctx.send_msgs(message)
if ctx.queued_puzzle_popup:
await asyncio.create_task(ctx.displayPuzzlePieceTextinGame(ctx.queued_puzzle_popup[0])) # send the num 1 index of whats in the queue
if ctx.queued_info_popup:
await asyncio.create_task(ctx.displayInfoTextinGame(ctx.queued_info_popup[0]))
if ctx.queued_chest_popup:
await asyncio.create_task(ctx.displayChestTextInGame(ctx.queued_chest_popup[0]))
elif not ctx.kh2connected and ctx.serverconnected:
logger.info("Game Connection lost. trying to reconnect.")
ctx.kh2 = None
#todo: change this to be an option for the client to auto reconnect with the default being yes
# reason is because the await sleep causes the client to hang if you close the game then the client without disconnecting.
while not ctx.kh2connected and ctx.serverconnected:
try:
ctx.kh2 = pymem.Pymem(process_name="KINGDOM HEARTS II FINAL MIX")
ctx.get_addresses()
logger.info("Game Connection Established.")
except Exception as e:
await asyncio.sleep(5)
if ctx.disconnect_from_server:
ctx.disconnect_from_server = False
await ctx.disconnect()
except Exception as e:
if ctx.kh2connected:
ctx.kh2connected = False
logger.info(e)
logger.info("line 940")
await asyncio.sleep(0.5)
def launch():
async def main(args):
ctx = KH2Context(args.connect, args.password)
ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop")
if gui_enabled:
ctx.run_gui()
ctx.run_cli()
progression_watcher = asyncio.create_task(
kh2_watcher(ctx), name="KH2ProgressionWatcher")
await ctx.exit_event.wait()
ctx.server_address = None
await progression_watcher
await ctx.shutdown()
import colorama
parser = get_base_parser(description="KH2 Client, for text interfacing.")
args, rest = parser.parse_known_args()
colorama.init()
asyncio.run(main(args))
colorama.deinit()

View File

@@ -0,0 +1,47 @@
# All the write functions return a bool for has written it but there isnt a use case for that I've found
def kh2_read_short(self, address) -> int:
"""Reads 2 bytes"""
return self.kh2.read_short(self.kh2.base_address + address)
def kh2_write_short(self, address, value) -> None:
"""Writes 2 bytes"""
self.kh2.write_short(self.kh2.base_address + address, value)
def kh2_write_byte(self, address, value):
"""Writes 1 byte"""
return self.kh2.write_bytes(self.kh2.base_address + address, value.to_bytes(1, 'big'), 1)
def kh2_read_byte(self, address):
"""Reads 1 byte"""
return int.from_bytes(self.kh2.read_bytes(self.kh2.base_address + address, 1))
def kh2_read_int(self, address):
"""Reads 4 bytes"""
return self.kh2.read_int(self.kh2.base_address + address)
def kh2_write_int(self, address, value):
"""Writes 4 bytes"""
self.kh2.write_int(self.kh2.base_address + address, value)
def kh2_read_longlong(self, address):
"""Reads 8 bytes"""
return self.kh2.read_longlong(self.kh2.base_address + address)
def kh2_read_string(self, address, length):
"""Reads length amount of bytes"""
return self.kh2.read_string(self.kh2.base_address + address, length)
def kh2_write_bytes(self, address, value):
return self.kh2.write_bytes(self.kh2.base_address + address, bytes(value), len(value))
def kh2_return_base_address(self):
return self.kh2.base_address

View File

@@ -0,0 +1,434 @@
from CommonClient import logger
from typing import TYPE_CHECKING
from .WorldLocations import *
from ..Names import ItemName
import re
import asyncio
def to_khscii(self, item_name):
# credit to TopazTK for this.
out_list = []
char_count = 0
# Throughout the text, do:
while char_count < len(item_name):
char = item_name[char_count]
# Simple character conversion through mathematics.
if 'a' <= char <= 'z':
out_list.append(ord(char) + 0x39)
char_count += 1
elif 'A' <= char <= 'Z':
out_list.append(ord(char) - 0x13)
char_count += 1
elif '0' <= char <= '9':
out_list.append(ord(char) + 0x60)
char_count += 1
# If it hits a "{", we will know it's a command, not a character.
elif char == '{':
# A command is 6 characters long, in the format of "{0xTT}",
# with the "TT" being the 2-digit encode for that command.
command = item_name[char_count:char_count + 6]
if re.match(r'^{0x[a-fA-F0-9][a-fA-F0-9]}$', command):
value = command[1:5]
out_list.append(int(value, 16))
char_count += 6
# Should it be anything we do not know, we look through
# the special dictionary.
else:
if char in self.special_dict:
out_list.append(self.special_dict[char])
else:
out_list.append(0x01)
char_count += 1
# When the list ends, we add a terminator and return the string.
out_list.append(0x00)
return out_list
async def give_item(self, item, location):
try:
#sleep so we can get the datapackage and not miss any items that were sent to us while we didnt have our item id dicts
while not self.lookup_id_to_item:
await asyncio.sleep(0.5)
itemname = self.lookup_id_to_item[item]
itemdata = self.item_name_to_data[itemname]
# itemcode = self.kh2_item_name_to_id[itemname]
if itemdata.ability:
if location in self.all_weapon_location_id:
return
if itemname in {"High Jump", "Quick Run", "Dodge Roll", "Aerial Dodge", "Glide"}:
self.kh2_seed_save_cache["AmountInvo"]["Growth"][itemname] += 1
return
if itemname not in self.kh2_seed_save_cache["AmountInvo"]["Ability"]:
self.kh2_seed_save_cache["AmountInvo"]["Ability"][itemname] = []
# appending the slot that the ability should be in
# appending the slot that the ability should be in
# abilities have a limit amount of slots.
# we start from the back going down to not mess with stuff.
# Front of Invo
# Sora: Save+24F0+0x54 : 0x2546
# Donald: Save+2604+0x54 : 0x2658
# Goofy: Save+2718+0x54 : 0x276C
# Back of Invo. Sora has 6 ability slots that are reserved
# Sora: Save+24F0+0x54+0x92 : 0x25D8
# Donald: Save+2604+0x54+0x9C : 0x26F4
# Goofy: Save+2718+0x54+0x9C : 0x2808
# seed has 2 scans in sora's abilities
# recieved second scan
# if len(seed_save(Scan:[ability slot 52]) < (2)amount of that ability they should have from slot data
# ability_slot = back of inventory that isnt taken
# add ability_slot to seed_save(Scan[]) so now its Scan:[ability slot 52,50]
# decrease back of inventory since its ability_slot is already taken
if len(self.kh2_seed_save_cache["AmountInvo"]["Ability"][itemname]) < \
self.AbilityQuantityDict[itemname]:
if itemname in self.sora_ability_set:
ability_slot = self.kh2_seed_save_cache["SoraInvo"][0]
self.kh2_seed_save_cache["AmountInvo"]["Ability"][itemname].append(ability_slot)
self.kh2_seed_save_cache["SoraInvo"][0] -= 2
elif itemname in self.donald_ability_set:
ability_slot = self.kh2_seed_save_cache["DonaldInvo"][0]
self.kh2_seed_save_cache["AmountInvo"]["Ability"][itemname].append(ability_slot)
self.kh2_seed_save_cache["DonaldInvo"][0] -= 2
else:
ability_slot = self.kh2_seed_save_cache["GoofyInvo"][0]
self.kh2_seed_save_cache["AmountInvo"]["Ability"][itemname].append(ability_slot)
self.kh2_seed_save_cache["GoofyInvo"][0] -= 2
if ability_slot in self.front_ability_slots:
self.front_ability_slots.remove(ability_slot)
# if itemdata in {bitmask} all the forms,summons and a few other things are bitmasks
elif itemdata.memaddr in {0x36C4, 0x36C5, 0x36C6, 0x36C0, 0x36CA}:
# if memaddr is in a bitmask location in memory
if itemname not in self.kh2_seed_save_cache["AmountInvo"]["Bitmask"]:
self.kh2_seed_save_cache["AmountInvo"]["Bitmask"].append(itemname)
# if itemdata in {magic}
elif itemdata.memaddr in {0x3594, 0x3595, 0x3596, 0x3597, 0x35CF, 0x35D0}:
# if memaddr is in magic addresses
self.kh2_seed_save_cache["AmountInvo"]["Magic"][itemname] += 1
# equipment is a list instead of dict because you can only have 1 currently
elif itemname in self.all_equipment:
if itemname in self.kh2_seed_save_cache["AmountInvo"]["Equipment"]:
self.kh2_seed_save_cache["AmountInvo"]["Equipment"][itemname] += 1
else:
self.kh2_seed_save_cache["AmountInvo"]["Equipment"][itemname] = 1
# weapons are done differently since you can only have one and has to check it differently
elif itemname in self.all_weapons:
if itemname in self.keyblade_set:
self.kh2_seed_save_cache["AmountInvo"]["Weapon"]["Sora"].append(itemname)
elif itemname in self.staff_set:
self.kh2_seed_save_cache["AmountInvo"]["Weapon"]["Donald"].append(itemname)
else:
self.kh2_seed_save_cache["AmountInvo"]["Weapon"]["Goofy"].append(itemname)
elif itemname in self.stat_increase_set:
self.kh2_seed_save_cache["AmountInvo"]["StatIncrease"][itemname] += 1
else:
# "normal" items. They have a unique byte reserved for how many they have
if itemname in self.kh2_seed_save_cache["AmountInvo"]["Amount"]:
self.kh2_seed_save_cache["AmountInvo"]["Amount"][itemname] += 1
else:
self.kh2_seed_save_cache["AmountInvo"]["Amount"][itemname] = 1
except Exception as e:
if self.kh2connected:
self.kh2connected = False
logger.info(e)
logger.info("line 582")
async def IsInShop(self, sellable):
# journal = 0x741230 shop = 0x741320
# if journal=-1 and shop = 5 then in shop
# if journal !=-1 and shop = 10 then journal
journal = self.kh2_read_short(self.Journal)
shop = self.kh2_read_short(self.Shop)
if (journal == -1 and shop == 5) or (journal != -1 and shop == 10):
# print("your in the shop")
sellable_dict = {}
# for item that the player has received that can be sold
# get amount of sellable items PRE selling them
# basically a snapshot of what the amounts of these items are pre shopping
for itemName in sellable:
itemdata = self.item_name_to_data[itemName]
amount = self.kh2_read_byte(self.Save + itemdata.memaddr)
sellable_dict[itemName] = amount
# wait until user exits the shop
while (journal == -1 and shop == 5) or (journal != -1 and shop == 10):
journal = self.kh2_read_short(self.Journal)
shop = self.kh2_read_short(self.Shop)
await asyncio.sleep(0.5)
# for item and amount pre shop
for item, amount in sellable_dict.items():
itemdata = self.item_name_to_data[item]
afterShop = self.kh2_read_byte(self.Save + itemdata.memaddr)
alreadySold = 0
if item in self.kh2_seed_save["SoldEquipment"]:
alreadySold = self.kh2_seed_save["SoldEquipment"][item]
# if afterShop is < Amount post shop i.e if someone sold something
if afterShop < amount:
self.kh2_seed_save["SoldEquipment"][item] = (amount - afterShop) + alreadySold
async def verifyItems(self):
try:
# All these sets include items that the player has recieved
# set of all the items that have an
master_amount = list(self.kh2_seed_save_cache["AmountInvo"]["Amount"].keys())
# set of all the items that have are abilities
master_ability = list(self.kh2_seed_save_cache["AmountInvo"]["Ability"].keys())
# set of all the items that are bitmasks
master_bitmask = list(self.kh2_seed_save_cache["AmountInvo"]["Bitmask"])
# sets of all the weapons. These are different because have to check inventory and equipped
master_keyblade = list(self.kh2_seed_save_cache["AmountInvo"]["Weapon"]["Sora"])
master_staff = list(self.kh2_seed_save_cache["AmountInvo"]["Weapon"]["Donald"])
master_shield = list(self.kh2_seed_save_cache["AmountInvo"]["Weapon"]["Goofy"])
# Same with weapons but a lot more slots
master_equipment = list(self.kh2_seed_save_cache["AmountInvo"]["Equipment"].keys())
# Set of magic, Can only be given when the player is paused due to crashing in loadzones
master_magic = list(self.kh2_seed_save_cache["AmountInvo"]["Magic"].keys())
# Have to apply them to the slot that sora is in and a lot more dynamic than other amount items
master_stat = list(self.kh2_seed_save_cache["AmountInvo"]["StatIncrease"].keys())
# Set of all things that could be sold
master_sell = master_equipment + master_staff + master_shield
await asyncio.create_task(self.IsInShop(master_sell))
# print(self.kh2_seed_save_cache["AmountInvo"]["Ability"])
for item_name in master_amount:
item_data = self.item_name_to_data[item_name]
amount_of_items = 0
amount_of_items += self.kh2_seed_save_cache["AmountInvo"]["Amount"][item_name]
if item_name == "Torn Page":
# Torn Pages are handled differently because they can be consumed.
# Will check the progression in 100 acre and - the amount of visits
# amountofitems-amount of visits done
for location, data in tornPageLocks.items():
if self.kh2_read_byte(self.Save + data.addrObtained) & 0x1 << data.bitIndex > 0:
amount_of_items -= 1
# 255 is the max limit for a byte
if amount_of_items > 255:
amount_of_items = 255
if self.kh2_read_byte(self.Save + item_data.memaddr) != amount_of_items and amount_of_items >= 0:
self.kh2_write_byte(self.Save + item_data.memaddr, amount_of_items)
for item_name in master_keyblade:
item_data = self.item_name_to_data[item_name]
# if the inventory slot for that keyblade is less than the amount they should have,
# and they are not in stt
if self.kh2_read_byte(self.Save + item_data.memaddr) != 1 and self.kh2_read_byte(
self.Save + 0x1CFF) != 13:
# Checking form anchors for the keyblade to remove extra keyblades
# Checking Normal Sora,Valor Form,Master Form and Final Forms keyblades
if self.kh2_read_short(self.Save + 0x24F0) == item_data.kh2id \
or self.kh2_read_short(self.Save + 0x32F4) == item_data.kh2id \
or self.kh2_read_short(self.Save + 0x339C) == item_data.kh2id \
or self.kh2_read_short(self.Save + 0x33D4) == item_data.kh2id:
self.kh2_write_byte(self.Save + item_data.memaddr, 0)
else:
self.kh2_write_byte(self.Save + item_data.memaddr, 1)
for item_name in master_staff:
item_data = self.item_name_to_data[item_name]
if self.kh2_read_byte(self.Save + item_data.memaddr) != 1 \
and self.kh2_read_short(self.Save + 0x2604) != item_data.kh2id \
and item_name not in self.kh2_seed_save["SoldEquipment"]:
self.kh2_write_byte(self.Save + item_data.memaddr, 1)
for item_name in master_shield:
item_data = self.item_name_to_data[item_name]
if self.kh2_read_byte(self.Save + item_data.memaddr) != 1 \
and self.kh2_read_short(self.Save + 0x2718) != item_data.kh2id \
and item_name not in self.kh2_seed_save["SoldEquipment"]:
self.kh2_write_byte(self.Save + item_data.memaddr, 1)
for item_name in master_ability:
item_data = self.item_name_to_data[item_name]
ability_slot = []
ability_slot += self.kh2_seed_save_cache["AmountInvo"]["Ability"][item_name]
for slot in ability_slot:
current = self.kh2_read_short(self.Save + slot)
ability = current & 0x0FFF
if ability | 0x8000 != (0x8000 + item_data.memaddr):
if current - 0x8000 > 0:
self.kh2_write_short(self.Save + slot, 0x8000 + item_data.memaddr)
else:
self.kh2_write_short(self.Save + slot, item_data.memaddr)
# removes the duped ability if client gave faster than the game.
for ability in self.front_ability_slots:
if self.kh2_read_short(self.Save + ability) != 0:
print(f"removed {self.Save + ability} from {ability}")
self.kh2_write_short(self.Save + ability, 0)
# remove the dummy level 1 growths if they are in these invo slots.
for inventorySlot in {0x25CE, 0x25D0, 0x25D2, 0x25D4, 0x25D6, 0x25D8}:
current = self.kh2_read_short(self.Save + inventorySlot)
ability = current & 0x0FFF
if 0x05E <= ability <= 0x06D:
self.kh2_write_short(self.Save + inventorySlot, 0)
for item_name in self.master_growth:
growthLevel = self.kh2_seed_save_cache["AmountInvo"]["Growth"][item_name]
if growthLevel > 0:
slot = self.growth_values_dict[item_name][2]
min_growth = self.growth_values_dict[item_name][0]
max_growth = self.growth_values_dict[item_name][1]
if growthLevel > 4:
growthLevel = 4
current_growth_level = self.kh2_read_short(self.Save + slot)
ability = current_growth_level & 0x0FFF
# if the player should be getting a growth ability
if ability | 0x8000 != 0x8000 + min_growth - 1 + growthLevel:
# if it should be level one of that growth
if 0x8000 + min_growth - 1 + growthLevel <= 0x8000 + min_growth or ability < min_growth:
self.kh2_write_short(self.Save + slot, min_growth)
# if it is already in the inventory
elif ability | 0x8000 < (0x8000 + max_growth):
self.kh2_write_short(self.Save + slot, current_growth_level + 1)
for item_name in master_bitmask:
item_data = self.item_name_to_data[item_name]
itemMemory = self.kh2_read_byte(self.Save + item_data.memaddr)
if self.kh2_read_byte(self.Save + item_data.memaddr) & 0x1 << item_data.bitmask == 0:
# when getting a form anti points should be reset to 0 but bit-shift doesn't trigger the game.
if item_name in {"Valor Form", "Wisdom Form", "Limit Form", "Master Form", "Final Form"}:
self.kh2_write_byte(self.Save + 0x3410, 0)
self.kh2_write_byte(self.Save + item_data.memaddr, itemMemory | 0x01 << item_data.bitmask)
for item_name in master_equipment:
item_data = self.item_name_to_data[item_name]
amount_found_in_slots = 0
if item_name in self.accessories_set:
Equipment_Anchor_List = self.Equipment_Anchor_Dict["Accessories"]
else:
Equipment_Anchor_List = self.Equipment_Anchor_Dict["Armor"]
# Checking form anchors for the equipment
for partyMember in ["Sora", "Donald", "Goofy"]:
for SlotOffset in Equipment_Anchor_List:
if self.kh2_read_short(self.Save + self.CharacterAnchors[partyMember] + SlotOffset) == item_data.kh2id:
amount_found_in_slots += 1
if item_name in self.kh2_seed_save["SoldEquipment"]:
amount_found_in_slots += self.kh2_seed_save["SoldEquipment"][item_name]
inInventory = self.kh2_seed_save_cache["AmountInvo"]["Equipment"][item_name] - amount_found_in_slots
if inInventory != self.kh2_read_byte(self.Save + item_data.memaddr):
self.kh2_write_byte(self.Save + item_data.memaddr, inInventory)
for item_name in master_magic:
item_data = self.item_name_to_data[item_name]
amount_of_items = 0
amount_of_items += self.kh2_seed_save_cache["AmountInvo"]["Magic"][item_name]
# - base address because it reads a pointer then in stead of reading what it points to its pointer+baseaddress which offsets
if self.kh2_read_byte(self.Save + item_data.memaddr) != amount_of_items and self.kh2_read_byte(self.FadeStatus) == 0 \
and self.kh2_read_longlong(self.PlayerGaugePointer) != 0 \
and self.kh2_read_int(self.kh2_read_longlong(self.PlayerGaugePointer) + 0x88 - self.kh2_return_base_address()) != 0:
self.kh2_write_byte(self.Save + item_data.memaddr, amount_of_items)
for item_name in master_stat:
amount_of_items = 0
amount_of_items += self.kh2_seed_save_cache["AmountInvo"]["StatIncrease"][item_name]
# checking if they talked to the computer to give them these
if self.kh2_read_byte(self.Slot1 + 0x1B2) >= 5 and (self.kh2_read_byte(self.Save + 0x1D27) & 0x1 << 3) > 0:
if item_name == ItemName.MaxHPUp:
if self.kh2_read_byte(self.Save + 0x2498) < 3: # Non-Critical
Bonus = 5
else: # Critical
Bonus = 2
if self.kh2_read_int(self.Slot1 + 0x004) != self.base_hp + (Bonus * amount_of_items):
self.kh2_write_int(self.Slot1 + 0x004, self.base_hp + (Bonus * amount_of_items))
elif item_name == ItemName.MaxMPUp:
if self.kh2_read_byte(self.Save + 0x2498) < 3: # Non-Critical
Bonus = 10
else: # Critical
Bonus = 5
if self.kh2_read_int(self.Slot1 + 0x184) != self.base_mp + (Bonus * amount_of_items):
self.kh2_write_int(self.Slot1 + 0x184, self.base_mp + (Bonus * amount_of_items))
elif item_name == ItemName.DriveGaugeUp:
current_max_drive = self.kh2_read_byte(self.Slot1 + 0x1B2)
# change when max drive is changed from 6 to 4
# drive is maxed at 9 and base_drive is always 5 so if amount is higher set to 4 which is the max it should be
amount_of_items = min(amount_of_items, 4)
if current_max_drive < 9 and current_max_drive != self.base_drive + amount_of_items:
self.kh2_write_byte(self.Slot1 + 0x1B2, self.base_drive + amount_of_items)
# need to do these differently when the amount is dynamic
elif item_name == ItemName.AccessorySlotUp:
current_accessory = self.kh2_read_byte(self.Save + 0x2501)
if current_accessory != self.base_accessory_slots + amount_of_items:
if 4 > current_accessory < self.base_accessory_slots + amount_of_items:
self.kh2_write_byte(self.Save + 0x2501, current_accessory + 1)
elif self.base_accessory_slots + amount_of_items < 4:
self.kh2_write_byte(self.Save + 0x2501, self.base_accessory_slots + amount_of_items)
elif item_name == ItemName.ArmorSlotUp:
current_armor_slots = self.kh2_read_byte(self.Save + 0x2500)
if current_armor_slots != self.base_armor_slots + amount_of_items:
if 4 > current_armor_slots < self.base_armor_slots + amount_of_items:
self.kh2_write_byte(self.Save + 0x2500, current_armor_slots + 1)
elif self.base_armor_slots + amount_of_items < 4:
self.kh2_write_byte(self.Save + 0x2500, self.base_armor_slots + amount_of_items)
elif item_name == ItemName.ItemSlotUp:
current_item_slots = self.kh2_read_byte(self.Save + 0x2502)
if current_item_slots != self.base_item_slots + amount_of_items:
if 8 > current_item_slots < self.base_item_slots + amount_of_items:
self.kh2_write_byte(self.Save + 0x2502, current_item_slots + 1)
elif self.base_item_slots + amount_of_items < 8:
self.kh2_write_byte(self.Save + 0x2502, self.base_item_slots + amount_of_items)
# if self.kh2_read_byte(self.Save + item_data.memaddr) != amount_of_items \
# and self.kh2_read_byte(self.Slot1 + 0x1B2) >= 5 and \
# self.kh2_read_byte(self.Save + 0x23DF) & 0x1 << 3 > 0 and self.kh2_read_byte(0x741320) in {10, 8}:
# self.kh2_write_byte(self.Save + item_data.memaddr, amount_of_items)
if "PoptrackerVersionCheck" in self.kh2slotdata:
if self.kh2slotdata["PoptrackerVersionCheck"] > 4.2 and self.kh2_read_byte(
self.Save + 0x3607) != 1: # telling the goa they are on version 4.3
self.kh2_write_byte(self.Save + 0x3607, 1)
except Exception as e:
if self.kh2connected:
self.kh2connected = False
logger.info(e)
logger.info("line 840")
async def displayInfoTextinGame(self, string_to_display):
infoBarPointerRef = self.kh2_read_longlong(self.InfoBarPointer)
if self.kh2_read_byte(0x800000) == 0 and infoBarPointerRef != 0 and self.kh2.read_int(infoBarPointerRef + 0x48) == 0:
self.kh2_write_byte(0x800000, 1) # displaying info bar popup
displayed_string = self.to_khscii(string_to_display)
self.kh2_write_bytes(0x800004, displayed_string)
self.queued_info_popup.remove(string_to_display)
await asyncio.sleep(0.5)
async def displayPuzzlePieceTextinGame(self, string_to_display):
if self.kh2_read_byte(0x800000) == 0:
displayed_string = self.to_khscii(string_to_display)
self.kh2_write_bytes(0x800104, displayed_string)
self.kh2_write_byte(0x800000, 2) # displaying puzzle piece popup
self.queued_puzzle_popup.remove(string_to_display)
await asyncio.sleep(0.5)
async def displayChestTextInGame(self, string_to_display):
if self.kh2_read_byte(0x800000) == 0:
displayed_string = self.to_khscii(string_to_display)
print(f"made display string {displayed_string} from {string_to_display}")
self.kh2_write_byte(0x800150, 0) # item picture. this will change from 0 when I can input icons from the items
print("wrote item picture")
self.kh2_write_bytes(0x800154, displayed_string) # text
print("wrote text")
await asyncio.sleep(1)
self.kh2_write_byte(0x800000, 3) # displaying chest popup
print("called chest popup")
self.queued_chest_popup.remove(string_to_display)
await asyncio.sleep(0.5)

View File

@@ -0,0 +1,187 @@
from CommonClient import logger
from .WorldLocations import *
from typing import TYPE_CHECKING
# I don't know what is going on here, but it works.
if TYPE_CHECKING:
from . import KH2Context
else:
KH2Context = object
def finishedGame(ctx: KH2Context):
if ctx.kh2slotdata['FinalXemnas'] == 1:
if not ctx.final_xemnas and ctx.kh2_read_byte(
ctx.Save + all_world_locations[LocationName.FinalXemnas].addrObtained) \
& 0x1 << all_world_locations[LocationName.FinalXemnas].bitIndex > 0:
ctx.final_xemnas = True
# three proofs
if ctx.kh2slotdata['Goal'] == 0:
if ctx.kh2_read_byte(ctx.Save + 0x36B2) > 0 \
and ctx.kh2_read_byte(ctx.Save + 0x36B3) > 0 \
and ctx.kh2_read_byte(ctx.Save + 0x36B4) > 0:
if ctx.kh2slotdata['FinalXemnas'] == 1:
if ctx.final_xemnas:
return True
return False
return True
return False
elif ctx.kh2slotdata['Goal'] == 1:
if ctx.kh2_read_byte(ctx.Save + 0x3641) >= ctx.kh2slotdata['LuckyEmblemsRequired']:
if ctx.kh2_read_byte(ctx.Save + 0x36B3) < 1:
ctx.kh2_write_byte(ctx.Save + 0x36B2, 1)
ctx.kh2_write_byte(ctx.Save + 0x36B3, 1)
ctx.kh2_write_byte(ctx.Save + 0x36B4, 1)
logger.info("The Final Door is now Open")
if ctx.kh2slotdata['FinalXemnas'] == 1:
if ctx.final_xemnas:
return True
return False
return True
return False
elif ctx.kh2slotdata['Goal'] == 2:
# for backwards compat
if "hitlist" in ctx.kh2slotdata:
locations = ctx.sending
for boss in ctx.kh2slotdata["hitlist"]:
if boss in locations:
ctx.hitlist_bounties += 1
if ctx.hitlist_bounties >= ctx.kh2slotdata["BountyRequired"] or ctx.kh2_seed_save_cache["AmountInvo"]["Amount"][
"Bounty"] >= ctx.kh2slotdata["BountyRequired"]:
if ctx.kh2_read_byte(ctx.Save + 0x36B3) < 1:
ctx.kh2_write_byte(ctx.Save + 0x36B2, 1)
ctx.kh2_write_byte(ctx.Save + 0x36B3, 1)
ctx.kh2_write_byte(ctx.Save + 0x36B4, 1)
logger.info("The Final Door is now Open")
if ctx.kh2slotdata['FinalXemnas'] == 1:
if ctx.final_xemnas:
return True
return False
return True
return False
elif ctx.kh2slotdata["Goal"] == 3:
if ctx.kh2_seed_save_cache["AmountInvo"]["Amount"]["Bounty"] >= ctx.kh2slotdata["BountyRequired"] and \
ctx.kh2_read_byte(ctx.Save + 0x3641) >= ctx.kh2slotdata['LuckyEmblemsRequired']:
if ctx.kh2_read_byte(ctx.Save + 0x36B3) < 1:
ctx.kh2_write_byte(ctx.Save + 0x36B2, 1)
ctx.kh2_write_byte(ctx.Save + 0x36B3, 1)
ctx.kh2_write_byte(ctx.Save + 0x36B4, 1)
logger.info("The Final Door is now Open")
if ctx.kh2slotdata['FinalXemnas'] == 1:
if ctx.final_xemnas:
return True
return False
return True
return False
async def checkWorldLocations(self):
try:
currentworldint = self.kh2_read_byte(self.Now)
if self.last_world_int != currentworldint:
self.last_world_int = currentworldint
await self.send_msgs([{
"cmd": "Set", "key": "Slot: " + str(self.slot) + " :CurrentWorld",
"default": 0, "want_reply": False, "operations": [{
"operation": "replace",
"value": currentworldint
}]
}])
if currentworldint in self.worldid_to_locations:
curworldid = self.worldid_to_locations[currentworldint]
for location, data in curworldid.items():
if location in self.kh2_loc_name_to_id.keys():
locationId = self.kh2_loc_name_to_id[location]
if locationId not in self.locations_checked \
and self.kh2_read_byte(self.Save + data.addrObtained) & 0x1 << data.bitIndex > 0:
self.sending = self.sending + [(int(locationId))]
except Exception as e:
if self.kh2connected:
self.kh2connected = False
logger.info(e)
logger.info("line 425")
async def checkLevels(self):
try:
for location, data in SoraLevels.items():
currentLevel = self.kh2_read_byte(self.Save + 0x24FF)
locationId = self.kh2_loc_name_to_id[location]
if locationId not in self.locations_checked \
and currentLevel >= data.bitIndex:
if self.kh2_seed_save["Levels"]["SoraLevel"] < currentLevel:
self.kh2_seed_save["Levels"]["SoraLevel"] = currentLevel
self.sending = self.sending + [(int(locationId))]
formDict = {
0: ["ValorLevel", ValorLevels], 1: ["WisdomLevel", WisdomLevels], 2: ["LimitLevel", LimitLevels],
3: ["MasterLevel", MasterLevels], 4: ["FinalLevel", FinalLevels], 5: ["SummonLevel", SummonLevels]
}
for i in range(6):
for location, data in formDict[i][1].items():
formlevel = self.kh2_read_byte(self.Save + data.addrObtained)
if location in self.kh2_loc_name_to_id.keys():
# if current form level is above other form level
locationId = self.kh2_loc_name_to_id[location]
if locationId not in self.locations_checked \
and formlevel >= data.bitIndex:
if formlevel > self.kh2_seed_save["Levels"][formDict[i][0]]:
self.kh2_seed_save["Levels"][formDict[i][0]] = formlevel
self.sending = self.sending + [(int(locationId))]
except Exception as e:
if self.kh2connected:
self.kh2connected = False
logger.info(e)
logger.info("line 456")
async def checkSlots(self):
try:
for location, data in weaponSlots.items():
locationId = self.kh2_loc_name_to_id[location]
if locationId not in self.locations_checked:
if self.kh2_read_byte(self.Save + data.addrObtained) > 0:
self.sending = self.sending + [(int(locationId))]
for location, data in formSlots.items():
locationId = self.kh2_loc_name_to_id[location]
if locationId not in self.locations_checked and self.kh2_read_byte(self.Save + 0x06B2) == 0:
if self.kh2_read_byte(self.Save + data.addrObtained) & 0x1 << data.bitIndex > 0:
self.sending = self.sending + [(int(locationId))]
except Exception as e:
if self.kh2connected:
self.kh2connected = False
logger.info(e)
logger.info("line 475")
async def verifyChests(self):
try:
for location in self.locations_checked:
locationName = self.lookup_id_to_location[location]
if locationName in self.chest_set:
if locationName in self.location_name_to_worlddata.keys():
locationData = self.location_name_to_worlddata[locationName]
if self.kh2_read_byte(
self.Save + locationData.addrObtained) & 0x1 << locationData.bitIndex == 0:
roomData = self.kh2_read_byte(self.Save + locationData.addrObtained)
self.kh2_write_byte(self.Save + locationData.addrObtained,
roomData | 0x01 << locationData.bitIndex)
except Exception as e:
if self.kh2connected:
self.kh2connected = False
logger.info(e)
logger.info("line 491")
async def verifyLevel(self):
for leveltype, anchor in {
"SoraLevel": 0x24FF,
"ValorLevel": 0x32F6,
"WisdomLevel": 0x332E,
"LimitLevel": 0x3366,
"MasterLevel": 0x339E,
"FinalLevel": 0x33D6
}.items():
if self.kh2_read_byte(self.Save + anchor) < self.kh2_seed_save["Levels"][leveltype]:
self.kh2_write_byte(self.Save + anchor, self.kh2_seed_save["Levels"][leveltype])

File diff suppressed because it is too large Load Diff

View File