Files
Archipelago/worlds/mm3/client.py
2026-03-08 21:42:06 +01:00

784 lines
29 KiB
Python

import logging
import time
from enum import IntEnum
from base64 import b64encode
from typing import TYPE_CHECKING, Any
from NetUtils import ClientStatus, color, NetworkItem
from worlds._bizhawk.client import BizHawkClient
if TYPE_CHECKING:
from worlds._bizhawk.context import BizHawkClientContext, BizHawkClientCommandProcessor
nes_logger = logging.getLogger("NES")
logger = logging.getLogger("Client")
MM3_CURRENT_STAGE = 0x22
MM3_MEGAMAN_STATE = 0x30
MM3_PROG_STATE = 0x60
MM3_ROBOT_MASTERS_DEFEATED = 0x61
MM3_DOC_STATUS = 0x62
MM3_HEALTH = 0xA2
MM3_WEAPON_ENERGY = 0xA3
MM3_WEAPONS = {
1: 1,
2: 3,
3: 0,
4: 2,
5: 4,
6: 5,
7: 7,
8: 9,
0x11: 6,
0x12: 8,
0x13: 10,
}
MM3_DOC_REMAP = {
0: 0,
1: 1,
2: 2,
3: 3,
4: 6,
5: 7,
6: 4,
7: 5
}
MM3_LIVES = 0xAE
MM3_E_TANKS = 0xAF
MM3_ENERGY_BAR = 0xB2
MM3_CONSUMABLES = 0x150
MM3_ROBOT_MASTERS_UNLOCKED = 0x680
MM3_DOC_ROBOT_UNLOCKED = 0x681
MM3_ENERGYLINK = 0x682
MM3_LAST_WILY = 0x683
MM3_RBM_STROBE = 0x684
MM3_SFX_QUEUE = 0x685
MM3_DOC_ROBOT_DEFEATED = 0x686
MM3_COMPLETED_STAGES = 0x687
MM3_RECEIVED_ITEMS = 0x688
MM3_RUSH_RECEIVED = 0x689
MM3_CONSUMABLE_TABLE: dict[int, dict[int, tuple[int, int]]] = {
# Stage:
# Item: (byte offset, bit mask)
0: {
0x0200: (0, 5),
0x0201: (3, 2),
},
1: {
0x0202: (2, 6),
0x0203: (2, 5),
0x0204: (2, 4),
0x0205: (2, 3),
0x0206: (3, 6),
0x0207: (3, 5),
0x0208: (3, 7),
0x0209: (4, 0)
},
2: {
0x020A: (2, 7),
0x020B: (3, 0),
0x020C: (3, 1),
0x020D: (3, 2),
0x020E: (4, 2),
0x020F: (4, 3),
0x0210: (4, 7),
0x0211: (5, 1),
0x0212: (6, 1),
0x0213: (7, 0)
},
3: {
0x0214: (0, 6),
0x0215: (1, 5),
0x0216: (2, 3),
0x0217: (2, 7),
0x0218: (2, 6),
0x0219: (2, 5),
0x021A: (4, 5),
},
4: {
0x021B: (1, 3),
0x021C: (1, 5),
0x021D: (1, 7),
0x021E: (2, 0),
0x021F: (1, 6),
0x0220: (2, 4),
0x0221: (2, 5),
0x0222: (4, 5)
},
5: {
0x0223: (3, 0),
0x0224: (3, 2),
0x0225: (4, 5),
0x0226: (4, 6),
0x0227: (6, 4),
},
6: {
0x0228: (2, 0),
0x0229: (2, 1),
0x022A: (3, 1),
0x022B: (3, 2),
0x022C: (3, 3),
0x022D: (3, 4),
},
7: {
0x022E: (3, 5),
0x022F: (3, 4),
0x0230: (3, 3),
0x0231: (3, 2),
},
8: {
0x0232: (1, 4),
0x0233: (2, 1),
0x0234: (2, 2),
0x0235: (2, 5),
0x0236: (3, 5),
0x0237: (4, 2),
0x0238: (4, 4),
0x0239: (5, 3),
0x023A: (6, 0),
0x023B: (6, 1),
0x023C: (7, 5),
},
9: {
0x023D: (3, 2),
0x023E: (3, 6),
0x023F: (4, 5),
0x0240: (5, 4),
},
10: {
0x0241: (0, 2),
0x0242: (2, 4)
},
11: {
0x0243: (4, 1),
0x0244: (6, 0),
0x0245: (6, 1),
0x0246: (6, 2),
0x0247: (6, 3),
},
12: {
0x0248: (0, 0),
0x0249: (0, 3),
0x024A: (0, 5),
0x024B: (1, 6),
0x024C: (2, 7),
0x024D: (2, 3),
0x024E: (2, 1),
0x024F: (2, 2),
0x0250: (3, 5),
0x0251: (3, 4),
0x0252: (3, 6),
0x0253: (3, 7)
},
13: {
0x0254: (0, 3),
0x0255: (0, 6),
0x0256: (1, 0),
0x0257: (3, 0),
0x0258: (3, 2),
0x0259: (3, 3),
0x025A: (3, 4),
0x025B: (3, 5),
0x025C: (3, 6),
0x025D: (4, 0),
0x025E: (3, 7),
0x025F: (4, 1),
0x0260: (4, 2),
},
14: {
0x0261: (0, 3),
0x0262: (0, 2),
0x0263: (0, 6),
0x0264: (1, 2),
0x0265: (1, 7),
0x0266: (2, 0),
0x0267: (2, 1),
0x0268: (2, 2),
0x0269: (2, 3),
0x026A: (5, 2),
0x026B: (5, 3),
},
15: {
0x026C: (0, 0),
0x026D: (0, 1),
0x026E: (0, 2),
0x026F: (0, 3),
0x0270: (0, 4),
0x0271: (0, 6),
0x0272: (1, 0),
0x0273: (1, 2),
0x0274: (1, 3),
0x0275: (1, 1),
0x0276: (0, 7),
0x0277: (3, 2),
0x0278: (2, 2),
0x0279: (2, 3),
0x027A: (2, 4),
0x027B: (2, 5),
0x027C: (3, 1),
0x027D: (3, 0),
0x027E: (2, 7),
0x027F: (2, 6),
},
16: {
0x0280: (0, 0),
0x0281: (0, 3),
0x0282: (0, 1),
0x0283: (0, 2),
},
17: {
0x0284: (0, 2),
0x0285: (0, 6),
0x0286: (0, 1),
0x0287: (0, 5),
0x0288: (0, 3),
0x0289: (0, 0),
0x028A: (0, 4)
}
}
def to_oneup_format(val: int) -> int:
return ((val // 10) * 0x10) + val % 10
def from_oneup_format(val: int) -> int:
return ((val // 0x10) * 10) + val % 0x10
class MM3EnergyLinkType(IntEnum):
Life = 0
NeedleCannon = 1
MagnetMissile = 2
GeminiLaser = 3
HardKnuckle = 4
TopSpin = 5
SearchSnake = 6
SparkShot = 7
ShadowBlade = 8
OneUP = 12
RushCoil = 0x11
RushMarine = 0x12
RushJet = 0x13
request_to_name: dict[str, str] = {
"HP": "health",
"NE": "Needle Cannon energy",
"MA": "Magnet Missile energy",
"GE": "Gemini Laser energy",
"HA": "Hard Knuckle energy",
"TO": "Top Spin energy",
"SN": "Search Snake energy",
"SP": "Spark Shot energy",
"SH": "Shadow Blade energy",
"RC": "Rush Coil energy",
"RM": "Rush Marine energy",
"RJ": "Rush Jet energy",
"1U": "lives"
}
HP_EXCHANGE_RATE = 500000000
WEAPON_EXCHANGE_RATE = 250000000
ONEUP_EXCHANGE_RATE = 14000000000
def cmd_pool(self: "BizHawkClientCommandProcessor") -> None:
"""Check the current pool of EnergyLink, and requestable refills from it."""
if self.ctx.game != "Mega Man 3":
logger.warning("This command can only be used when playing Mega Man 3.")
return
if not self.ctx.server or not self.ctx.slot:
logger.warning("You must be connected to a server to use this command.")
return
energylink = self.ctx.stored_data.get(f"EnergyLink{self.ctx.team}", 0)
health_points = energylink // HP_EXCHANGE_RATE
weapon_points = energylink // WEAPON_EXCHANGE_RATE
lives = energylink // ONEUP_EXCHANGE_RATE
logger.info(f"Healing available: {health_points}\n"
f"Weapon refill available: {weapon_points}\n"
f"Lives available: {lives}")
def cmd_request(self: "BizHawkClientCommandProcessor", amount: str, target: str) -> None:
"""Request a refill from EnergyLink."""
from worlds._bizhawk.context import BizHawkClientContext
if self.ctx.game != "Mega Man 3":
logger.warning("This command can only be used when playing Mega Man 3.")
return
if not self.ctx.server or not self.ctx.slot:
logger.warning("You must be connected to a server to use this command.")
return
valid_targets: dict[str, MM3EnergyLinkType] = {
"HP": MM3EnergyLinkType.Life,
"NE": MM3EnergyLinkType.NeedleCannon,
"MA": MM3EnergyLinkType.MagnetMissile,
"GE": MM3EnergyLinkType.GeminiLaser,
"HA": MM3EnergyLinkType.HardKnuckle,
"TO": MM3EnergyLinkType.TopSpin,
"SN": MM3EnergyLinkType.SearchSnake,
"SP": MM3EnergyLinkType.SparkShot,
"SH": MM3EnergyLinkType.ShadowBlade,
"RC": MM3EnergyLinkType.RushCoil,
"RM": MM3EnergyLinkType.RushMarine,
"RJ": MM3EnergyLinkType.RushJet,
"1U": MM3EnergyLinkType.OneUP
}
if target.upper() not in valid_targets:
logger.warning(f"Unrecognized target {target.upper()}. Available targets: {', '.join(valid_targets.keys())}")
return
ctx = self.ctx
assert isinstance(ctx, BizHawkClientContext)
client = ctx.client_handler
assert isinstance(client, MegaMan3Client)
client.refill_queue.append((valid_targets[target.upper()], int(amount)))
logger.info(f"Restoring {amount} {request_to_name[target.upper()]}.")
def cmd_autoheal(self: "BizHawkClientCommandProcessor") -> None:
"""Enable auto heal from EnergyLink."""
if self.ctx.game != "Mega Man 3":
logger.warning("This command can only be used when playing Mega Man 3.")
return
if not self.ctx.server or not self.ctx.slot:
logger.warning("You must be connected to a server to use this command.")
return
else:
assert isinstance(self.ctx.client_handler, MegaMan3Client)
if self.ctx.client_handler.auto_heal:
self.ctx.client_handler.auto_heal = False
logger.info(f"Auto healing disabled.")
else:
self.ctx.client_handler.auto_heal = True
logger.info(f"Auto healing enabled.")
def get_sfx_writes(sfx: int) -> tuple[int, bytes, str]:
return MM3_SFX_QUEUE, sfx.to_bytes(1, 'little'), "RAM"
class MegaMan3Client(BizHawkClient):
game = "Mega Man 3"
system = "NES"
patch_suffix = ".apmm3"
item_queue: list[NetworkItem] = []
pending_death_link: bool = False
# default to true, as we don't want to send a deathlink until Mega Man's HP is initialized once
sending_death_link: bool = True
death_link: bool = False
energy_link: bool = False
rom: bytes | None = None
weapon_energy: int = 0
health_energy: int = 0
auto_heal: bool = False
refill_queue: list[tuple[MM3EnergyLinkType, int]] = []
last_wily: int | None = None # default to wily 1
doc_status: int | None = None # default to no doc progress
async def validate_rom(self, ctx: "BizHawkClientContext") -> bool:
from worlds._bizhawk import RequestFailedError, read, get_memory_size
from . import MM3World
try:
if (await get_memory_size(ctx.bizhawk_ctx, "PRG ROM")) < 0x3FFB0:
# not the entire size, but enough to check validation
if "pool" in ctx.command_processor.commands:
ctx.command_processor.commands.pop("pool")
if "request" in ctx.command_processor.commands:
ctx.command_processor.commands.pop("request")
if "autoheal" in ctx.command_processor.commands:
ctx.command_processor.commands.pop("autoheal")
return False
game_name, version = (await read(ctx.bizhawk_ctx, [(0x3F320, 21, "PRG ROM"),
(0x3F33C, 3, "PRG ROM")]))
if game_name[:3] != b"MM3" or version != bytes(MM3World.world_version):
if game_name[:3] == b"MM3":
# I think this is an easier check than the other?
older_version = f"{version[0]}.{version[1]}.{version[2]}"
logger.warning(f"This Mega Man 3 patch was generated for an different version of the apworld. "
f"Please use that version to connect instead.\n"
f"Patch version: ({older_version})\n"
f"Client version: ({'.'.join([str(i) for i in MM3World.world_version])})")
if "pool" in ctx.command_processor.commands:
ctx.command_processor.commands.pop("pool")
if "request" in ctx.command_processor.commands:
ctx.command_processor.commands.pop("request")
if "autoheal" in ctx.command_processor.commands:
ctx.command_processor.commands.pop("autoheal")
return False
except UnicodeDecodeError:
return False
except RequestFailedError:
return False # Should verify on the next pass
ctx.game = self.game
self.rom = game_name
ctx.items_handling = 0b111
ctx.want_slot_data = False
deathlink = (await read(ctx.bizhawk_ctx, [(0x3F336, 1, "PRG ROM")]))[0][0]
if deathlink & 0x01:
self.death_link = True
await ctx.update_death_link(self.death_link)
if deathlink & 0x02:
self.energy_link = True
if self.energy_link:
if "pool" not in ctx.command_processor.commands:
ctx.command_processor.commands["pool"] = cmd_pool
if "request" not in ctx.command_processor.commands:
ctx.command_processor.commands["request"] = cmd_request
if "autoheal" not in ctx.command_processor.commands:
ctx.command_processor.commands["autoheal"] = cmd_autoheal
return True
async def set_auth(self, ctx: "BizHawkClientContext") -> None:
if self.rom:
ctx.auth = b64encode(self.rom).decode()
def on_package(self, ctx: "BizHawkClientContext", cmd: str, args: dict[str, Any]) -> None:
if cmd == "Bounced":
if "tags" in args:
assert ctx.slot is not None
if "DeathLink" in args["tags"] and args["data"]["source"] != ctx.slot_info[ctx.slot].name:
self.on_deathlink(ctx)
elif cmd == "Retrieved":
if f"MM3_LAST_WILY_{ctx.team}_{ctx.slot}" in args["keys"]:
self.last_wily = args["keys"][f"MM3_LAST_WILY_{ctx.team}_{ctx.slot}"]
if f"MM3_DOC_STATUS_{ctx.team}_{ctx.slot}" in args["keys"]:
self.doc_status = args["keys"][f"MM3_DOC_STATUS_{ctx.team}_{ctx.slot}"]
elif cmd == "Connected":
if self.energy_link:
ctx.set_notify(f"EnergyLink{ctx.team}")
if ctx.ui:
ctx.ui.enable_energy_link()
async def send_deathlink(self, ctx: "BizHawkClientContext") -> None:
self.sending_death_link = True
ctx.last_death_link = time.time()
await ctx.send_death("Mega Man was defeated.")
def on_deathlink(self, ctx: "BizHawkClientContext") -> None:
ctx.last_death_link = time.time()
self.pending_death_link = True
async def game_watcher(self, ctx: "BizHawkClientContext") -> None:
from worlds._bizhawk import read, write
if ctx.server is None:
return
if ctx.slot is None:
return
# get our relevant bytes
(prog_state, robot_masters_unlocked, robot_masters_defeated, doc_status, doc_robo_unlocked, doc_robo_defeated,
rush_acquired, received_items, completed_stages, consumable_checks,
e_tanks, lives, weapon_energy, health, state, bar_state, current_stage,
energy_link_packet, last_wily) = await read(ctx.bizhawk_ctx, [
(MM3_PROG_STATE, 1, "RAM"),
(MM3_ROBOT_MASTERS_UNLOCKED, 1, "RAM"),
(MM3_ROBOT_MASTERS_DEFEATED, 1, "RAM"),
(MM3_DOC_STATUS, 1, "RAM"),
(MM3_DOC_ROBOT_UNLOCKED, 1, "RAM"),
(MM3_DOC_ROBOT_DEFEATED, 1, "RAM"),
(MM3_RUSH_RECEIVED, 1, "RAM"),
(MM3_RECEIVED_ITEMS, 1, "RAM"),
(MM3_COMPLETED_STAGES, 0x1, "RAM"),
(MM3_CONSUMABLES, 16, "RAM"), # Could be more but 16 definitely catches all current
(MM3_E_TANKS, 1, "RAM"),
(MM3_LIVES, 1, "RAM"),
(MM3_WEAPON_ENERGY, 11, "RAM"),
(MM3_HEALTH, 1, "RAM"),
(MM3_MEGAMAN_STATE, 1, "RAM"),
(MM3_ENERGY_BAR, 2, "RAM"),
(MM3_CURRENT_STAGE, 1, "RAM"),
(MM3_ENERGYLINK, 1, "RAM"),
(MM3_LAST_WILY, 1, "RAM"),
])
if bar_state[0] not in (0x00, 0x80):
return # Game is not initialized
# Bit of a trick here, bar state can only be 0x00 or 0x80 (display health bar, or don't)
# This means it can double as init guard and in-stage tracker
if not ctx.finished_game and completed_stages[0] & 0x20:
await ctx.send_msgs([{
"cmd": "StatusUpdate",
"status": ClientStatus.CLIENT_GOAL
}])
writes = []
# deathlink
# only handle deathlink in bar state 0x80 (in stage)
if bar_state[0] == 0x80:
if self.pending_death_link:
writes.append((MM3_MEGAMAN_STATE, bytes([0x0E]), "RAM"))
self.pending_death_link = False
self.sending_death_link = True
if "DeathLink" in ctx.tags and ctx.last_death_link + 1 < time.time():
if state[0] == 0x0E and not self.sending_death_link:
await self.send_deathlink(ctx)
elif state[0] != 0x0E:
self.sending_death_link = False
if self.last_wily != last_wily[0]:
if self.last_wily is None:
# revalidate last wily from data storage
await ctx.send_msgs([{"cmd": "Set", "key": f"MM3_LAST_WILY_{ctx.team}_{ctx.slot}", "operations": [
{"operation": "default", "value": 0xC}
]}])
await ctx.send_msgs([{"cmd": "Get", "keys": [f"MM3_LAST_WILY_{ctx.team}_{ctx.slot}"]}])
elif last_wily[0] == 0:
writes.append((MM3_LAST_WILY, self.last_wily.to_bytes(1, "little"), "RAM"))
else:
# correct our setting
self.last_wily = last_wily[0]
await ctx.send_msgs([{"cmd": "Set", "key": f"MM3_LAST_WILY_{ctx.team}_{ctx.slot}", "operations": [
{"operation": "replace", "value": self.last_wily}
]}])
if self.doc_status != doc_status[0]:
if self.doc_status is None:
# revalidate doc status from data storage
await ctx.send_msgs([{"cmd": "Set", "key": f"MM3_DOC_STATUS_{ctx.team}_{ctx.slot}", "operations": [
{"operation": "default", "value": 0}
]}])
await ctx.send_msgs([{"cmd": "Get", "keys": [f"MM3_DOC_STATUS_{ctx.team}_{ctx.slot}"]}])
elif doc_status[0] == 0:
writes.append((MM3_DOC_STATUS, self.doc_status.to_bytes(1, "little"), "RAM"))
else:
# correct our setting
# shouldn't be possible to desync, but we'll account for it anyways
self.doc_status |= doc_status[0]
await ctx.send_msgs([{"cmd": "Set", "key": f"MM3_DOC_STATUS_{ctx.team}_{ctx.slot}", "operations": [
{"operation": "replace", "value": self.doc_status}
]}])
weapon_energy = bytearray(weapon_energy)
# handle receiving items
recv_amount = received_items[0]
if recv_amount < len(ctx.items_received):
item = ctx.items_received[recv_amount]
logging.info('Received %s from %s (%s) (%d/%d in list)' % (
color(ctx.item_names.lookup_in_slot(item.item), 'red', 'bold'),
color(ctx.player_names[item.player], 'yellow'),
ctx.location_names.lookup_in_slot(item.location, item.player), recv_amount, len(ctx.items_received)))
if item.item & 0x120 == 0:
# Robot Master Weapon, or Rush
new_weapons = item.item & 0xFF
weapon_energy[MM3_WEAPONS[new_weapons]] |= 0x9C
writes.append((MM3_WEAPON_ENERGY, weapon_energy, "RAM"))
writes.append(get_sfx_writes(0x32))
elif item.item & 0x20 == 0:
# Robot Master Stage Access
# Catch the Doc Robo here
if item.item & 0x10:
ptr = MM3_DOC_ROBOT_UNLOCKED
unlocked = doc_robo_unlocked
else:
ptr = MM3_ROBOT_MASTERS_UNLOCKED
unlocked = robot_masters_unlocked
new_stages = unlocked[0] | (1 << ((item.item & 0xF) - 1))
print(new_stages)
writes.append((ptr, new_stages.to_bytes(1, 'little'), "RAM"))
writes.append(get_sfx_writes(0x34))
writes.append((MM3_RBM_STROBE, b"\x01", "RAM"))
else:
# append to the queue, so we handle it later
self.item_queue.append(item)
recv_amount += 1
writes.append((MM3_RECEIVED_ITEMS, recv_amount.to_bytes(1, 'little'), "RAM"))
if energy_link_packet[0]:
pickup = energy_link_packet[0]
if pickup in (0x64, 0x65):
# Health pickups
if pickup == 0x65:
value = 2
else:
value = 10
exchange_rate = HP_EXCHANGE_RATE
elif pickup in (0x66, 0x67):
# Weapon Energy
if pickup == 0x67:
value = 2
else:
value = 10
exchange_rate = WEAPON_EXCHANGE_RATE
elif pickup == 0x69:
# 1-Up
value = 1
exchange_rate = ONEUP_EXCHANGE_RATE
else:
# if we managed to pickup something else, we should just fall through
value = 0
exchange_rate = 0
contribution = (value * exchange_rate) >> 1
if contribution:
await ctx.send_msgs([{
"cmd": "Set", "key": f"EnergyLink{ctx.team}", "slot": ctx.slot, "operations":
[{"operation": "add", "value": contribution},
{"operation": "max", "value": 0}]}])
logger.info(f"Deposited {contribution / HP_EXCHANGE_RATE} health into the pool.")
writes.append((MM3_ENERGYLINK, 0x00.to_bytes(1, "little"), "RAM"))
if self.weapon_energy:
# Weapon Energy
# We parse the whole thing to spread it as thin as possible
current_energy = self.weapon_energy
for i, weapon in zip(range(len(weapon_energy)), weapon_energy):
if weapon & 0x80 and (weapon & 0x7F) < 0x1C:
missing = 0x1C - (weapon & 0x7F)
if missing > self.weapon_energy:
missing = self.weapon_energy
self.weapon_energy -= missing
weapon_energy[i] = weapon + missing
if not self.weapon_energy:
writes.append((MM3_WEAPON_ENERGY, weapon_energy, "RAM"))
break
else:
if current_energy != self.weapon_energy:
writes.append((MM3_WEAPON_ENERGY, weapon_energy, "RAM"))
if self.health_energy or self.auto_heal:
# Health Energy
# We save this if the player has not taken any damage
current_health = health[0]
if 0 < (current_health & 0x7F) < 0x1C:
health_diff = 0x1C - (current_health & 0x7F)
if self.health_energy:
if health_diff > self.health_energy:
health_diff = self.health_energy
self.health_energy -= health_diff
else:
pool = ctx.stored_data.get(f"EnergyLink{ctx.team}", 0)
if health_diff * HP_EXCHANGE_RATE > pool:
health_diff = int(pool // HP_EXCHANGE_RATE)
await ctx.send_msgs([{
"cmd": "Set", "key": f"EnergyLink{ctx.team}", "slot": ctx.slot, "operations":
[{"operation": "add", "value": -health_diff * HP_EXCHANGE_RATE},
{"operation": "max", "value": 0}]}])
current_health += health_diff
writes.append((MM3_HEALTH, current_health.to_bytes(1, 'little'), "RAM"))
if self.refill_queue:
refill_type, refill_amount = self.refill_queue.pop()
if refill_type == MM3EnergyLinkType.Life:
exchange_rate = HP_EXCHANGE_RATE
elif refill_type == MM3EnergyLinkType.OneUP:
exchange_rate = ONEUP_EXCHANGE_RATE
else:
exchange_rate = WEAPON_EXCHANGE_RATE
pool = ctx.stored_data.get(f"EnergyLink{ctx.team}", 0)
request = exchange_rate * refill_amount
if request > pool:
logger.warning(
f"Not enough energy to fulfill the request. Maximum request: {pool // exchange_rate}")
else:
await ctx.send_msgs([{
"cmd": "Set", "key": f"EnergyLink{ctx.team}", "slot": ctx.slot, "operations":
[{"operation": "add", "value": -request},
{"operation": "max", "value": 0}]}])
if refill_type == MM3EnergyLinkType.Life:
refill_ptr = MM3_HEALTH
elif refill_type == MM3EnergyLinkType.OneUP:
refill_ptr = MM3_LIVES
else:
refill_ptr = MM3_WEAPON_ENERGY + MM3_WEAPONS[refill_type]
current_value = (await read(ctx.bizhawk_ctx, [(refill_ptr, 1, "RAM")]))[0][0]
if refill_type == MM3EnergyLinkType.OneUP:
current_value = from_oneup_format(current_value)
new_value = min(0x9C if refill_type != MM3EnergyLinkType.OneUP else 99, current_value + refill_amount)
if refill_type == MM3EnergyLinkType.OneUP:
new_value = to_oneup_format(new_value)
writes.append((refill_ptr, new_value.to_bytes(1, "little"), "RAM"))
if len(self.item_queue):
item = self.item_queue.pop(0)
idx = item.item & 0xF
if idx == 0:
# 1-Up
current_lives = from_oneup_format(lives[0])
if current_lives > 99:
self.item_queue.append(item)
else:
current_lives += 1
current_lives = to_oneup_format(current_lives)
writes.append((MM3_LIVES, current_lives.to_bytes(1, 'little'), "RAM"))
writes.append(get_sfx_writes(0x14))
elif idx == 1:
self.weapon_energy += 0xE
writes.append(get_sfx_writes(0x1C))
elif idx == 2:
self.health_energy += 0xE
writes.append(get_sfx_writes(0x1C))
elif idx == 3:
current_tanks = from_oneup_format(e_tanks[0])
if current_tanks > 99:
self.item_queue.append(item)
else:
current_tanks += 1
current_tanks = to_oneup_format(current_tanks)
writes.append((MM3_E_TANKS, current_tanks.to_bytes(1, 'little'), "RAM"))
writes.append(get_sfx_writes(0x14))
await write(ctx.bizhawk_ctx, writes)
new_checks = []
# check for locations
for i in range(8):
flag = 1 << i
if robot_masters_defeated[0] & flag:
rbm_id = 0x0001 + i
if rbm_id not in ctx.checked_locations:
new_checks.append(rbm_id)
wep_id = 0x0101 + i
if wep_id not in ctx.checked_locations:
new_checks.append(wep_id)
if doc_robo_defeated[0] & flag:
doc_id = 0x0010 + MM3_DOC_REMAP[i]
if doc_id not in ctx.checked_locations:
new_checks.append(doc_id)
for i in range(2):
flag = 1 << i
if rush_acquired[0] & flag:
itm_id = 0x0111 + i
if itm_id not in ctx.checked_locations:
new_checks.append(itm_id)
for i in (0, 1, 2, 4):
# Wily 4 does not have a boss check
boss_id = 0x0009 + i
if completed_stages[0] & (1 << i) != 0:
if boss_id not in ctx.checked_locations:
new_checks.append(boss_id)
if completed_stages[0] & 0x80 and 0x000F not in ctx.checked_locations:
new_checks.append(0x000F)
if bar_state[0] == 0x80: # currently in stage
if (prog_state[0] > 0x00 and current_stage[0] >= 8) or prog_state[0] == 0x00:
# need to block the specific state of Break Man prog=0x12 stage=0x5
# it doesn't clean the consumable table and he doesn't have any anyways
for consumable in MM3_CONSUMABLE_TABLE[current_stage[0]]:
consumable_info = MM3_CONSUMABLE_TABLE[current_stage[0]][consumable]
if consumable not in ctx.checked_locations:
is_checked = consumable_checks[consumable_info[0]] & (1 << consumable_info[1])
if is_checked:
new_checks.append(consumable)
for new_check_id in new_checks:
ctx.locations_checked.add(new_check_id)
location = ctx.location_names.lookup_in_game(new_check_id)
nes_logger.info(
f'New Check: {location} ({len(ctx.locations_checked)}/'
f'{len(ctx.missing_locations) + len(ctx.checked_locations)})')
await ctx.send_msgs([{"cmd": 'LocationChecks', "locations": [new_check_id]}])