from __future__ import annotations import collections import copy import logging import asyncio import urllib.parse import sys import typing import time import functools import ModuleUpdate ModuleUpdate.update() import websockets import Utils if __name__ == "__main__": Utils.init_logging("TextClient", exception_logger="Client") from MultiServer import CommandProcessor, mark_raw from NetUtils import (Endpoint, decode, NetworkItem, JSONtoTextParser, ClientStatus, Permission, NetworkSlot, RawJSONtoTextParser, add_json_text, add_json_location, add_json_item, JSONTypes, HintStatus, SlotType, NetworkPlayer, encode_to_bytes) from Utils import Version, stream_input, async_start from worlds import network_data_package import os import ssl if typing.TYPE_CHECKING: import kvui import argparse logger = logging.getLogger("Client") # without terminal, we have to use gui mode gui_enabled = not sys.stdout or "--nogui" not in sys.argv @Utils.cache_argsless def get_ssl_context(): import certifi return ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=certifi.where()) class ClientCommandProcessor(CommandProcessor): """ The Command Processor will parse every method of the class that starts with "_cmd_" as a command to be called when parsing user input, i.e. _cmd_exit will be called when the user sends the command "/exit". The decorator @mark_raw can be imported from MultiServer and tells the parser to only split on the first space after the command i.e. "/exit one two three" will be passed in as method("one two three") with mark_raw and method("one", "two", "three") without. In addition all docstrings for command methods will be displayed to the user on launch and when using "/help" """ def __init__(self, ctx: CommonContext): self.ctx = ctx def output(self, text: str): """Helper function to abstract logging to the CommonClient UI""" logger.info(text) def _cmd_exit(self) -> bool: """Close connections and client""" self.ctx.exit_event.set() return True def _cmd_connect(self, address: str = "") -> bool: """Connect to a MultiWorld Server""" if address: self.ctx.server_address = None self.ctx.username = None self.ctx.password = None elif not self.ctx.server_address: self.output("Please specify an address.") return False async_start(self.ctx.connect(address if address else None), name="connecting") return True def _cmd_disconnect(self) -> bool: """Disconnect from a MultiWorld Server""" async_start(self.ctx.disconnect(), name="disconnecting") return True def _cmd_received(self) -> bool: """List all received items""" item: NetworkItem self.output(f'{len(self.ctx.items_received)} received items, sorted by time:') for index, item in enumerate(self.ctx.items_received, 1): parts = [] add_json_item(parts, item.item, self.ctx.slot, item.flags) add_json_text(parts, " from ") add_json_location(parts, item.location, item.player) add_json_text(parts, " by ") add_json_text(parts, item.player, type=JSONTypes.player_id) self.ctx.on_print_json({"data": parts, "cmd": "PrintJSON"}) return True def get_current_datapackage(self) -> dict[str, typing.Any]: """ Return datapackage for current game if known. :return: The datapackage for the currently registered game. If not found, an empty dictionary will be returned. """ if not self.ctx.game: return {} checksum = self.ctx.checksums[self.ctx.game] return Utils.load_data_package_for_checksum(self.ctx.game, checksum) def _cmd_missing(self, filter_text = "") -> bool: """List all missing location checks, from your local game state. Can be given text, which will be used as filter.""" if not self.ctx.game: self.output("No game set, cannot determine missing checks.") return False count = 0 checked_count = 0 lookup = self.get_current_datapackage().get("location_name_to_id", {}) for location, location_id in lookup.items(): if filter_text and filter_text not in location: continue if location_id < 0: continue if location_id not in self.ctx.locations_checked: if location_id in self.ctx.missing_locations: self.output('Missing: ' + location) count += 1 elif location_id in self.ctx.checked_locations: self.output('Checked: ' + location) count += 1 checked_count += 1 if count: self.output( f"Found {count} missing location checks{f'. {checked_count} location checks previously visited.' if checked_count else ''}") else: self.output("No missing location checks found.") return True def output_datapackage_part(self, key: str, name: str) -> bool: """ Helper to digest a specific section of this game's datapackage. :param key: The dictionary key in the datapackage. :param name: Printed to the user as context for the part. :return: Whether the process was successful. """ if not self.ctx.game: self.output(f"No game set, cannot determine {name}.") return False lookup = self.get_current_datapackage().get(key) if lookup is None: self.output("datapackage not yet loaded, try again") return False self.output(f"{name} for {self.ctx.game}") for key in lookup: self.output(key) return True def _cmd_items(self) -> bool: """List all item names for the currently running game.""" return self.output_datapackage_part("item_name_to_id", "Item Names") def _cmd_locations(self) -> bool: """List all location names for the currently running game.""" return self.output_datapackage_part("location_name_to_id", "Location Names") def output_group_part(self, group_key: typing.Literal["item_name_groups", "location_name_groups"], filter_key: str, name: str) -> bool: """ Logs an item or location group from the player's game's datapackage. :param group_key: Either Item or Location group to be processed. :param filter_key: Which group key to filter to. If an empty string is passed will log all item/location groups. :param name: Printed to the user as context for the part. :return: Whether the process was successful. """ if not self.ctx.game: self.output(f"No game set, cannot determine existing {name} Groups.") return False lookup = Utils.persistent_load().get("groups_by_checksum", {}).get(self.ctx.checksums[self.ctx.game], {})\ .get(self.ctx.game, {}).get(group_key, {}) if lookup is None: self.output("datapackage not yet loaded, try again") return False if filter_key: if filter_key not in lookup: self.output(f"Unknown {name} Group {filter_key}") return False self.output(f"{name}s for {name} Group \"{filter_key}\"") for entry in lookup[filter_key]: self.output(entry) else: self.output(f"{name} Groups for {self.ctx.game}") for group in lookup: self.output(group) return True @mark_raw def _cmd_item_groups(self, key: str = "") -> bool: """ List all item group names for the currently running game. :param key: Which item group to filter to. Will log all groups if empty. """ return self.output_group_part("item_name_groups", key, "Item") @mark_raw def _cmd_location_groups(self, key: str = "") -> bool: """ List all location group names for the currently running game. :param key: Which item group to filter to. Will log all groups if empty. """ return self.output_group_part("location_name_groups", key, "Location") def _cmd_ready(self) -> bool: """Send ready status to server.""" self.ctx.ready = not self.ctx.ready if self.ctx.ready: state = ClientStatus.CLIENT_READY self.output("Readied up.") else: state = ClientStatus.CLIENT_CONNECTED self.output("Unreadied.") async_start(self.ctx.send_msgs([{"cmd": "StatusUpdate", "status": state}]), name="send StatusUpdate") return True def default(self, raw: str): """The default message parser to be used when parsing any messages that do not match a command""" raw = self.ctx.on_user_say(raw) if raw: async_start(self.ctx.send_msgs([{"cmd": "Say", "text": raw}]), name="send Say") class CommonContext: # The following attributes are used to Connect and should be adjusted as needed in subclasses tags: typing.Set[str] = {"AP"} game: typing.Optional[str] = None items_handling: typing.Optional[int] = None want_slot_data: bool = True # should slot_data be retrieved via Connect class NameLookupDict: """A specialized dict, with helper methods, for id -> name item/location data package lookups by game.""" def __init__(self, ctx: CommonContext, lookup_type: typing.Literal["item", "location"]): self.ctx: CommonContext = ctx self.lookup_type: typing.Literal["item", "location"] = lookup_type self._unknown_item: typing.Callable[[int], str] = lambda key: f"Unknown {lookup_type} (ID: {key})" self._archipelago_lookup: typing.Dict[int, str] = {} self._game_store: typing.Dict[str, typing.ChainMap[int, str]] = collections.defaultdict( lambda: collections.ChainMap(self._archipelago_lookup, Utils.KeyedDefaultDict(self._unknown_item))) # noinspection PyTypeChecker def __getitem__(self, key: str) -> typing.Mapping[int, str]: assert isinstance(key, str), f"ctx.{self.lookup_type}_names used with an id, use the lookup_in_ helpers instead" return self._game_store[key] def __len__(self) -> int: return len(self._game_store) def __iter__(self) -> typing.Iterator[str]: return iter(self._game_store) def __repr__(self) -> str: return repr(self._game_store) def lookup_in_game(self, code: int, game_name: typing.Optional[str] = None) -> str: """Returns the name for an item/location id in the context of a specific game or own game if `game` is omitted. """ if game_name is None: game_name = self.ctx.game assert game_name is not None, f"Attempted to lookup {self.lookup_type} with no game name available." return self._game_store[game_name][code] def lookup_in_slot(self, code: int, slot: typing.Optional[int] = None) -> str: """Returns the name for an item/location id in the context of a specific slot or own slot if `slot` is omitted. Use of `lookup_in_slot` should not be used when not connected to a server. If looking in own game, set `ctx.game` and use `lookup_in_game` method instead. """ if slot is None: slot = self.ctx.slot assert slot is not None, f"Attempted to lookup {self.lookup_type} with no slot info available." return self.lookup_in_game(code, self.ctx.slot_info[slot].game) def update_game(self, game: str, name_to_id_lookup_table: typing.Dict[str, int]) -> None: """Overrides existing lookup tables for a particular game.""" id_to_name_lookup_table = Utils.KeyedDefaultDict(self._unknown_item) id_to_name_lookup_table.update({code: name for name, code in name_to_id_lookup_table.items()}) self._game_store[game] = collections.ChainMap(self._archipelago_lookup, id_to_name_lookup_table) if game == "Archipelago": # Keep track of the Archipelago data package separately so if it gets updated in a custom datapackage, # it updates in all chain maps automatically. self._archipelago_lookup.clear() self._archipelago_lookup.update(id_to_name_lookup_table) # defaults starting_reconnect_delay: int = 5 current_reconnect_delay: int = starting_reconnect_delay command_processor: typing.Type[CommandProcessor] = ClientCommandProcessor ui: typing.Optional["kvui.GameManager"] = None ui_task: typing.Optional["asyncio.Task[None]"] = None input_task: typing.Optional["asyncio.Task[None]"] = None keep_alive_task: typing.Optional["asyncio.Task[None]"] = None server_task: typing.Optional["asyncio.Task[None]"] = None autoreconnect_task: typing.Optional["asyncio.Task[None]"] = None disconnected_intentionally: bool = False server: typing.Optional[Endpoint] = None server_version: Version = Version(0, 0, 0) generator_version: Version = Version(0, 0, 0) current_energy_link_value: typing.Optional[int] = None # to display in UI, gets set by server max_size: int = 16*1024*1024 # 16 MB of max incoming packet size last_death_link: float = time.time() # last send/received death link on AP layer # remaining type info slot_info: dict[int, NetworkSlot] """Slot Info from the server for the current connection""" server_address: str | None """Autoconnect address provided by the ctx constructor""" password: str | None """Password used for Connecting, expected by server_auth""" hint_cost: int | None """Current Hint Cost per Hint from the server""" hint_points: int | None """Current avaliable Hint Points from the server""" player_names: dict[int, str] """Current lookup of slot number to player display name from server (includes aliases)""" finished_game: bool """ Bool to signal that status should be updated to Goal after reconnecting to be used to ensure that a StatusUpdate packet does not get lost when disconnected """ ready: bool """Bool to keep track of state for the /ready command""" team: int | None """Team number of currently connected slot""" slot: int | None """Slot number of currently connected slot""" auth: str | None """Name used in Connect packet""" seed_name: str | None """Seed name that will be validated on opening a socket if present""" # locations locations_checked: set[int] """ Local container of location ids checked to signal that LocationChecks should be resent after reconnecting to be used to ensure that a LocationChecks packet does not get lost when disconnected """ locations_scouted: set[int] """ Local container of location ids scouted to signal that LocationScouts should be resent after reconnecting to be used to ensure that a LocationScouts packet does not get lost when disconnected """ items_received: list[NetworkItem] """List of NetworkItems recieved from the server""" missing_locations: set[int] """Container of Locations that are unchecked per server state""" checked_locations: set[int] """Container of Locations that are checked per server state""" server_locations: set[int] """Container of Locations that exist per server state; a combination between missing and checked locations""" locations_info: dict[int, NetworkItem] """Dict of location id: NetworkItem info from LocationScouts request""" # data storage stored_data: dict[str, typing.Any] """ Data Storage values by key that were retrieved from the server any keys subscribed to with SetNotify will be kept up to date """ stored_data_notification_keys: set[str] """Current container of watched Data Storage keys, managed by ctx.set_notify""" # internals _messagebox: typing.Optional["kvui.MessageBox"] = None """Current message box through kvui""" _messagebox_connection_loss: typing.Optional["kvui.MessageBox"] = None """Message box reporting a loss of connection""" def __init__(self, server_address: typing.Optional[str] = None, password: typing.Optional[str] = None) -> None: # server state self.server_address = server_address self.username = None self.password = password self.hint_cost = None self.slot_info = {} self.permissions = { "release": "disabled", "collect": "disabled", "remaining": "disabled", } # own state self.finished_game = False self.ready = False self.team = None self.slot = None self.auth = None self.seed_name = None self.locations_checked = set() # local state self.locations_scouted = set() self.items_received = [] self.missing_locations = set() # server state self.checked_locations = set() # server state self.server_locations = set() # all locations the server knows of, missing_location | checked_locations self.locations_info = {} self.stored_data = {} self.stored_data_notification_keys = set() self.input_queue = asyncio.Queue() self.input_requests = 0 # game state self.player_names = {0: "Archipelago"} self.exit_event = asyncio.Event() self.watcher_event = asyncio.Event() self.item_names = self.NameLookupDict(self, "item") self.location_names = self.NameLookupDict(self, "location") self.checksums = {} self.jsontotextparser = JSONtoTextParser(self) self.rawjsontotextparser = RawJSONtoTextParser(self) if self.game: self.checksums[self.game] = network_data_package["games"][self.game]["checksum"] self.update_data_package(network_data_package) # execution self.keep_alive_task = asyncio.create_task(keep_alive(self), name="Bouncy") @property def suggested_address(self) -> str: if self.server_address: return self.server_address return Utils.persistent_load().get("client", {}).get("last_server_address", "") @functools.cached_property def raw_text_parser(self) -> RawJSONtoTextParser: return RawJSONtoTextParser(self) @property def total_locations(self) -> typing.Optional[int]: """Will return None until connected.""" if self.checked_locations or self.missing_locations: return len(self.checked_locations | self.missing_locations) async def connection_closed(self): if self.server and self.server.socket is not None: await self.server.socket.close() self.reset_server_state() def reset_server_state(self): self.auth = None self.slot = None self.team = None self.items_received = [] self.locations_info = {} self.server_version = Version(0, 0, 0) self.generator_version = Version(0, 0, 0) self.server = None self.server_task = None self.hint_cost = None self.permissions = { "release": "disabled", "collect": "disabled", "remaining": "disabled", } async def disconnect(self, allow_autoreconnect: bool = False): if not allow_autoreconnect: self.disconnected_intentionally = True if self.cancel_autoreconnect(): logger.info("Cancelled auto-reconnect.") if self.server and not self.server.socket.closed: await self.server.socket.close() if self.server_task is not None: await self.server_task if self.ui: self.ui.update_hints() async def send_msgs(self, msgs: typing.List[typing.Any]) -> None: """ `msgs` JSON serializable """ if not self.server or not self.server.socket.open or self.server.socket.closed: return await self.server.socket.send(encode_to_bytes(msgs)) def consume_players_package(self, package: typing.List[NetworkPlayer]): self.player_names = {network_player.slot: network_player.name for network_player in package if self.team == network_player.team} self.player_names[0] = "Archipelago" def event_invalid_slot(self): raise Exception('Invalid Slot; please verify that you have connected to the correct world.') def event_invalid_game(self): raise Exception('Invalid Game; please verify that you connected with the right game to the correct world.') async def server_auth(self, password_requested: bool = False) -> typing.Optional[str]: if password_requested and not self.password: logger.info('Enter the password required to join this game:') self.password = await self.console_input() return self.password return None async def get_username(self): if not self.auth: self.auth = self.username if not self.auth: logger.info('Enter slot name:') self.auth = await self.console_input() async def send_connect(self, **kwargs: typing.Any) -> None: """ Send a `Connect` packet to log in to the server, additional keyword args can override any value in the connection packet """ payload = { 'cmd': 'Connect', 'password': self.password, 'name': self.auth, 'version': Utils.version_tuple, 'tags': self.tags, 'items_handling': self.items_handling, 'uuid': Utils.get_unique_identifier(), 'game': self.game, "slot_data": self.want_slot_data, } if kwargs: payload.update(kwargs) await self.send_msgs([payload]) await self.send_msgs([{"cmd": "Get", "keys": ["_read_race_mode"]}]) async def check_locations(self, locations: typing.Collection[int]) -> set[int]: """Send new location checks to the server. Returns the set of actually new locations that were sent.""" locations = set(locations) & self.missing_locations if locations: await self.send_msgs([{"cmd": 'LocationChecks', "locations": tuple(locations)}]) return locations async def console_input(self) -> str: if self.ui: self.ui.focus_textinput() self.input_requests += 1 return await self.input_queue.get() async def connect(self, address: typing.Optional[str] = None) -> None: """ disconnect any previous connection, and open new connection to the server """ await self.disconnect() self.server_task = asyncio.create_task(server_loop(self, address), name="server loop") def cancel_autoreconnect(self) -> bool: if self.autoreconnect_task: self.autoreconnect_task.cancel() self.autoreconnect_task = None return True return False def slot_concerns_self(self, slot) -> bool: """Helper function to abstract player groups, should be used instead of checking slot == self.slot directly.""" if slot == self.slot: return True if slot in self.slot_info: return self.slot in self.slot_info[slot].group_members return False def is_echoed_chat(self, print_json_packet: dict) -> bool: """Helper function for filtering out messages sent by self.""" return print_json_packet.get("type", "") == "Chat" \ and print_json_packet.get("team", None) == self.team \ and print_json_packet.get("slot", None) == self.slot def is_uninteresting_item_send(self, print_json_packet: dict) -> bool: """Helper function for filtering out ItemSend prints that do not concern the local player.""" return print_json_packet.get("type", "") == "ItemSend" \ and not self.slot_concerns_self(print_json_packet["receiving"]) \ and not self.slot_concerns_self(print_json_packet["item"].player) def on_print(self, args: dict): logger.info(args["text"]) def on_print_json(self, args: dict): if self.ui: # send copy to UI self.ui.print_json(copy.deepcopy(args["data"])) logging.getLogger("FileLog").info(self.rawjsontotextparser(copy.deepcopy(args["data"])), extra={"NoStream": True}) logging.getLogger("StreamLog").info(self.jsontotextparser(copy.deepcopy(args["data"])), extra={"NoFile": True}) def on_package(self, cmd: str, args: dict): """For custom package handling in subclasses.""" pass def on_user_say(self, text: str) -> typing.Optional[str]: """Gets called before sending a Say to the server from the user. Returned text is sent, or sending is aborted if None is returned.""" return text def on_ui_command(self, text: str) -> None: """Gets called by kivy when the user executes a command starting with `/` or `!`. The command processor is still called; this is just intended for command echoing.""" self.ui.print_json([{"text": text, "type": "color", "color": "orange"}]) def update_permissions(self, permissions: typing.Dict[str, int]): """Internal method to parse and save server permissions from RoomInfo""" for permission_name, permission_flag in permissions.items(): try: flag = Permission(permission_flag) logger.info(f"{permission_name.capitalize()} permission: {flag.name}") self.permissions[permission_name] = flag.name except Exception as e: # safeguard against permissions that may be implemented in the future logger.exception(e) async def shutdown(self): self.server_address = "" self.username = None self.password = None self.cancel_autoreconnect() if self.server and not self.server.socket.closed: await self.server.socket.close() if self.server_task: await self.server_task while self.input_requests > 0: self.input_queue.put_nowait(None) self.input_requests -= 1 self.keep_alive_task.cancel() if self.ui_task: await self.ui_task if self.input_task: self.input_task.cancel() # Hints def update_hint(self, location: int, finding_player: int, status: typing.Optional[HintStatus]) -> None: msg = {"cmd": "UpdateHint", "location": location, "player": finding_player} if status is not None: msg["status"] = status async_start(self.send_msgs([msg]), name="update_hint") # DataPackage async def prepare_data_package(self, relevant_games: typing.Set[str], remote_data_package_checksums: typing.Dict[str, str]): """Validate that all data is present for the current multiworld. Download, assimilate and cache missing data from the server.""" # by documentation any game can use Archipelago locations/items -> always relevant relevant_games.add("Archipelago") needed_updates: typing.Set[str] = set() for game in relevant_games: if game not in remote_data_package_checksums: continue remote_checksum: typing.Optional[str] = remote_data_package_checksums.get(game) if not remote_checksum: # custom data package and no checksum for this game needed_updates.add(game) continue cached_checksum: typing.Optional[str] = self.checksums.get(game) # no action required if cached version is new enough if remote_checksum != cached_checksum: local_checksum: typing.Optional[str] = network_data_package["games"].get(game, {}).get("checksum") if remote_checksum == local_checksum: self.update_game(network_data_package["games"][game], game) else: cached_game = Utils.load_data_package_for_checksum(game, remote_checksum) cache_checksum: typing.Optional[str] = cached_game.get("checksum") # download remote version if cache is not new enough if remote_checksum != cache_checksum: needed_updates.add(game) else: self.update_game(cached_game, game) if needed_updates: await self.send_msgs([{"cmd": "GetDataPackage", "games": [game_name]} for game_name in needed_updates]) def update_game(self, game_package: dict, game: str): self.item_names.update_game(game, game_package["item_name_to_id"]) self.location_names.update_game(game, game_package["location_name_to_id"]) self.checksums[game] = game_package.get("checksum") def update_data_package(self, data_package: dict): for game, game_data in data_package["games"].items(): self.update_game(game_data, game) def consume_network_data_package(self, data_package: dict): self.update_data_package(data_package) logger.info(f"Got new ID/Name DataPackage for {', '.join(data_package['games'])}") for game, game_data in data_package["games"].items(): Utils.store_data_package_for_checksum(game, game_data) def consume_network_item_groups(self): data = {"item_name_groups": self.stored_data[f"_read_item_name_groups_{self.game}"]} current_cache = Utils.persistent_load().get("groups_by_checksum", {}).get(self.checksums[self.game], {}) if self.game in current_cache: current_cache[self.game].update(data) else: current_cache[self.game] = data Utils.persistent_store("groups_by_checksum", self.checksums[self.game], current_cache) def consume_network_location_groups(self): data = {"location_name_groups": self.stored_data[f"_read_location_name_groups_{self.game}"]} current_cache = Utils.persistent_load().get("groups_by_checksum", {}).get(self.checksums[self.game], {}) if self.game in current_cache: current_cache[self.game].update(data) else: current_cache[self.game] = data Utils.persistent_store("groups_by_checksum", self.checksums[self.game], current_cache) # data storage def set_notify(self, *keys: str) -> None: """Subscribe to be notified of changes to selected data storage keys. The values can be accessed via the "stored_data" attribute of this context, which is a dictionary mapping the names of the data storage keys to the latest values received from the server. """ if new_keys := (set(keys) - self.stored_data_notification_keys): self.stored_data_notification_keys.update(new_keys) async_start(self.send_msgs([{"cmd": "Get", "keys": list(new_keys)}, {"cmd": "SetNotify", "keys": list(new_keys)}])) # DeathLink hooks def on_deathlink(self, data: typing.Dict[str, typing.Any]) -> None: """Gets dispatched when a new DeathLink is triggered by another linked player.""" self.last_death_link = max(data["time"], self.last_death_link) text = data.get("cause", "") if text: logger.info(f"DeathLink: {text}") else: logger.info(f"DeathLink: Received from {data['source']}") async def send_death(self, death_text: str = ""): """Helper function to send a deathlink using death_text as the unique death cause string.""" if self.server and self.server.socket: logger.info("DeathLink: Sending death to your friends...") self.last_death_link = time.time() await self.send_msgs([{ "cmd": "Bounce", "tags": ["DeathLink"], "data": { "time": self.last_death_link, "source": self.player_names[self.slot], "cause": death_text } }]) async def update_death_link(self, death_link: bool): """Helper function to set Death Link connection tag on/off and update the connection if already connected.""" old_tags = self.tags.copy() if death_link: self.tags.add("DeathLink") else: self.tags -= {"DeathLink"} if old_tags != self.tags and self.server and not self.server.socket.closed: await self.send_msgs([{"cmd": "ConnectUpdate", "tags": self.tags}]) def gui_error(self, title: str, text: typing.Union[Exception, str]) -> typing.Optional["kvui.MessageBox"]: """Displays an error messagebox in the loaded Kivy UI. Override if using a different UI framework""" if not self.ui: return None title = title or "Error" from kvui import MessageBox if self._messagebox: self._messagebox.dismiss() # make "Multiple exceptions" look nice text = str(text).replace('[Errno', '\n[Errno').strip() # split long messages into title and text parts = title.split('. ', 1) if len(parts) == 1: parts = title.split(', ', 1) if len(parts) > 1: text = parts[1] + '\n\n' + text title = parts[0] # display error self._messagebox = MessageBox(title, text, error=True) self._messagebox.open() return self._messagebox def handle_connection_loss(self, msg: str) -> None: """Helper for logging and displaying a loss of connection. Must be called from an except block.""" exc_info = sys.exc_info() logger.exception(msg, exc_info=exc_info, extra={'compact_gui': True}) self._messagebox_connection_loss = self.gui_error(msg, exc_info[1]) def make_gui(self) -> "type[kvui.GameManager]": """ To return the Kivy `App` class needed for `run_gui` so it can be overridden before being built Common changes are changing `base_title` to update the window title of the client and updating `logging_pairs` to automatically make new tabs that can be filled with their respective logger. ex. `logging_pairs.append(("Foo", "Bar"))` will add a "Bar" tab which follows the logger returned from `logging.getLogger("Foo")` """ from kvui import GameManager class TextManager(GameManager): base_title = "Archipelago Text Client" return TextManager def run_gui(self): """Import kivy UI system from make_gui() and start running it as self.ui_task.""" ui_class = self.make_gui() self.ui = ui_class(self) self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI") def run_cli(self): if sys.stdin: if sys.stdin.fileno() != 0: from multiprocessing import parent_process if parent_process(): return # ignore MultiProcessing pipe # steam overlay breaks when starting console_loop if 'gameoverlayrenderer' in os.environ.get('LD_PRELOAD', ''): logger.info("Skipping terminal input, due to conflicting Steam Overlay detected. Please use GUI only.") else: self.input_task = asyncio.create_task(console_loop(self), name="Input") async def keep_alive(ctx: CommonContext, seconds_between_checks=100): """some ISPs/network configurations drop TCP connections if no payload is sent (ignore TCP-keep-alive) so we send a payload to prevent drop and if we were dropped anyway this will cause an auto-reconnect.""" seconds_elapsed = 0 while not ctx.exit_event.is_set(): await asyncio.sleep(1) # short sleep to not block program shutdown if ctx.server and ctx.slot: seconds_elapsed += 1 if seconds_elapsed > seconds_between_checks: await ctx.send_msgs([{"cmd": "Bounce", "slots": [ctx.slot]}]) seconds_elapsed = 0 async def server_loop(ctx: CommonContext, address: typing.Optional[str] = None) -> None: if ctx.server and ctx.server.socket: logger.error('Already connected') return if address is None: # set through CLI or APBP address = ctx.server_address # Wait for the user to provide a multiworld server address if not address: logger.info('Please connect to an Archipelago server.') return ctx.cancel_autoreconnect() if ctx._messagebox_connection_loss: ctx._messagebox_connection_loss.dismiss() ctx._messagebox_connection_loss = None address = f"ws://{address}" if "://" not in address \ else address.replace("archipelago://", "ws://") server_url = urllib.parse.urlparse(address) if server_url.username: ctx.username = server_url.username if server_url.password: ctx.password = server_url.password def reconnect_hint() -> str: return ", type /connect to reconnect" if ctx.server_address else "" logger.info(f'Connecting to Archipelago server at {address}') try: port = server_url.port or 38281 # raises ValueError if invalid socket = await websockets.connect(address, port=port, ping_timeout=None, ping_interval=None, ssl=get_ssl_context() if address.startswith("wss://") else None, max_size=ctx.max_size) if ctx.ui is not None: ctx.ui.update_address_bar(server_url.netloc) ctx.server = Endpoint(socket) logger.info('Connected') ctx.server_address = address ctx.current_reconnect_delay = ctx.starting_reconnect_delay ctx.disconnected_intentionally = False async for data in ctx.server.socket: for msg in decode(data): await process_server_cmd(ctx, msg) logger.warning(f"Disconnected from multiworld server{reconnect_hint()}") except websockets.InvalidMessage: # probably encrypted if address.startswith("ws://"): # try wss await server_loop(ctx, "ws" + address[1:]) else: ctx.handle_connection_loss(f"Lost connection to the multiworld server due to InvalidMessage" f"{reconnect_hint()}") except ConnectionRefusedError: ctx.handle_connection_loss("Connection refused by the server. " "May not be running Archipelago on that address or port.") except websockets.InvalidURI: ctx.handle_connection_loss("Failed to connect to the multiworld server (invalid URI)") except OSError: ctx.handle_connection_loss("Failed to connect to the multiworld server") except Exception: ctx.handle_connection_loss(f"Lost connection to the multiworld server{reconnect_hint()}") finally: await ctx.connection_closed() if ctx.server_address and ctx.username and not ctx.disconnected_intentionally: logger.info(f"... automatically reconnecting in {ctx.current_reconnect_delay} seconds") assert ctx.autoreconnect_task is None ctx.autoreconnect_task = asyncio.create_task(server_autoreconnect(ctx), name="server auto reconnect") ctx.current_reconnect_delay *= 2 async def server_autoreconnect(ctx: CommonContext): await asyncio.sleep(ctx.current_reconnect_delay) if ctx.server_address and ctx.server_task is None: ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop") async def process_server_cmd(ctx: CommonContext, args: dict): try: cmd = args["cmd"] except: logger.exception(f"Could not get command from {args}") raise if cmd == 'RoomInfo': if ctx.seed_name and ctx.seed_name != args["seed_name"]: msg = "The server is running a different multiworld than your client is. (invalid seed_name)" logger.info(msg, extra={'compact_gui': True}) ctx.gui_error('Error', msg) else: logger.info('--------------------------------') logger.info('Room Information:') logger.info('--------------------------------') ctx.server_version = Version.from_network_dict(args["version"]) if "generator_version" in args: ctx.generator_version = Version.from_network_dict(args["generator_version"]) logger.info(f'Server protocol version: {ctx.server_version.as_simple_string()}, ' f'generator version: {ctx.generator_version.as_simple_string()}, ' f'tags: {", ".join(args["tags"])}') else: logger.info(f'Server protocol version: {ctx.server_version.as_simple_string()}, ' f'tags: {", ".join(args["tags"])}') if args['password']: logger.info('Password required') ctx.update_permissions(args.get("permissions", {})) logger.info( f"A !hint costs {args['hint_cost']}% of your total location count as points" f" and you get {args['location_check_points']}" f" for each location checked. Use !hint for more information.") ctx.hint_cost = int(args['hint_cost']) ctx.check_points = int(args['location_check_points']) if "players" in args: # TODO remove when servers sending this are outdated players = args.get("players", []) if len(players) < 1: logger.info('No player connected') else: players.sort() current_team = -1 logger.info('Connected Players:') for network_player in players: if network_player.team != current_team: logger.info(f' Team #{network_player.team + 1}') current_team = network_player.team logger.info(' %s (Player %d)' % (network_player.alias, network_player.slot)) # update data package data_package_checksums = args.get("datapackage_checksums", {}) await ctx.prepare_data_package(set(args["games"]), data_package_checksums) await ctx.server_auth(args['password']) elif cmd == 'DataPackage': ctx.consume_network_data_package(args['data']) elif cmd == 'ConnectionRefused': errors = args["errors"] if 'InvalidSlot' in errors: ctx.disconnected_intentionally = True ctx.event_invalid_slot() elif 'InvalidGame' in errors: ctx.disconnected_intentionally = True ctx.event_invalid_game() elif 'IncompatibleVersion' in errors: ctx.disconnected_intentionally = True raise Exception('Server reported your client version as incompatible. ' 'This probably means you have to update.') elif 'InvalidItemsHandling' in errors: raise Exception('The item handling flags requested by the client are not supported') # last to check, recoverable problem elif 'InvalidPassword' in errors: logger.error('Invalid password') ctx.password = None await ctx.server_auth(True) elif errors: raise Exception("Unknown connection errors: " + str(errors)) else: raise Exception('Connection refused by the multiworld host, no reason provided') elif cmd == 'Connected': ctx.username = ctx.auth ctx.team = args["team"] ctx.slot = args["slot"] # int keys get lost in JSON transfer ctx.slot_info = {0: NetworkSlot("Archipelago", "Archipelago", SlotType.player)} ctx.slot_info.update({int(pid): NetworkSlot.from_network_dict(data) for pid, data in args["slot_info"].items()}) ctx.hint_points = args.get("hint_points", 0) ctx.consume_players_package([NetworkPlayer.from_network_dict(player) for player in args["players"]]) ctx.stored_data_notification_keys.add(f"_read_hints_{ctx.team}_{ctx.slot}") if ctx.game: game = ctx.game else: game = ctx.slot_info[ctx.slot][1] ctx.stored_data_notification_keys.add(f"_read_item_name_groups_{game}") ctx.stored_data_notification_keys.add(f"_read_location_name_groups_{game}") msgs = [] if ctx.locations_checked: msgs.append({"cmd": "LocationChecks", "locations": list(ctx.locations_checked)}) if ctx.locations_scouted: msgs.append({"cmd": "LocationScouts", "locations": list(ctx.locations_scouted)}) if ctx.stored_data_notification_keys: msgs.append({"cmd": "Get", "keys": list(ctx.stored_data_notification_keys)}) msgs.append({"cmd": "SetNotify", "keys": list(ctx.stored_data_notification_keys)}) if msgs: await ctx.send_msgs(msgs) if ctx.finished_game: await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}]) # Get the server side view of missing as of time of connecting. # This list is used to only send to the server what is reported as ACTUALLY Missing. # This also serves to allow an easy visual of what locations were already checked previously # when /missing is used for the client side view of what is missing. ctx.missing_locations = set(args["missing_locations"]) ctx.checked_locations = set(args["checked_locations"]) ctx.server_locations = ctx.missing_locations | ctx. checked_locations server_url = urllib.parse.urlparse(ctx.server_address) Utils.persistent_store("client", "last_server_address", server_url.netloc) elif cmd == 'ReceivedItems': start_index = args["index"] if start_index == 0: ctx.items_received = [] elif start_index != len(ctx.items_received): sync_msg = [{'cmd': 'Sync'}] if ctx.locations_checked: sync_msg.append({"cmd": "LocationChecks", "locations": list(ctx.locations_checked)}) await ctx.send_msgs(sync_msg) if start_index == len(ctx.items_received): for item in args['items']: ctx.items_received.append(NetworkItem.from_network_dict(item)) ctx.watcher_event.set() elif cmd == 'LocationInfo': for item in [NetworkItem.from_network_dict(item) for item in args['locations']]: ctx.locations_info[item.location] = item ctx.watcher_event.set() elif cmd == "RoomUpdate": if "players" in args: ctx.consume_players_package([NetworkPlayer.from_network_dict(player) for player in args["players"]]) if "hint_points" in args: ctx.hint_points = args['hint_points'] if "checked_locations" in args: checked = set(args["checked_locations"]) ctx.checked_locations |= checked ctx.missing_locations -= checked if "permissions" in args: ctx.update_permissions(args["permissions"]) elif cmd == 'Print': ctx.on_print(args) elif cmd == 'PrintJSON': ctx.on_print_json(args) elif cmd == 'InvalidPacket': logger.warning(f"Invalid Packet of {args['type']}: {args['text']}") elif cmd == "Bounced": tags = args.get("tags", []) # we can skip checking "DeathLink" in ctx.tags, as otherwise we wouldn't have been send this if "DeathLink" in tags and ctx.last_death_link != args["data"]["time"]: ctx.on_deathlink(args["data"]) elif cmd == "Retrieved": ctx.stored_data.update(args["keys"]) if ctx.ui and f"_read_hints_{ctx.team}_{ctx.slot}" in args["keys"]: ctx.ui.update_hints() if f"_read_item_name_groups_{ctx.game}" in args["keys"]: ctx.consume_network_item_groups() if f"_read_location_name_groups_{ctx.game}" in args["keys"]: ctx.consume_network_location_groups() elif cmd == "SetReply": ctx.stored_data[args["key"]] = args["value"] if ctx.ui and f"_read_hints_{ctx.team}_{ctx.slot}" == args["key"]: ctx.ui.update_hints() elif f"_read_item_name_groups_{ctx.game}" == args["key"]: ctx.consume_network_item_groups() elif f"_read_location_name_groups_{ctx.game}" == args["key"]: ctx.consume_network_location_groups() elif args["key"].startswith("EnergyLink"): ctx.current_energy_link_value = args["value"] if ctx.ui: ctx.ui.set_new_energy_link_value() else: logger.debug(f"unknown command {cmd}") ctx.on_package(cmd, args) async def console_loop(ctx: CommonContext): commandprocessor = ctx.command_processor(ctx) queue = asyncio.Queue() stream_input(sys.stdin, queue) while not ctx.exit_event.is_set(): try: input_text = await queue.get() queue.task_done() if ctx.input_requests > 0: ctx.input_requests -= 1 ctx.input_queue.put_nowait(input_text) continue if input_text: commandprocessor(input_text) except Exception as e: logger.exception(e) def get_base_parser(description: typing.Optional[str] = None): """Base argument parser to be reused for components subclassing off of CommonClient""" import argparse parser = argparse.ArgumentParser(description=description) parser.add_argument('--connect', default=None, help='Address of the multiworld host.') parser.add_argument('--password', default=None, help='Password of the multiworld host.') if sys.stdout: # If terminal output exists, offer gui-less mode parser.add_argument('--nogui', default=False, action='store_true', help="Turns off Client GUI.") return parser def handle_url_arg(args: "argparse.Namespace", parser: "typing.Optional[argparse.ArgumentParser]" = None) -> "argparse.Namespace": """ Parse the url arg "archipelago://name:pass@host:port" from launcher into correct launch args for CommonClient If alternate data is required the urlparse response is saved back to args.url if valid """ if not args.url: return args url = urllib.parse.urlparse(args.url) if url.scheme != "archipelago": if not parser: parser = get_base_parser() parser.error(f"bad url, found {args.url}, expected url in form of archipelago://archipelago.gg:38281") return args args.url = url args.connect = url.netloc if url.username: args.name = urllib.parse.unquote(url.username) if url.password: args.password = urllib.parse.unquote(url.password) return args def run_as_textclient(*args): class TextContext(CommonContext): # Text Mode to use !hint and such with games that have no text entry tags = CommonContext.tags | {"TextOnly"} game = "" # empty matches any game since 0.3.2 items_handling = 0b111 # receive all items for /received want_slot_data = False # Can't use game specific slot_data async def server_auth(self, password_requested: bool = False): if password_requested and not self.password: await super(TextContext, self).server_auth(password_requested) await self.get_username() await self.send_connect(game="") def on_package(self, cmd: str, args: dict): if cmd == "Connected": self.game = self.slot_info[self.slot].game async def disconnect(self, allow_autoreconnect: bool = False): self.game = "" await super().disconnect(allow_autoreconnect) async def main(args): ctx = TextContext(args.connect, args.password) ctx.auth = args.name ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop") if gui_enabled: ctx.run_gui() ctx.run_cli() await ctx.exit_event.wait() await ctx.shutdown() import colorama parser = get_base_parser(description="Gameless Archipelago Client, for text interfacing.") parser.add_argument('--name', default=None, help="Slot Name to connect as.") parser.add_argument("url", nargs="?", help="Archipelago connection url") args = parser.parse_args(args) args = handle_url_arg(args, parser=parser) # use colorama to display colored text highlighting on windows colorama.just_fix_windows_console() asyncio.run(main(args)) colorama.deinit() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) # force log-level to work around log level resetting to WARNING run_as_textclient(*sys.argv[1:]) # default value for parse_args