Merge branch 'main' into NewSoupVi-patch-26

This commit is contained in:
NewSoupVi
2025-10-14 10:21:21 +02:00
committed by GitHub
1468 changed files with 284743 additions and 60207 deletions

541
Fill.py
View File

@@ -4,7 +4,7 @@ import logging
import typing
from collections import Counter, deque
from BaseClasses import CollectionState, Item, Location, LocationProgressType, MultiWorld
from BaseClasses import CollectionState, Item, Location, LocationProgressType, MultiWorld, PlandoItemBlock
from Options import Accessibility
from worlds.AutoWorld import call_all
@@ -36,7 +36,8 @@ def sweep_from_pool(base_state: CollectionState, itempool: typing.Sequence[Item]
def fill_restrictive(multiworld: MultiWorld, base_state: CollectionState, locations: typing.List[Location],
item_pool: typing.List[Item], single_player_placement: bool = False, lock: bool = False,
swap: bool = True, on_place: typing.Optional[typing.Callable[[Location], None]] = None,
allow_partial: bool = False, allow_excluded: bool = False, name: str = "Unknown") -> None:
allow_partial: bool = False, allow_excluded: bool = False, one_item_per_player: bool = True,
name: str = "Unknown") -> None:
"""
:param multiworld: Multiworld to be filled.
:param base_state: State assumed before fill.
@@ -63,14 +64,24 @@ def fill_restrictive(multiworld: MultiWorld, base_state: CollectionState, locati
placed = 0
while any(reachable_items.values()) and locations:
# grab one item per player
items_to_place = [items.pop()
for items in reachable_items.values() if items]
if one_item_per_player:
# grab one item per player
items_to_place = [items.pop()
for items in reachable_items.values() if items]
else:
next_player = multiworld.random.choice([player for player, items in reachable_items.items() if items])
items_to_place = []
if item_pool:
items_to_place.append(reachable_items[next_player].pop())
for item in items_to_place:
for p, pool_item in enumerate(item_pool):
# The items added into `reachable_items` are placed starting from the end of each deque in
# `reachable_items`, so the items being placed are more likely to be found towards the end of `item_pool`.
for p, pool_item in enumerate(reversed(item_pool), start=1):
if pool_item is item:
item_pool.pop(p)
del item_pool[-p]
break
maximum_exploration_state = sweep_from_pool(
base_state, item_pool + unplaced_items, multiworld.get_filled_locations(item.player)
if single_player_placement else None)
@@ -89,7 +100,7 @@ def fill_restrictive(multiworld: MultiWorld, base_state: CollectionState, locati
# if minimal accessibility, only check whether location is reachable if game not beatable
if multiworld.worlds[item_to_place.player].options.accessibility == Accessibility.option_minimal:
perform_access_check = not multiworld.has_beaten_game(maximum_exploration_state,
item_to_place.player) \
item_to_place.player) \
if single_player_placement else not has_beaten_game
else:
perform_access_check = True
@@ -105,12 +116,23 @@ def fill_restrictive(multiworld: MultiWorld, base_state: CollectionState, locati
else:
# we filled all reachable spots.
if swap:
# Keep a cache of previous safe swap states that might be usable to sweep from to produce the next
# swap state, instead of sweeping from `base_state` each time.
previous_safe_swap_state_cache: typing.Deque[CollectionState] = deque()
# Almost never are more than 2 states needed. The rare cases that do are usually highly restrictive
# single_player_placement=True pre-fills which can go through more than 10 states in some seeds.
max_swap_base_state_cache_length = 3
# try swapping this item with previously placed items in a safe way then in an unsafe way
swap_attempts = ((i, location, unsafe)
for unsafe in (False, True)
for i, location in enumerate(placements))
for (i, location, unsafe) in swap_attempts:
placed_item = location.item
if item_to_place == placed_item:
# The number of allowed swaps is limited, so do not allow a swap of an item with a copy of
# itself.
continue
# Unplaceable items can sometimes be swapped infinitely. Limit the
# number of times we will swap an individual item to prevent this
swap_count = swapped_items[placed_item.player, placed_item.name, unsafe]
@@ -119,40 +141,50 @@ def fill_restrictive(multiworld: MultiWorld, base_state: CollectionState, locati
location.item = None
placed_item.location = None
swap_state = sweep_from_pool(base_state, [placed_item, *item_pool] if unsafe else item_pool,
multiworld.get_filled_locations(item.player)
if single_player_placement else None)
for previous_safe_swap_state in previous_safe_swap_state_cache:
# If a state has already checked the location of the swap, then it cannot be used.
if location not in previous_safe_swap_state.advancements:
# Previous swap states will have collected all items in `item_pool`, so the new
# `swap_state` can skip having to collect them again.
# Previous swap states will also have already checked many locations, making the sweep
# faster.
swap_state = sweep_from_pool(previous_safe_swap_state, (placed_item,) if unsafe else (),
multiworld.get_filled_locations(item.player)
if single_player_placement else None)
break
else:
# No previous swap_state was usable as a base state to sweep from, so create a new one.
swap_state = sweep_from_pool(base_state, [placed_item, *item_pool] if unsafe else item_pool,
multiworld.get_filled_locations(item.player)
if single_player_placement else None)
# Unsafe states should not be added to the cache because they have collected `placed_item`.
if not unsafe:
if len(previous_safe_swap_state_cache) >= max_swap_base_state_cache_length:
# Remove the oldest cached state.
previous_safe_swap_state_cache.pop()
# Add the new state to the start of the cache.
previous_safe_swap_state_cache.appendleft(swap_state)
# unsafe means swap_state assumes we can somehow collect placed_item before item_to_place
# by continuing to swap, which is not guaranteed. This is unsafe because there is no mechanic
# to clean that up later, so there is a chance generation fails.
if (not single_player_placement or location.player == item_to_place.player) \
and location.can_fill(swap_state, item_to_place, perform_access_check):
# Add this item to the existing placement, and
# add the old item to the back of the queue
spot_to_fill = placements.pop(i)
# Verify placing this item won't reduce available locations, which would be a useless swap.
prev_state = swap_state.copy()
prev_loc_count = len(
multiworld.get_reachable_locations(prev_state))
swap_count += 1
swapped_items[placed_item.player, placed_item.name, unsafe] = swap_count
swap_state.collect(item_to_place, True)
new_loc_count = len(
multiworld.get_reachable_locations(swap_state))
reachable_items[placed_item.player].appendleft(
placed_item)
item_pool.append(placed_item)
if new_loc_count >= prev_loc_count:
# Add this item to the existing placement, and
# add the old item to the back of the queue
spot_to_fill = placements.pop(i)
# cleanup at the end to hopefully get better errors
cleanup_required = True
swap_count += 1
swapped_items[placed_item.player, placed_item.name, unsafe] = swap_count
reachable_items[placed_item.player].appendleft(
placed_item)
item_pool.append(placed_item)
# cleanup at the end to hopefully get better errors
cleanup_required = True
break
break
# Item can't be placed here, restore original item
location.item = placed_item
@@ -231,18 +263,30 @@ def remaining_fill(multiworld: MultiWorld,
locations: typing.List[Location],
itempool: typing.List[Item],
name: str = "Remaining",
move_unplaceable_to_start_inventory: bool = False) -> None:
move_unplaceable_to_start_inventory: bool = False,
check_location_can_fill: bool = False) -> None:
unplaced_items: typing.List[Item] = []
placements: typing.List[Location] = []
swapped_items: typing.Counter[typing.Tuple[int, str]] = Counter()
total = min(len(itempool), len(locations))
total = min(len(itempool), len(locations))
placed = 0
# Optimisation: Decide whether to do full location.can_fill check (respect excluded), or only check the item rule
if check_location_can_fill:
state = CollectionState(multiworld)
def location_can_fill_item(location_to_fill: Location, item_to_fill: Item):
return location_to_fill.can_fill(state, item_to_fill, check_access=False)
else:
def location_can_fill_item(location_to_fill: Location, item_to_fill: Item):
return location_to_fill.item_rule(item_to_fill)
while locations and itempool:
item_to_place = itempool.pop()
spot_to_fill: typing.Optional[Location] = None
for i, location in enumerate(locations):
if location.item_rule(item_to_place):
if location_can_fill_item(location, item_to_place):
# popping by index is faster than removing by content,
spot_to_fill = locations.pop(i)
# skipping a scan for the element
@@ -263,7 +307,7 @@ def remaining_fill(multiworld: MultiWorld,
location.item = None
placed_item.location = None
if location.item_rule(item_to_place):
if location_can_fill_item(location, item_to_place):
# Add this item to the existing placement, and
# add the old item to the back of the queue
spot_to_fill = placements.pop(i)
@@ -323,19 +367,26 @@ def fast_fill(multiworld: MultiWorld,
return item_pool[placing:], fill_locations[placing:]
def accessibility_corrections(multiworld: MultiWorld, state: CollectionState, locations, pool=[]):
def accessibility_corrections(multiworld: MultiWorld,
state: CollectionState,
locations: list[Location],
pool: list[Item] | None = None) -> None:
if pool is None:
pool = []
maximum_exploration_state = sweep_from_pool(state, pool)
minimal_players = {player for player in multiworld.player_ids if multiworld.worlds[player].options.accessibility == "minimal"}
unreachable_locations = [location for location in multiworld.get_locations() if location.player in minimal_players and
minimal_players = {player for player in multiworld.player_ids if
multiworld.worlds[player].options.accessibility == "minimal"}
unreachable_locations = [location for location in multiworld.get_locations() if
location.player in minimal_players and
not location.can_reach(maximum_exploration_state)]
for location in unreachable_locations:
if (location.item is not None and location.item.advancement and location.address is not None and not
location.locked and location.item.player not in minimal_players):
pool.append(location.item)
state.remove(location.item)
location.item = None
if location in state.advancements:
state.advancements.remove(location)
state.remove(location.item)
locations.append(location)
if pool and locations:
locations.sort(key=lambda loc: loc.progress_type != LocationProgressType.PRIORITY)
@@ -347,7 +398,7 @@ def inaccessible_location_rules(multiworld: MultiWorld, state: CollectionState,
unreachable_locations = [location for location in locations if not location.can_reach(maximum_exploration_state)]
if unreachable_locations:
def forbid_important_item_rule(item: Item):
return not ((item.classification & 0b0011) and multiworld.worlds[item.player].options.accessibility != 'minimal')
return not ((item.classification & 0b0011) and multiworld.worlds[item.player].options.accessibility != "minimal")
for location in unreachable_locations:
add_item_rule(location, forbid_important_item_rule)
@@ -441,6 +492,12 @@ def distribute_early_items(multiworld: MultiWorld,
def distribute_items_restrictive(multiworld: MultiWorld,
panic_method: typing.Literal["swap", "raise", "start_inventory"] = "swap") -> None:
assert all(item.location is None for item in multiworld.itempool), (
"At the start of distribute_items_restrictive, "
"there are items in the multiworld itempool that are already placed on locations:\n"
f"{[(item.location, item) for item in multiworld.itempool if item.location is not None]}"
)
fill_locations = sorted(multiworld.get_unfilled_locations())
multiworld.random.shuffle(fill_locations)
# get items to distribute
@@ -483,22 +540,64 @@ def distribute_items_restrictive(multiworld: MultiWorld,
single_player = multiworld.players == 1 and not multiworld.groups
if prioritylocations:
regular_progression = []
deprioritized_progression = []
for item in progitempool:
if item.deprioritized:
deprioritized_progression.append(item)
else:
regular_progression.append(item)
# "priority fill"
fill_restrictive(multiworld, multiworld.state, prioritylocations, progitempool,
single_player_placement=single_player, swap=False, on_place=mark_for_locking, name="Priority")
# try without deprioritized items in the mix at all. This means they need to be collected into state first.
priority_fill_state = sweep_from_pool(multiworld.state, deprioritized_progression)
fill_restrictive(multiworld, priority_fill_state, prioritylocations, regular_progression,
single_player_placement=single_player, swap=False, on_place=mark_for_locking,
name="Priority", one_item_per_player=True, allow_partial=True)
if prioritylocations and regular_progression:
# retry with one_item_per_player off because some priority fills can fail to fill with that optimization
# deprioritized items are still not in the mix, so they need to be collected into state first.
# allow_partial should only be set if there is deprioritized progression to fall back on.
priority_retry_state = sweep_from_pool(multiworld.state, deprioritized_progression)
fill_restrictive(multiworld, priority_retry_state, prioritylocations, regular_progression,
single_player_placement=single_player, swap=False, on_place=mark_for_locking,
name="Priority Retry", one_item_per_player=False,
allow_partial=bool(deprioritized_progression))
if prioritylocations and deprioritized_progression:
# There are no more regular progression items that can be placed on any priority locations.
# We'd still prefer to place deprioritized progression items on priority locations over filler items.
# Since we're leaving out the remaining regular progression now, we need to collect it into state first.
priority_retry_2_state = sweep_from_pool(multiworld.state, regular_progression)
fill_restrictive(multiworld, priority_retry_2_state, prioritylocations, deprioritized_progression,
single_player_placement=single_player, swap=False, on_place=mark_for_locking,
name="Priority Retry 2", one_item_per_player=True, allow_partial=True)
if prioritylocations and deprioritized_progression:
# retry with deprioritized items AND without one_item_per_player optimisation
# Since we're leaving out the remaining regular progression now, we need to collect it into state first.
priority_retry_3_state = sweep_from_pool(multiworld.state, regular_progression)
fill_restrictive(multiworld, priority_retry_3_state, prioritylocations, deprioritized_progression,
single_player_placement=single_player, swap=False, on_place=mark_for_locking,
name="Priority Retry 3", one_item_per_player=False)
# restore original order of progitempool
progitempool[:] = [item for item in progitempool if not item.location]
accessibility_corrections(multiworld, multiworld.state, prioritylocations, progitempool)
defaultlocations = prioritylocations + defaultlocations
if progitempool:
# "advancement/progression fill"
maximum_exploration_state = sweep_from_pool(multiworld.state)
if panic_method == "swap":
fill_restrictive(multiworld, multiworld.state, defaultlocations, progitempool, swap=True,
fill_restrictive(multiworld, maximum_exploration_state, defaultlocations, progitempool, swap=True,
name="Progression", single_player_placement=single_player)
elif panic_method == "raise":
fill_restrictive(multiworld, multiworld.state, defaultlocations, progitempool, swap=False,
fill_restrictive(multiworld, maximum_exploration_state, defaultlocations, progitempool, swap=False,
name="Progression", single_player_placement=single_player)
elif panic_method == "start_inventory":
fill_restrictive(multiworld, multiworld.state, defaultlocations, progitempool, swap=False,
fill_restrictive(multiworld, maximum_exploration_state, defaultlocations, progitempool, swap=False,
allow_partial=True, name="Progression", single_player_placement=single_player)
if progitempool:
for item in progitempool:
@@ -514,7 +613,8 @@ def distribute_items_restrictive(multiworld: MultiWorld,
if progitempool:
raise FillError(
f"Not enough locations for progression items. "
f"There are {len(progitempool)} more progression items than there are available locations.",
f"There are {len(progitempool)} more progression items than there are available locations.\n"
f"Unfilled locations:\n{multiworld.get_unfilled_locations()}.",
multiworld=multiworld,
)
accessibility_corrections(multiworld, multiworld.state, defaultlocations)
@@ -532,7 +632,7 @@ def distribute_items_restrictive(multiworld: MultiWorld,
if excludedlocations:
raise FillError(
f"Not enough filler items for excluded locations. "
f"There are {len(excludedlocations)} more excluded locations than filler or trap items.",
f"There are {len(excludedlocations)} more excluded locations than excludable items.",
multiworld=multiworld,
)
@@ -553,6 +653,26 @@ def distribute_items_restrictive(multiworld: MultiWorld,
print_data = {"items": items_counter, "locations": locations_counter}
logging.info(f"Per-Player counts: {print_data})")
more_locations = locations_counter - items_counter
more_items = items_counter - locations_counter
for player in multiworld.player_ids:
if more_locations[player]:
logging.error(
f"Player {multiworld.get_player_name(player)} had {more_locations[player]} more locations than items.")
elif more_items[player]:
logging.warning(
f"Player {multiworld.get_player_name(player)} had {more_items[player]} more items than locations.")
if unfilled:
raise FillError(
f"Unable to fill all locations.\n" +
f"Unfilled locations({len(unfilled)}): {unfilled}"
)
else:
logging.warning(
f"Unable to place all items.\n" +
f"Unplaced items({len(unplaced)}): {unplaced}"
)
def flood_items(multiworld: MultiWorld) -> None:
# get items to distribute
@@ -628,9 +748,9 @@ def balance_multiworld_progression(multiworld: MultiWorld) -> None:
if multiworld.worlds[player].options.progression_balancing > 0
}
if not balanceable_players:
logging.info('Skipping multiworld progression balancing.')
logging.info("Skipping multiworld progression balancing.")
else:
logging.info(f'Balancing multiworld progression for {len(balanceable_players)} Players.')
logging.info(f"Balancing multiworld progression for {len(balanceable_players)} Players.")
logging.debug(balanceable_players)
state: CollectionState = CollectionState(multiworld)
checked_locations: typing.Set[Location] = set()
@@ -728,7 +848,7 @@ def balance_multiworld_progression(multiworld: MultiWorld) -> None:
if player in threshold_percentages):
break
elif not balancing_sphere:
raise RuntimeError('Not all required items reachable. Something went terribly wrong here.')
raise RuntimeError("Not all required items reachable. Something went terribly wrong here.")
# Gather a set of locations which we can swap items into
unlocked_locations: typing.Dict[int, typing.Set[Location]] = collections.defaultdict(set)
for l in unchecked_locations:
@@ -744,8 +864,8 @@ def balance_multiworld_progression(multiworld: MultiWorld) -> None:
testing = items_to_test.pop()
reducing_state = state.copy()
for location in itertools.chain((
l for l in items_to_replace
if l.item.player == player
l for l in items_to_replace
if l.item.player == player
), items_to_test):
reducing_state.collect(location.item, True, location)
@@ -818,52 +938,30 @@ def swap_location_item(location_1: Location, location_2: Location, check_locked:
location_2.item.location = location_2
def distribute_planned(multiworld: MultiWorld) -> None:
def warn(warning: str, force: typing.Union[bool, str]) -> None:
if force in [True, 'fail', 'failure', 'none', False, 'warn', 'warning']:
logging.warning(f'{warning}')
def parse_planned_blocks(multiworld: MultiWorld) -> dict[int, list[PlandoItemBlock]]:
def warn(warning: str, force: bool | str) -> None:
if isinstance(force, bool):
logging.warning(f"{warning}")
else:
logging.debug(f'{warning}')
logging.debug(f"{warning}")
def failed(warning: str, force: typing.Union[bool, str]) -> None:
if force in [True, 'fail', 'failure']:
def failed(warning: str, force: bool | str) -> None:
if force is True:
raise Exception(warning)
else:
warn(warning, force)
swept_state = multiworld.state.copy()
swept_state.sweep_for_advancements()
reachable = frozenset(multiworld.get_reachable_locations(swept_state))
early_locations: typing.Dict[int, typing.List[str]] = collections.defaultdict(list)
non_early_locations: typing.Dict[int, typing.List[str]] = collections.defaultdict(list)
for loc in multiworld.get_unfilled_locations():
if loc in reachable:
early_locations[loc.player].append(loc.name)
else: # not reachable with swept state
non_early_locations[loc.player].append(loc.name)
world_name_lookup = multiworld.world_name_lookup
block_value = typing.Union[typing.List[str], typing.Dict[str, typing.Any], str]
plando_blocks: typing.List[typing.Dict[str, typing.Any]] = []
player_ids = set(multiworld.player_ids)
plando_blocks: dict[int, list[PlandoItemBlock]] = dict()
player_ids: set[int] = set(multiworld.player_ids)
for player in player_ids:
for block in multiworld.plando_items[player]:
block['player'] = player
if 'force' not in block:
block['force'] = 'silent'
if 'from_pool' not in block:
block['from_pool'] = True
elif not isinstance(block['from_pool'], bool):
from_pool_type = type(block['from_pool'])
raise Exception(f'Plando "from_pool" has to be boolean, not {from_pool_type} for player {player}.')
if 'world' not in block:
target_world = False
else:
target_world = block['world']
plando_blocks[player] = []
for block in multiworld.worlds[player].options.plando_items:
new_block: PlandoItemBlock = PlandoItemBlock(player, block.from_pool, block.force)
target_world = block.world
if target_world is False or multiworld.players == 1: # target own world
worlds: typing.Set[int] = {player}
worlds: set[int] = {player}
elif target_world is True: # target any worlds besides own
worlds = set(multiworld.player_ids) - {player}
elif target_world is None: # target all worlds
@@ -872,156 +970,201 @@ def distribute_planned(multiworld: MultiWorld) -> None:
worlds = set()
for listed_world in target_world:
if listed_world not in world_name_lookup:
failed(f"Cannot place item to {target_world}'s world as that world does not exist.",
block['force'])
failed(f"Cannot place item to {listed_world}'s world as that world does not exist.",
block.force)
continue
worlds.add(world_name_lookup[listed_world])
elif type(target_world) == int: # target world by slot number
if target_world not in range(1, multiworld.players + 1):
failed(
f"Cannot place item in world {target_world} as it is not in range of (1, {multiworld.players})",
block['force'])
block.force)
continue
worlds = {target_world}
else: # target world by slot name
if target_world not in world_name_lookup:
failed(f"Cannot place item to {target_world}'s world as that world does not exist.",
block['force'])
block.force)
continue
worlds = {world_name_lookup[target_world]}
block['world'] = worlds
new_block.worlds = worlds
items: block_value = []
if "items" in block:
items = block["items"]
if 'count' not in block:
block['count'] = False
elif "item" in block:
items = block["item"]
if 'count' not in block:
block['count'] = 1
else:
failed("You must specify at least one item to place items with plando.", block['force'])
continue
items: list[str] | dict[str, typing.Any] = block.items
if isinstance(items, dict):
item_list: typing.List[str] = []
item_list: list[str] = []
for key, value in items.items():
if value is True:
value = multiworld.itempool.count(multiworld.worlds[player].create_item(key))
item_list += [key] * value
items = item_list
if isinstance(items, str):
items = [items]
block['items'] = items
new_block.items = items
locations: block_value = []
if 'location' in block:
locations = block['location'] # just allow 'location' to keep old yamls compatible
elif 'locations' in block:
locations = block['locations']
locations: list[str] = block.locations
if isinstance(locations, str):
locations = [locations]
if isinstance(locations, dict):
location_list = []
for key, value in locations.items():
location_list += [key] * value
locations = location_list
resolved_locations: list[Location] = []
for target_player in worlds:
locations_from_groups: list[str] = []
world_locations = multiworld.get_unfilled_locations(target_player)
for group in multiworld.worlds[target_player].location_name_groups:
if group in locations:
locations_from_groups.extend(multiworld.worlds[target_player].location_name_groups[group])
resolved_locations.extend(location for location in world_locations
if location.name in [*locations, *locations_from_groups])
new_block.locations = sorted(dict.fromkeys(locations))
new_block.resolved_locations = sorted(set(resolved_locations))
count = block.count
if not count:
count = (min(len(new_block.items), len(new_block.resolved_locations))
if new_block.resolved_locations else len(new_block.items))
if isinstance(count, int):
count = {"min": count, "max": count}
if "min" not in count:
count["min"] = 0
if "max" not in count:
count["max"] = (min(len(new_block.items), len(new_block.resolved_locations))
if new_block.resolved_locations else len(new_block.items))
new_block.count = count
plando_blocks[player].append(new_block)
return plando_blocks
def resolve_early_locations_for_planned(multiworld: MultiWorld):
def warn(warning: str, force: bool | str) -> None:
if isinstance(force, bool):
logging.warning(f"{warning}")
else:
logging.debug(f"{warning}")
def failed(warning: str, force: bool | str) -> None:
if force is True:
raise Exception(warning)
else:
warn(warning, force)
swept_state = multiworld.state.copy()
swept_state.sweep_for_advancements()
reachable = frozenset(multiworld.get_reachable_locations(swept_state))
early_locations: dict[int, list[Location]] = collections.defaultdict(list)
non_early_locations: dict[int, list[Location]] = collections.defaultdict(list)
for loc in multiworld.get_unfilled_locations():
if loc in reachable:
early_locations[loc.player].append(loc)
else: # not reachable with swept state
non_early_locations[loc.player].append(loc)
for player in multiworld.plando_item_blocks:
removed = []
for block in multiworld.plando_item_blocks[player]:
locations = block.locations
resolved_locations = block.resolved_locations
worlds = block.worlds
if "early_locations" in locations:
locations.remove("early_locations")
for target_player in worlds:
locations += early_locations[target_player]
resolved_locations += early_locations[target_player]
if "non_early_locations" in locations:
locations.remove("non_early_locations")
for target_player in worlds:
locations += non_early_locations[target_player]
resolved_locations += non_early_locations[target_player]
block['locations'] = list(dict.fromkeys(locations))
if block.count["max"] > len(block.items):
count = block.count["max"]
failed(f"Plando count {count} greater than items specified", block.force)
block.count["max"] = len(block.items)
if block.count["min"] > len(block.items):
block.count["min"] = len(block.items)
if block.count["max"] > len(block.resolved_locations) > 0:
count = block.count["max"]
failed(f"Plando count {count} greater than locations specified", block.force)
block.count["max"] = len(block.resolved_locations)
if block.count["min"] > len(block.resolved_locations):
block.count["min"] = len(block.resolved_locations)
block.count["target"] = multiworld.random.randint(block.count["min"],
block.count["max"])
if not block['count']:
block['count'] = (min(len(block['items']), len(block['locations'])) if
len(block['locations']) > 0 else len(block['items']))
if isinstance(block['count'], int):
block['count'] = {'min': block['count'], 'max': block['count']}
if 'min' not in block['count']:
block['count']['min'] = 0
if 'max' not in block['count']:
block['count']['max'] = (min(len(block['items']), len(block['locations'])) if
len(block['locations']) > 0 else len(block['items']))
if block['count']['max'] > len(block['items']):
count = block['count']
failed(f"Plando count {count} greater than items specified", block['force'])
block['count'] = len(block['items'])
if block['count']['max'] > len(block['locations']) > 0:
count = block['count']
failed(f"Plando count {count} greater than locations specified", block['force'])
block['count'] = len(block['locations'])
block['count']['target'] = multiworld.random.randint(block['count']['min'], block['count']['max'])
if not block.count["target"]:
removed.append(block)
if block['count']['target'] > 0:
plando_blocks.append(block)
for block in removed:
multiworld.plando_item_blocks[player].remove(block)
def distribute_planned_blocks(multiworld: MultiWorld, plando_blocks: list[PlandoItemBlock]):
def warn(warning: str, force: bool | str) -> None:
if isinstance(force, bool):
logging.warning(f"{warning}")
else:
logging.debug(f"{warning}")
def failed(warning: str, force: bool | str) -> None:
if force is True:
raise Exception(warning)
else:
warn(warning, force)
# shuffle, but then sort blocks by number of locations minus number of items,
# so less-flexible blocks get priority
multiworld.random.shuffle(plando_blocks)
plando_blocks.sort(key=lambda block: (len(block['locations']) - block['count']['target']
if len(block['locations']) > 0
else len(multiworld.get_unfilled_locations(player)) - block['count']['target']))
plando_blocks.sort(key=lambda block: (len(block.resolved_locations) - block.count["target"]
if len(block.resolved_locations) > 0
else len(multiworld.get_unfilled_locations(block.player)) -
block.count["target"]))
for placement in plando_blocks:
player = placement['player']
player = placement.player
try:
worlds = placement['world']
locations = placement['locations']
items = placement['items']
maxcount = placement['count']['target']
from_pool = placement['from_pool']
worlds = placement.worlds
locations = placement.resolved_locations
items = placement.items
maxcount = placement.count["target"]
from_pool = placement.from_pool
candidates = list(multiworld.get_unfilled_locations_for_players(locations, sorted(worlds)))
multiworld.random.shuffle(candidates)
multiworld.random.shuffle(items)
count = 0
err: typing.List[str] = []
successful_pairs: typing.List[typing.Tuple[Item, Location]] = []
for item_name in items:
item = multiworld.worlds[player].create_item(item_name)
for location in reversed(candidates):
if (location.address is None) == (item.code is None): # either both None or both not None
if not location.item:
if location.item_rule(item):
if location.can_fill(multiworld.state, item, False):
successful_pairs.append((item, location))
candidates.remove(location)
count = count + 1
break
else:
err.append(f"Can't place item at {location} due to fill condition not met.")
else:
err.append(f"{item_name} not allowed at {location}.")
else:
err.append(f"Cannot place {item_name} into already filled location {location}.")
item_candidates = []
if from_pool:
instances = [item for item in multiworld.itempool if item.player == player and item.name in items]
for item in multiworld.random.sample(items, maxcount):
candidate = next((i for i in instances if i.name == item), None)
if candidate is None:
warn(f"Could not remove {item} from pool for {multiworld.player_name[player]} as "
f"it's already missing from it", placement.force)
candidate = multiworld.worlds[player].create_item(item)
else:
err.append(f"Mismatch between {item_name} and {location}, only one is an event.")
if count == maxcount:
break
if count < placement['count']['min']:
m = placement['count']['min']
failed(
f"Plando block failed to place {m - count} of {m} item(s) for {multiworld.player_name[player]}, error(s): {' '.join(err)}",
placement['force'])
for (item, location) in successful_pairs:
multiworld.push_item(location, item, collect=False)
location.locked = True
logging.debug(f"Plando placed {item} at {location}")
if from_pool:
try:
multiworld.itempool.remove(item)
except ValueError:
warn(
f"Could not remove {item} from pool for {multiworld.player_name[player]} as it's already missing from it.",
placement['force'])
multiworld.itempool.remove(candidate)
instances.remove(candidate)
item_candidates.append(candidate)
else:
item_candidates = [multiworld.worlds[player].create_item(item)
for item in multiworld.random.sample(items, maxcount)]
if any(item.code is None for item in item_candidates) \
and not all(item.code is None for item in item_candidates):
failed(f"Plando block for player {player} ({multiworld.player_name[player]}) contains both "
f"event items and non-event items. "
f"Event items: {[item for item in item_candidates if item.code is None]}, "
f"Non-event items: {[item for item in item_candidates if item.code is not None]}",
placement.force)
continue
else:
is_real = item_candidates[0].code is not None
candidates = [candidate for candidate in locations if candidate.item is None
and bool(candidate.address) == is_real]
multiworld.random.shuffle(candidates)
allstate = multiworld.get_all_state(False)
mincount = placement.count["min"]
allowed_margin = len(item_candidates) - mincount
fill_restrictive(multiworld, allstate, candidates, item_candidates, lock=True,
allow_partial=True, name="Plando Main Fill")
if len(item_candidates) > allowed_margin:
failed(f"Could not place {len(item_candidates)} "
f"of {mincount + allowed_margin} item(s) "
f"for {multiworld.player_name[player]}, "
f"remaining items: {item_candidates}",
placement.force)
if from_pool:
multiworld.itempool.extend([item for item in item_candidates if item.code is not None])
except Exception as e:
raise Exception(
f"Error running plando for player {player} ({multiworld.player_name[player]})") from e