Files
dockipelago/worlds/generic/Rules.py
Ian Robinson 286769a0f3 Core: Add rule builder (#5048)
* initial commit of rules engine

* implement most of the stuff

* add docs and fill out rest of the functionality

* add in explain functions

* dedupe items and add more docs

* pr feedback and optimization updates

* Self is not in typing on 3.10

* fix test

* Update docs/rule builder.md

Co-authored-by: BadMagic100 <dempsey.sean@outlook.com>

* pr feedback

* love it when CI gives me different results than local

* add composition with bitwise and and or

* strongly typed option filtering

* skip resolving location parent region

* update docs

* update typing and add decorator

* add string explains

* move simplify code to world

* add wrapper rule

* I may need to abandon the generic typing

* missing space for faris

* fix hashing for resolved rules

* thank u typing extensions ilu

* remove bad cacheable check

* add decorator to assign hash and rule name

* more type crimes...

* region access rules are now cached

* break compatibility so new features work

* update docs

* replace decorators with __init_subclass__

* ok now the frozen dataclass is automatic

* one more type fix for the road

* small fixes and caching tests

* play nicer with tests

* ok actually fix the tests

* add item_mapping for faris

* add more state helpers as rules

* fix has from list rules

* fix can reach location caching and add set completion condition

* fix can reach entrance caching

* implement HasGroup and HasGroupUnique

* add more tests and fix some bugs

* Add name arg to create_entrance

Co-authored-by: roseasromeo <11944660+roseasromeo@users.noreply.github.com>

* fix json dumping option filters

* restructure and test serialization

* add prop to disable caching

* switch to __call__ and revert access_rule changes

* update docs and make edge cases match

* ruff has lured me into a false sense of security

* also unused

* fix disabling caching

* move filter function to filter class

* add more docs

* tests for explain functions

* Update docs/rule builder.md

Co-authored-by: roseasromeo <11944660+roseasromeo@users.noreply.github.com>

* chore: Strip out uses of TYPE_CHECKING as much as possible

* chore: add empty webworld for test

* chore: optimize rule evaluations

* remove getattr from hot code paths

* testing new cache flags

* only clear cache for rules cached as false in collect

* update test for new behaviour

* do not have rules inherit from each other

* update docs on caching

* fix name of attribute

* make explain messages more colorful

* fix issue with combining rules with different options

* add convenience functions for filtering

* use an operator with higher precedence

* name conflicts less with optionfilter

* move simplify and instance caching code

* update docs

* kill resolve_rule

* kill true_rule and false_rule

* move helpers to base classes

* update docs

* I really should finish all of my

* fix test

* rename mixin

* fix typos

* refactor rule builder into folder for better imports

* update docs

* do not dupe collectionrule

* docs review feedback

* missed a file

* remove rule_caching_enabled from base World

* update docs on caching

* shuffle around some docs

* use option instead of option.value

* add in operator and more testing

* rm World = object

* test fixes

* move cache to logic mixin

* keep test rule builder world out of global registry

* todone

* call register_dependencies automatically

* move register deps call to call_single

* add filtered_resolution

* allow bool opts on filters

* fix serialization tests

* allow reverse operations

---------

Co-authored-by: BadMagic100 <dempsey.sean@outlook.com>
Co-authored-by: roseasromeo <11944660+roseasromeo@users.noreply.github.com>
2026-02-08 17:00:23 +01:00

190 lines
8.2 KiB
Python

import collections
import logging
import typing
from BaseClasses import (CollectionRule, CollectionState, Entrance, Item, Location,
LocationProgressType, MultiWorld, Region)
ItemRule = typing.Callable[[Item], bool]
def locality_needed(multiworld: MultiWorld) -> bool:
for player in multiworld.player_ids:
if multiworld.worlds[player].options.local_items.value:
return True
if multiworld.worlds[player].options.non_local_items.value:
return True
# Group
for group_id, group in multiworld.groups.items():
if set(multiworld.player_ids) == set(group["players"]):
continue
if group["local_items"]:
return True
if group["non_local_items"]:
return True
def locality_rules(multiworld: MultiWorld):
if locality_needed(multiworld):
forbid_data: typing.Dict[int, typing.Dict[int, typing.Set[str]]] = \
collections.defaultdict(lambda: collections.defaultdict(set))
def forbid(sender: int, receiver: int, items: typing.Set[str]):
forbid_data[sender][receiver].update(items)
for receiving_player in multiworld.player_ids:
local_items: typing.Set[str] = multiworld.worlds[receiving_player].options.local_items.value
if local_items:
for sending_player in multiworld.player_ids:
if receiving_player != sending_player:
forbid(sending_player, receiving_player, local_items)
non_local_items: typing.Set[str] = multiworld.worlds[receiving_player].options.non_local_items.value
if non_local_items:
forbid(receiving_player, receiving_player, non_local_items)
# Group
for receiving_group_id, receiving_group in multiworld.groups.items():
if set(multiworld.player_ids) == set(receiving_group["players"]):
continue
if receiving_group["local_items"]:
for sending_player in multiworld.player_ids:
if sending_player not in receiving_group["players"]:
forbid(sending_player, receiving_group_id, receiving_group["local_items"])
if receiving_group["non_local_items"]:
for sending_player in multiworld.player_ids:
if sending_player in receiving_group["players"]:
forbid(sending_player, receiving_group_id, receiving_group["non_local_items"])
# create fewer lambda's to save memory and cache misses
func_cache = {}
for location in multiworld.get_locations():
if (location.player, location.item_rule) in func_cache:
location.item_rule = func_cache[location.player, location.item_rule]
# empty rule that just returns True, overwrite
elif location.item_rule is Location.item_rule:
func_cache[location.player, location.item_rule] = location.item_rule = \
lambda i, sending_blockers = forbid_data[location.player], \
old_rule = location.item_rule: \
i.name not in sending_blockers[i.player]
# special rule, needs to also be fulfilled.
else:
func_cache[location.player, location.item_rule] = location.item_rule = \
lambda i, sending_blockers = forbid_data[location.player], \
old_rule = location.item_rule: \
i.name not in sending_blockers[i.player] and old_rule(i)
def exclusion_rules(multiworld: MultiWorld, player: int, exclude_locations: typing.Set[str]) -> None:
for loc_name in exclude_locations:
try:
location = multiworld.get_location(loc_name, player)
except KeyError as e: # failed to find the given location. Check if it's a legitimate location
if loc_name not in multiworld.worlds[player].location_name_to_id:
raise Exception(f"Unable to exclude location {loc_name} in player {player}'s world.") from e
else:
if not location.advancement:
location.progress_type = LocationProgressType.EXCLUDED
else:
logging.warning(f"Unable to exclude location {loc_name} in player {player}'s world.")
def set_rule(spot: typing.Union[Location, Entrance], rule: CollectionRule):
spot.access_rule = rule
def add_rule(spot: typing.Union[Location, Entrance], rule: CollectionRule, combine="and"):
old_rule = spot.access_rule
# empty rule, replace instead of add
if old_rule is Location.access_rule or old_rule is Entrance.access_rule:
spot.access_rule = rule if combine == "and" else old_rule
else:
if combine == "and":
spot.access_rule = lambda state: rule(state) and old_rule(state)
else:
spot.access_rule = lambda state: rule(state) or old_rule(state)
def forbid_item(location: Location, item: str, player: int):
old_rule = location.item_rule
# empty rule
if old_rule is Location.item_rule:
location.item_rule = lambda i: i.name != item or i.player != player
else:
location.item_rule = lambda i: (i.name != item or i.player != player) and old_rule(i)
def forbid_items_for_player(location: Location, items: typing.Set[str], player: int):
old_rule = location.item_rule
location.item_rule = lambda i: (i.player != player or i.name not in items) and old_rule(i)
def forbid_items(location: Location, items: typing.Set[str]):
"""unused, but kept as a debugging tool."""
old_rule = location.item_rule
location.item_rule = lambda i: i.name not in items and old_rule(i)
def add_item_rule(location: Location, rule: ItemRule, combine: str = "and"):
old_rule = location.item_rule
# empty rule, replace instead of add
if old_rule is Location.item_rule:
location.item_rule = rule if combine == "and" else old_rule
else:
if combine == "and":
location.item_rule = lambda item: rule(item) and old_rule(item)
else:
location.item_rule = lambda item: rule(item) or old_rule(item)
def item_name_in_location_names(state: CollectionState, item: str, player: int,
location_name_player_pairs: typing.Sequence[typing.Tuple[str, int]]) -> bool:
for location in location_name_player_pairs:
if location_item_name(state, location[0], location[1]) == (item, player):
return True
return False
def item_name_in_locations(item: str, player: int,
locations: typing.Sequence[Location]) -> bool:
for location in locations:
if location.item and location.item.name == item and location.item.player == player:
return True
return False
def location_item_name(state: CollectionState, location: str, player: int) -> \
typing.Optional[typing.Tuple[str, int]]:
location = state.multiworld.get_location(location, player)
if location.item is None:
return None
return location.item.name, location.item.player
def allow_self_locking_items(spot: typing.Union[Location, Region], *item_names: str) -> None:
"""
This function sets rules on the supplied spot, such that the supplied item_name(s) can possibly be placed there.
spot: Location or Region that the item(s) are allowed to be placed in
item_names: item name or names that are allowed to be placed in the Location or Region
"""
player = spot.player
def add_allowed_rules(area: typing.Union[Location, Entrance], location: Location) -> None:
def set_always_allow(location: Location, rule: typing.Callable) -> None:
location.always_allow = rule
for item_name in item_names:
add_rule(area, lambda state, item_name=item_name:
location_item_name(state, location.name, player) == (item_name, player), "or")
set_always_allow(location, lambda state, item:
item.player == player and item.name in [item_name for item_name in item_names])
if isinstance(spot, Region):
for entrance in spot.entrances:
for location in spot.locations:
add_allowed_rules(entrance, location)
else:
add_allowed_rules(spot, spot)