forked from mirror/Archipelago
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
254 lines
9.1 KiB
Python
254 lines
9.1 KiB
Python
from __future__ import annotations
|
|
import sys
|
|
import asyncio
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict
|
|
|
|
import ModuleUpdate
|
|
ModuleUpdate.update()
|
|
|
|
import Utils
|
|
item_num = 1
|
|
|
|
from .Socket import KHDDDSocket, SlotDataType, DDDCommand
|
|
|
|
if __name__ == "__main__":
|
|
Utils.init_logging("KHDDDClient", exception_logger="Client")
|
|
|
|
from NetUtils import NetworkItem, ClientStatus
|
|
from CommonClient import gui_enabled, logger, get_base_parser, ClientCommandProcessor, \
|
|
CommonContext, server_loop
|
|
|
|
def check_stdin() -> None:
|
|
if Utils.is_windows and sys.stdin:
|
|
print("WARNING: Console input is not routed reliably on Windows, use the GUI instead.")
|
|
|
|
class KHDDDClientCommandProcessor(ClientCommandProcessor):
|
|
|
|
def __init__(self, ctx):
|
|
super().__init__(ctx)
|
|
|
|
def _cmd_drop(self):
|
|
"""Instantly drops the player."""
|
|
self.ctx.socket.send_client_cmd(DDDCommand.DROP, "")
|
|
self.output("Dropping player.")
|
|
|
|
def _cmd_unstuck(self):
|
|
"""Sends the inactive character to the World Map."""
|
|
self.ctx.socket.send_client_cmd(DDDCommand.UNSTUCK, "")
|
|
self.output("Sending inactive character to the World Map.")
|
|
|
|
def _cmd_deathlink(self):
|
|
"""Toggles Deathlink"""
|
|
self.ctx.death_link = not self.ctx.death_link
|
|
asyncio.create_task(self.ctx.update_death_link(self.ctx.death_link)).add_done_callback(
|
|
lambda _: self.output(f"Death Link turned {'on' if self.ctx.death_link else 'off'}"))
|
|
self.ctx.socket.send_client_cmd(DDDCommand.DEATH_LINK, str(self.ctx.death_link))
|
|
|
|
|
|
|
|
class KHDDDContext(CommonContext):
|
|
command_processor: int = KHDDDClientCommandProcessor
|
|
game = "Kingdom Hearts Dream Drop Distance"
|
|
items_handling = 0b111 #Attempt full remote
|
|
death_link: bool = False
|
|
|
|
sent_notifications = 0
|
|
|
|
#Vars for socket
|
|
socket: KHDDDSocket = None
|
|
check_location_IDs = []
|
|
slot_data_info: Dict[str, str] = {}
|
|
_connectedToAp: bool = False
|
|
_connectedToDDD: bool = False
|
|
|
|
_get_items_running = False
|
|
|
|
def __init__(self, server_address, password):
|
|
super(KHDDDContext, self).__init__(server_address, password)
|
|
|
|
#Socket
|
|
self.socket = KHDDDSocket(self)
|
|
asyncio.create_task(self.socket.start_server(), name="KHDDDSocketServer")
|
|
|
|
async def server_auth(self, password_requested:bool = False):
|
|
if password_requested and not self.password:
|
|
await super(KHDDDContext, self).server_auth(password_requested)
|
|
await self.get_username()
|
|
await self.send_connect()
|
|
|
|
async def connection_closed(self):
|
|
await super(KHDDDContext, self).connection_closed()
|
|
self._connectedToAp = False
|
|
self.slot_data_info = {}
|
|
|
|
@property
|
|
def connectedToAp(self) -> bool:
|
|
return self._connectedToAp
|
|
@connectedToAp.setter
|
|
def connectedToAp(self, value: bool):
|
|
self._connectedToAp = value
|
|
|
|
@property
|
|
def connectedToDDD(self) -> bool:
|
|
return self._connectedToDDD
|
|
@connectedToDDD.setter
|
|
def connectedToDDD(self, value: bool):
|
|
self._connectedToDDD = value
|
|
|
|
@property
|
|
def endpoints(self):
|
|
if self.server:
|
|
return [self.server]
|
|
else:
|
|
return []
|
|
|
|
async def shutdown(self):
|
|
await super(KHDDDContext, self).shutdown()
|
|
self.socket.send(20, ["Closing"])
|
|
self.socket.shutdown_server()
|
|
|
|
def on_package(self, cmd: str, args: dict):
|
|
if cmd in {"Connected"}:
|
|
self.connectedToAp = True
|
|
self.slot_data_info = args['slot_data']
|
|
asyncio.create_task(self.send_slot_data(), name="KHDDDSendSlotData")
|
|
|
|
if cmd in {"ReceivedItems"}:
|
|
if len(args["items"]) > 0:
|
|
self.socket.send_multipleItems(args["items"], len(self.items_received))
|
|
else:
|
|
self.socket.send_singleItem(args["items"][0].item, len(self.items_received))
|
|
|
|
#Send item notifications to game
|
|
if cmd in {"PrintJSON"} and "type" in args:
|
|
if args["type"] == "ItemSend":
|
|
item = args["item"]
|
|
networkItem = NetworkItem(*item)
|
|
receiverID = args["receiving"]
|
|
senderID = networkItem.player
|
|
if receiverID == self.slot or senderID == self.slot:
|
|
itemName = self.item_names.lookup_in_slot(networkItem.item, receiverID)[:20]
|
|
itemCategory = networkItem.flags
|
|
receiverName = self.player_names[receiverID][:20]
|
|
#message = ""
|
|
if senderID == self.slot and receiverID != senderID: # Item sent to someone else
|
|
#message = itemName + "\nTo " + receiverName
|
|
#logger.info(message)
|
|
self.socket.item_msg(str(itemName), str(receiverName), str(itemCategory))
|
|
|
|
|
|
def on_deathlink(self, data: dict[str, object]):
|
|
self.last_death_link = max(data["time"], self.last_death_link)
|
|
text = data.get("cause", "")
|
|
if text:
|
|
logger.info(f"Deathlink: {text}")
|
|
else:
|
|
logger.info(f"Deathlink: Received from {data['source']}")
|
|
#Send to the game
|
|
self.socket.send(8, [str(int(data["time"]))])
|
|
|
|
def run_gui(self):
|
|
"""Import kivy UI system and start running it as self.ui_task"""
|
|
from kvui import GameManager
|
|
|
|
class KHDDDManager(GameManager):
|
|
logging_pairs = [
|
|
("Client", "Archipelago")
|
|
]
|
|
base_title = "Archipelago KHDDD Client"
|
|
|
|
self.ui = KHDDDManager(self)
|
|
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
|
|
|
|
def get_items(self):
|
|
"""Send all received items to the game client. This can't be async
|
|
because of message handler, so putting it into an internal async function
|
|
and running it with the Utils fire and forget function."""
|
|
@staticmethod
|
|
async def async_get_items(ctx: KHDDDContext):
|
|
try:
|
|
while not ctx.exit_event.is_set() and not ctx.connectedToAp:
|
|
await asyncio.sleep(5)
|
|
if ctx.connectedToAp: #should catch drop traps somehow in multi-sends to prevent drop on connect
|
|
ctx.socket.send_multipleItems(ctx.items_received, len(ctx.items_received))
|
|
finally:
|
|
ctx._get_items_running = False
|
|
|
|
if not self._get_items_running:
|
|
self._get_items_running = True
|
|
Utils.async_start(async_get_items(self), name="KHDDDGetItems")
|
|
|
|
def get_slot_data(self):
|
|
Utils.async_start(self.send_slot_data(), name="KHDDDGetSlotData")
|
|
|
|
async def send_slot_data(self):
|
|
while not self.exit_event.is_set():
|
|
if not self.connectedToDDD or self._get_items_running:
|
|
await asyncio.sleep(5)
|
|
continue
|
|
elif self.slot_data_info:
|
|
for key, value in self.slot_data_info.items():
|
|
if key in SlotDataType.__members__.keys():
|
|
self.socket.send_slot_data(SlotDataType[key], str(value))
|
|
break
|
|
|
|
|
|
async def game_watcher(ctx: KHDDDContext):
|
|
while not ctx.exit_event.is_set():
|
|
try:
|
|
if not ctx.connectedToDDD:
|
|
await asyncio.sleep(5)
|
|
continue
|
|
|
|
if ctx.socket.deathTime != "" and ctx.death_link:
|
|
# New death detected, parse deathTime as local datetime
|
|
death_time = datetime.strptime(ctx.socket.deathTime, '%Y%m%d%H%M%S')
|
|
ctx.socket.deathTime = ""
|
|
time_window = timedelta(seconds=20)
|
|
if death_time + time_window >= datetime.now():
|
|
logger.info(f"Sending deathlink...")
|
|
await ctx.send_death(death_text=f"{ctx.username} fell to a nightmare")
|
|
|
|
if ctx.socket.goaled and not ctx.finished_game:
|
|
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
|
|
ctx.finished_game = True
|
|
|
|
ctx.locations_checked = ctx.check_location_IDs
|
|
message = [{"cmd": 'LocationChecks', "locations": ctx.check_location_IDs}]
|
|
await ctx.send_msgs(message)
|
|
await asyncio.sleep(0.5)
|
|
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in game watcher: {e}")
|
|
ctx.connectedToDDD = False
|
|
continue
|
|
|
|
def launch():
|
|
|
|
async def main(args):
|
|
ctx = KHDDDContext(args.connect, args.password)
|
|
ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop")
|
|
if gui_enabled:
|
|
ctx.run_gui()
|
|
ctx.run_cli()
|
|
progression_watcher = asyncio.create_task(
|
|
game_watcher(ctx), name="KHDDDProgressionWatcher")
|
|
|
|
await ctx.exit_event.wait()
|
|
ctx.server_address = None
|
|
|
|
await progression_watcher
|
|
|
|
await ctx.shutdown()
|
|
|
|
import colorama
|
|
|
|
parser = get_base_parser(description="KHDDD Client, for text interfacing.")
|
|
|
|
args, rest = parser.parse_known_args()
|
|
colorama.just_fix_windows_console()
|
|
asyncio.run(main(args))
|
|
colorama.deinit() |