forked from mirror/Archipelago
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
259 lines
8.8 KiB
Python
259 lines
8.8 KiB
Python
import asyncio
|
|
import Utils
|
|
import websockets
|
|
import functools
|
|
from copy import deepcopy
|
|
from typing import List, Any, Iterable
|
|
from NetUtils import decode, encode, JSONtoTextParser, NetworkItem, NetworkPlayer
|
|
from MultiServer import Endpoint
|
|
from CommonClient import CommonContext, gui_enabled, ClientCommandProcessor, logger, get_base_parser
|
|
|
|
DEBUG = False
|
|
|
|
|
|
class TrackmaniaCommandProcessor(ClientCommandProcessor):
|
|
def _cmd_trackmania(self):
|
|
"""Check Trackmania Plugin Connection State"""
|
|
if isinstance(self.ctx, TrackmaniaContext):
|
|
logger.info(f"Trackmania Plugin Status: {self.ctx.get_trackmania_status()}")
|
|
|
|
def _cmd_reroll(self, series_number: int = -1, map_number: int = -1):
|
|
"""Reroll a map from Trackmania Exchange.
|
|
If series_number and map_number are not present, rerolls the currently loaded map."""
|
|
if isinstance(self.ctx, TrackmaniaContext):
|
|
if self.ctx.is_proxy_connected():
|
|
logger.info("Rerolling Loaded Map...")
|
|
series_number = int(series_number)
|
|
map_number = int(map_number)
|
|
msg : dict = {"cmd" : "Reroll", "series_index" : series_number, "map_index" : map_number}
|
|
msg_str = encode([msg])
|
|
self.ctx.server_msgs.append(msg_str)
|
|
|
|
|
|
|
|
|
|
class TrackmaniaContext(CommonContext):
|
|
command_processor = TrackmaniaCommandProcessor
|
|
game = "Trackmania"
|
|
|
|
def __init__(self, server_address, password):
|
|
super().__init__(server_address, password)
|
|
self.proxy = None
|
|
self.proxy_task = None
|
|
self.gamejsontotext = JSONtoTextParser(self)
|
|
self.autoreconnect_task = None
|
|
self.endpoint = None
|
|
self.items_handling = 0b111
|
|
self.room_info = None
|
|
self.connected_msg = None
|
|
self.game_connected = False
|
|
self.awaiting_info = False
|
|
self.full_inventory: List[Any] = []
|
|
self.server_msgs: List[Any] = []
|
|
|
|
async def server_auth(self, password_requested: bool = False):
|
|
if password_requested and not self.password:
|
|
await super(TrackmaniaContext, self).server_auth(password_requested)
|
|
|
|
await self.get_username()
|
|
await self.send_connect()
|
|
|
|
def get_trackmania_status(self) -> str:
|
|
if not self.is_proxy_connected():
|
|
return "Not connected to Trackmania Plugin"
|
|
|
|
return "Connected to Trackmania Plugin"
|
|
|
|
async def send_msgs_proxy(self, msgs: Iterable[dict]) -> bool:
|
|
""" `msgs` JSON serializable """
|
|
if not self.endpoint or not self.endpoint.socket.open or self.endpoint.socket.closed:
|
|
return False
|
|
|
|
if DEBUG:
|
|
logger.info(f"Outgoing message: {msgs}")
|
|
|
|
await self.endpoint.socket.send(msgs)
|
|
return True
|
|
|
|
async def disconnect(self, allow_autoreconnect: bool = False):
|
|
await super().disconnect(allow_autoreconnect)
|
|
|
|
async def disconnect_proxy(self):
|
|
if self.endpoint and not self.endpoint.socket.closed:
|
|
await self.endpoint.socket.close()
|
|
if self.proxy_task is not None:
|
|
await self.proxy_task
|
|
|
|
def is_connected(self) -> bool:
|
|
return self.server and self.server.socket.open
|
|
|
|
def is_proxy_connected(self) -> bool:
|
|
return self.endpoint and self.endpoint.socket.open
|
|
|
|
def on_print_json(self, args: dict):
|
|
text = self.gamejsontotext(deepcopy(args["data"]))
|
|
msg = {"cmd": "PrintJSON", "data": [{"text": text}], "type": "Chat"}
|
|
self.server_msgs.append(encode([msg]))
|
|
|
|
if self.ui:
|
|
self.ui.print_json(args["data"])
|
|
else:
|
|
text = self.jsontotextparser(args["data"])
|
|
logger.info(text)
|
|
|
|
def update_items(self):
|
|
# just to be safe - we might still have an inventory from a different room
|
|
if not self.is_connected():
|
|
return
|
|
|
|
self.server_msgs.append(encode([{"cmd": "ReceivedItems", "index": 0, "items": self.full_inventory}]))
|
|
|
|
def on_package(self, cmd: str, args: dict):
|
|
if cmd == "Connected":
|
|
json = args
|
|
# This data is not needed and causes the game to freeze for long periods of time in large asyncs.
|
|
if "slot_info" in json.keys():
|
|
json["slot_info"] = {}
|
|
if "players" in json.keys():
|
|
me: NetworkPlayer
|
|
for n in json["players"]:
|
|
if n.slot == json["slot"] and n.team == json["team"]:
|
|
me = n
|
|
break
|
|
|
|
# Only put our player info in there as we actually need it
|
|
json["players"] = [me]
|
|
if DEBUG:
|
|
print(json)
|
|
self.connected_msg = encode([json])
|
|
if self.awaiting_info:
|
|
self.server_msgs.append(self.room_info)
|
|
self.update_items()
|
|
self.awaiting_info = False
|
|
else:# send all checks we have whenever we reconnect to the server
|
|
if self.is_proxy_connected():
|
|
msg: dict = {"cmd":"Resync"}
|
|
self.server_msgs.append(encode(msg))
|
|
|
|
|
|
elif cmd == "RoomUpdate":
|
|
# Same story as above
|
|
json = args
|
|
if "players" in json.keys():
|
|
json["players"] = []
|
|
|
|
self.server_msgs.append(encode(json))
|
|
|
|
elif cmd == "ReceivedItems":
|
|
if args["index"] == 0:
|
|
self.full_inventory.clear()
|
|
|
|
for item in args["items"]:
|
|
self.full_inventory.append(NetworkItem(*item))
|
|
|
|
self.server_msgs.append(encode([args]))
|
|
|
|
elif cmd == "RoomInfo":
|
|
self.seed_name = args["seed_name"]
|
|
self.room_info = encode([args])
|
|
|
|
else:
|
|
if cmd != "PrintJSON":
|
|
self.server_msgs.append(encode([args]))
|
|
|
|
def run_gui(self):
|
|
from kvui import GameManager
|
|
|
|
class TrackmaniaManager(GameManager):
|
|
logging_pairs = [
|
|
("Client", "Archipelago")
|
|
]
|
|
base_title = "Archipelago Trackmania Client"
|
|
|
|
self.ui = TrackmaniaManager(self)
|
|
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
|
|
|
|
|
|
async def proxy(websocket, path: str = "/", ctx: TrackmaniaContext = None):
|
|
ctx.endpoint = Endpoint(websocket)
|
|
try:
|
|
await on_client_connected(ctx)
|
|
|
|
if ctx.is_proxy_connected():
|
|
async for data in websocket:
|
|
if DEBUG:
|
|
logger.info(f"Incoming message: {data}")
|
|
|
|
for msg in decode(data):
|
|
if msg["cmd"] == "Connect":
|
|
# Proxy is connecting, make sure it is valid
|
|
if msg["game"] != "Trackmania":
|
|
logger.info("Aborting proxy connection: game is not Trackmania")
|
|
await ctx.disconnect_proxy()
|
|
break
|
|
|
|
if ctx.connected_msg and ctx.is_connected():
|
|
await ctx.send_msgs_proxy(ctx.connected_msg)
|
|
ctx.update_items()
|
|
continue
|
|
|
|
if not ctx.is_proxy_connected():
|
|
break
|
|
|
|
await ctx.send_msgs([msg])
|
|
|
|
except Exception as e:
|
|
if not isinstance(e, websockets.WebSocketException):
|
|
logger.exception(e)
|
|
finally:
|
|
await ctx.disconnect_proxy()
|
|
|
|
|
|
async def on_client_connected(ctx: TrackmaniaContext):
|
|
if ctx.room_info and ctx.is_connected():
|
|
await ctx.send_msgs_proxy(ctx.room_info)
|
|
else:
|
|
ctx.awaiting_info = True
|
|
|
|
|
|
async def proxy_loop(ctx: TrackmaniaContext):
|
|
try:
|
|
while not ctx.exit_event.is_set():
|
|
if len(ctx.server_msgs) > 0:
|
|
for msg in ctx.server_msgs:
|
|
await ctx.send_msgs_proxy(msg)
|
|
|
|
ctx.server_msgs.clear()
|
|
await asyncio.sleep(0.1)
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
logger.info("Aborting Trackmania Proxy Client due to errors")
|
|
|
|
|
|
def launch():
|
|
async def main():
|
|
parser = get_base_parser()
|
|
args = parser.parse_args()
|
|
|
|
ctx = TrackmaniaContext(args.connect, args.password)
|
|
logger.info("Starting Trackmania proxy server")
|
|
ctx.proxy = websockets.serve(functools.partial(proxy, ctx=ctx),
|
|
host="", port=22422, ping_timeout=999999, ping_interval=999999)
|
|
ctx.proxy_task = asyncio.create_task(proxy_loop(ctx), name="ProxyLoop")
|
|
|
|
if gui_enabled:
|
|
ctx.run_gui()
|
|
ctx.run_cli()
|
|
|
|
await ctx.proxy
|
|
await ctx.proxy_task
|
|
await ctx.exit_event.wait()
|
|
|
|
Utils.init_logging("TrackmaniaClient")
|
|
# options = Utils.get_options()
|
|
|
|
import colorama
|
|
colorama.just_fix_windows_console()
|
|
asyncio.run(main())
|
|
colorama.deinit()
|