mirror of
https://github.com/ArchipelagoMW/Archipelago.git
synced 2026-04-19 13:33:32 -07:00
Merge branch 'main' into custom-port-range
This commit is contained in:
@@ -19,7 +19,10 @@ from pony.orm import commit, db_session, select
|
||||
|
||||
import Utils
|
||||
|
||||
from MultiServer import Context, server, auto_shutdown, ServerCommandProcessor, ClientMessageProcessor, load_server_cert
|
||||
from MultiServer import (
|
||||
Context, server, auto_shutdown, ServerCommandProcessor, ClientMessageProcessor, load_server_cert,
|
||||
server_per_message_deflate_factory,
|
||||
)
|
||||
from Utils import restricted_loads, cache_argsless
|
||||
|
||||
from .locker import Locker
|
||||
@@ -87,18 +90,24 @@ class WebHostContext(Context):
|
||||
setattr(self, key, value)
|
||||
self.non_hintable_names = collections.defaultdict(frozenset, self.non_hintable_names)
|
||||
|
||||
def listen_to_db_commands(self):
|
||||
async def listen_to_db_commands(self):
|
||||
cmdprocessor = DBCommandProcessor(self)
|
||||
|
||||
while not self.exit_event.is_set():
|
||||
with db_session:
|
||||
commands = select(command for command in Command if command.room.id == self.room_id)
|
||||
if commands:
|
||||
for command in commands:
|
||||
self.main_loop.call_soon_threadsafe(cmdprocessor, command.commandtext)
|
||||
command.delete()
|
||||
commit()
|
||||
time.sleep(5)
|
||||
await self.main_loop.run_in_executor(None, self._process_db_commands, cmdprocessor)
|
||||
try:
|
||||
await asyncio.wait_for(self.exit_event.wait(), 5)
|
||||
except asyncio.TimeoutError:
|
||||
pass
|
||||
|
||||
def _process_db_commands(self, cmdprocessor):
|
||||
with db_session:
|
||||
commands = select(command for command in Command if command.room.id == self.room_id)
|
||||
if commands:
|
||||
for command in commands:
|
||||
self.main_loop.call_soon_threadsafe(cmdprocessor, command.commandtext)
|
||||
command.delete()
|
||||
commit()
|
||||
|
||||
@db_session
|
||||
def load(self, room_id: int, game_ports: str):
|
||||
@@ -130,7 +139,7 @@ class WebHostContext(Context):
|
||||
else:
|
||||
row = GameDataPackage.get(checksum=game_data["checksum"])
|
||||
if row: # None if rolled on >= 0.3.9 but uploaded to <= 0.3.8. multidata should be complete
|
||||
game_data_packages[game] = Utils.restricted_loads(row.data)
|
||||
game_data_packages[game] = restricted_loads(row.data)
|
||||
continue
|
||||
else:
|
||||
self.logger.warning(f"Did not find game_data_package for {game}: {game_data['checksum']}")
|
||||
@@ -147,19 +156,20 @@ class WebHostContext(Context):
|
||||
self.location_name_groups = static_location_name_groups
|
||||
return self._load(multidata, game_data_packages, True)
|
||||
|
||||
@db_session
|
||||
def init_save(self, enabled: bool = True):
|
||||
self.saving = enabled
|
||||
if self.saving:
|
||||
savegame_data = Room.get(id=self.room_id).multisave
|
||||
if savegame_data:
|
||||
self.set_save(restricted_loads(Room.get(id=self.room_id).multisave))
|
||||
with db_session:
|
||||
savegame_data = Room.get(id=self.room_id).multisave
|
||||
if savegame_data:
|
||||
self.set_save(restricted_loads(savegame_data))
|
||||
self._start_async_saving(atexit_save=False)
|
||||
threading.Thread(target=self.listen_to_db_commands, daemon=True).start()
|
||||
asyncio.create_task(self.listen_to_db_commands())
|
||||
|
||||
@db_session
|
||||
def _save(self, exit_save: bool = False) -> bool:
|
||||
room = Room.get(id=self.room_id)
|
||||
# Does not use Utils.restricted_dumps because we'd rather make a save than not make one
|
||||
room.multisave = pickle.dumps(self.get_save())
|
||||
# saving only occurs on activity, so we can "abuse" this information to mark this as last_activity
|
||||
if not exit_save: # we don't want to count a shutdown as activity, which would restart the server again
|
||||
@@ -262,9 +272,24 @@ def set_up_logging(room_id) -> logging.Logger:
|
||||
return logger
|
||||
|
||||
|
||||
def tear_down_logging(room_id):
|
||||
"""Close logging handling for a room."""
|
||||
logger_name = f"RoomLogger {room_id}"
|
||||
if logger_name in logging.Logger.manager.loggerDict:
|
||||
logger = logging.getLogger(logger_name)
|
||||
for handler in logger.handlers[:]:
|
||||
logger.removeHandler(handler)
|
||||
handler.close()
|
||||
del logging.Logger.manager.loggerDict[logger_name]
|
||||
|
||||
|
||||
def run_server_process(name: str, ponyconfig: dict, static_server_data: dict,
|
||||
cert_file: typing.Optional[str], cert_key_file: typing.Optional[str],
|
||||
host: str, game_ports: str, rooms_to_run: multiprocessing.Queue, rooms_shutting_down: multiprocessing.Queue):
|
||||
host: str, game_ports: str, rooms_to_run: multiprocessing.Queue,
|
||||
rooms_shutting_down: multiprocessing.Queue):
|
||||
from setproctitle import setproctitle
|
||||
|
||||
setproctitle(name)
|
||||
Utils.init_logging(name)
|
||||
try:
|
||||
import resource
|
||||
@@ -285,8 +310,23 @@ def run_server_process(name: str, ponyconfig: dict, static_server_data: dict,
|
||||
raise Exception("Worlds system should not be loaded in the custom server.")
|
||||
|
||||
import gc
|
||||
ssl_context = load_server_cert(cert_file, cert_key_file) if cert_file else None
|
||||
del cert_file, cert_key_file, ponyconfig
|
||||
|
||||
if not cert_file:
|
||||
def get_ssl_context():
|
||||
return None
|
||||
else:
|
||||
load_date = None
|
||||
ssl_context = load_server_cert(cert_file, cert_key_file)
|
||||
|
||||
def get_ssl_context():
|
||||
nonlocal load_date, ssl_context
|
||||
today = datetime.date.today()
|
||||
if load_date != today:
|
||||
ssl_context = load_server_cert(cert_file, cert_key_file)
|
||||
load_date = today
|
||||
return ssl_context
|
||||
|
||||
del ponyconfig
|
||||
gc.collect() # free intermediate objects used during setup
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
@@ -301,12 +341,16 @@ def run_server_process(name: str, ponyconfig: dict, static_server_data: dict,
|
||||
assert ctx.server is None
|
||||
try:
|
||||
ctx.server = websockets.serve(
|
||||
functools.partial(server, ctx=ctx), ctx.host, ctx.port, ssl=ssl_context)
|
||||
|
||||
functools.partial(server, ctx=ctx),
|
||||
ctx.host,
|
||||
ctx.port,
|
||||
ssl=get_ssl_context(),
|
||||
extensions=[server_per_message_deflate_factory],
|
||||
)
|
||||
await ctx.server
|
||||
except OSError: # likely port in use
|
||||
ctx.server = websockets.serve(
|
||||
functools.partial(server, ctx=ctx), ctx.host, 0, ssl=ssl_context)
|
||||
functools.partial(server, ctx=ctx), ctx.host, 0, ssl=get_ssl_context())
|
||||
|
||||
await ctx.server
|
||||
port = 0
|
||||
@@ -323,6 +367,7 @@ def run_server_process(name: str, ponyconfig: dict, static_server_data: dict,
|
||||
with db_session:
|
||||
room = Room.get(id=ctx.room_id)
|
||||
room.last_port = port
|
||||
del room
|
||||
else:
|
||||
ctx.logger.exception("Could not determine port. Likely hosting failure.")
|
||||
with db_session:
|
||||
@@ -335,28 +380,36 @@ def run_server_process(name: str, ponyconfig: dict, static_server_data: dict,
|
||||
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
if ctx.saving:
|
||||
ctx._save()
|
||||
ctx._save(True)
|
||||
setattr(asyncio.current_task(), "save", None)
|
||||
except Exception as e:
|
||||
with db_session:
|
||||
room = Room.get(id=room_id)
|
||||
room.last_port = -1
|
||||
del room
|
||||
logger.exception(e)
|
||||
raise
|
||||
else:
|
||||
if ctx.saving:
|
||||
ctx._save()
|
||||
ctx._save(True)
|
||||
setattr(asyncio.current_task(), "save", None)
|
||||
finally:
|
||||
try:
|
||||
ctx.save_dirty = False # make sure the saving thread does not write to DB after final wakeup
|
||||
ctx.exit_event.set() # make sure the saving thread stops at some point
|
||||
# NOTE: async saving should probably be an async task and could be merged with shutdown_task
|
||||
with (db_session):
|
||||
|
||||
if ctx.server and hasattr(ctx.server, "ws_server"):
|
||||
ctx.server.ws_server.close()
|
||||
await ctx.server.ws_server.wait_closed()
|
||||
|
||||
with db_session:
|
||||
# ensure the Room does not spin up again on its own, minute of safety buffer
|
||||
room = Room.get(id=room_id)
|
||||
room.last_activity = datetime.datetime.utcnow() - \
|
||||
datetime.timedelta(minutes=1, seconds=room.timeout)
|
||||
del room
|
||||
tear_down_logging(room_id)
|
||||
logging.info(f"Shutting down room {room_id} on {name}.")
|
||||
finally:
|
||||
await asyncio.sleep(5)
|
||||
|
||||
Reference in New Issue
Block a user