mirror of
https://github.com/ArchipelagoMW/Archipelago.git
synced 2026-04-23 19:13:32 -07:00
Merge branch 'main' into core_filler_creation_reason
This commit is contained in:
528
BaseClasses.py
528
BaseClasses.py
@@ -5,12 +5,14 @@ import functools
|
||||
import logging
|
||||
import random
|
||||
import secrets
|
||||
import warnings
|
||||
from argparse import Namespace
|
||||
from collections import Counter, deque
|
||||
from collections.abc import Collection, MutableSequence
|
||||
from collections import Counter, deque, defaultdict
|
||||
from collections.abc import Callable, Collection, Iterable, Iterator, Mapping, MutableSequence, Set
|
||||
from enum import IntEnum, IntFlag
|
||||
from typing import (AbstractSet, Any, Callable, ClassVar, Dict, Iterable, Iterator, List, Mapping, NamedTuple,
|
||||
Optional, Protocol, Set, Tuple, Union, TYPE_CHECKING)
|
||||
from typing import (AbstractSet, Any, ClassVar, Dict, List, Literal, NamedTuple,
|
||||
Optional, Protocol, Tuple, Union, TYPE_CHECKING, overload)
|
||||
import dataclasses
|
||||
|
||||
from typing_extensions import NotRequired, TypedDict
|
||||
|
||||
@@ -20,6 +22,7 @@ import Utils
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from entrance_rando import ERPlacementState
|
||||
from rule_builder.rules import Rule
|
||||
from worlds import AutoWorld
|
||||
|
||||
|
||||
@@ -54,12 +57,21 @@ class HasNameAndPlayer(Protocol):
|
||||
player: int
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class PlandoItemBlock:
|
||||
player: int
|
||||
from_pool: bool
|
||||
force: bool | Literal["silent"]
|
||||
worlds: set[int] = dataclasses.field(default_factory=set)
|
||||
items: list[str] = dataclasses.field(default_factory=list)
|
||||
locations: list[str] = dataclasses.field(default_factory=list)
|
||||
resolved_locations: list[Location] = dataclasses.field(default_factory=list)
|
||||
count: dict[str, int] = dataclasses.field(default_factory=dict)
|
||||
|
||||
|
||||
class MultiWorld():
|
||||
debug_types = False
|
||||
player_name: Dict[int, str]
|
||||
plando_texts: List[Dict[str, str]]
|
||||
plando_items: List[List[Dict[str, Any]]]
|
||||
plando_connections: List
|
||||
worlds: Dict[int, "AutoWorld.World"]
|
||||
groups: Dict[int, Group]
|
||||
regions: RegionManager
|
||||
@@ -74,7 +86,7 @@ class MultiWorld():
|
||||
local_items: Dict[int, Options.LocalItems]
|
||||
non_local_items: Dict[int, Options.NonLocalItems]
|
||||
progression_balancing: Dict[int, Options.ProgressionBalancing]
|
||||
completion_condition: Dict[int, Callable[[CollectionState], bool]]
|
||||
completion_condition: Dict[int, CollectionRule]
|
||||
indirect_connections: Dict[Region, Set[Entrance]]
|
||||
exclude_locations: Dict[int, Options.ExcludeLocations]
|
||||
priority_locations: Dict[int, Options.PriorityLocations]
|
||||
@@ -83,6 +95,8 @@ class MultiWorld():
|
||||
start_location_hints: Dict[int, Options.StartLocationHints]
|
||||
item_links: Dict[int, Options.ItemLinks]
|
||||
|
||||
plando_item_blocks: Dict[int, List[PlandoItemBlock]]
|
||||
|
||||
game: Dict[int, str]
|
||||
|
||||
random: random.Random
|
||||
@@ -141,17 +155,11 @@ class MultiWorld():
|
||||
self.algorithm = 'balanced'
|
||||
self.groups = {}
|
||||
self.regions = self.RegionManager(players)
|
||||
self.shops = []
|
||||
self.itempool = []
|
||||
self.seed = None
|
||||
self.seed_name: str = "Unavailable"
|
||||
self.precollected_items = {player: [] for player in self.player_ids}
|
||||
self.required_locations = []
|
||||
self.light_world_light_cone = False
|
||||
self.dark_world_light_cone = False
|
||||
self.rupoor_cost = 10
|
||||
self.aga_randomness = True
|
||||
self.save_and_quit_from_boss = True
|
||||
self.custom = False
|
||||
self.customitemarray = []
|
||||
self.shuffle_ganon = True
|
||||
@@ -160,18 +168,17 @@ class MultiWorld():
|
||||
self.local_early_items = {player: {} for player in self.player_ids}
|
||||
self.indirect_connections = {}
|
||||
self.start_inventory_from_pool: Dict[int, Options.StartInventoryPool] = {}
|
||||
self.plando_item_blocks = {}
|
||||
|
||||
for player in range(1, players + 1):
|
||||
def set_player_attr(attr: str, val) -> None:
|
||||
self.__dict__.setdefault(attr, {})[player] = val
|
||||
set_player_attr('plando_items', [])
|
||||
set_player_attr('plando_texts', {})
|
||||
set_player_attr('plando_connections', [])
|
||||
set_player_attr('plando_item_blocks', [])
|
||||
set_player_attr('game', "Archipelago")
|
||||
set_player_attr('completion_condition', lambda state: True)
|
||||
self.worlds = {}
|
||||
self.per_slot_randoms = Utils.DeprecateDict("Using per_slot_randoms is now deprecated. Please use the "
|
||||
"world's random object instead (usually self.random)")
|
||||
"world's random object instead (usually self.random)", True)
|
||||
self.plando_options = PlandoOptions.none
|
||||
|
||||
def get_all_ids(self) -> Tuple[int, ...]:
|
||||
@@ -216,17 +223,8 @@ class MultiWorld():
|
||||
self.seed_name = name if name else str(self.seed)
|
||||
|
||||
def set_options(self, args: Namespace) -> None:
|
||||
# TODO - remove this section once all worlds use options dataclasses
|
||||
from worlds import AutoWorld
|
||||
|
||||
all_keys: Set[str] = {key for player in self.player_ids for key in
|
||||
AutoWorld.AutoWorldRegister.world_types[self.game[player]].options_dataclass.type_hints}
|
||||
for option_key in all_keys:
|
||||
option = Utils.DeprecateDict(f"Getting options from multiworld is now deprecated. "
|
||||
f"Please use `self.options.{option_key}` instead.")
|
||||
option.update(getattr(args, option_key, {}))
|
||||
setattr(self, option_key, option)
|
||||
|
||||
for player in self.player_ids:
|
||||
world_type = AutoWorld.AutoWorldRegister.world_types[self.game[player]]
|
||||
self.worlds[player] = world_type(self, player)
|
||||
@@ -264,6 +262,7 @@ class MultiWorld():
|
||||
"local_items": set(item_link.get("local_items", [])),
|
||||
"non_local_items": set(item_link.get("non_local_items", [])),
|
||||
"link_replacement": replacement_prio.index(item_link["link_replacement"]),
|
||||
"skip_if_solo": item_link.get("skip_if_solo", False),
|
||||
}
|
||||
|
||||
for _name, item_link in item_links.items():
|
||||
@@ -287,6 +286,8 @@ class MultiWorld():
|
||||
|
||||
for group_name, item_link in item_links.items():
|
||||
game = item_link["game"]
|
||||
if item_link["skip_if_solo"] and len(item_link["players"]) == 1:
|
||||
continue
|
||||
group_id, group = self.add_group(group_name, game, set(item_link["players"]))
|
||||
|
||||
group["item_pool"] = item_link["item_pool"]
|
||||
@@ -428,23 +429,39 @@ class MultiWorld():
|
||||
def get_location(self, location_name: str, player: int) -> Location:
|
||||
return self.regions.location_cache[player][location_name]
|
||||
|
||||
def get_all_state(self, use_cache: bool, allow_partial_entrances: bool = False) -> CollectionState:
|
||||
cached = getattr(self, "_all_state", None)
|
||||
if use_cache and cached:
|
||||
return cached.copy()
|
||||
def get_all_state(self, use_cache: bool | None = None, allow_partial_entrances: bool = False,
|
||||
collect_pre_fill_items: bool = True, perform_sweep: bool = True) -> CollectionState:
|
||||
"""
|
||||
Creates a new CollectionState, and collects all precollected items, all items in the multiworld itempool, those
|
||||
specified in each worlds' `get_pre_fill_items()`, and then sweeps the multiworld collecting any other items
|
||||
it is able to reach, building as complete of a completed game state as possible.
|
||||
|
||||
:param use_cache: Deprecated and unused.
|
||||
:param allow_partial_entrances: Whether the CollectionState should allow for disconnected entrances while
|
||||
sweeping, such as before entrance randomization is complete.
|
||||
:param collect_pre_fill_items: Whether the items in each worlds' `get_pre_fill_items()` should be added to this
|
||||
state.
|
||||
:param perform_sweep: Whether this state should perform a sweep for reachable locations, collecting any placed
|
||||
items it can.
|
||||
|
||||
:return: The completed CollectionState.
|
||||
"""
|
||||
if __debug__ and use_cache is not None:
|
||||
# TODO swap to Utils.deprecate when we want this to crash on source and warn on frozen
|
||||
warnings.warn("multiworld.get_all_state no longer caches all_state and this argument will be removed.",
|
||||
DeprecationWarning)
|
||||
ret = CollectionState(self, allow_partial_entrances)
|
||||
|
||||
for item in self.itempool:
|
||||
self.worlds[item.player].collect(ret, item)
|
||||
for player in self.player_ids:
|
||||
subworld = self.worlds[player]
|
||||
for item in subworld.get_pre_fill_items():
|
||||
subworld.collect(ret, item)
|
||||
ret.sweep_for_advancements()
|
||||
if collect_pre_fill_items:
|
||||
for player in self.player_ids:
|
||||
subworld = self.worlds[player]
|
||||
for item in subworld.get_pre_fill_items():
|
||||
subworld.collect(ret, item)
|
||||
if perform_sweep:
|
||||
ret.sweep_for_advancements()
|
||||
|
||||
if use_cache:
|
||||
self._all_state = ret
|
||||
return ret
|
||||
|
||||
def get_items(self) -> List[Item]:
|
||||
@@ -546,7 +563,9 @@ class MultiWorld():
|
||||
else:
|
||||
return all((self.has_beaten_game(state, p) for p in range(1, self.players + 1)))
|
||||
|
||||
def can_beat_game(self, starting_state: Optional[CollectionState] = None) -> bool:
|
||||
def can_beat_game(self,
|
||||
starting_state: Optional[CollectionState] = None,
|
||||
locations: Optional[Iterable[Location]] = None) -> bool:
|
||||
if starting_state:
|
||||
if self.has_beaten_game(starting_state):
|
||||
return True
|
||||
@@ -555,25 +574,10 @@ class MultiWorld():
|
||||
state = CollectionState(self)
|
||||
if self.has_beaten_game(state):
|
||||
return True
|
||||
prog_locations = {location for location in self.get_locations() if location.item
|
||||
and location.item.advancement and location not in state.locations_checked}
|
||||
|
||||
while prog_locations:
|
||||
sphere: Set[Location] = set()
|
||||
# build up spheres of collection radius.
|
||||
# Everything in each sphere is independent from each other in dependencies and only depends on lower spheres
|
||||
for location in prog_locations:
|
||||
if location.can_reach(state):
|
||||
sphere.add(location)
|
||||
|
||||
if not sphere:
|
||||
# ran out of places and did not finish yet, quit
|
||||
return False
|
||||
|
||||
for location in sphere:
|
||||
state.collect(location.item, True, location)
|
||||
prog_locations -= sphere
|
||||
|
||||
for _ in state.sweep_for_advancements(locations,
|
||||
yield_each_sweep=True,
|
||||
checked_locations=state.locations_checked):
|
||||
if self.has_beaten_game(state):
|
||||
return True
|
||||
|
||||
@@ -617,7 +621,7 @@ class MultiWorld():
|
||||
locations: Set[Location] = set()
|
||||
events: Set[Location] = set()
|
||||
for location in self.get_filled_locations():
|
||||
if type(location.item.code) is int:
|
||||
if type(location.item.code) is int and type(location.address) is int:
|
||||
locations.add(location)
|
||||
else:
|
||||
events.add(location)
|
||||
@@ -689,6 +693,12 @@ class MultiWorld():
|
||||
sphere.append(locations.pop(n))
|
||||
|
||||
if not sphere:
|
||||
if __debug__:
|
||||
from Fill import FillError
|
||||
raise FillError(
|
||||
f"Could not access required locations for accessibility check. Missing: {locations}",
|
||||
multiworld=self,
|
||||
)
|
||||
# ran out of places and did not finish yet, quit
|
||||
logging.warning(f"Could not access required locations for accessibility check."
|
||||
f" Missing: {locations}")
|
||||
@@ -718,12 +728,14 @@ class CollectionState():
|
||||
advancements: Set[Location]
|
||||
path: Dict[Union[Region, Entrance], PathValue]
|
||||
locations_checked: Set[Location]
|
||||
"""Internal cache for Advancement Locations already checked by this CollectionState. Not for use in logic."""
|
||||
stale: Dict[int, bool]
|
||||
allow_partial_entrances: bool
|
||||
additional_init_functions: List[Callable[[CollectionState, MultiWorld], None]] = []
|
||||
additional_copy_functions: List[Callable[[CollectionState, CollectionState], CollectionState]] = []
|
||||
|
||||
def __init__(self, parent: MultiWorld, allow_partial_entrances: bool = False):
|
||||
assert parent.worlds, "CollectionState created without worlds initialized in parent"
|
||||
self.prog_items = {player: Counter() for player in parent.get_all_ids()}
|
||||
self.multiworld = parent
|
||||
self.reachable_regions = {player: set() for player in parent.get_all_ids()}
|
||||
@@ -757,7 +769,7 @@ class CollectionState():
|
||||
else:
|
||||
self._update_reachable_regions_auto_indirect_conditions(player, queue)
|
||||
|
||||
def _update_reachable_regions_explicit_indirect_conditions(self, player: int, queue: deque):
|
||||
def _update_reachable_regions_explicit_indirect_conditions(self, player: int, queue: deque[Entrance]):
|
||||
reachable_regions = self.reachable_regions[player]
|
||||
blocked_connections = self.blocked_connections[player]
|
||||
# run BFS on all connections, and keep track of those blocked by missing items
|
||||
@@ -775,13 +787,16 @@ class CollectionState():
|
||||
blocked_connections.update(new_region.exits)
|
||||
queue.extend(new_region.exits)
|
||||
self.path[new_region] = (new_region.name, self.path.get(connection, None))
|
||||
self.multiworld.worlds[player].reached_region(self, new_region)
|
||||
|
||||
# Retry connections if the new region can unblock them
|
||||
for new_entrance in self.multiworld.indirect_connections.get(new_region, set()):
|
||||
if new_entrance in blocked_connections and new_entrance not in queue:
|
||||
queue.append(new_entrance)
|
||||
entrances = self.multiworld.indirect_connections.get(new_region)
|
||||
if entrances is not None:
|
||||
relevant_entrances = entrances.intersection(blocked_connections)
|
||||
relevant_entrances.difference_update(queue)
|
||||
queue.extend(relevant_entrances)
|
||||
|
||||
def _update_reachable_regions_auto_indirect_conditions(self, player: int, queue: deque):
|
||||
def _update_reachable_regions_auto_indirect_conditions(self, player: int, queue: deque[Entrance]):
|
||||
reachable_regions = self.reachable_regions[player]
|
||||
blocked_connections = self.blocked_connections[player]
|
||||
new_connection: bool = True
|
||||
@@ -803,6 +818,7 @@ class CollectionState():
|
||||
queue.extend(new_region.exits)
|
||||
self.path[new_region] = (new_region.name, self.path.get(connection, None))
|
||||
new_connection = True
|
||||
self.multiworld.worlds[player].reached_region(self, new_region)
|
||||
# sweep for indirect connections, mostly Entrance.can_reach(unrelated_Region)
|
||||
queue.extend(blocked_connections)
|
||||
|
||||
@@ -851,40 +867,172 @@ class CollectionState():
|
||||
"Please switch over to sweep_for_advancements.")
|
||||
return self.sweep_for_advancements(locations)
|
||||
|
||||
def sweep_for_advancements(self, locations: Optional[Iterable[Location]] = None) -> None:
|
||||
if locations is None:
|
||||
locations = self.multiworld.get_filled_locations()
|
||||
reachable_advancements = True
|
||||
# since the loop has a good chance to run more than once, only filter the advancements once
|
||||
locations = {location for location in locations if location.advancement and location not in self.advancements}
|
||||
def _sweep_for_advancements_impl(self, advancements_per_player: List[Tuple[int, List[Location]]],
|
||||
yield_each_sweep: bool) -> Iterator[None]:
|
||||
"""
|
||||
The implementation for sweep_for_advancements is separated here because it returns a generator due to the use
|
||||
of a yield statement.
|
||||
"""
|
||||
all_players = {player for player, _ in advancements_per_player}
|
||||
players_to_check = all_players
|
||||
# As an optimization, it is assumed that each player's world only logically depends on itself. However, worlds
|
||||
# are allowed to logically depend on other worlds, so once there are no more players that should be checked
|
||||
# under this assumption, an extra sweep iteration is performed that checks every player, to confirm that the
|
||||
# sweep is finished.
|
||||
checking_if_finished = False
|
||||
while players_to_check:
|
||||
next_advancements_per_player: List[Tuple[int, List[Location]]] = []
|
||||
next_players_to_check = set()
|
||||
|
||||
while reachable_advancements:
|
||||
reachable_advancements = {location for location in locations if location.can_reach(self)}
|
||||
locations -= reachable_advancements
|
||||
for advancement in reachable_advancements:
|
||||
self.advancements.add(advancement)
|
||||
assert isinstance(advancement.item, Item), "tried to collect Event with no Item"
|
||||
self.collect(advancement.item, True, advancement)
|
||||
for player, locations in advancements_per_player:
|
||||
if player not in players_to_check:
|
||||
next_advancements_per_player.append((player, locations))
|
||||
continue
|
||||
|
||||
# Accessibility of each location is checked first because a player's region accessibility cache becomes
|
||||
# stale whenever one of their own items is collected into the state.
|
||||
reachable_locations: List[Location] = []
|
||||
unreachable_locations: List[Location] = []
|
||||
for location in locations:
|
||||
if location.can_reach(self):
|
||||
# Locations containing items that do not belong to `player` could be collected immediately
|
||||
# because they won't stale `player`'s region accessibility cache, but, for simplicity, all the
|
||||
# items at reachable locations are collected in a single loop.
|
||||
reachable_locations.append(location)
|
||||
else:
|
||||
unreachable_locations.append(location)
|
||||
if unreachable_locations:
|
||||
next_advancements_per_player.append((player, unreachable_locations))
|
||||
|
||||
# A previous player's locations processed in the current `while players_to_check` iteration could have
|
||||
# collected items belonging to `player`, but now that all of `player`'s reachable locations have been
|
||||
# found, it can be assumed that `player` will not gain any more reachable locations until another one of
|
||||
# their items is collected.
|
||||
# It would be clearer to not add players to `next_players_to_check` in the first place if they have yet
|
||||
# to be processed in the current `while players_to_check` iteration, but checking if a player should be
|
||||
# added to `next_players_to_check` would need to be run once for every item that is collected, so it is
|
||||
# more performant to instead discard `player` from `next_players_to_check` once their locations have
|
||||
# been processed.
|
||||
next_players_to_check.discard(player)
|
||||
|
||||
# Collect the items from the reachable locations.
|
||||
for advancement in reachable_locations:
|
||||
self.advancements.add(advancement)
|
||||
item = advancement.item
|
||||
assert isinstance(item, Item), "tried to collect advancement Location with no Item"
|
||||
if self.collect(item, True, advancement):
|
||||
# The player the item belongs to may be able to reach additional locations in the next sweep
|
||||
# iteration.
|
||||
next_players_to_check.add(item.player)
|
||||
|
||||
if not next_players_to_check:
|
||||
if not checking_if_finished:
|
||||
# It is assumed that each player's world only logically depends on itself, which may not be the
|
||||
# case, so confirm that the sweep is finished by doing an extra iteration that checks every player.
|
||||
checking_if_finished = True
|
||||
next_players_to_check = all_players
|
||||
else:
|
||||
checking_if_finished = False
|
||||
|
||||
players_to_check = next_players_to_check
|
||||
advancements_per_player = next_advancements_per_player
|
||||
|
||||
if yield_each_sweep:
|
||||
yield
|
||||
|
||||
@overload
|
||||
def sweep_for_advancements(self, locations: Optional[Iterable[Location]] = None, *,
|
||||
yield_each_sweep: Literal[True],
|
||||
checked_locations: Optional[Set[Location]] = None) -> Iterator[None]: ...
|
||||
|
||||
@overload
|
||||
def sweep_for_advancements(self, locations: Optional[Iterable[Location]] = None,
|
||||
yield_each_sweep: Literal[False] = False,
|
||||
checked_locations: Optional[Set[Location]] = None) -> None: ...
|
||||
|
||||
def sweep_for_advancements(self, locations: Optional[Iterable[Location]] = None, yield_each_sweep: bool = False,
|
||||
checked_locations: Optional[Set[Location]] = None) -> Optional[Iterator[None]]:
|
||||
"""
|
||||
Sweep through the locations that contain uncollected advancement items, collecting the items into the state
|
||||
until there are no more reachable locations that contain uncollected advancement items.
|
||||
|
||||
:param locations: The locations to sweep through, defaulting to all locations in the multiworld.
|
||||
:param yield_each_sweep: When True, return a generator that yields at the end of each sweep iteration.
|
||||
:param checked_locations: Optional override of locations to filter out from the locations argument, defaults to
|
||||
self.advancements when None.
|
||||
"""
|
||||
if checked_locations is None:
|
||||
checked_locations = self.advancements
|
||||
|
||||
# Since the sweep loop usually performs many iterations, the locations are filtered in advance.
|
||||
# A list of tuples is used, instead of a dictionary, because it is faster to iterate.
|
||||
advancements_per_player: List[Tuple[int, List[Location]]]
|
||||
if locations is None:
|
||||
# `location.advancement` can only be True for filled locations, so unfilled locations are filtered out.
|
||||
advancements_per_player = []
|
||||
for player, locations_dict in self.multiworld.regions.location_cache.items():
|
||||
filtered_locations = [location for location in locations_dict.values()
|
||||
if location.advancement and location not in checked_locations]
|
||||
if filtered_locations:
|
||||
advancements_per_player.append((player, filtered_locations))
|
||||
else:
|
||||
# Filter and separate the locations into a list for each player.
|
||||
advancements_per_player_dict: Dict[int, List[Location]] = defaultdict(list)
|
||||
for location in locations:
|
||||
if location.advancement and location not in checked_locations:
|
||||
advancements_per_player_dict[location.player].append(location)
|
||||
# Convert to a list of tuples.
|
||||
advancements_per_player = list(advancements_per_player_dict.items())
|
||||
del advancements_per_player_dict
|
||||
|
||||
if yield_each_sweep:
|
||||
# Return a generator that will yield at the end of each sweep iteration.
|
||||
return self._sweep_for_advancements_impl(advancements_per_player, True)
|
||||
else:
|
||||
# Create the generator, but tell it not to yield anything, so it will run to completion in zero iterations
|
||||
# once started, then start and exhaust the generator by attempting to iterate it.
|
||||
for _ in self._sweep_for_advancements_impl(advancements_per_player, False):
|
||||
assert False, "Generator yielded when it should have run to completion without yielding"
|
||||
return None
|
||||
|
||||
# item name related
|
||||
def has(self, item: str, player: int, count: int = 1) -> bool:
|
||||
return self.prog_items[player][item] >= count
|
||||
|
||||
# for loops are specifically used in all/any/count methods, instead of all()/any()/sum(), to avoid the overhead of
|
||||
# creating and iterating generator instances. In `return all(player_prog_items[item] for item in items)`, the
|
||||
# argument to all() would be a new generator instance, for example.
|
||||
def has_all(self, items: Iterable[str], player: int) -> bool:
|
||||
"""Returns True if each item name of items is in state at least once."""
|
||||
return all(self.prog_items[player][item] for item in items)
|
||||
player_prog_items = self.prog_items[player]
|
||||
for item in items:
|
||||
if not player_prog_items[item]:
|
||||
return False
|
||||
return True
|
||||
|
||||
def has_any(self, items: Iterable[str], player: int) -> bool:
|
||||
"""Returns True if at least one item name of items is in state at least once."""
|
||||
return any(self.prog_items[player][item] for item in items)
|
||||
player_prog_items = self.prog_items[player]
|
||||
for item in items:
|
||||
if player_prog_items[item]:
|
||||
return True
|
||||
return False
|
||||
|
||||
def has_all_counts(self, item_counts: Mapping[str, int], player: int) -> bool:
|
||||
"""Returns True if each item name is in the state at least as many times as specified."""
|
||||
return all(self.prog_items[player][item] >= count for item, count in item_counts.items())
|
||||
player_prog_items = self.prog_items[player]
|
||||
for item, count in item_counts.items():
|
||||
if player_prog_items[item] < count:
|
||||
return False
|
||||
return True
|
||||
|
||||
def has_any_count(self, item_counts: Mapping[str, int], player: int) -> bool:
|
||||
"""Returns True if at least one item name is in the state at least as many times as specified."""
|
||||
return any(self.prog_items[player][item] >= count for item, count in item_counts.items())
|
||||
player_prog_items = self.prog_items[player]
|
||||
for item, count in item_counts.items():
|
||||
if player_prog_items[item] >= count:
|
||||
return True
|
||||
return False
|
||||
|
||||
def count(self, item: str, player: int) -> int:
|
||||
return self.prog_items[player][item]
|
||||
@@ -912,11 +1060,20 @@ class CollectionState():
|
||||
|
||||
def count_from_list(self, items: Iterable[str], player: int) -> int:
|
||||
"""Returns the cumulative count of items from a list present in state."""
|
||||
return sum(self.prog_items[player][item_name] for item_name in items)
|
||||
player_prog_items = self.prog_items[player]
|
||||
total = 0
|
||||
for item_name in items:
|
||||
total += player_prog_items[item_name]
|
||||
return total
|
||||
|
||||
def count_from_list_unique(self, items: Iterable[str], player: int) -> int:
|
||||
"""Returns the cumulative count of items from a list present in state. Ignores duplicates of the same item."""
|
||||
return sum(self.prog_items[player][item_name] > 0 for item_name in items)
|
||||
player_prog_items = self.prog_items[player]
|
||||
total = 0
|
||||
for item_name in items:
|
||||
if player_prog_items[item_name] > 0:
|
||||
total += 1
|
||||
return total
|
||||
|
||||
# item name group related
|
||||
def has_group(self, item_name_group: str, player: int, count: int = 1) -> bool:
|
||||
@@ -972,6 +1129,17 @@ class CollectionState():
|
||||
|
||||
return changed
|
||||
|
||||
def add_item(self, item: str, player: int, count: int = 1) -> None:
|
||||
"""
|
||||
Adds the item to state.
|
||||
|
||||
:param item: The item to be added.
|
||||
:param player: The player the item is for.
|
||||
:param count: How many of the item to add.
|
||||
"""
|
||||
assert count > 0
|
||||
self.prog_items[player][item] += count
|
||||
|
||||
def remove(self, item: Item):
|
||||
changed = self.multiworld.worlds[item.player].remove(self, item)
|
||||
if changed:
|
||||
@@ -980,6 +1148,37 @@ class CollectionState():
|
||||
self.blocked_connections[item.player] = set()
|
||||
self.stale[item.player] = True
|
||||
|
||||
def remove_item(self, item: str, player: int, count: int = 1) -> None:
|
||||
"""
|
||||
Removes the item from state.
|
||||
|
||||
:param item: The item to be removed.
|
||||
:param player: The player the item is for.
|
||||
:param count: How many of the item to remove.
|
||||
"""
|
||||
assert count > 0
|
||||
self.prog_items[player][item] -= count
|
||||
if self.prog_items[player][item] < 1:
|
||||
del (self.prog_items[player][item])
|
||||
|
||||
def set_item(self, item: str, player: int, count: int) -> None:
|
||||
"""
|
||||
Sets the item in state equal to the provided count.
|
||||
|
||||
:param item: The item to modify.
|
||||
:param player: The player the item is for.
|
||||
:param count: How many of the item to now have.
|
||||
"""
|
||||
assert count >= 0
|
||||
if count == 0:
|
||||
del (self.prog_items[player][item])
|
||||
else:
|
||||
self.prog_items[player][item] = count
|
||||
|
||||
|
||||
CollectionRule = Callable[[CollectionState], bool]
|
||||
DEFAULT_COLLECTION_RULE: CollectionRule = staticmethod(lambda state: True)
|
||||
|
||||
|
||||
class EntranceType(IntEnum):
|
||||
ONE_WAY = 1
|
||||
@@ -987,7 +1186,7 @@ class EntranceType(IntEnum):
|
||||
|
||||
|
||||
class Entrance:
|
||||
access_rule: Callable[[CollectionState], bool] = staticmethod(lambda state: True)
|
||||
access_rule: CollectionRule = DEFAULT_COLLECTION_RULE
|
||||
hide_path: bool = False
|
||||
player: int
|
||||
name: str
|
||||
@@ -995,9 +1194,6 @@ class Entrance:
|
||||
connected_region: Optional[Region] = None
|
||||
randomization_group: int
|
||||
randomization_type: EntranceType
|
||||
# LttP specific, TODO: should make a LttPEntrance
|
||||
addresses = None
|
||||
target = None
|
||||
|
||||
def __init__(self, player: int, name: str = "", parent: Optional[Region] = None,
|
||||
randomization_group: int = 0, randomization_type: EntranceType = EntranceType.ONE_WAY) -> None:
|
||||
@@ -1016,10 +1212,8 @@ class Entrance:
|
||||
|
||||
return False
|
||||
|
||||
def connect(self, region: Region, addresses: Any = None, target: Any = None) -> None:
|
||||
def connect(self, region: Region) -> None:
|
||||
self.connected_region = region
|
||||
self.target = target
|
||||
self.addresses = addresses
|
||||
region.entrances.append(self)
|
||||
|
||||
def is_valid_source_transition(self, er_state: "ERPlacementState") -> bool:
|
||||
@@ -1071,13 +1265,16 @@ class Region:
|
||||
self.region_manager = region_manager
|
||||
|
||||
def __getitem__(self, index: int) -> Location:
|
||||
return self._list.__getitem__(index)
|
||||
return self._list[index]
|
||||
|
||||
def __setitem__(self, index: int, value: Location) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
def __len__(self) -> int:
|
||||
return self._list.__len__()
|
||||
return len(self._list)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._list)
|
||||
|
||||
# This seems to not be needed, but that's a bit suspicious.
|
||||
# def __del__(self):
|
||||
@@ -1088,8 +1285,8 @@ class Region:
|
||||
|
||||
class LocationRegister(Register):
|
||||
def __delitem__(self, index: int) -> None:
|
||||
location: Location = self._list.__getitem__(index)
|
||||
self._list.__delitem__(index)
|
||||
location: Location = self._list[index]
|
||||
del self._list[index]
|
||||
del(self.region_manager.location_cache[location.player][location.name])
|
||||
|
||||
def insert(self, index: int, value: Location) -> None:
|
||||
@@ -1100,8 +1297,8 @@ class Region:
|
||||
|
||||
class EntranceRegister(Register):
|
||||
def __delitem__(self, index: int) -> None:
|
||||
entrance: Entrance = self._list.__getitem__(index)
|
||||
self._list.__delitem__(index)
|
||||
entrance: Entrance = self._list[index]
|
||||
del self._list[index]
|
||||
del(self.region_manager.entrance_cache[entrance.player][entrance.name])
|
||||
|
||||
def insert(self, index: int, value: Entrance) -> None:
|
||||
@@ -1160,8 +1357,7 @@ class Region:
|
||||
for entrance in self.entrances: # BFS might be better here, trying DFS for now.
|
||||
return entrance.parent_region.get_connecting_entrance(is_main_entrance)
|
||||
|
||||
def add_locations(self, locations: Dict[str, Optional[int]],
|
||||
location_type: Optional[type[Location]] = None) -> None:
|
||||
def add_locations(self, locations: Mapping[str, int | None], location_type: type[Location] | None = None) -> None:
|
||||
"""
|
||||
Adds locations to the Region object, where location_type is your Location class and locations is a dict of
|
||||
location names to address.
|
||||
@@ -1173,8 +1369,50 @@ class Region:
|
||||
for location, address in locations.items():
|
||||
self.locations.append(location_type(self.player, location, address, self))
|
||||
|
||||
def add_event(
|
||||
self,
|
||||
location_name: str,
|
||||
item_name: str | None = None,
|
||||
rule: CollectionRule | Rule[Any] | None = None,
|
||||
location_type: type[Location] | None = None,
|
||||
item_type: type[Item] | None = None,
|
||||
show_in_spoiler: bool = True,
|
||||
) -> Item:
|
||||
"""
|
||||
Adds an event location/item pair to the region.
|
||||
|
||||
:param location_name: Name for the event location.
|
||||
:param item_name: Name for the event item. If not provided, defaults to location_name.
|
||||
:param rule: Callable to determine access for this event location within its region.
|
||||
:param location_type: Location class to create the event location with. Defaults to BaseClasses.Location.
|
||||
:param item_type: Item class to create the event item with. Defaults to BaseClasses.Item.
|
||||
:param show_in_spoiler: Will be passed along to the created event Location's show_in_spoiler attribute.
|
||||
:return: The created Event Item
|
||||
"""
|
||||
if location_type is None:
|
||||
location_type = Location
|
||||
|
||||
if item_name is None:
|
||||
item_name = location_name
|
||||
|
||||
if item_type is None:
|
||||
item_type = Item
|
||||
|
||||
event_location = location_type(self.player, location_name, None, self)
|
||||
event_location.show_in_spoiler = show_in_spoiler
|
||||
if rule is not None:
|
||||
self.multiworld.worlds[self.player].set_rule(event_location, rule)
|
||||
|
||||
event_item = item_type(item_name, ItemClassification.progression, None, self.player)
|
||||
|
||||
event_location.place_locked_item(event_item)
|
||||
|
||||
self.locations.append(event_location)
|
||||
|
||||
return event_item
|
||||
|
||||
def connect(self, connecting_region: Region, name: Optional[str] = None,
|
||||
rule: Optional[Callable[[CollectionState], bool]] = None) -> Entrance:
|
||||
rule: Optional[CollectionRule | Rule[Any]] = None) -> Entrance:
|
||||
"""
|
||||
Connects this Region to another Region, placing the provided rule on the connection.
|
||||
|
||||
@@ -1182,8 +1420,8 @@ class Region:
|
||||
:param name: name of the connection being created
|
||||
:param rule: callable to determine access of this connection to go from self to the exiting_region"""
|
||||
exit_ = self.create_exit(name if name else f"{self.name} -> {connecting_region.name}")
|
||||
if rule:
|
||||
exit_.access_rule = rule
|
||||
if rule is not None:
|
||||
self.multiworld.worlds[self.player].set_rule(exit_, rule)
|
||||
exit_.connect(connecting_region)
|
||||
return exit_
|
||||
|
||||
@@ -1207,16 +1445,16 @@ class Region:
|
||||
entrance.connect(self)
|
||||
return entrance
|
||||
|
||||
def add_exits(self, exits: Union[Iterable[str], Dict[str, Optional[str]]],
|
||||
rules: Dict[str, Callable[[CollectionState], bool]] = None) -> List[Entrance]:
|
||||
def add_exits(self, exits: Iterable[str] | Mapping[str, str | None],
|
||||
rules: Mapping[str, CollectionRule | Rule[Any]] | None = None) -> List[Entrance]:
|
||||
"""
|
||||
Connects current region to regions in exit dictionary. Passed region names must exist first.
|
||||
|
||||
:param exits: exits from the region. format is {"connecting_region": "exit_name"}. if a non dict is provided,
|
||||
created entrances will be named "self.name -> connecting_region"
|
||||
:param rules: rules for the exits from this region. format is {"connecting_region", rule}
|
||||
created entrances will be named "self.name -> connecting_region"
|
||||
:param rules: rules for the exits from this region. format is {"connecting_region": rule}
|
||||
"""
|
||||
if not isinstance(exits, Dict):
|
||||
if not isinstance(exits, Mapping):
|
||||
exits = dict.fromkeys(exits)
|
||||
return [
|
||||
self.connect(
|
||||
@@ -1247,7 +1485,7 @@ class Location:
|
||||
show_in_spoiler: bool = True
|
||||
progress_type: LocationProgressType = LocationProgressType.DEFAULT
|
||||
always_allow: Callable[[CollectionState, Item], bool] = staticmethod(lambda state, item: False)
|
||||
access_rule: Callable[[CollectionState], bool] = staticmethod(lambda state: True)
|
||||
access_rule: CollectionRule = DEFAULT_COLLECTION_RULE
|
||||
item_rule: Callable[[Item], bool] = staticmethod(lambda item: True)
|
||||
item: Optional[Item] = None
|
||||
|
||||
@@ -1283,9 +1521,6 @@ class Location:
|
||||
multiworld = self.parent_region.multiworld if self.parent_region and self.parent_region.multiworld else None
|
||||
return multiworld.get_name_string_for_object(self) if multiworld else f'{self.name} (Player {self.player})'
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.name, self.player))
|
||||
|
||||
def __lt__(self, other: Location):
|
||||
return (self.player, self.name) < (other.player, other.name)
|
||||
|
||||
@@ -1309,31 +1544,47 @@ class Location:
|
||||
|
||||
|
||||
class ItemClassification(IntFlag):
|
||||
filler = 0b0000
|
||||
filler = 0b00000
|
||||
""" aka trash, as in filler items like ammo, currency etc """
|
||||
|
||||
progression = 0b0001
|
||||
progression = 0b00001
|
||||
""" Item that is logically relevant.
|
||||
Protects this item from being placed on excluded or unreachable locations. """
|
||||
|
||||
useful = 0b0010
|
||||
useful = 0b00010
|
||||
""" Item that is especially useful.
|
||||
Protects this item from being placed on excluded or unreachable locations.
|
||||
When combined with another flag like "progression", it means "an especially useful progression item". """
|
||||
|
||||
trap = 0b0100
|
||||
trap = 0b00100
|
||||
""" Item that is detrimental in some way. """
|
||||
|
||||
skip_balancing = 0b1000
|
||||
skip_balancing = 0b01000
|
||||
""" should technically never occur on its own
|
||||
Item that is logically relevant, but progression balancing should not touch.
|
||||
Typically currency or other counted items. """
|
||||
|
||||
progression_skip_balancing = 0b1001 # only progression gets balanced
|
||||
Possible reasons for why an item should not be pulled ahead by progression balancing:
|
||||
1. This item is quite insignificant, so pulling it earlier doesn't help (currency/etc.)
|
||||
2. It is important for the player experience that this item is evenly distributed in the seed (e.g. goal items) """
|
||||
|
||||
deprioritized = 0b10000
|
||||
""" Should technically never occur on its own.
|
||||
Will not be considered for priority locations,
|
||||
unless Priority Locations Fill runs out of regular progression items before filling all priority locations.
|
||||
|
||||
Should be used for items that would feel bad for the player to find on a priority location.
|
||||
Usually, these are items that are plentiful or insignificant. """
|
||||
|
||||
progression_deprioritized_skip_balancing = 0b11001
|
||||
""" Since a common case of both skip_balancing and deprioritized is "insignificant progression",
|
||||
these items often want both flags. """
|
||||
|
||||
progression_skip_balancing = 0b01001 # only progression gets balanced
|
||||
progression_deprioritized = 0b10001 # only progression can be placed during priority fill
|
||||
|
||||
def as_flag(self) -> int:
|
||||
"""As Network API flag int."""
|
||||
return int(self & 0b0111)
|
||||
return int(self & 0b00111)
|
||||
|
||||
|
||||
class Item:
|
||||
@@ -1377,6 +1628,10 @@ class Item:
|
||||
def trap(self) -> bool:
|
||||
return ItemClassification.trap in self.classification
|
||||
|
||||
@property
|
||||
def deprioritized(self) -> bool:
|
||||
return ItemClassification.deprioritized in self.classification
|
||||
|
||||
@property
|
||||
def filler(self) -> bool:
|
||||
return not (self.advancement or self.useful or self.trap)
|
||||
@@ -1389,6 +1644,10 @@ class Item:
|
||||
def flags(self) -> int:
|
||||
return self.classification.as_flag()
|
||||
|
||||
@property
|
||||
def is_event(self) -> bool:
|
||||
return self.code is None
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if not isinstance(other, Item):
|
||||
return NotImplemented
|
||||
@@ -1473,30 +1732,29 @@ class Spoiler:
|
||||
logging.debug('The following items could not be reached: %s', ['%s (Player %d) at %s (Player %d)' % (
|
||||
location.item.name, location.item.player, location.name, location.player) for location in
|
||||
sphere_candidates])
|
||||
if any([multiworld.worlds[location.item.player].options.accessibility != 'minimal' for location in sphere_candidates]):
|
||||
raise RuntimeError(f'Not all progression items reachable ({sphere_candidates}). '
|
||||
f'Something went terribly wrong here.')
|
||||
if not multiworld.has_beaten_game(state):
|
||||
raise RuntimeError("During playthrough generation, the game was determined to be unbeatable. "
|
||||
"Something went terribly wrong here. "
|
||||
f"Unreachable progression items: {sphere_candidates}")
|
||||
else:
|
||||
self.unreachables = sphere_candidates
|
||||
break
|
||||
|
||||
# in the second phase, we cull each sphere such that the game is still beatable,
|
||||
# reducing each range of influence to the bare minimum required inside it
|
||||
restore_later: Dict[Location, Item] = {}
|
||||
required_locations = {location for sphere in collection_spheres for location in sphere}
|
||||
for num, sphere in reversed(tuple(enumerate(collection_spheres))):
|
||||
to_delete: Set[Location] = set()
|
||||
for location in sphere:
|
||||
# we remove the item at location and check if game is still beatable
|
||||
# we remove the location from required_locations to sweep from, and check if the game is still beatable
|
||||
logging.debug('Checking if %s (Player %d) is required to beat the game.', location.item.name,
|
||||
location.item.player)
|
||||
old_item = location.item
|
||||
location.item = None
|
||||
if multiworld.can_beat_game(state_cache[num]):
|
||||
required_locations.remove(location)
|
||||
if multiworld.can_beat_game(state_cache[num], required_locations):
|
||||
to_delete.add(location)
|
||||
restore_later[location] = old_item
|
||||
else:
|
||||
# still required, got to keep it around
|
||||
location.item = old_item
|
||||
required_locations.add(location)
|
||||
|
||||
# cull entries in spheres for spoiler walkthrough at end
|
||||
sphere -= to_delete
|
||||
@@ -1513,7 +1771,7 @@ class Spoiler:
|
||||
logging.debug('Checking if %s (Player %d) is required to beat the game.', item.name, item.player)
|
||||
precollected_items.remove(item)
|
||||
multiworld.state.remove(item)
|
||||
if not multiworld.can_beat_game():
|
||||
if not multiworld.can_beat_game(multiworld.state, required_locations):
|
||||
# Add the item back into `precollected_items` and collect it into `multiworld.state`.
|
||||
multiworld.push_precollected(item)
|
||||
else:
|
||||
@@ -1555,9 +1813,6 @@ class Spoiler:
|
||||
self.create_paths(state, collection_spheres)
|
||||
|
||||
# repair the multiworld again
|
||||
for location, item in restore_later.items():
|
||||
location.item = item
|
||||
|
||||
for item in removed_precollected:
|
||||
multiworld.push_precollected(item)
|
||||
|
||||
@@ -1614,6 +1869,9 @@ class Spoiler:
|
||||
Utils.__version__, self.multiworld.seed))
|
||||
outfile.write('Filling Algorithm: %s\n' % self.multiworld.algorithm)
|
||||
outfile.write('Players: %d\n' % self.multiworld.players)
|
||||
if self.multiworld.players > 1:
|
||||
loc_count = len([loc for loc in self.multiworld.get_locations() if not loc.is_event])
|
||||
outfile.write('Total Location Count: %d\n' % loc_count)
|
||||
outfile.write(f'Plando Options: {self.multiworld.plando_options}\n')
|
||||
AutoWorld.call_stage(self.multiworld, "write_spoiler_header", outfile)
|
||||
|
||||
@@ -1622,6 +1880,9 @@ class Spoiler:
|
||||
outfile.write('\nPlayer %d: %s\n' % (player, self.multiworld.get_player_name(player)))
|
||||
outfile.write('Game: %s\n' % self.multiworld.game[player])
|
||||
|
||||
loc_count = len([loc for loc in self.multiworld.get_locations(player) if not loc.is_event])
|
||||
outfile.write('Location Count: %d\n' % loc_count)
|
||||
|
||||
for f_option, option in self.multiworld.worlds[player].options_dataclass.type_hints.items():
|
||||
write_option(f_option, option)
|
||||
|
||||
@@ -1658,7 +1919,8 @@ class Spoiler:
|
||||
if self.unreachables:
|
||||
outfile.write('\n\nUnreachable Progression Items:\n\n')
|
||||
outfile.write(
|
||||
'\n'.join(['%s: %s' % (unreachable.item, unreachable) for unreachable in self.unreachables]))
|
||||
'\n'.join(['%s: %s' % (unreachable.item, unreachable)
|
||||
for unreachable in sorted(self.unreachables)]))
|
||||
|
||||
if self.paths:
|
||||
outfile.write('\n\nPaths:\n\n')
|
||||
@@ -1685,7 +1947,7 @@ class Tutorial(NamedTuple):
|
||||
description: str
|
||||
language: str
|
||||
file_name: str
|
||||
link: str
|
||||
link: str # unused
|
||||
authors: List[str]
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user