Files
dockipelago/worlds/portal2/client/Portal2Client.py
Jonathan Tinney 7971961166
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
add schedule I, sonic 1/frontiers/heroes, spirit island
2026-04-02 23:46:36 -07:00

459 lines
19 KiB
Python

from argparse import Namespace
import asyncio
import logging
import sys
import time
import typing
from CommonClient import CommonContext, server_loop, ClientCommandProcessor, logger, gui_enabled
from NetUtils import ClientStatus, NetworkItem
from Utils import async_start, init_logging
from ..mod_helpers.ItemHandling import add_ratman_commands, handle_item, handle_map_start, handle_trap, portal_gun_upgrade_not_inplace, potatos_not_inplace
from ..mod_helpers.MapMenu import Menu
from ..Locations import location_names_to_map_codes, map_codes_to_location_names, wheatley_maps_to_monitor_names, all_locations_table
from .. import Portal2World
from ..Options import GameModeOption
if __name__ == "__main__":
init_logging("Portal2Client", exception_logger="Portal2Client")
logger = logging.getLogger("Portal2Client")
class Portal2CommandProcessor(ClientCommandProcessor):
def _cmd_check_connection(self):
"""Responds with the status of the client's connection to the Portal 2 mod"""
self.ctx.alert_game_connection()
def _cmd_command(self, *command):
"""Sends a command to the game. Should not be used unless you get softlocked"""
self.ctx.command_queue.append(' '.join(command) + "\n")
def _cmd_deathlink(self):
"""Toggles death link for this client"""
self.ctx.death_link_active = not self.ctx.death_link_active
async_start(self.ctx.update_death_link(self.ctx.death_link_active), "set_deathlink")
self.output(f"Death link has been {"enabled" if self.ctx.death_link_active else "disabled"}")
def _cmd_refresh_menu(self):
"""Refreshed the in game menu in case of maps being inaccessible when they should be"""
self.ctx.refresh_menu()
def _cmd_message_in_game(self, message: str, *color_string):
"""Send a message to be displayed in game (only works while in a map).
message can be any text
color_string is an optional RGB string e.g. 255 100 0"""
if len(color_string) == 3:
self.ctx.add_to_in_game_message_queue(message, ' '.join(color_string))
else:
self.ctx.add_to_in_game_message_queue(message)
def _cmd_needed(self, *location_name):
"""Get the requirements for the map separated by all requirements and ones not yet acquired"""
# Check if map name is in the list of map names
message = "Location not found, use /locations to get a list of locations"
location_name = ' '.join(location_name)
for location in location_names_to_map_codes.keys():
if location_name in location:
requirements = all_locations_table[location].required_items
requirements_not_collected = list(set(self.ctx.item_list) & set(requirements))
requirements.sort()
requirements_not_collected.sort()
message = ("Required Items: \n"
f"{", ".join(requirements)}\n"
f"{"All items acquired" if not requirements_not_collected else "Still needed: \n" + ", ".join(requirements_not_collected)}")
break
self.output(message)
class Portal2Context(CommonContext):
command_processor = Portal2CommandProcessor
game_command_sender_task: typing.Optional["asyncio.Task[None]"] = None
game_message_listener_task: typing.Optional["asyncio.Task[None]"] = None
game = "Portal 2"
items_handling = 0b111 # receive all items for /received
HOST = "localhost"
PORT = int(Portal2World.settings.default_portal2_port)
death_link_active = False
goal_map_code = ""
item_list: list[str] = []
item_remove_commands: list[str] = []
command_queue: list[str] = []
game_message_queue: list[str] = []
sender_active : bool = False
listener_active : bool = False
location_name_to_id: dict[str, int] = None
menu: Menu = None
def alert_game_connection(self):
if self.check_game_connection():
self.command_processor.output(self.command_processor, "Connection to Portal 2 is up and running")
else:
self.command_processor.output(self.command_processor, "Disconnected from Portal 2. Make sure the mod is open and the `-netconport 3000` launch option is set")
def create_level_begin_command(self):
'''Generates a command that deletes all entities not collected yet'''
return f"{';'.join(self.item_remove_commands)}\n"
def update_menu(self, location_id: int = None):
menu_file = Portal2World.settings.menu_file
if location_id is not None:
self.menu.complete_check(location_id)
# Write the menu to that file
with open(menu_file, "w", encoding='utf-8') as f:
f.write(str(self.menu))
def refresh_menu(self):
for location_id in self.checked_locations:
self.menu.complete_check(location_id)
self.update_menu()
def add_to_in_game_message_queue(self, message: str, color_string: str = None) -> None:
self.command_queue.append(f'script AddToTextQueue("{message}"{f',"{color_string}"' if color_string else ""})\n')
async def p2_message_listener(self):
'''Listener for the messages sent from portal 2 to the client'''
try:
while True:
try:
reader, writer = await asyncio.open_connection(self.HOST, self.PORT)
except ConnectionRefusedError:
self.listener_active = False
await asyncio.sleep(self.current_reconnect_delay)
continue
self.listener_active = True
try:
while True:
data = await reader.read(4096)
if not data:
# connection closed by server; break to reconnect
break
# Add messages to the queue for consumption
data_list = data.decode(errors="ignore").replace("\'", "").split('\r\n')
self.game_message_queue += data_list
except asyncio.CancelledError:
logger.info("Game listener closed from cancellation")
raise
except Exception as e:
logger.error(f"An error occurred in listener loop: {e}")
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
self.listener_active = False
await asyncio.sleep(self.current_reconnect_delay)
except asyncio.CancelledError:
logger.info("Game listener closed from cancellation")
raise
async def p2_command_sender(self):
'''Command sender for the console commands sent to portal 2 from the client'''
try:
while True:
try:
reader, writer = await asyncio.open_connection(self.HOST, self.PORT)
except ConnectionRefusedError:
self.sender_active = False
await asyncio.sleep(self.current_reconnect_delay)
continue
self.sender_active = True
try:
# Keep the connection open and send queued commands without blocking the loop
while True:
# handle commands
if self.command_queue:
c = self.command_queue.pop(0)
if c:
writer.write(c.encode())
await writer.drain()
# Handle messages
elif self.game_message_queue:
message = self.game_message_queue.pop(0)
await self.handle_message(message)
else:
# yield control briefly so other tasks (listener, etc.) run smoothly
await asyncio.sleep(0.1)
except asyncio.CancelledError:
logger.info("Game sender closed from cancellation")
raise
except Exception as e:
logger.error(f"An error occurred in sender loop: {e}")
finally:
try:
writer.close()
await writer.wait_closed()
except Exception:
pass
self.sender_active = False
await asyncio.sleep(self.current_reconnect_delay)
except asyncio.CancelledError:
logger.info("Game sender closed from cancellation")
raise
async def handle_message(self, message: str):
if message.startswith("map_name:"):
map_name = message.split(':', 1)[1]
# append the whole command string
command_string = self.create_level_begin_command()
self.command_queue.append(command_string)
self.command_queue += handle_map_start(map_name, self.item_list)
# For map complete checks
elif message.startswith("map_complete:"):
done_map = message.split(':', 1)[1]
if done_map == self.goal_map_code:
await self.handle_goal_completion()
map_id = self.map_code_to_location_id(done_map)
if map_id:
await self.check_locations([map_id])
self.update_menu(map_id)
# All other checks
elif message.startswith("item_collected:"):
item_collected = message.split(":", 1)[1]
check_id = all_locations_table[item_collected].id
await self.check_locations([check_id])
self.update_menu(check_id)
elif message.startswith("monitor_break:"):
map_name = message.split(":", 1)[1]
check_name = wheatley_maps_to_monitor_names[map_name]
check_id = all_locations_table[check_name].id
await self.check_locations([check_id])
self.update_menu(check_id)
# Custom buttons e.g. ratman dens
elif message.startswith("button_check:"):
check_name = message.split(":", 1)[1]
check_id = all_locations_table[check_name].id
await self.check_locations([check_id])
self.update_menu(check_id)
# Deathlink
elif message.startswith("send_deathlink"):
if self.death_link_active and time.time() - self.last_death_link > 10:
await self.send_death()
async def handle_goal_completion(self):
if self.finished_game:
return
self.finished_game = True
await self.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
def on_deathlink(self, data):
self.command_queue.append("restart\n")
return super().on_deathlink(data)
def check_game_connection(self):
logger.info("Sender active: " + str(self.sender_active) + " Listener active: " + str(self.listener_active))
return self.sender_active and self.listener_active
# Used for nothing?
def location_id_to_map_code(self, location_id: str) -> str:
'''Converts a location ID to a map code (if that id relates to a map location)'''
# Convert id to name
location_name = self.location_names.lookup_in_game(location_id)
# Get info for location name
if location_name in location_names_to_map_codes:
return location_names_to_map_codes[location_name]
return None
def map_code_to_location_id(self, map_code: str):
'''Convert in game map name to location id for location checks'''
if map_code not in map_codes_to_location_names:
return None
location_name = map_codes_to_location_names[map_code]
if not self.location_name_to_id:
raise Exception("location_name_to_id dict has not been created yet")
if location_name not in self.location_name_to_id:
return None
return self.location_name_to_id[location_name]
def handle_slot_data(self, slot_data: dict):
if "death_link" in slot_data:
self.death_link_active = slot_data["death_link"]
async_start(self.update_death_link(self.death_link_active), "set_deathlink")
if "goal_map_code" in slot_data:
self.goal_map_code = slot_data["goal_map_code"]
if "location_name_to_id" in slot_data:
self.location_name_to_id = slot_data["location_name_to_id"]
if "chapter_dict" in slot_data:
if "logic_difficulty" in slot_data:
self.menu = Menu(slot_data["chapter_dict"], self, logic_difficulty=slot_data["logic_difficulty"])
else:
self.menu = Menu(slot_data["chapter_dict"], self)
else:
raise Exception("chapter_dict not found in slot data")
if "game_mode" in slot_data:
self.menu.is_open_world = slot_data["game_mode"] == GameModeOption.OPEN_WORLD
if "wheatley_monitors" in slot_data:
if slot_data["wheatley_monitors"]:
self.menu.has_wheatley_monitors = True
if "ratman_dens" in slot_data:
if slot_data["ratman_dens"]:
add_ratman_commands()
self.menu.has_ratman_dens = True
# Don't remove the portal gun upgrade after pickup
if "portal_gun_upgrade_inplace" not in slot_data:
portal_gun_upgrade_not_inplace()
# Don't disable potatos in PotatOS level
if "potatos_inplace" not in slot_data:
potatos_not_inplace()
self.menu.generate_menu()
# self.refresh_menu()
def on_package(self, cmd, args):
def update_item_list():
# Update item list to only include items not collected
items_received_names = [self.item_names.lookup_in_game(i.item, self.game) for i in self.items_received]
self.item_list = list(set(self.item_list) - set(items_received_names))
self.refresh_menu()
# Add item names to list
if cmd == "Retrieved":
if f"_read_item_name_groups_{self.game}" in args["keys"]:
self.item_list = args["keys"][f"_read_item_name_groups_{self.game}"]["Everything"]
update_item_list()
self.update_item_remove_commands()
if cmd == "ReceivedItems":
items = args["items"]
traps = [i for i in items if i.flags == 0b100]
for trap in traps:
self.command_queue.append(handle_trap(self.item_names.lookup_in_game(trap.item, self.game)))
update_item_list()
self.update_item_remove_commands()
if cmd == "Connected":
self.handle_slot_data(args["slot_data"])
self.alert_game_connection()
if cmd == "PrintJSON":
if "type" in args:
if args["type"] == "ItemSend" and args["receiving"] == self.slot:
item: NetworkItem = args["item"]
text = self.parse_message(args["data"], sending = item.player)
elif args["type"] == "Goal":
text = self.parse_message(args["data"])
else:
if args["type"] == "Collect":
self.update_menu()
return # Don't send text to game
self.add_to_in_game_message_queue(text)
def parse_message(self, data: list[dict], sending: int | None = None) -> str: # data pats not cast to JSONMessagePart as expected, dict instead
message = ""
for part in data:
text = part["text"]
if "type" in part:
if part["type"] == "item_id":
text = self.item_names.lookup_in_slot(int(text), self.slot)
elif part["type"] == "location_id":
text = self.location_names.lookup_in_slot(int(text), sending)
elif part["type"] == "player_id":
text = self.player_names[int(text)]
message += text
return message
def update_item_remove_commands(self):
temp_commands = []
for item_name in self.item_list:
item_commands = handle_item(item_name)
if item_commands:
temp_commands += item_commands
self.item_remove_commands = temp_commands
def make_gui(self):
from kvui import GameManager
class Portal2TextManager(GameManager):
base_title = "Portal 2 Text Client"
def __init__(self, ctx):
super().__init__(ctx)
self.icon = r"worlds/portal2/data/Portalpelago.png"
return Portal2TextManager
async def shutdown(self):
self.server_address = ""
self.username = None
self.password = None
self.cancel_autoreconnect()
if self.server and not self.server.socket.closed:
await self.server.socket.close()
if self.server_task:
await self.server_task
if self.game_command_sender_task:
self.game_command_sender_task.cancel()
if self.game_message_listener_task:
self.game_message_listener_task.cancel()
while self.input_requests > 0:
self.input_queue.put_nowait(None)
self.input_requests -= 1
self.keep_alive_task.cancel()
if self.ui_task:
await self.ui_task
if self.input_task:
self.input_task.cancel()
async def server_auth(self, password_requested: bool = False) -> None:
if password_requested and not self.password:
await super().server_auth(password_requested)
await self.get_username()
await self.send_connect(game=self.game)
async def main(args: Namespace):
ctx = Portal2Context(args.connect, args.password)
ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop")
ctx.game_command_sender_task = asyncio.create_task(ctx.p2_command_sender(), name="sender loop")
ctx.game_message_listener_task = asyncio.create_task(ctx.p2_message_listener(), name="listener loop")
if gui_enabled:
ctx.run_gui()
ctx.run_cli()
await ctx.exit_event.wait()
await ctx.shutdown()
def launch(*args: str) -> None:
from .Launch import launch_portal_2_client
launch_portal_2_client(*args)
if __name__ == "__main__":
launch(*sys.argv[1:])