implement server-client handshake and move hint system to optional colorama support

This commit is contained in:
Fabian Dill
2020-02-16 15:32:40 +01:00
parent 0986b36b39
commit b04db006e0
5 changed files with 275 additions and 116 deletions

View File

@@ -8,6 +8,7 @@ import shlex
import urllib.request
import zlib
import collections
import typing
import ModuleUpdate
ModuleUpdate.update()
@@ -20,7 +21,11 @@ import Regions
import Utils
from MultiClient import ReceivedItem, get_item_name_from_id, get_location_name_from_address
class Client:
version: typing.List[int] = [0, 0, 0]
tags: typing.List[str] = []
def __init__(self, socket):
self.socket = socket
self.auth = False
@@ -28,9 +33,12 @@ class Client:
self.team = None
self.slot = None
self.send_index = 0
self.tags = []
self.version = [0, 0, 0]
class Context:
def __init__(self, host:str, port:int, password:str, location_check_points:int, hint_cost:int):
def __init__(self, host: str, port: int, password: str, location_check_points: int, hint_cost: int):
self.data_filename = None
self.save_filename = None
self.disable_save = False
@@ -69,6 +77,7 @@ class Context:
logging.info(f'Loaded save file with {sum([len(p) for p in received_items.values()])} received items '
f'for {len(received_items)} players')
async def send_msgs(websocket, msgs):
if not websocket or not websocket.open or websocket.closed:
return
@@ -91,17 +100,35 @@ def notify_all(ctx : Context, text):
logging.info("Notice (all): %s" % text)
broadcast_all(ctx, [['Print', text]])
def notify_team(ctx : Context, team : int, text : str):
logging.info("Notice (Team #%d): %s" % (team+1, text))
def notify_team(ctx: Context, team: int, text: str):
logging.info("Notice (Team #%d): %s" % (team + 1, text))
broadcast_team(ctx, team, [['Print', text]])
def notify_client(client : Client, text : str):
def notify_client(client: Client, text: str):
if not client.auth:
return
logging.info("Notice (Player %s in team %d): %s" % (client.name, client.team+1, text))
asyncio.create_task(send_msgs(client.socket, [['Print', text]]))
logging.info("Notice (Player %s in team %d): %s" % (client.name, client.team + 1, text))
asyncio.create_task(send_msgs(client.socket, [['Print', text]]))
async def server(websocket, path, ctx : Context):
# separated out, due to compatibilty between client's
def notify_hints(ctx: Context, team: int, hints: typing.List[Utils.Hint]):
cmd = [["Hint", hints]]
texts = [['Print', format_hint(ctx, team, hint)] for hint in hints]
for cmd, text in texts:
logging.info("Notice (Team #%d): %s" % (team + 1, text))
for client in ctx.clients:
if client.auth and client.team == team:
if "Berserker" in client.tags:
payload = cmd
else:
payload = texts
asyncio.create_task(send_msgs(client.socket, payload))
async def server(websocket, path, ctx: Context):
client = Client(websocket)
ctx.clients.append(client)
@@ -126,7 +153,9 @@ async def server(websocket, path, ctx : Context):
async def on_client_connected(ctx : Context, client : Client):
await send_msgs(client.socket, [['RoomInfo', {
'password': ctx.password is not None,
'players': [(client.team, client.slot, client.name) for client in ctx.clients if client.auth]
'players': [(client.team, client.slot, client.name) for client in ctx.clients if client.auth],
'tags': ['Berserker'],
'version': "1.0.0"
}]])
async def on_client_disconnected(ctx : Context, client : Client):
@@ -167,7 +196,8 @@ def get_connected_players_string(ctx : Context):
text += f'{c.name} '
return 'Connected players: ' + text[:-1]
def get_received_items(ctx : Context, team, player):
def get_received_items(ctx: Context, team: int, player: int):
return ctx.received_items.setdefault((team, player), [])
def tuplize_received_items(items):
@@ -182,12 +212,14 @@ def send_new_items(ctx : Context):
asyncio.create_task(send_msgs(client.socket, [['ReceivedItems', (client.send_index, tuplize_received_items(items)[client.send_index:])]]))
client.send_index = len(items)
def forfeit_player(ctx : Context, team, slot):
def forfeit_player(ctx: Context, team, slot):
all_locations = [values[0] for values in Regions.location_table.values() if type(values[0]) is int]
notify_all(ctx, "%s (Team #%d) has forfeited" % (ctx.player_names[(team, slot)], team + 1))
register_location_checks(ctx, team, slot, all_locations)
def register_location_checks(ctx : Context, team, slot, locations):
def register_location_checks(ctx: Context, team, slot, locations):
ctx.location_checks[team, slot] |= set(locations)
found_items = False
@@ -214,7 +246,8 @@ def register_location_checks(ctx : Context, team, slot, locations):
if found_items:
save(ctx)
def save(ctx:Context):
def save(ctx: Context):
if not ctx.disable_save:
try:
with open(ctx.save_filename, "wb") as f:
@@ -223,21 +256,29 @@ def save(ctx:Context):
except Exception as e:
logging.exception(e)
def collect_hints(ctx:Context, team, slot, item:str) -> list:
def collect_hints(ctx: Context, team: int, slot: int, item: str) -> typing.List[Utils.Hint]:
hints = []
seeked_item_id = Items.item_table[item][3]
seeked_item_id = Items.lookup_lower_name_to_id[item]
for check, result in ctx.locations.items():
item_id, receiving_player = result
if receiving_player == slot and item_id == seeked_item_id:
location_id, finding_player = check
found = location_id in ctx.location_checks[team, finding_player]
hinttext = f"[Hint]: {ctx.player_names[(team, slot)]}'s {item} can be found at " \
f"{get_location_name_from_address(location_id)} in {ctx.player_names[team, finding_player]}'s World."
hints.append((found, hinttext + (" (found)" if found else "")))
hints.append(Utils.Hint(receiving_player, finding_player, location_id, item_id, found))
return hints
async def process_client_cmd(ctx : Context, client : Client, cmd, args):
def format_hint(ctx: Context, team: int, hint: Utils.Hint) -> str:
return f"[Hint]: {ctx.player_names[team, hint.receiving_player]}'s " \
f"{Items.lookup_id_to_name[hint.item]} can be found " \
f"at {get_location_name_from_address(hint.location)} " \
f"in {ctx.player_names[team, hint.finding_player]}'s World." \
+ (" (found)" if hint.found else "")
async def process_client_cmd(ctx: Context, client: Client, cmd, args):
if type(cmd) is not str:
await send_msgs(client.socket, [['InvalidCmd']])
return
@@ -268,7 +309,10 @@ async def process_client_cmd(ctx : Context, client : Client, cmd, args):
await send_msgs(client.socket, [['ConnectionRefused', list(errors)]])
else:
client.auth = True
reply = [['Connected', [(client.team, client.slot), [(p, n) for (t, p), n in ctx.player_names.items() if t == client.team]]]]
client.version = args.get('version', Client.version)
client.tags = args.get('tags', Client.tags)
reply = [['Connected', [(client.team, client.slot),
[(p, n) for (t, p), n in ctx.player_names.items() if t == client.team]]]]
items = get_received_items(ctx, client.team, client.slot)
if items:
reply.append(['ReceivedItems', (0, tuplize_received_items(items))])
@@ -331,20 +375,20 @@ async def process_client_cmd(ctx : Context, client : Client, cmd, args):
timer = 10
asyncio.create_task(countdown(ctx, timer))
elif args.startswith("!hint"):
points_available = ctx.location_check_points * len(ctx.location_checks[client.team, client.slot]) - ctx.hint_cost*ctx.hints_used[client.team, client.slot]
itemname = args[6:]
if not itemname:
notify_client(client, "Use !hint {itemname}, for example !hint Lamp. "
points_available = ctx.location_check_points * len(ctx.location_checks[client.team, client.slot]) - \
ctx.hint_cost * ctx.hints_used[client.team, client.slot]
item_name = args[6:].lower()
if not item_name:
notify_client(client, "Use !hint {item_name}, for example !hint Lamp. "
f"A hint costs {ctx.hint_cost} points. "
f"You have {points_available} points.")
elif itemname in Items.item_table:
hints = collect_hints(ctx, client.team, client.slot, itemname)
elif item_name in Items.lookup_lower_name_to_id:
hints = collect_hints(ctx, client.team, client.slot, item_name)
found = 0
for already_found, hint in hints:
found += 1 - already_found
for hint in hints:
found += 1 - hint.found
if not found:
for already_found, hint in hints:
notify_team(ctx, client.team, hint)
notify_hints(ctx, client.team, format_hint(ctx, client.team, hints))
notify_client(client, "No new items found, points refunded.")
else:
if ctx.hint_cost:
@@ -354,15 +398,14 @@ async def process_client_cmd(ctx : Context, client : Client, cmd, args):
if can_pay:
ctx.hints_used[client.team, client.slot] += found
for already_found, hint in hints:
notify_team(ctx, client.team, hint)
notify_hints(ctx, client.team, format_hint(ctx, client.team, hints))
save(ctx)
else:
notify_client(client, f"You can't afford the hint. "
f"You have {points_available} points and need at least {ctx.hint_cost}, "
f"more if multiple items are still be found.")
f"You have {points_available} points and need at least {ctx.hint_cost}, "
f"more if multiple items are still to be found.")
else:
notify_client(client, f'Item "{itemname}" not found.')
notify_client(client, f'Item "{item_name}" not found.')
def set_password(ctx : Context, password):
ctx.password = password
@@ -407,10 +450,11 @@ async def console(ctx : Context):
forfeit_player(ctx, team, slot)
if command[0] == '/senditem' and len(command) > 2:
[(player, item)] = re.findall(r'\S* (\S*) (.*)', input)
if item in Items.item_table:
item = item.lower()
if item in Items.lookup_lower_name_to_id:
for client in ctx.clients:
if client.auth and client.name.lower() == player.lower():
new_item = ReceivedItem(Items.item_table[item][3], "cheat console", client.slot)
if client.name.lower() == player.lower():
new_item = ReceivedItem(Items.lookup_lower_name_to_name[item], "cheat console", client.slot)
get_received_items(ctx, client.team, client.slot).append(new_item)
notify_all(ctx, 'Cheat console: sending "' + item + '" to ' + client.name)
send_new_items(ctx)
@@ -421,11 +465,10 @@ async def console(ctx : Context):
if len(command) == 1:
logging.info("Use /hint {Playername} {itemname}\nFor example /hint Berserker Lamp")
elif name.lower() == command[1].lower():
item = " ".join(command[2:])
if item in Items.item_table:
item = " ".join(command[2:]).lower()
if item in Items.lookup_lower_name_to_id:
hints = collect_hints(ctx, team, slot, item)
for already_found, hint in hints:
notify_team(ctx, team, hint)
notify_hints(ctx, team, hints)
else:
logging.warning("Unknown item: " + item)
if command[0][0] != '/':