Files
dockipelago/worlds/trackmania/client.py
Jonathan Tinney 7971961166
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
add schedule I, sonic 1/frontiers/heroes, spirit island
2026-04-02 23:46:36 -07:00

259 lines
8.8 KiB
Python

import asyncio
import Utils
import websockets
import functools
from copy import deepcopy
from typing import List, Any, Iterable
from NetUtils import decode, encode, JSONtoTextParser, NetworkItem, NetworkPlayer
from MultiServer import Endpoint
from CommonClient import CommonContext, gui_enabled, ClientCommandProcessor, logger, get_base_parser
DEBUG = False
class TrackmaniaCommandProcessor(ClientCommandProcessor):
def _cmd_trackmania(self):
"""Check Trackmania Plugin Connection State"""
if isinstance(self.ctx, TrackmaniaContext):
logger.info(f"Trackmania Plugin Status: {self.ctx.get_trackmania_status()}")
def _cmd_reroll(self, series_number: int = -1, map_number: int = -1):
"""Reroll a map from Trackmania Exchange.
If series_number and map_number are not present, rerolls the currently loaded map."""
if isinstance(self.ctx, TrackmaniaContext):
if self.ctx.is_proxy_connected():
logger.info("Rerolling Loaded Map...")
series_number = int(series_number)
map_number = int(map_number)
msg : dict = {"cmd" : "Reroll", "series_index" : series_number, "map_index" : map_number}
msg_str = encode([msg])
self.ctx.server_msgs.append(msg_str)
class TrackmaniaContext(CommonContext):
command_processor = TrackmaniaCommandProcessor
game = "Trackmania"
def __init__(self, server_address, password):
super().__init__(server_address, password)
self.proxy = None
self.proxy_task = None
self.gamejsontotext = JSONtoTextParser(self)
self.autoreconnect_task = None
self.endpoint = None
self.items_handling = 0b111
self.room_info = None
self.connected_msg = None
self.game_connected = False
self.awaiting_info = False
self.full_inventory: List[Any] = []
self.server_msgs: List[Any] = []
async def server_auth(self, password_requested: bool = False):
if password_requested and not self.password:
await super(TrackmaniaContext, self).server_auth(password_requested)
await self.get_username()
await self.send_connect()
def get_trackmania_status(self) -> str:
if not self.is_proxy_connected():
return "Not connected to Trackmania Plugin"
return "Connected to Trackmania Plugin"
async def send_msgs_proxy(self, msgs: Iterable[dict]) -> bool:
""" `msgs` JSON serializable """
if not self.endpoint or not self.endpoint.socket.open or self.endpoint.socket.closed:
return False
if DEBUG:
logger.info(f"Outgoing message: {msgs}")
await self.endpoint.socket.send(msgs)
return True
async def disconnect(self, allow_autoreconnect: bool = False):
await super().disconnect(allow_autoreconnect)
async def disconnect_proxy(self):
if self.endpoint and not self.endpoint.socket.closed:
await self.endpoint.socket.close()
if self.proxy_task is not None:
await self.proxy_task
def is_connected(self) -> bool:
return self.server and self.server.socket.open
def is_proxy_connected(self) -> bool:
return self.endpoint and self.endpoint.socket.open
def on_print_json(self, args: dict):
text = self.gamejsontotext(deepcopy(args["data"]))
msg = {"cmd": "PrintJSON", "data": [{"text": text}], "type": "Chat"}
self.server_msgs.append(encode([msg]))
if self.ui:
self.ui.print_json(args["data"])
else:
text = self.jsontotextparser(args["data"])
logger.info(text)
def update_items(self):
# just to be safe - we might still have an inventory from a different room
if not self.is_connected():
return
self.server_msgs.append(encode([{"cmd": "ReceivedItems", "index": 0, "items": self.full_inventory}]))
def on_package(self, cmd: str, args: dict):
if cmd == "Connected":
json = args
# This data is not needed and causes the game to freeze for long periods of time in large asyncs.
if "slot_info" in json.keys():
json["slot_info"] = {}
if "players" in json.keys():
me: NetworkPlayer
for n in json["players"]:
if n.slot == json["slot"] and n.team == json["team"]:
me = n
break
# Only put our player info in there as we actually need it
json["players"] = [me]
if DEBUG:
print(json)
self.connected_msg = encode([json])
if self.awaiting_info:
self.server_msgs.append(self.room_info)
self.update_items()
self.awaiting_info = False
else:# send all checks we have whenever we reconnect to the server
if self.is_proxy_connected():
msg: dict = {"cmd":"Resync"}
self.server_msgs.append(encode(msg))
elif cmd == "RoomUpdate":
# Same story as above
json = args
if "players" in json.keys():
json["players"] = []
self.server_msgs.append(encode(json))
elif cmd == "ReceivedItems":
if args["index"] == 0:
self.full_inventory.clear()
for item in args["items"]:
self.full_inventory.append(NetworkItem(*item))
self.server_msgs.append(encode([args]))
elif cmd == "RoomInfo":
self.seed_name = args["seed_name"]
self.room_info = encode([args])
else:
if cmd != "PrintJSON":
self.server_msgs.append(encode([args]))
def run_gui(self):
from kvui import GameManager
class TrackmaniaManager(GameManager):
logging_pairs = [
("Client", "Archipelago")
]
base_title = "Archipelago Trackmania Client"
self.ui = TrackmaniaManager(self)
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
async def proxy(websocket, path: str = "/", ctx: TrackmaniaContext = None):
ctx.endpoint = Endpoint(websocket)
try:
await on_client_connected(ctx)
if ctx.is_proxy_connected():
async for data in websocket:
if DEBUG:
logger.info(f"Incoming message: {data}")
for msg in decode(data):
if msg["cmd"] == "Connect":
# Proxy is connecting, make sure it is valid
if msg["game"] != "Trackmania":
logger.info("Aborting proxy connection: game is not Trackmania")
await ctx.disconnect_proxy()
break
if ctx.connected_msg and ctx.is_connected():
await ctx.send_msgs_proxy(ctx.connected_msg)
ctx.update_items()
continue
if not ctx.is_proxy_connected():
break
await ctx.send_msgs([msg])
except Exception as e:
if not isinstance(e, websockets.WebSocketException):
logger.exception(e)
finally:
await ctx.disconnect_proxy()
async def on_client_connected(ctx: TrackmaniaContext):
if ctx.room_info and ctx.is_connected():
await ctx.send_msgs_proxy(ctx.room_info)
else:
ctx.awaiting_info = True
async def proxy_loop(ctx: TrackmaniaContext):
try:
while not ctx.exit_event.is_set():
if len(ctx.server_msgs) > 0:
for msg in ctx.server_msgs:
await ctx.send_msgs_proxy(msg)
ctx.server_msgs.clear()
await asyncio.sleep(0.1)
except Exception as e:
logger.exception(e)
logger.info("Aborting Trackmania Proxy Client due to errors")
def launch():
async def main():
parser = get_base_parser()
args = parser.parse_args()
ctx = TrackmaniaContext(args.connect, args.password)
logger.info("Starting Trackmania proxy server")
ctx.proxy = websockets.serve(functools.partial(proxy, ctx=ctx),
host="", port=22422, ping_timeout=999999, ping_interval=999999)
ctx.proxy_task = asyncio.create_task(proxy_loop(ctx), name="ProxyLoop")
if gui_enabled:
ctx.run_gui()
ctx.run_cli()
await ctx.proxy
await ctx.proxy_task
await ctx.exit_event.wait()
Utils.init_logging("TrackmaniaClient")
# options = Utils.get_options()
import colorama
colorama.just_fix_windows_console()
asyncio.run(main())
colorama.deinit()