forked from mirror/Archipelago
Some checks failed
Analyze modified files / flake8 (push) Failing after 2m28s
Build / build-win (push) Has been cancelled
Build / build-ubuntu2204 (push) Has been cancelled
ctest / Test C++ ubuntu-latest (push) Has been cancelled
ctest / Test C++ windows-latest (push) Has been cancelled
Analyze modified files / mypy (push) Has been cancelled
Build and Publish Docker Images / Push Docker image to Docker Hub (push) Successful in 5m4s
Native Code Static Analysis / scan-build (push) Failing after 5m2s
type check / pyright (push) Successful in 1m7s
unittests / Test Python 3.11.2 ubuntu-latest (push) Failing after 16m23s
unittests / Test Python 3.12 ubuntu-latest (push) Failing after 28m19s
unittests / Test Python 3.13 ubuntu-latest (push) Failing after 14m49s
unittests / Test hosting with 3.13 on ubuntu-latest (push) Successful in 5m0s
unittests / Test Python 3.13 macos-latest (push) Has been cancelled
unittests / Test Python 3.11 windows-latest (push) Has been cancelled
unittests / Test Python 3.13 windows-latest (push) Has been cancelled
419 lines
20 KiB
Python
419 lines
20 KiB
Python
from typing import Optional, Set, Dict, Any
|
|
import asyncio, multiprocessing, traceback
|
|
|
|
from CommonClient import ClientCommandProcessor, CommonContext, get_base_parser, logger, server_loop, gui_enabled
|
|
from NetUtils import ClientStatus
|
|
import Utils
|
|
|
|
from .Interface import KONInterface
|
|
from .Data import EVENTS, SONGS, SNACKS, SNACK_NAME_FROM_ID, SONG_NAME_FROM_ID, PROP_NAME_FROM_ID, SONG_CLEARS, CHARACTER_CLEARS, PLAYABLE_CHARACTER_NAME_FROM_ID, PROPS, SONG_COMPLETIONIST_CLEARS, CHARACTER_CLEAR_NAME_FROM_ID, OUTFIT_NAME_FROM_ID, SONG_RANK_CLEARS, SONG_COMBO_CLEARS, CHARACTER_RANK_CLEARS, CHARACTER_COMBO_CLEARS, HARD_SONG_RANK_CLEARS, HARD_SONG_COMBO_CLEARS, HARD_CHARACTER_RANK_CLEARS, HARD_CHARACTER_COMBO_CLEARS, HARD_SONG_COMPLETIONIST_CLEARS, HARD_SONG_CLEARS, HARD_CHARACTER_CLEARS, HARD_CHARACTER_CLEAR_NAME_FROM_ID, CHARACTERS
|
|
|
|
class KONCommandProcessor(ClientCommandProcessor):
|
|
def __init__(self, ctx: CommonContext) -> None:
|
|
super().__init__(ctx)
|
|
|
|
def _cmd_characters(self) -> None:
|
|
if isinstance(self.ctx, KONContext):
|
|
log_characters(self.ctx)
|
|
|
|
def _cmd_progress(self) -> None:
|
|
if isinstance(self.ctx, KONContext):
|
|
log_tokens(self.ctx)
|
|
|
|
class KONContext(CommonContext):
|
|
client_version: str = "v1.1.3"
|
|
|
|
game: str = "K-On! After School Live!!"
|
|
|
|
command_processor = KONCommandProcessor
|
|
items_handling = 0b111
|
|
|
|
is_connected = False
|
|
interface_sync_task : asyncio.tasks = None
|
|
last_error_message : Optional[str] = None
|
|
|
|
cached_received_items : Set[int]
|
|
|
|
slot_data: Dict[str, Any]
|
|
|
|
def __init__(self, address, password: str) -> None:
|
|
super().__init__(address, password)
|
|
Utils.init_logging(f"K-On! After School Live!! Archipelago Client {self.client_version}")
|
|
|
|
self.interface = KONInterface(logger)
|
|
self.tokens_reported = False
|
|
self.cached_received_items = set()
|
|
self.slot_data: Dict[str, Any] = {}
|
|
self.sent_deaths = 0
|
|
self.snack_cache_retrieved = False
|
|
self.most_recent_instruction = None
|
|
|
|
def on_package(self, cmd: str, args: Dict[str, Any]) -> None:
|
|
if cmd == "Connected":
|
|
self.slot_data = args["slot_data"]
|
|
self.interface.food_duration = self.slot_data["default_food_duration"]
|
|
self.previously_checked_locations = args["checked_locations"]
|
|
self.interface.goal_song = self.slot_data["goal_song"]
|
|
self.interface.token_requirement = self.slot_data["token_requirement"]
|
|
self.interface.tape_requirement = self.slot_data["tape_requirement"]
|
|
self.interface.matching_outfits_goal = self.slot_data["matching_outfits_goal"]
|
|
self.deathlink_pending = False
|
|
self.cleared_songs = {}
|
|
self.snack_cache = {}
|
|
|
|
if "snack_upgrades_enabled" in self.slot_data and self.slot_data["snack_upgrades_enabled"] == True:
|
|
self.interface.snack_upgrades_enabled = True
|
|
|
|
if cmd == "Retrieved":
|
|
if "keys" in args:
|
|
if f"k-on_cleared_songs_{self.team}_{self.slot}" in args["keys"]:
|
|
self.cleared_songs = args["keys"].get(f"k-on_cleared_songs_{self.team}_{self.slot}", {})
|
|
if self.cleared_songs == None:
|
|
self.cleared_songs = {}
|
|
if f"k-on_snack_cache_{self.team}_{self.slot}" in args["keys"]:
|
|
self.snack_cache = args["keys"].get(f"k-on_snack_cache_{self.team}_{self.slot}", {})
|
|
if self.snack_cache == None:
|
|
self.snack_cache = {}
|
|
|
|
self.snack_cache_retrieved = True
|
|
|
|
async def server_auth(self, password_requested : bool = False) -> None:
|
|
if password_requested and not self.password:
|
|
await super().server_auth(password_requested)
|
|
|
|
await self.get_username()
|
|
await self.send_connect()
|
|
|
|
def on_deathlink(self, data: dict):
|
|
if "deathlink_enabled" in self.slot_data and self.slot_data["deathlink_enabled"] == True:
|
|
self.deathlink_pending = True
|
|
text = data.get("cause", "")
|
|
if text:
|
|
logger.info(f"DeathLink: {text}")
|
|
else:
|
|
logger.info(f"DeathLink: Received from {data['source']}")
|
|
|
|
def run_gui(self) -> None:
|
|
from kvui import GameManager
|
|
|
|
class KONManager(GameManager):
|
|
logging_pairs = [("Client", "Archipelago")]
|
|
base_title = "K-On! After School Live!! Archipelago"
|
|
|
|
self.ui = KONManager(self)
|
|
self.ui_task = asyncio.create_task(self.ui.async_run(), name = "ui")
|
|
|
|
def update_connection_status(ctx, status) -> None:
|
|
if ctx.is_connected == status:
|
|
return
|
|
|
|
if status:
|
|
logger.info("Communications with PPSSPP began successfully.")
|
|
else:
|
|
logger.info("Not connected to PPSSPP.")
|
|
|
|
ctx.is_connected = status
|
|
|
|
async def interface_sync_task(ctx) -> None:
|
|
logger.info("Beginning communication with PPSSPP...")
|
|
await ctx.interface.connect_to_ppsspp()
|
|
await asyncio.sleep(4)
|
|
|
|
while not ctx.exit_event.is_set():
|
|
try:
|
|
is_connected = await ctx.interface.get_connection_state()
|
|
update_connection_status(ctx, is_connected)
|
|
if is_connected:
|
|
await asyncio.sleep(1)
|
|
if ctx.interface.loaded_kon:
|
|
await check_game(ctx)
|
|
else:
|
|
logger.info("K-On! After School Live!! is not currently running. Please load the game in PPSSPP.")
|
|
await asyncio.sleep(3)
|
|
await ctx.interface.get_loaded_game_status()
|
|
else:
|
|
await reconnect_game(ctx)
|
|
except ConnectionError:
|
|
await ctx.interface.disconnect()
|
|
except Exception as e:
|
|
if isinstance(e, RuntimeError):
|
|
logger.error(str(e))
|
|
else:
|
|
logger.error(traceback.format_exc())
|
|
|
|
await asyncio.sleep(3)
|
|
continue
|
|
|
|
async def check_game(ctx) -> None:
|
|
if ctx.server:
|
|
if not ctx.slot:
|
|
await asyncio.sleep(1)
|
|
return
|
|
elif not ctx.most_recent_instruction == "welcome":
|
|
#Enable deathlink once connected
|
|
if "deathlink_enabled" in ctx.slot_data and ctx.slot_data["deathlink_enabled"] == True:
|
|
await ctx.update_death_link(True)
|
|
|
|
await ctx.send_msgs([{"cmd": "Get", "keys": [f"k-on_cleared_songs_{ctx.team}_{ctx.slot}", f"k-on_snack_cache_{ctx.team}_{ctx.slot}"]}]) #Retrieve previously played songs and update snack cache once connected
|
|
|
|
logger.info("You are now connected and ready to play. Let's rock!")
|
|
logger.info("Use /progress to see your progress towards unlocking your Goal Song. Use /characters to see your currently unlocked characters.")
|
|
ctx.most_recent_instruction = "welcome"
|
|
|
|
checked_locations : Set[int] = set()
|
|
|
|
#Check for events
|
|
for event in ctx.interface.event_clears:
|
|
checked_locations.add(EVENTS[event]["location_id"])
|
|
|
|
#Check for basic clears
|
|
for song_clear in ctx.interface.song_clears:
|
|
checked_locations.add(SONG_CLEARS[song_clear]["location_id"])
|
|
for character_clear in ctx.interface.character_clears:
|
|
checked_locations.add(CHARACTER_CLEARS[character_clear]["location_id"])
|
|
|
|
#Check for combo clears
|
|
for combo_clear in ctx.interface.combo_clears:
|
|
checked_locations.add(SONG_COMBO_CLEARS[combo_clear]["location_id"])
|
|
for character_combo_clear in ctx.interface.character_combo_clears:
|
|
checked_locations.add(CHARACTER_COMBO_CLEARS[character_combo_clear]["location_id"])
|
|
|
|
#Check for A rank clears
|
|
for rank_clear in ctx.interface.rank_clears:
|
|
checked_locations.add(SONG_RANK_CLEARS[rank_clear]["location_id"])
|
|
for character_rank_clear in ctx.interface.character_rank_clears:
|
|
checked_locations.add(CHARACTER_RANK_CLEARS[character_rank_clear]["location_id"])
|
|
|
|
#Check for basic clears on Hard difficulty
|
|
for hard_song_clear in ctx.interface.hard_song_clears:
|
|
checked_locations.add(HARD_SONG_CLEARS[hard_song_clear]["location_id"])
|
|
for hard_character_clear in ctx.interface.hard_character_clears:
|
|
checked_locations.add(HARD_CHARACTER_CLEARS[hard_character_clear]["location_id"])
|
|
|
|
#Check for combo clears on Hard difficulty
|
|
for hard_combo_clear in ctx.interface.hard_combo_clears:
|
|
checked_locations.add(HARD_SONG_COMBO_CLEARS[hard_combo_clear]["location_id"])
|
|
for hard_character_combo_clear in ctx.interface.hard_character_combo_clears:
|
|
checked_locations.add(HARD_CHARACTER_COMBO_CLEARS[hard_character_combo_clear]["location_id"])
|
|
|
|
#Check for A rank clears on Hard difficulty
|
|
for hard_rank_clear in ctx.interface.hard_rank_clears:
|
|
checked_locations.add(HARD_SONG_RANK_CLEARS[hard_rank_clear]["location_id"])
|
|
for hard_character_rank_clear in ctx.interface.hard_character_rank_clears:
|
|
checked_locations.add(HARD_CHARACTER_RANK_CLEARS[hard_character_rank_clear]["location_id"])
|
|
|
|
#Full Band Clear
|
|
if len(set(ctx.interface.characters_received)) >= 5: #Must have obtained all characters in order to start sending Full Band Clear checks
|
|
for song in ctx.cleared_songs: #Must have at least played the song once in order to send Full Band Clear check
|
|
if SONG_CLEARS[f"{song}: Clear"]["location_id"] in ctx.checked_locations and not (SONG_COMPLETIONIST_CLEARS[f"{song}: Full Band Clear"]["location_id"] in ctx.checked_locations):
|
|
full_band_cleared = True
|
|
for character in CHARACTERS:
|
|
if full_band_cleared and not character in ctx.cleared_songs[song]:
|
|
full_band_cleared = False
|
|
|
|
if full_band_cleared:
|
|
checked_locations.add(SONG_COMPLETIONIST_CLEARS[f"{song}: Full Band Clear"]["location_id"])
|
|
|
|
if ctx.slot_data["hard_clear_locations"] > 1 and HARD_SONG_CLEARS[f"{song}: Clear on Hard"]["location_id"] in ctx.checked_locations and not HARD_SONG_COMPLETIONIST_CLEARS[f"{song}: Full Band Clear on Hard"]["location_id"] in ctx.checked_locations:
|
|
full_band_cleared = True
|
|
for character in CHARACTERS:
|
|
if full_band_cleared and not f"{character}_hard" in ctx.cleared_songs[song]:
|
|
full_band_cleared = False
|
|
|
|
if full_band_cleared:
|
|
checked_locations.add(HARD_SONG_COMPLETIONIST_CLEARS[f"{song}: Full Band Clear on Hard"]["location_id"])
|
|
|
|
if ctx.interface.new_song_clears:
|
|
ctx.interface.new_song_clears = False
|
|
|
|
for song in ctx.interface.cleared_songs:
|
|
if not song in ctx.cleared_songs:
|
|
ctx.cleared_songs[song] = ctx.interface.cleared_songs[song]
|
|
else:
|
|
ctx.cleared_songs[song] = list(set(ctx.cleared_songs[song]) | set(ctx.interface.cleared_songs[song]))
|
|
|
|
await ctx.send_msgs([{"cmd": "Set", "key": f"k-on_cleared_songs_{ctx.team}_{ctx.slot}", "default": {}, "want_reply": False, "operations": [{"operation": "replace", "value": ctx.cleared_songs}]}])
|
|
|
|
checked_locations = checked_locations.difference(ctx.checked_locations)
|
|
|
|
#If there are unsent locations, send them now
|
|
if checked_locations:
|
|
await ctx.send_msgs([{"cmd" : "LocationChecks", "locations" : checked_locations}])
|
|
|
|
#Init vars for receiving items
|
|
collected_tokens = 0
|
|
food_upgrades = 0
|
|
collected_tapes = 0
|
|
tension_upgrades = 0
|
|
new_characters = False
|
|
new_snacks = {}
|
|
|
|
#Receive items from server
|
|
for server_item in ctx.items_received:
|
|
if not server_item.item in ctx.cached_received_items:
|
|
if server_item.item in SNACK_NAME_FROM_ID:
|
|
snack_name = SNACK_NAME_FROM_ID[server_item.item]
|
|
if not snack_name in new_snacks:
|
|
new_snacks[snack_name] = 1
|
|
else:
|
|
new_snacks[snack_name] += 1
|
|
elif server_item.item in SONG_NAME_FROM_ID:
|
|
ctx.interface.unlock_song(SONG_NAME_FROM_ID[server_item.item])
|
|
ctx.cached_received_items.add(server_item.item)
|
|
elif server_item.item in PROP_NAME_FROM_ID:
|
|
ctx.interface.unlock_prop(PROP_NAME_FROM_ID[server_item.item])
|
|
ctx.cached_received_items.add(server_item.item)
|
|
elif server_item.item in OUTFIT_NAME_FROM_ID:
|
|
ctx.interface.unlock_outfit(OUTFIT_NAME_FROM_ID[server_item.item])
|
|
ctx.cached_received_items.add(server_item.item)
|
|
elif server_item.item in PLAYABLE_CHARACTER_NAME_FROM_ID:
|
|
ctx.interface.characters_received.append(PLAYABLE_CHARACTER_NAME_FROM_ID[server_item.item])
|
|
ctx.cached_received_items.add(server_item.item)
|
|
new_characters = True
|
|
elif server_item.item == 301: #Happy End item
|
|
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
|
|
ctx.cached_received_items.add(server_item.item)
|
|
elif server_item.item == 700: #Teatime Token item
|
|
collected_tokens += 1
|
|
elif server_item.item == 701: #Cassette Tape item
|
|
collected_tapes += 1
|
|
elif server_item.item == 800: #Snack Upgrade item
|
|
food_upgrades += 1
|
|
elif server_item.item == 302: #Hard Difficulty
|
|
ctx.interface.hard_unlocked = True
|
|
else: #Unknown item received
|
|
ctx.cached_received_items.add(server_item.item)
|
|
|
|
if collected_tokens > ctx.interface.token_count or collected_tapes > ctx.interface.tape_count: #Amount of Tokens or Tapes is higher than before
|
|
ctx.interface.token_count = collected_tokens
|
|
ctx.interface.tape_count = collected_tapes
|
|
if collected_tokens >= ctx.slot_data["token_requirement"] and collected_tapes >= ctx.slot_data["tape_requirement"]: #Goal hit
|
|
log_tokens(ctx)
|
|
elif ctx.slot_data["token_requirement"] > 0 and ctx.slot_data["tape_requirement"] > 0 and not ctx.tokens_reported: #If Token count hasn't been reported yet (i.e. the client only just connected for the first time in the play session)
|
|
log_tokens(ctx)
|
|
ctx.tokens_reported = True
|
|
|
|
ctx.interface.food_duration = ctx.slot_data["default_food_duration"] + (food_upgrades * 1)
|
|
ctx.interface.tension_upgrade = tension_upgrades #Currently unused
|
|
|
|
#Figure out which snacks have already been applied before giving them again
|
|
if ctx.snack_cache_retrieved:
|
|
for snack_name in new_snacks:
|
|
updated_snack_cache = False
|
|
if (not new_snacks[snack_name] in ctx.snack_cache) or (ctx.snack_cache[snack_name] < new_snacks[snack_name]):
|
|
if snack_name in ctx.snack_cache:
|
|
snacks_to_add = new_snacks[snack_name] - ctx.snack_cache[snack_name]
|
|
else:
|
|
snacks_to_add = new_snacks[snack_name]
|
|
ctx.interface.receive_snack(snack_name, snacks_to_add)
|
|
ctx.snack_cache[snack_name] = new_snacks[snack_name]
|
|
updated_snack_cache = True
|
|
|
|
if updated_snack_cache:
|
|
await ctx.send_msgs([{"cmd": "Set", "key": f"k-on_snack_cache_{ctx.team}_{ctx.slot}", "default": {}, "want_reply": False, "operations": [{"operation": "replace", "value": ctx.snack_cache}]}])
|
|
|
|
if new_characters: #If a new character has been unlocked
|
|
log_characters(ctx) #Let the player know their current list of characters
|
|
|
|
if ctx.deathlink_pending == True:
|
|
await ctx.interface.deathlink_received()
|
|
ctx.deathlink_pending = False
|
|
elif "deathlink_enabled" in ctx.slot_data and ctx.slot_data["deathlink_enabled"] == True and ctx.interface.deaths > ctx.sent_deaths:
|
|
ctx.sent_deaths = ctx.interface.deaths
|
|
await ctx.send_death(f"{ctx.player_names[ctx.slot]} {ctx.interface.death_text}")
|
|
|
|
elif not ctx.most_recent_instruction == "connect":
|
|
ctx.most_recent_instruction = "connect"
|
|
logger.info("You are not currently connected to an Archipelago server. Connect to an Archipelago server now!")
|
|
|
|
async def reconnect_game(ctx) -> None:
|
|
if not ctx.most_recent_instruction == "ppsspp":
|
|
ctx.most_recent_instruction = "ppsspp"
|
|
logger.info("Communication with PPSSPP failed. Please ensure that PPSSPP is open and K-On! After School Live!! is loaded.")
|
|
await asyncio.sleep(5)
|
|
await ctx.interface.connect_to_ppsspp()
|
|
|
|
def log_characters(ctx) -> None:
|
|
chars = [name.split(" ")[-1] for name in ctx.interface.characters_received]
|
|
if not chars:
|
|
return
|
|
if len(chars) == 1:
|
|
name_str = f"{chars[0]}."
|
|
else:
|
|
name_str = ", ".join(chars[:-1]) + f" and {chars[-1]}."
|
|
logger.info(f"You can now play as {name_str}")
|
|
|
|
def log_tokens(ctx) -> None:
|
|
tokens_needed = max(ctx.slot_data["token_requirement"] - ctx.interface.token_count, 0)
|
|
tapes_needed = max(ctx.slot_data["tape_requirement"] - ctx.interface.tape_count, 0)
|
|
|
|
token_count = ctx.interface.token_count
|
|
tape_count = ctx.interface.tape_count
|
|
token_req = ctx.slot_data["token_requirement"]
|
|
tape_req = ctx.slot_data["tape_requirement"]
|
|
outfits_required = ctx.slot_data["matching_outfits_goal"]
|
|
|
|
inventory_items = []
|
|
requirement_items = []
|
|
|
|
if token_req > 0:
|
|
inventory_items.append(f"{token_count} Teatime Tokens")
|
|
requirement_items.append(f"{token_req} Teatime Tokens")
|
|
if tape_req > 0:
|
|
inventory_items.append(f"{tape_count} Cassette Tapes")
|
|
requirement_items.append(f"{tape_req} Cassette Tapes")
|
|
|
|
inventory_text = " and ".join(inventory_items)
|
|
requirement_text = " and ".join(requirement_items)
|
|
|
|
if outfits_required:
|
|
requirement_text += " along with matching outfits equipped"
|
|
|
|
if tokens_needed + tapes_needed == 0:
|
|
advice_text = f"You can play {ctx.slot_data['goal_song']}"
|
|
if outfits_required:
|
|
advice_text += " after equipping matching outfits"
|
|
else:
|
|
needs = []
|
|
if tokens_needed:
|
|
needs.append(f"{tokens_needed} Teatime Tokens")
|
|
if tapes_needed:
|
|
needs.append(f"{tapes_needed} Cassette Tapes")
|
|
advice_text = f"You still need {' and '.join(needs)}"
|
|
|
|
logger.info(f"You currently have {inventory_text}. You need a total of {requirement_text} to unlock your goal song. {advice_text}!")
|
|
|
|
def launch() -> None:
|
|
async def main() -> None:
|
|
multiprocessing.freeze_support()
|
|
|
|
parser = get_base_parser()
|
|
args = parser.parse_args()
|
|
|
|
ctx = KONContext(args.connect, args.password)
|
|
ctx.server_task = asyncio.create_task(server_loop(ctx), name="Server Loop")
|
|
|
|
if gui_enabled:
|
|
ctx.run_gui()
|
|
ctx.run_cli()
|
|
|
|
ctx.interface_sync_task = asyncio.create_task(interface_sync_task(ctx), name="PPSSPP Sync")
|
|
|
|
await ctx.exit_event.wait()
|
|
ctx.server_address = None
|
|
await ctx.interface.disconnect()
|
|
await ctx.shutdown()
|
|
|
|
if ctx.interface_sync_task:
|
|
await ctx.interface_sync_task
|
|
|
|
#Run Client
|
|
import colorama
|
|
|
|
colorama.init()
|
|
asyncio.run(main())
|
|
colorama.deinit()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
launch() |