Merge remote-tracking branch 'Main/main' into rework_accessibility

# Conflicts:
#	BaseClasses.py
#	playerSettings.yaml
#	worlds/alttp/Options.py
#	worlds/alttp/Rules.py
#	worlds/alttp/test/dungeons/TestDungeon.py
#	worlds/alttp/test/inverted/TestInverted.py
#	worlds/alttp/test/inverted_minor_glitches/TestInvertedMinor.py
#	worlds/alttp/test/inverted_owg/TestInvertedOWG.py
#	worlds/alttp/test/minor_glitches/TestMinor.py
#	worlds/alttp/test/owg/TestVanillaOWG.py
#	worlds/alttp/test/vanilla/TestVanilla.py
#	worlds/messenger/Options.py
#	worlds/messenger/Rules.py
#	worlds/pokemon_rb/options.py
#	worlds/pokemon_rb/rules.py
This commit is contained in:
alwaysintreble
2023-10-10 17:47:40 -05:00
876 changed files with 84124 additions and 49325 deletions

View File

@@ -7,9 +7,11 @@ import random
import secrets
import typing # this can go away when Python 3.8 support is dropped
from argparse import Namespace
from collections import OrderedDict, Counter, deque, ChainMap
from collections import ChainMap, Counter, deque
from collections.abc import Collection
from enum import IntEnum, IntFlag
from typing import List, Dict, Optional, Set, Iterable, Union, Any, Tuple, TypedDict, Callable, NamedTuple
from typing import Any, Callable, Dict, Iterable, Iterator, List, NamedTuple, Optional, Set, Tuple, TypedDict, Union, \
Type, ClassVar
import NetUtils
import Options
@@ -28,15 +30,15 @@ class Group(TypedDict, total=False):
link_replacement: bool
class ThreadBarrierProxy():
class ThreadBarrierProxy:
"""Passes through getattr while passthrough is True"""
def __init__(self, obj: Any):
def __init__(self, obj: object) -> None:
self.passthrough = True
self.obj = obj
def __getattr__(self, item):
def __getattr__(self, name: str) -> Any:
if self.passthrough:
return getattr(self.obj, item)
return getattr(self.obj, name)
else:
raise RuntimeError("You are in a threaded context and global random state was removed for your safety. "
"Please use multiworld.per_slot_randoms[player] or randomize ahead of output.")
@@ -81,6 +83,7 @@ class MultiWorld():
random: random.Random
per_slot_randoms: Dict[int, random.Random]
"""Deprecated. Please use `self.random` instead."""
class AttributeProxy():
def __init__(self, rule):
@@ -96,7 +99,6 @@ class MultiWorld():
self.player_types = {player: NetUtils.SlotType.player for player in self.player_ids}
self.glitch_triforce = False
self.algorithm = 'balanced'
self.dungeons: Dict[Tuple[str, int], Dungeon] = {}
self.groups = {}
self.regions = []
self.shops = []
@@ -113,7 +115,6 @@ class MultiWorld():
self.dark_world_light_cone = False
self.rupoor_cost = 10
self.aga_randomness = True
self.lock_aga_door_in_escape = False
self.save_and_quit_from_boss = True
self.custom = False
self.customitemarray = []
@@ -122,6 +123,7 @@ class MultiWorld():
self.early_items = {player: {} for player in self.player_ids}
self.local_early_items = {player: {} for player in self.player_ids}
self.indirect_connections = {}
self.start_inventory_from_pool: Dict[int, Options.StartInventoryPool] = {}
self.fix_trock_doors = self.AttributeProxy(
lambda player: self.shuffle[player] != 'vanilla' or self.mode[player] == 'inverted')
self.fix_skullwoods_exit = self.AttributeProxy(
@@ -135,7 +137,6 @@ class MultiWorld():
def set_player_attr(attr, val):
self.__dict__.setdefault(attr, {})[player] = val
set_player_attr('tech_tree_layout_prerequisites', {})
set_player_attr('_region_cache', {})
set_player_attr('shuffle', "vanilla")
set_player_attr('logic', "noglitches")
@@ -202,14 +203,7 @@ class MultiWorld():
self.player_types[new_id] = NetUtils.SlotType.group
self._region_cache[new_id] = {}
world_type = AutoWorld.AutoWorldRegister.world_types[game]
for option_key, option in world_type.option_definitions.items():
getattr(self, option_key)[new_id] = option(option.default)
for option_key, option in Options.common_options.items():
getattr(self, option_key)[new_id] = option(option.default)
for option_key, option in Options.per_game_common_options.items():
getattr(self, option_key)[new_id] = option(option.default)
self.worlds[new_id] = world_type(self, new_id)
self.worlds[new_id] = world_type.create_group(self, new_id, players)
self.worlds[new_id].collect_item = classmethod(AutoWorld.World.collect_item).__get__(self.worlds[new_id])
self.player_name[new_id] = name
@@ -232,24 +226,24 @@ class MultiWorld():
range(1, self.players + 1)}
def set_options(self, args: Namespace) -> None:
for option_key in Options.common_options:
setattr(self, option_key, getattr(args, option_key, ))
for option_key in Options.per_game_common_options:
setattr(self, option_key, getattr(args, option_key, {}))
for player in self.player_ids:
self.custom_data[player] = {}
world_type = AutoWorld.AutoWorldRegister.world_types[self.game[player]]
for option_key in world_type.option_definitions:
setattr(self, option_key, getattr(args, option_key, {}))
self.worlds[player] = world_type(self, player)
self.worlds[player].random = self.per_slot_randoms[player]
for option_key in world_type.options_dataclass.type_hints:
option_values = getattr(args, option_key, {})
setattr(self, option_key, option_values)
# TODO - remove this loop once all worlds use options dataclasses
options_dataclass: typing.Type[Options.PerGameCommonOptions] = self.worlds[player].options_dataclass
self.worlds[player].options = options_dataclass(**{option_key: getattr(args, option_key)[player]
for option_key in options_dataclass.type_hints})
def set_item_links(self):
item_links = {}
replacement_prio = [False, True, None]
for player in self.player_ids:
for item_link in self.item_links[player].value:
for item_link in self.worlds[player].options.item_links.value:
if item_link["name"] in item_links:
if item_links[item_link["name"]]["game"] != self.game[player]:
raise Exception(f"Cannot ItemLink across games. Link: {item_link['name']}")
@@ -304,14 +298,6 @@ class MultiWorld():
group["non_local_items"] = item_link["non_local_items"]
group["link_replacement"] = replacement_prio[item_link["link_replacement"]]
# intended for unittests
def set_default_common_options(self):
for option_key, option in Options.common_options.items():
setattr(self, option_key, {player_id: option(option.default) for player_id in self.player_ids})
for option_key, option in Options.per_game_common_options.items():
setattr(self, option_key, {player_id: option(option.default) for player_id in self.player_ids})
self.state = CollectionState(self)
def secure(self):
self.random = ThreadBarrierProxy(secrets.SystemRandom())
self.is_race = True
@@ -363,7 +349,7 @@ class MultiWorld():
for r_location in region.locations:
self._location_cache[r_location.name, player] = r_location
def get_regions(self, player=None):
def get_regions(self, player: Optional[int] = None) -> Collection[Region]:
return self.regions if player is None else self._region_cache[player].values()
def get_region(self, regionname: str, player: int) -> Region:
@@ -387,12 +373,6 @@ class MultiWorld():
self._recache()
return self._location_cache[location, player]
def get_dungeon(self, dungeonname: str, player: int) -> Dungeon:
try:
return self.dungeons[dungeonname, player]
except KeyError as e:
raise KeyError('No such dungeon %s for player %d' % (dungeonname, player)) from e
def get_all_state(self, use_cache: bool) -> CollectionState:
cached = getattr(self, "_all_state", None)
if use_cache and cached:
@@ -445,7 +425,6 @@ class MultiWorld():
self.state.collect(item, True)
def push_item(self, location: Location, item: Item, collect: bool = True):
assert location.can_fill(self.state, item, False), f"Cannot place {item} into {location}."
location.item = item
item.location = location
if collect:
@@ -493,8 +472,10 @@ class MultiWorld():
def get_unfilled_locations_for_players(self, location_names: List[str], players: Iterable[int]):
for player in players:
if not location_names:
location_names = [location.name for location in self.get_unfilled_locations(player)]
for location_name in location_names:
valid_locations = [location.name for location in self.get_unfilled_locations(player)]
else:
valid_locations = location_names
for location_name in valid_locations:
location = self._location_cache.get((location_name, player), None)
if location is not None and location.item is None:
yield location
@@ -743,9 +724,11 @@ class CollectionState():
return self.prog_items[item, player] >= count
def has_all(self, items: Set[str], player: int) -> bool:
"""Returns True if each item name of items is in state at least once."""
return all(self.prog_items[item, player] for item in items)
def has_any(self, items: Set[str], player: int) -> bool:
"""Returns True if at least one item name of items is in state at least once."""
return any(self.prog_items[item, player] for item in items)
def count(self, item: str, player: int) -> int:
@@ -794,56 +777,6 @@ class CollectionState():
self.stale[item.player] = True
class Region:
name: str
_hint_text: str
player: int
multiworld: Optional[MultiWorld]
entrances: List[Entrance]
exits: List[Entrance]
locations: List[Location]
dungeon: Optional[Dungeon] = None
def __init__(self, name: str, player: int, multiworld: MultiWorld, hint: Optional[str] = None):
self.name = name
self.entrances = []
self.exits = []
self.locations = []
self.multiworld = multiworld
self._hint_text = hint
self.player = player
def can_reach(self, state: CollectionState) -> bool:
if state.stale[self.player]:
state.update_reachable_regions(self.player)
return self in state.reachable_regions[self.player]
def can_reach_private(self, state: CollectionState) -> bool:
for entrance in self.entrances:
if entrance.can_reach(state):
if not self in state.path:
state.path[self] = (self.name, state.path.get(entrance, None))
return True
return False
@property
def hint_text(self) -> str:
return self._hint_text if self._hint_text else self.name
def get_connecting_entrance(self, is_main_entrance: typing.Callable[[Entrance], bool]) -> Entrance:
for entrance in self.entrances:
if is_main_entrance(entrance):
return entrance
for entrance in self.entrances: # BFS might be better here, trying DFS for now.
return entrance.parent_region.get_connecting_entrance(is_main_entrance)
def __repr__(self):
return self.__str__()
def __str__(self):
return self.multiworld.get_name_string_for_object(self) if self.multiworld else f'{self.name} (Player {self.player})'
class Entrance:
access_rule: Callable[[CollectionState], bool] = staticmethod(lambda state: True)
hide_path: bool = False
@@ -882,41 +815,92 @@ class Entrance:
return world.get_name_string_for_object(self) if world else f'{self.name} (Player {self.player})'
class Dungeon(object):
def __init__(self, name: str, regions: List[Region], big_key: Item, small_keys: List[Item],
dungeon_items: List[Item], player: int):
class Region:
name: str
_hint_text: str
player: int
multiworld: Optional[MultiWorld]
entrances: List[Entrance]
exits: List[Entrance]
locations: List[Location]
entrance_type: ClassVar[Type[Entrance]] = Entrance
def __init__(self, name: str, player: int, multiworld: MultiWorld, hint: Optional[str] = None):
self.name = name
self.regions = regions
self.big_key = big_key
self.small_keys = small_keys
self.dungeon_items = dungeon_items
self.bosses = dict()
self.entrances = []
self.exits = []
self.locations = []
self.multiworld = multiworld
self._hint_text = hint
self.player = player
self.multiworld = None
def can_reach(self, state: CollectionState) -> bool:
if state.stale[self.player]:
state.update_reachable_regions(self.player)
return self in state.reachable_regions[self.player]
@property
def boss(self) -> Optional[Boss]:
return self.bosses.get(None, None)
def hint_text(self) -> str:
return self._hint_text if self._hint_text else self.name
@boss.setter
def boss(self, value: Optional[Boss]):
self.bosses[None] = value
def get_connecting_entrance(self, is_main_entrance: Callable[[Entrance], bool]) -> Entrance:
for entrance in self.entrances:
if is_main_entrance(entrance):
return entrance
for entrance in self.entrances: # BFS might be better here, trying DFS for now.
return entrance.parent_region.get_connecting_entrance(is_main_entrance)
@property
def keys(self) -> List[Item]:
return self.small_keys + ([self.big_key] if self.big_key else [])
def add_locations(self, locations: Dict[str, Optional[int]],
location_type: Optional[Type[Location]] = None) -> None:
"""
Adds locations to the Region object, where location_type is your Location class and locations is a dict of
location names to address.
@property
def all_items(self) -> List[Item]:
return self.dungeon_items + self.keys
:param locations: dictionary of locations to be created and added to this Region `{name: ID}`
:param location_type: Location class to be used to create the locations with"""
if location_type is None:
location_type = Location
for location, address in locations.items():
self.locations.append(location_type(self.player, location, address, self))
def is_dungeon_item(self, item: Item) -> bool:
return item.player == self.player and item.name in (dungeon_item.name for dungeon_item in self.all_items)
def connect(self, connecting_region: Region, name: Optional[str] = None,
rule: Optional[Callable[[CollectionState], bool]] = None) -> None:
"""
Connects this Region to another Region, placing the provided rule on the connection.
def __eq__(self, other: Dungeon) -> bool:
if not other:
return False
return self.name == other.name and self.player == other.player
:param connecting_region: Region object to connect to path is `self -> exiting_region`
:param name: name of the connection being created
:param rule: callable to determine access of this connection to go from self to the exiting_region"""
exit_ = self.create_exit(name if name else f"{self.name} -> {connecting_region.name}")
if rule:
exit_.access_rule = rule
exit_.connect(connecting_region)
def create_exit(self, name: str) -> Entrance:
"""
Creates and returns an Entrance object as an exit of this region.
:param name: name of the Entrance being created
"""
exit_ = self.entrance_type(self.player, name, self)
self.exits.append(exit_)
return exit_
def add_exits(self, exits: Union[Iterable[str], Dict[str, Optional[str]]],
rules: Dict[str, Callable[[CollectionState], bool]] = None) -> None:
"""
Connects current region to regions in exit dictionary. Passed region names must exist first.
:param exits: exits from the region. format is {"connecting_region": "exit_name"}. if a non dict is provided,
created entrances will be named "self.name -> connecting_region"
:param rules: rules for the exits from this region. format is {"connecting_region", rule}
"""
if not isinstance(exits, Dict):
exits = dict.fromkeys(exits)
for connecting_region, name in exits.items():
self.connect(self.multiworld.get_region(connecting_region, self.player),
name,
rules[connecting_region] if rules and connecting_region in rules else None)
def __repr__(self):
return self.__str__()
@@ -925,20 +909,6 @@ class Dungeon(object):
return self.multiworld.get_name_string_for_object(self) if self.multiworld else f'{self.name} (Player {self.player})'
class Boss():
def __init__(self, name: str, enemizer_name: str, defeat_rule: Callable, player: int):
self.name = name
self.enemizer_name = enemizer_name
self.defeat_rule = defeat_rule
self.player = player
def can_defeat(self, state) -> bool:
return self.defeat_rule(state, self.player)
def __repr__(self):
return f"Boss({self.name})"
class LocationProgressType(IntEnum):
DEFAULT = 1
PRIORITY = 2
@@ -1071,15 +1041,19 @@ class Item:
def flags(self) -> int:
return self.classification.as_flag()
def __eq__(self, other):
def __eq__(self, other: object) -> bool:
if not isinstance(other, Item):
return NotImplemented
return self.name == other.name and self.player == other.player
def __lt__(self, other: Item) -> bool:
def __lt__(self, other: object) -> bool:
if not isinstance(other, Item):
return NotImplemented
if other.player != self.player:
return other.player < self.player
return self.name < other.name
def __hash__(self):
def __hash__(self) -> int:
return hash((self.name, self.player))
def __repr__(self) -> str:
@@ -1091,33 +1065,44 @@ class Item:
return f"{self.name} (Player {self.player})"
class Spoiler():
multiworld: MultiWorld
unreachables: Set[Location]
class EntranceInfo(TypedDict, total=False):
player: int
entrance: str
exit: str
direction: str
def __init__(self, world):
self.multiworld = world
class Spoiler:
multiworld: MultiWorld
hashes: Dict[int, str]
entrances: Dict[Tuple[str, str, int], EntranceInfo]
playthrough: Dict[str, Union[List[str], Dict[str, str]]] # sphere "0" is list, others are dict
unreachables: Set[Location]
paths: Dict[str, List[Union[Tuple[str, str], Tuple[str, None]]]] # last step takes no further exits
def __init__(self, multiworld: MultiWorld) -> None:
self.multiworld = multiworld
self.hashes = {}
self.entrances = OrderedDict()
self.entrances = {}
self.playthrough = {}
self.unreachables = set()
self.paths = {}
def set_entrance(self, entrance: str, exit_: str, direction: str, player: int):
def set_entrance(self, entrance: str, exit_: str, direction: str, player: int) -> None:
if self.multiworld.players == 1:
self.entrances[(entrance, direction, player)] = OrderedDict(
[('entrance', entrance), ('exit', exit_), ('direction', direction)])
self.entrances[(entrance, direction, player)] = \
{"entrance": entrance, "exit": exit_, "direction": direction}
else:
self.entrances[(entrance, direction, player)] = OrderedDict(
[('player', player), ('entrance', entrance), ('exit', exit_), ('direction', direction)])
self.entrances[(entrance, direction, player)] = \
{"player": player, "entrance": entrance, "exit": exit_, "direction": direction}
def create_playthrough(self, create_paths: bool = True):
def create_playthrough(self, create_paths: bool = True) -> None:
"""Destructive to the world while it is run, damage gets repaired afterwards."""
from itertools import chain
# get locations containing progress items
multiworld = self.multiworld
prog_locations = {location for location in multiworld.get_filled_locations() if location.item.advancement}
state_cache = [None]
state_cache: List[Optional[CollectionState]] = [None]
collection_spheres: List[Set[Location]] = []
state = CollectionState(multiworld)
sphere_candidates = set(prog_locations)
@@ -1226,17 +1211,17 @@ class Spoiler():
for item in removed_precollected:
multiworld.push_precollected(item)
def create_paths(self, state: CollectionState, collection_spheres: List[Set[Location]]):
def create_paths(self, state: CollectionState, collection_spheres: List[Set[Location]]) -> None:
from itertools import zip_longest
multiworld = self.multiworld
def flist_to_iter(node):
while node:
value, node = node
yield value
def flist_to_iter(path_value: Optional[PathValue]) -> Iterator[str]:
while path_value:
region_or_entrance, path_value = path_value
yield region_or_entrance
def get_path(state, region):
reversed_path_as_flist = state.path.get(region, (region, None))
def get_path(state: CollectionState, region: Region) -> List[Union[Tuple[str, str], Tuple[str, None]]]:
reversed_path_as_flist: PathValue = state.path.get(region, (str(region), None))
string_path_flat = reversed(list(map(str, flist_to_iter(reversed_path_as_flist))))
# Now we combine the flat string list into (region, exit) pairs
pathsiter = iter(string_path_flat)
@@ -1262,14 +1247,11 @@ class Spoiler():
self.paths[str(multiworld.get_region('Inverted Big Bomb Shop', player))] = \
get_path(state, multiworld.get_region('Inverted Big Bomb Shop', player))
def to_file(self, filename: str):
def write_option(option_key: str, option_obj: type(Options.Option)):
res = getattr(self.multiworld, option_key)[player]
def to_file(self, filename: str) -> None:
def write_option(option_key: str, option_obj: Options.AssembleOptions) -> None:
res = getattr(self.multiworld.worlds[player].options, option_key)
display_name = getattr(option_obj, "display_name", option_key)
try:
outfile.write(f'{display_name + ":":33}{res.current_option_name}\n')
except:
raise Exception
outfile.write(f"{display_name + ':':33}{res.current_option_name}\n")
with open(filename, 'w', encoding="utf-8-sig") as outfile:
outfile.write(
@@ -1285,8 +1267,7 @@ class Spoiler():
outfile.write('\nPlayer %d: %s\n' % (player, self.multiworld.get_player_name(player)))
outfile.write('Game: %s\n' % self.multiworld.game[player])
options = ChainMap(Options.per_game_common_options, self.multiworld.worlds[player].option_definitions)
for f_option, option in options.items():
for f_option, option in self.multiworld.worlds[player].options_dataclass.type_hints.items():
write_option(f_option, option)
AutoWorld.call_single(self.multiworld, "write_spoiler_header", player, outfile)
@@ -1302,15 +1283,15 @@ class Spoiler():
AutoWorld.call_all(self.multiworld, "write_spoiler", outfile)
locations = [(str(location), str(location.item) if location.item is not None else "Nothing")
for location in self.multiworld.get_locations() if location.show_in_spoiler]
for location in self.multiworld.get_locations() if location.show_in_spoiler]
outfile.write('\n\nLocations:\n\n')
outfile.write('\n'.join(
['%s: %s' % (location, item) for location, item in locations]))
outfile.write('\n\nPlaythrough:\n\n')
outfile.write('\n'.join(['%s: {\n%s\n}' % (sphere_nr, '\n'.join(
[' %s: %s' % (location, item) for (location, item) in sphere.items()] if sphere_nr != '0' else [
f' {item}' for item in sphere])) for (sphere_nr, sphere) in self.playthrough.items()]))
[f" {location}: {item}" for (location, item) in sphere.items()] if isinstance(sphere, dict) else
[f" {item}" for item in sphere])) for (sphere_nr, sphere) in self.playthrough.items()]))
if self.unreachables:
outfile.write('\n\nUnreachable Items:\n\n')
outfile.write(
@@ -1371,23 +1352,21 @@ class PlandoOptions(IntFlag):
@classmethod
def _handle_part(cls, part: str, base: PlandoOptions) -> PlandoOptions:
try:
part = cls[part]
return base | cls[part]
except Exception as e:
raise KeyError(f"{part} is not a recognized name for a plando module. "
f"Known options: {', '.join(flag.name for flag in cls)}") from e
else:
return base | part
f"Known options: {', '.join(str(flag.name) for flag in cls)}") from e
def __str__(self) -> str:
if self.value:
return ", ".join(flag.name for flag in PlandoOptions if self.value & flag.value)
return ", ".join(str(flag.name) for flag in PlandoOptions if self.value & flag.value)
return "None"
seeddigits = 20
def get_seed(seed=None) -> int:
def get_seed(seed: Optional[int] = None) -> int:
if seed is None:
random.seed(None)
return random.randint(0, pow(10, seeddigits) - 1)