Files
Archipelago/worlds/civ_6/TunerClient.py
Carter Hesterman 5cbbe9a0b1 Minor changes
2024-08-06 14:11:42 -06:00

105 lines
3.4 KiB
Python

import asyncio
from logging import Logger
import socket
ADDRESS = "127.0.0.1"
PORT = 4318
CLIENT_PREFIX = "APSTART:"
CLIENT_POSTFIX = ":APEND"
def decode_mixed_string(data):
return ''.join(chr(b) if 32 <= b < 127 else '?' for b in data)
class TunerException(Exception):
pass
class TunerTimeoutException(TunerException):
pass
class TunerErrorException(TunerException):
pass
class TunerConnectionException(TunerException):
pass
class TunerClient:
"""Interfaces with Civilization via the tuner socket"""
logger: Logger
def __init__(self, logger):
self.logger = logger
def __parse_response(self, response: str) -> str:
"""Parses the response from the tuner socket"""
split = response.split(CLIENT_PREFIX)
if len(split) > 1:
start = split[1]
end = start.split(CLIENT_POSTFIX)[0]
return end
elif "ERR:" in response:
raise TunerErrorException(response.replace("?", ""))
else:
return ""
async def send_game_command(self, command_string: str, size: int = 64):
"""Small helper that prefixes a command with GameCore.Game."""
return await self.send_command("GameCore.Game." + command_string, size)
async def send_command(self, command_string: str, size: int = 64):
"""Send a raw commannd"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
b_command_string = command_string.encode('utf-8')
# Send data to the server
command_prefix = b"CMD:0:"
delimiter = b"\x00"
full_command = b_command_string
message = command_prefix + full_command + delimiter
message_length = len(message).to_bytes(1, byteorder='little')
# game expects this to be added before any command that is sent, indicates payload size
message_header = message_length + b"\x00\x00\x00\x03\x00\x00\x00"
data = message_header + command_prefix + full_command + delimiter
server_address = (ADDRESS, PORT)
loop = asyncio.get_event_loop()
try:
await loop.sock_connect(sock, server_address)
await loop.sock_sendall(sock, data)
# Add a delay before receiving data
await asyncio.sleep(.02)
received_data = await self.async_recv(sock)
response = decode_mixed_string(received_data)
return self.__parse_response(response)
except socket.timeout:
self.logger.debug('Timeout occurred while receiving data')
raise TunerTimeoutException()
except Exception as e:
self.logger.debug(f'Error occurred while receiving data: {str(e)}')
# check if No connection could be made is present in the error message
connection_errors = [
"The remote computer refused the network connection",
]
if any(error in str(e) for error in connection_errors):
raise TunerConnectionException(e)
else:
raise TunerErrorException(e)
finally:
sock.close()
async def async_recv(self, sock, timeout=2.0, size=4096):
response = await asyncio.wait_for(asyncio.get_event_loop().sock_recv(sock, size), timeout)
return response