diff --git a/WebHostLib/customserver.py b/WebHostLib/customserver.py index 8dfe73f44c..44271d7069 100644 --- a/WebHostLib/customserver.py +++ b/WebHostLib/customserver.py @@ -186,7 +186,7 @@ class WebHostContext(Context): @functools.cache def parse_game_ports(game_ports: tuple[str | int]): - available_ports: list[range | list[int]] = [] + parsed_ports: list[range | list[int]] = [] weights = [] ephemeral_allowed = False total_length = 0 @@ -197,23 +197,23 @@ def parse_game_ports(game_ports: tuple[str | int]): x = range(start, end + 1) total_length += len(x) weights.append(total_length) - available_ports.append(x) + parsed_ports.append(x) elif int(item) == 0: ephemeral_allowed = True else: total_length += 1 weights.append(total_length) - available_ports.append([int(item)]) + parsed_ports.append([int(item)]) - return available_ports, weights, total_length, ephemeral_allowed + return parsed_ports, weights, total_length, ephemeral_allowed -def get_random_port(game_ports: list[str | int], host: str) -> socket.socket: +def create_random_port_socket(game_ports: list[str | int], host: str) -> socket.socket: # convert to tuple because its hashable - available_ports, weights, length, ephemeral_allowed = parse_game_ports(tuple(game_ports)) - ports = random.choices(available_ports, cum_weights=weights, k=len(available_ports)) + parsed_ports, weights, length, ephemeral_allowed = parse_game_ports(tuple(game_ports)) + port_ranges = random.choices(parsed_ports, cum_weights=weights, k=len(parsed_ports)) remaining = 1024 - for r in ports: + for r in port_ranges: r_length = len(r) if isinstance(r, range): random_range = itertools.islice( @@ -222,9 +222,9 @@ def get_random_port(game_ports: list[str | int], host: str) -> socket.socket: map(lambda _: random.randint(r.start, r.stop), range(r_length)) ), remaining) - port = get_port_from_list(random_range, host) + port = create_socket_from_port_list(random_range, host) else: - port = get_port_from_list(filter(lambda p: p not in get_used_ports(), r), host) + port = create_socket_from_port_list(filter(lambda p: p not in get_used_ports(), r), host) remaining -= r_length if port is not None: return port @@ -235,13 +235,15 @@ def get_random_port(game_ports: list[str | int], host: str) -> socket.socket: raise OSError(98, "No available ports") -def try_processes(p): + +def try_processes(p: psutil.Process) -> typing.Iterable[int]: try: - return map(lambda c: c.laddr.port, p.net_connections("tcp4")) + return map(lambda c: c.laddr.port, p.get_active_net_connections("tcp4")) except psutil.AccessDenied: return [] -def net_connections() -> typing.Iterable[int]: + +def get_active_net_connections() -> typing.Iterable[int]: # Don't even try to check if system using AIX if psutil._common.AIX: return [] @@ -259,17 +261,17 @@ def net_connections() -> typing.Iterable[int]: )) -_last_used_ports = (frozenset(net_connections()), round(time.time() / 900)) +_last_used_ports = (frozenset(get_active_net_connections()), round(time.time() / 900)) def get_used_ports(): global _last_used_ports t_hash = round(time.time() / 900) if _last_used_ports[1] != t_hash: - _last_used_ports = (frozenset(net_connections()), t_hash) + _last_used_ports = (frozenset(get_active_net_connections()), t_hash) return _last_used_ports[0] -def get_port_from_list(available_ports: typing.Iterable[int], host: str) -> socket.socket | None: +def create_socket_from_port_list(available_ports: typing.Iterable[int], host: str) -> socket.socket | None: for port in available_ports: try: return socket.create_server((host, port)) @@ -410,7 +412,7 @@ def run_server_process(name: str, ponyconfig: dict, static_server_data: dict, if ctx.port == 0: ctx.server = websockets.serve( functools.partial(server, ctx=ctx), - sock=get_random_port(game_ports, ctx.host), + sock=create_random_port_socket(game_ports, ctx.host), ssl=get_ssl_context(), extensions=[server_per_message_deflate_factory], )