Core: Add rule builder (#5048)

* initial commit of rules engine

* implement most of the stuff

* add docs and fill out rest of the functionality

* add in explain functions

* dedupe items and add more docs

* pr feedback and optimization updates

* Self is not in typing on 3.10

* fix test

* Update docs/rule builder.md

Co-authored-by: BadMagic100 <dempsey.sean@outlook.com>

* pr feedback

* love it when CI gives me different results than local

* add composition with bitwise and and or

* strongly typed option filtering

* skip resolving location parent region

* update docs

* update typing and add decorator

* add string explains

* move simplify code to world

* add wrapper rule

* I may need to abandon the generic typing

* missing space for faris

* fix hashing for resolved rules

* thank u typing extensions ilu

* remove bad cacheable check

* add decorator to assign hash and rule name

* more type crimes...

* region access rules are now cached

* break compatibility so new features work

* update docs

* replace decorators with __init_subclass__

* ok now the frozen dataclass is automatic

* one more type fix for the road

* small fixes and caching tests

* play nicer with tests

* ok actually fix the tests

* add item_mapping for faris

* add more state helpers as rules

* fix has from list rules

* fix can reach location caching and add set completion condition

* fix can reach entrance caching

* implement HasGroup and HasGroupUnique

* add more tests and fix some bugs

* Add name arg to create_entrance

Co-authored-by: roseasromeo <11944660+roseasromeo@users.noreply.github.com>

* fix json dumping option filters

* restructure and test serialization

* add prop to disable caching

* switch to __call__ and revert access_rule changes

* update docs and make edge cases match

* ruff has lured me into a false sense of security

* also unused

* fix disabling caching

* move filter function to filter class

* add more docs

* tests for explain functions

* Update docs/rule builder.md

Co-authored-by: roseasromeo <11944660+roseasromeo@users.noreply.github.com>

* chore: Strip out uses of TYPE_CHECKING as much as possible

* chore: add empty webworld for test

* chore: optimize rule evaluations

* remove getattr from hot code paths

* testing new cache flags

* only clear cache for rules cached as false in collect

* update test for new behaviour

* do not have rules inherit from each other

* update docs on caching

* fix name of attribute

* make explain messages more colorful

* fix issue with combining rules with different options

* add convenience functions for filtering

* use an operator with higher precedence

* name conflicts less with optionfilter

* move simplify and instance caching code

* update docs

* kill resolve_rule

* kill true_rule and false_rule

* move helpers to base classes

* update docs

* I really should finish all of my

* fix test

* rename mixin

* fix typos

* refactor rule builder into folder for better imports

* update docs

* do not dupe collectionrule

* docs review feedback

* missed a file

* remove rule_caching_enabled from base World

* update docs on caching

* shuffle around some docs

* use option instead of option.value

* add in operator and more testing

* rm World = object

* test fixes

* move cache to logic mixin

* keep test rule builder world out of global registry

* todone

* call register_dependencies automatically

* move register deps call to call_single

* add filtered_resolution

* allow bool opts on filters

* fix serialization tests

* allow reverse operations

---------

Co-authored-by: BadMagic100 <dempsey.sean@outlook.com>
Co-authored-by: roseasromeo <11944660+roseasromeo@users.noreply.github.com>
This commit is contained in:
Ian Robinson
2026-02-08 11:00:23 -05:00
committed by GitHub
parent 1dd91ec85b
commit 286769a0f3
10 changed files with 3956 additions and 38 deletions

View File

@@ -2,11 +2,15 @@
"include": [
"../BizHawkClient.py",
"../Patch.py",
"../rule_builder/cached_world.py",
"../rule_builder/options.py",
"../rule_builder/rules.py",
"../test/param.py",
"../test/general/test_groups.py",
"../test/general/test_helpers.py",
"../test/general/test_memory.py",
"../test/general/test_names.py",
"../test/general/test_rule_builder.py",
"../test/multiworld/__init__.py",
"../test/multiworld/test_multiworlds.py",
"../test/netutils/__init__.py",

View File

@@ -8,10 +8,10 @@ import secrets
import warnings
from argparse import Namespace
from collections import Counter, deque, defaultdict
from collections.abc import Collection, MutableSequence
from collections.abc import Callable, Collection, Iterable, Iterator, Mapping, MutableSequence, Set
from enum import IntEnum, IntFlag
from typing import (AbstractSet, Any, Callable, ClassVar, Dict, Iterable, Iterator, List, Literal, Mapping, NamedTuple,
Optional, Protocol, Set, Tuple, Union, TYPE_CHECKING, Literal, overload)
from typing import (AbstractSet, Any, ClassVar, Dict, List, Literal, NamedTuple,
Optional, Protocol, Tuple, Union, TYPE_CHECKING, overload)
import dataclasses
from typing_extensions import NotRequired, TypedDict
@@ -85,7 +85,7 @@ class MultiWorld():
local_items: Dict[int, Options.LocalItems]
non_local_items: Dict[int, Options.NonLocalItems]
progression_balancing: Dict[int, Options.ProgressionBalancing]
completion_condition: Dict[int, Callable[[CollectionState], bool]]
completion_condition: Dict[int, CollectionRule]
indirect_connections: Dict[Region, Set[Entrance]]
exclude_locations: Dict[int, Options.ExcludeLocations]
priority_locations: Dict[int, Options.PriorityLocations]
@@ -766,7 +766,7 @@ class CollectionState():
else:
self._update_reachable_regions_auto_indirect_conditions(player, queue)
def _update_reachable_regions_explicit_indirect_conditions(self, player: int, queue: deque):
def _update_reachable_regions_explicit_indirect_conditions(self, player: int, queue: deque[Entrance]):
reachable_regions = self.reachable_regions[player]
blocked_connections = self.blocked_connections[player]
# run BFS on all connections, and keep track of those blocked by missing items
@@ -784,13 +784,14 @@ class CollectionState():
blocked_connections.update(new_region.exits)
queue.extend(new_region.exits)
self.path[new_region] = (new_region.name, self.path.get(connection, None))
self.multiworld.worlds[player].reached_region(self, new_region)
# Retry connections if the new region can unblock them
for new_entrance in self.multiworld.indirect_connections.get(new_region, set()):
if new_entrance in blocked_connections and new_entrance not in queue:
queue.append(new_entrance)
def _update_reachable_regions_auto_indirect_conditions(self, player: int, queue: deque):
def _update_reachable_regions_auto_indirect_conditions(self, player: int, queue: deque[Entrance]):
reachable_regions = self.reachable_regions[player]
blocked_connections = self.blocked_connections[player]
new_connection: bool = True
@@ -812,6 +813,7 @@ class CollectionState():
queue.extend(new_region.exits)
self.path[new_region] = (new_region.name, self.path.get(connection, None))
new_connection = True
self.multiworld.worlds[player].reached_region(self, new_region)
# sweep for indirect connections, mostly Entrance.can_reach(unrelated_Region)
queue.extend(blocked_connections)
@@ -1169,13 +1171,17 @@ class CollectionState():
self.prog_items[player][item] = count
CollectionRule = Callable[[CollectionState], bool]
DEFAULT_COLLECTION_RULE: CollectionRule = staticmethod(lambda state: True)
class EntranceType(IntEnum):
ONE_WAY = 1
TWO_WAY = 2
class Entrance:
access_rule: Callable[[CollectionState], bool] = staticmethod(lambda state: True)
access_rule: CollectionRule = DEFAULT_COLLECTION_RULE
hide_path: bool = False
player: int
name: str
@@ -1362,7 +1368,7 @@ class Region:
self,
location_name: str,
item_name: str | None = None,
rule: Callable[[CollectionState], bool] | None = None,
rule: CollectionRule | None = None,
location_type: type[Location] | None = None,
item_type: type[Item] | None = None,
show_in_spoiler: bool = True,
@@ -1401,7 +1407,7 @@ class Region:
return event_item
def connect(self, connecting_region: Region, name: Optional[str] = None,
rule: Optional[Callable[[CollectionState], bool]] = None) -> Entrance:
rule: Optional[CollectionRule] = None) -> Entrance:
"""
Connects this Region to another Region, placing the provided rule on the connection.
@@ -1435,7 +1441,7 @@ class Region:
return entrance
def add_exits(self, exits: Iterable[str] | Mapping[str, str | None],
rules: Mapping[str, Callable[[CollectionState], bool]] | None = None) -> List[Entrance]:
rules: Mapping[str, CollectionRule] | None = None) -> List[Entrance]:
"""
Connects current region to regions in exit dictionary. Passed region names must exist first.
@@ -1474,7 +1480,7 @@ class Location:
show_in_spoiler: bool = True
progress_type: LocationProgressType = LocationProgressType.DEFAULT
always_allow: Callable[[CollectionState, Item], bool] = staticmethod(lambda state, item: False)
access_rule: Callable[[CollectionState], bool] = staticmethod(lambda state: True)
access_rule: CollectionRule = DEFAULT_COLLECTION_RULE
item_rule: Callable[[Item], bool] = staticmethod(lambda item: True)
item: Optional[Item] = None

482
docs/rule builder.md Normal file
View File

@@ -0,0 +1,482 @@
# Rule Builder
This document describes the API provided for the rule builder. Using this API provides you with with a simple interface to define rules and the following advantages:
- Rule classes that avoid all the common pitfalls
- Logic optimization
- Automatic result caching (opt-in)
- Serialization/deserialization
- Human-readable logic explanations for players
## Overview
The rule builder consists of 3 main parts:
1. The rules, which are classes that inherit from `rule_builder.rules.Rule`. These are what you write for your logic. They can be combined and take into account your world's options. There are a number of default rules listed below, and you can create as many custom rules for your world as needed. When assigning the rules to a location or entrance they must be resolved.
1. Resolved rules, which are classes that inherit from `rule_builder.rules.Rule.Resolved`. These are the optimized rules specific to one player that are set as a location or entrance's access rule. You generally shouldn't be directly creating these but they'll be created when assigning rules to locations or entrances. These are what power the human-readable logic explanations.
1. The optional rule builder world subclass `CachedRuleBuilderWorld`, which is a class your world can inherit from instead of `World`. It adds a caching system to the rules that will lazy evaluate and cache the result.
## Usage
For the most part the only difference in usage is instead of writing lambdas for your logic, you write static Rule objects. You then must use `world.set_rule` to assign the rule to a location or entrance.
```python
# In your world's create_regions method
location = MyWorldLocation(...)
self.set_rule(location, Has("A Big Gun"))
```
The rule builder comes with a number of rules by default:
- `True_`: Always returns true
- `False_`: Always returns false
- `And`: Checks that all child rules are true (also provided by `&` operator)
- `Or`: Checks that at least one child rule is true (also provided by `|` operator)
- `Has`: Checks that the player has the given item with the given count (default 1)
- `HasAll`: Checks that the player has all given items
- `HasAny`: Checks that the player has at least one of the given items
- `HasAllCounts`: Checks that the player has all of the counts for the given items
- `HasAnyCount`: Checks that the player has any of the counts for the given items
- `HasFromList`: Checks that the player has some number of given items
- `HasFromListUnique`: Checks that the player has some number of given items, ignoring duplicates of the same item
- `HasGroup`: Checks that the player has some number of items from a given item group
- `HasGroupUnique`: Checks that the player has some number of items from a given item group, ignoring duplicates of the same item
- `CanReachLocation`: Checks that the player can logically reach the given location
- `CanReachRegion`: Checks that the player can logically reach the given region
- `CanReachEntrance`: Checks that the player can logically reach the given entrance
You can combine these rules together to describe the logic required for something. For example, to check if a player either has `Movement ability` or they have both `Key 1` and `Key 2`, you can do:
```python
rule = Has("Movement ability") | HasAll("Key 1", "Key 2")
```
> ⚠️ Composing rules with the `and` and `or` keywords will not work. You must use the bitwise `&` and `|` operators. In order to catch mistakes, the rule builder will not let you do boolean operations. As a consequence, in order to check if a rule is defined you must use `if rule is not None`.
### Assigning rules
When assigning the rule you must use the `set_rule` helper to correctly resolve and register the rule.
```python
self.set_rule(location_or_entrance, rule)
```
There is also a `create_entrance` helper that will resolve the rule, check if it's `False`, and if not create the entrance and set the rule. This allows you to skip creating entrances that will never be valid. You can also specify `force_creation=True` if you would like to create the entrance even if the rule is `False`.
```python
self.create_entrance(from_region, to_region, rule)
```
> ⚠️ If you use a `CanReachLocation` rule on an entrance, you will either have to create the locations first, or specify the location's parent region name with the `parent_region_name` argument of `CanReachLocation`.
You can also set a rule for your world's completion condition:
```python
self.set_completion_rule(rule)
```
### Restricting options
Every rule allows you to specify which options it's applicable for. You can provide the argument `options` which is an iterable of `OptionFilter` instances. Rules that pass the options check will be resolved as normal, and those that fail will be resolved as `False`.
If you want a comparison that isn't equals, you can specify with the `operator` argument. The following operators are allowed:
- `eq`: `==`
- `ne`: `!=`
- `gt`: `>`
- `lt`: `<`
- `ge`: `>=`
- `le`: `<=`
- `contains`: `in`
By default rules that are excluded by their options will default to `False`. If you want to default to `True` instead, you can specify `filtered_resolution=True` on your rule.
To check if the player can reach a switch, or if they've received the switch item if switches are randomized:
```python
rule = (
Has("Red switch", options=[OptionFilter(SwitchRando, 1)])
| CanReachLocation("Red switch", options=[OptionFilter(SwitchRando, 0)])
)
```
To add an extra logic requirement on the easiest difficulty which is ignored for other difficulties:
```python
rule = (
# ...the rest of the logic
& Has("QoL item", options=[OptionFilter(Difficulty, Difficulty.option_easy)], filtered_resolution=True)
)
```
If you would like to provide option filters when reusing or composing rules, you can use the `Filtered` helper rule:
```python
common_rule = Has("A") | HasAny("B", "C")
...
rule = (
Filtered(common_rule, options=[OptionFilter(Opt, 0)]),
| Filtered(Has("X") | CanReachRegion("Y"), options=[OptionFilter(Opt, 1)]),
)
```
You can also use the & and | operators to apply options to rules:
```python
common_rule = Has("A")
easy_filter = [OptionFilter(Difficulty, Difficulty.option_easy)]
common_rule_only_on_easy = common_rule & easy_filter
common_rule_skipped_on_easy = common_rule | easy_filter
```
## Enabling caching
The rule builder provides a `CachedRuleBuilderWorld` base class for your `World` class that enables caching on your rules.
```python
class MyWorld(CachedRuleBuilderWorld):
game = "My Game"
```
If your world's logic is very simple and you don't have many nested rules, the caching system may have more overhead cost than time it saves. You'll have to benchmark your own world to see if it should be enabled or not.
### Item name mapping
If you have multiple real items that map to a single logic item, add a `item_mapping` class dict to your world that maps actual item names to real item names so the cache system knows what to invalidate.
For example, if you have multiple `Currency x<num>` items on locations, but your rules only check a singular logical `Currency` item, eg `Has("Currency", 1000)`, you'll want to map each numerical currency item to the single logical `Currency`.
```python
class MyWorld(CachedRuleBuilderWorld):
item_mapping = {
"Currency x10": "Currency",
"Currency x50": "Currency",
"Currency x100": "Currency",
"Currency x500": "Currency",
}
```
## Defining custom rules
You can create a custom rule by creating a class that inherits from `Rule` or any of the default rules. You must provide the game name as an argument to the class. It's recommended to use the `@dataclass` decorator to reduce boilerplate, and to also provide your world as a type argument to add correct type checking to the `_instantiate` method.
You must provide or inherit a `Resolved` child class that defines an `_evaluate` method. This class will automatically be converted into a frozen `dataclass`. If your world has caching enabled you may need to define one or more dependencies functions as outlined below.
To add a rule that checks if the user has enough mcguffins to goal, with a randomized requirement:
```python
@dataclasses.dataclass()
class CanGoal(Rule["MyWorld"], game="My Game"):
@override
def _instantiate(self, world: "MyWorld") -> Rule.Resolved:
# caching_enabled only needs to be passed in when your world inherits from CachedRuleBuilderWorld
return self.Resolved(world.required_mcguffins, player=world.player, caching_enabled=True)
class Resolved(Rule.Resolved):
goal: int
@override
def _evaluate(self, state: CollectionState) -> bool:
return state.has("McGuffin", self.player, count=self.goal)
@override
def item_dependencies(self) -> dict[str, set[int]]:
# this function is only required if you have caching enabled
return {"McGuffin": {id(self)}}
@override
def explain_json(self, state: CollectionState | None = None) -> list[JSONMessagePart]:
# this method can be overridden to display custom explanations
return [
{"type": "text", "text": "Goal with "},
{"type": "color", "color": "green" if state and self(state) else "salmon", "text": str(self.goal)},
{"type": "text", "text": " McGuffins"},
]
```
Your custom rule can also resolve to builtin rules instead of needing to define your own:
```python
@dataclasses.dataclass()
class ComplicatedFilter(Rule["MyWorld"], game="My Game"):
def _instantiate(self, world: "MyWorld") -> Rule.Resolved:
if world.some_precalculated_bool:
return Has("Item 1").resolve(world)
if world.options.some_option:
return CanReachRegion("Region 1").resolve(world)
return False_().resolve(world)
```
### Item dependencies
If your world inherits from `CachedRuleBuilderWorld` and there are items that when collected will affect the result of your rule evaluation, it must define an `item_dependencies` function that returns a mapping of the item name to the id of your rule. These dependencies will be combined to inform the caching system. It may be worthwhile to define this function even when caching is disabled as more things may use it in the future.
```python
@dataclasses.dataclass()
class MyRule(Rule["MyWorld"], game="My Game"):
class Resolved(Rule.Resolved):
item_name: str
@override
def item_dependencies(self) -> dict[str, set[int]]:
return {self.item_name: {id(self)}}
```
All of the default `Has*` rules define this function already.
### Region dependencies
If your custom rule references other regions, it must define a `region_dependencies` function that returns a mapping of region names to the id of your rule regardless of if your world inherits from `CachedRuleBuilderWorld`. These dependencies will be combined to register indirect connections when you set this rule on an entrance and inform the caching system if applicable.
```python
@dataclasses.dataclass()
class MyRule(Rule["MyWorld"], game="My Game"):
class Resolved(Rule.Resolved):
region_name: str
@override
def region_dependencies(self) -> dict[str, set[int]]:
return {self.region_name: {id(self)}}
```
The default `CanReachLocation`, `CanReachRegion`, and `CanReachEntrance` rules define this function already.
### Location dependencies
If your custom rule references other locations, it must define a `location_dependencies` function that returns a mapping of the location name to the id of your rule regardless of if your world inherits from `CachedRuleBuilderWorld`. These dependencies will be combined to register indirect connections when you set this rule on an entrance and inform the caching system if applicable.
```python
@dataclasses.dataclass()
class MyRule(Rule["MyWorld"], game="My Game"):
class Resolved(Rule.Resolved):
location_name: str
@override
def location_dependencies(self) -> dict[str, set[int]]:
return {self.location_name: {id(self)}}
```
The default `CanReachLocation` rule defines this function already.
### Entrance dependencies
If your custom rule references other entrances, it must define a `entrance_dependencies` function that returns a mapping of the entrance name to the id of your rule regardless of if your world inherits from `CachedRuleBuilderWorld`. These dependencies will be combined to register indirect connections when you set this rule on an entrance and inform the caching system if applicable.
```python
@dataclasses.dataclass()
class MyRule(Rule["MyWorld"], game="My Game"):
class Resolved(Rule.Resolved):
entrance_name: str
@override
def entrance_dependencies(self) -> dict[str, set[int]]:
return {self.entrance_name: {id(self)}}
```
The default `CanReachEntrance` rule defines this function already.
### Rule explanations
Resolved rules have a default implementation for `explain_json` and `explain_str` functions. The former optionally accepts a `CollectionState` and returns a list of `JSONMessagePart` appropriate for `print_json` in a client. It will display a human-readable message that explains what the rule requires. The latter is similar but returns a string. It is useful when debugging. There is also a `__str__` method defined to check what a rule is without a state.
To implement a custom message with a custom rule, override the `explain_json` and/or `explain_str` method on your `Resolved` class:
```python
class MyRule(Rule, game="My Game"):
class Resolved(Rule.Resolved):
@override
def explain_json(self, state: CollectionState | None = None) -> list[JSONMessagePart]:
has_item = state and state.has("growth spurt", self.player)
color = "yellow"
start = "You must be "
if has_item:
start = "You are "
color = "green"
elif state is not None:
start = "You are not "
color = "salmon"
return [
{"type": "text", "text": start},
{"type": "color", "color": color, "text": "THIS"},
{"type": "text", "text": " tall to beat the game"},
]
@override
def explain_str(self, state: CollectionState | None = None) -> str:
if state is None:
return str(self)
if state.has("growth spurt", self.player):
return "You ARE this tall and can beat the game"
return "You are not THIS tall and cannot beat the game"
@override
def __str__(self) -> str:
return "You must be THIS tall to beat the game"
```
### Cache control
By default your custom rule will work through the cache system as any other rule if caching is enabled. There are two class attributes on the `Resolved` class you can override to change this behavior.
- `force_recalculate`: Setting this to `True` will cause your custom rule to skip going through the caching system and always recalculate when being evaluated. When a rule with this flag enabled is composed with `And` or `Or` it will cause any parent rules to always force recalculate as well. Use this flag when it's difficult to determine when your rule should be marked as stale.
- `skip_cache`: Setting this to `True` will also cause your custom rule to skip going through the caching system when being evaluated. However, it will **not** affect any other rules when composed with `And` or `Or`, so it must still define its `*_dependencies` functions as required. Use this flag when the evaluation of this rule is trivial and the overhead of the caching system will slow it down.
### Caveats
- Ensure you are passing `caching_enabled=True` in your `_instantiate` function when creating resolved rule instances if your world has opted into caching.
- Resolved rules are forced to be frozen dataclasses. They and all their attributes must be immutable and hashable.
- If your rule creates child rules ensure they are being resolved through the world rather than creating `Resolved` instances directly.
## Serialization
The rule builder is intended to be written first in Python for optimization and type safety. To facilitate exporting the rules to a client or tracker, rules have a `to_dict` method that returns a JSON-compatible dict. Since the location and entrance logic structure varies greatly from world to world, the actual JSON dumping is left up to the world dev.
The dict contains a `rule` key with the name of the rule, an `options` key with the rule's list of option filters, and an `args` key that contains any other arguments the individual rule has. For example, this is what a simple `Has` rule would look like:
```python
{
"rule": "Has",
"options": [],
"args": {
"item_name": "Some item",
"count": 1,
},
}
```
For `And` and `Or` rules, instead of an `args` key, they have a `children` key containing a list of their child rules in the same serializable format:
```python
{
"rule": "And",
"options": [],
"children": [
..., # each serialized rule
]
}
```
A full example is as follows:
```python
rule = And(
Has("a", options=[OptionFilter(ToggleOption, 0)]),
Or(Has("b", count=2), CanReachRegion("c"), options=[OptionFilter(ToggleOption, 1)]),
)
assert rule.to_dict() == {
"rule": "And",
"options": [],
"children": [
{
"rule": "Has",
"options": [
{
"option": "worlds.my_world.options.ToggleOption",
"value": 0,
"operator": "eq",
},
],
"args": {
"item_name": "a",
"count": 1,
},
},
{
"rule": "Or",
"options": [
{
"option": "worlds.my_world.options.ToggleOption",
"value": 1,
"operator": "eq",
},
],
"children": [
{
"rule": "Has",
"options": [],
"args": {
"item_name": "b",
"count": 2,
},
},
{
"rule": "CanReachRegion",
"options": [],
"args": {
"region_name": "c",
},
},
],
},
],
}
```
### Custom serialization
To define a different format for your custom rules, override the `to_dict` function:
```python
class BasicLogicRule(Rule, game="My Game"):
items = ("one", "two")
def to_dict(self) -> dict[str, Any]:
# Return whatever format works best for you
return {
"logic": "basic",
"items": self.items,
}
```
If your logic has been done in custom JSON first, you can define a `from_dict` class method on your rules to parse it correctly:
```python
class BasicLogicRule(Rule, game="My Game"):
@classmethod
def from_dict(cls, data: Mapping[str, Any], world_cls: type[World]) -> Self:
items = data.get("items", ())
return cls(*items)
```
## APIs
This section is provided for reference, refer to the above sections for examples.
### World API
These are properties and helpers that are available to you in your world.
#### Methods
- `rule_from_dict(data)`: Create a rule instance from a deserialized dict representation
- `register_rule_builder_dependencies()`: Register all rules that depend on location or entrance access with the inherited dependencies, gets called automatically after set_rules
- `set_rule(spot: Location | Entrance, rule: Rule)`: Resolve a rule, register its dependencies, and set it on the given location or entrance
- `set_completion_rule(rule: Rule)`: Sets the completion condition for this world
- `create_entrance(from_region: Region, to_region: Region, rule: Rule | None, name: str | None = None, force_creation: bool = False)`: Attempt to create an entrance from `from_region` to `to_region`, skipping creation if `rule` is defined and evaluates to `False_()` unless force_creation is `True`
#### CachedRuleBuilderWorld Properties
The following property is only available when inheriting from `CachedRuleBuilderWorld`
- `item_mapping: dict[str, str]`: A mapping of actual item name to logical item name
### Rule API
These are properties and helpers that you can use or override for custom rules.
- `_instantiate(world: World)`: Create a new resolved rule instance, override for custom rules as required
- `to_dict()`: Create a JSON-compatible dict representation of this rule, override if you want to customize your rule's serialization
- `from_dict(data, world_cls: type[World])`: Return a new rule instance from a deserialized representation, override if you've overridden `to_dict`
- `__str__()`: Basic string representation of a rule, useful for debugging
#### Resolved rule API
- `player: int`: The slot this rule is resolved for
- `_evaluate(state: CollectionState)`: Evaluate this rule against the given state, override this to define the logic for this rule
- `item_dependencies()`: A mapping of item name to set of ids, override this if your custom rule depends on item collection
- `region_dependencies()`: A mapping of region name to set of ids, override this if your custom rule depends on reaching regions
- `location_dependencies()`: A mapping of location name to set of ids, override this if your custom rule depends on reaching locations
- `entrance_dependencies()`: A mapping of entrance name to set of ids, override this if your custom rule depends on reaching entrances
- `explain_json(state: CollectionState | None = None)`: Return a list of printJSON messages describing this rule's logic (and if state is defined its evaluation) in a human readable way, override to explain custom rules
- `explain_str(state: CollectionState | None = None)`: Return a string describing this rule's logic (and if state is defined its evaluation) in a human readable way, override to explain custom rules, more useful for debugging
- `__str__()`: A string describing this rule's logic without its evaluation, override to explain custom rules

0
rule_builder/__init__.py Normal file
View File

View File

@@ -0,0 +1,146 @@
from collections import defaultdict
from typing import ClassVar, cast
from typing_extensions import override
from BaseClasses import CollectionState, Item, MultiWorld, Region
from worlds.AutoWorld import LogicMixin, World
from .rules import Rule
class CachedRuleBuilderWorld(World):
"""A World subclass that provides helpers for interacting with the rule builder"""
rule_item_dependencies: dict[str, set[int]]
"""A mapping of item name to set of rule ids"""
rule_region_dependencies: dict[str, set[int]]
"""A mapping of region name to set of rule ids"""
rule_location_dependencies: dict[str, set[int]]
"""A mapping of location name to set of rule ids"""
rule_entrance_dependencies: dict[str, set[int]]
"""A mapping of entrance name to set of rule ids"""
item_mapping: ClassVar[dict[str, str]] = {}
"""A mapping of actual item name to logical item name.
Useful when there are multiple versions of a collected item but the logic only uses one. For example:
item = Item("Currency x500"), rule = Has("Currency", count=1000), item_mapping = {"Currency x500": "Currency"}"""
rule_caching_enabled: ClassVar[bool] = True
"""Flag to inform rules that the caching system for this world is enabled. It should not be overridden."""
def __init__(self, multiworld: MultiWorld, player: int) -> None:
super().__init__(multiworld, player)
self.rule_item_dependencies = defaultdict(set)
self.rule_region_dependencies = defaultdict(set)
self.rule_location_dependencies = defaultdict(set)
self.rule_entrance_dependencies = defaultdict(set)
@override
def register_rule_dependencies(self, resolved_rule: Rule.Resolved) -> None:
for item_name, rule_ids in resolved_rule.item_dependencies().items():
self.rule_item_dependencies[item_name] |= rule_ids
for region_name, rule_ids in resolved_rule.region_dependencies().items():
self.rule_region_dependencies[region_name] |= rule_ids
for location_name, rule_ids in resolved_rule.location_dependencies().items():
self.rule_location_dependencies[location_name] |= rule_ids
for entrance_name, rule_ids in resolved_rule.entrance_dependencies().items():
self.rule_entrance_dependencies[entrance_name] |= rule_ids
def register_rule_builder_dependencies(self) -> None:
"""Register all rules that depend on locations or entrances with their dependencies"""
for location_name, rule_ids in self.rule_location_dependencies.items():
try:
location = self.get_location(location_name)
except KeyError:
continue
if not isinstance(location.access_rule, Rule.Resolved):
continue
for item_name in location.access_rule.item_dependencies():
self.rule_item_dependencies[item_name] |= rule_ids
for region_name in location.access_rule.region_dependencies():
self.rule_region_dependencies[region_name] |= rule_ids
for entrance_name, rule_ids in self.rule_entrance_dependencies.items():
try:
entrance = self.get_entrance(entrance_name)
except KeyError:
continue
if not isinstance(entrance.access_rule, Rule.Resolved):
continue
for item_name in entrance.access_rule.item_dependencies():
self.rule_item_dependencies[item_name] |= rule_ids
for region_name in entrance.access_rule.region_dependencies():
self.rule_region_dependencies[region_name] |= rule_ids
@override
def collect(self, state: CollectionState, item: Item) -> bool:
changed = super().collect(state, item)
if changed and self.rule_item_dependencies:
player_results = cast(dict[int, bool], state.rule_builder_cache[self.player]) # pyright: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
mapped_name = self.item_mapping.get(item.name, "")
rule_ids = self.rule_item_dependencies[item.name] | self.rule_item_dependencies[mapped_name]
for rule_id in rule_ids:
if player_results.get(rule_id, None) is False:
del player_results[rule_id]
return changed
@override
def remove(self, state: CollectionState, item: Item) -> bool:
changed = super().remove(state, item)
if not changed:
return changed
player_results = cast(dict[int, bool], state.rule_builder_cache[self.player]) # pyright: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
if self.rule_item_dependencies:
mapped_name = self.item_mapping.get(item.name, "")
rule_ids = self.rule_item_dependencies[item.name] | self.rule_item_dependencies[mapped_name]
for rule_id in rule_ids:
player_results.pop(rule_id, None)
# clear all region dependent caches as none can be trusted
if self.rule_region_dependencies:
for rule_ids in self.rule_region_dependencies.values():
for rule_id in rule_ids:
player_results.pop(rule_id, None)
# clear all location dependent caches as they may have lost region access
if self.rule_location_dependencies:
for rule_ids in self.rule_location_dependencies.values():
for rule_id in rule_ids:
player_results.pop(rule_id, None)
# clear all entrance dependent caches as they may have lost region access
if self.rule_entrance_dependencies:
for rule_ids in self.rule_entrance_dependencies.values():
for rule_id in rule_ids:
player_results.pop(rule_id, None)
return changed
@override
def reached_region(self, state: CollectionState, region: Region) -> None:
super().reached_region(state, region)
if self.rule_region_dependencies:
player_results = cast(dict[int, bool], state.rule_builder_cache[self.player]) # pyright: ignore[reportAttributeAccessIssue, reportUnknownMemberType]
for rule_id in self.rule_region_dependencies[region.name]:
player_results.pop(rule_id, None)
class CachedRuleBuilderLogicMixin(LogicMixin):
multiworld: MultiWorld # pyright: ignore[reportUninitializedInstanceVariable]
rule_builder_cache: dict[int, dict[int, bool]] # pyright: ignore[reportUninitializedInstanceVariable]
def init_mixin(self, multiworld: "MultiWorld") -> None:
players = multiworld.get_all_ids()
self.rule_builder_cache = {player: {} for player in players}
def copy_mixin(self, new_state: "CachedRuleBuilderLogicMixin") -> "CachedRuleBuilderLogicMixin":
new_state.rule_builder_cache = {
player: player_results.copy() for player, player_results in self.rule_builder_cache.items()
}
return new_state

91
rule_builder/options.py Normal file
View File

@@ -0,0 +1,91 @@
import dataclasses
import importlib
import operator
from collections.abc import Callable, Iterable
from typing import Any, Final, Literal, Self, cast
from typing_extensions import override
from Options import CommonOptions, Option
Operator = Literal["eq", "ne", "gt", "lt", "ge", "le", "contains", "in"]
OPERATORS: Final[dict[Operator, Callable[..., bool]]] = {
"eq": operator.eq,
"ne": operator.ne,
"gt": operator.gt,
"lt": operator.lt,
"ge": operator.ge,
"le": operator.le,
"contains": operator.contains,
"in": operator.contains,
}
OPERATOR_STRINGS: Final[dict[Operator, str]] = {
"eq": "==",
"ne": "!=",
"gt": ">",
"lt": "<",
"ge": ">=",
"le": "<=",
}
REVERSE_OPERATORS: Final[tuple[Operator, ...]] = ("in",)
@dataclasses.dataclass(frozen=True)
class OptionFilter:
option: type[Option[Any]]
value: Any
operator: Operator = "eq"
def to_dict(self) -> dict[str, Any]:
"""Returns a JSON compatible dict representation of this option filter"""
return {
"option": f"{self.option.__module__}.{self.option.__name__}",
"value": self.value,
"operator": self.operator,
}
def check(self, options: CommonOptions) -> bool:
"""Tests the given options dataclass to see if it passes this option filter"""
option_name = next(
(name for name, cls in options.__class__.type_hints.items() if cls is self.option),
None,
)
if option_name is None:
raise ValueError(f"Cannot find option {self.option.__name__} in options class {options.__class__.__name__}")
opt = cast(Option[Any] | None, getattr(options, option_name, None))
if opt is None:
raise ValueError(f"Invalid option: {option_name}")
fn = OPERATORS[self.operator]
return fn(self.value, opt) if self.operator in REVERSE_OPERATORS else fn(opt, self.value)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Self:
"""Returns a new OptionFilter instance from a dict representation"""
if "option" not in data or "value" not in data:
raise ValueError("Missing required value and/or option")
option_path = data["option"]
try:
option_mod_name, option_cls_name = option_path.rsplit(".", 1)
option_module = importlib.import_module(option_mod_name)
option = getattr(option_module, option_cls_name, None)
except (ValueError, ImportError) as e:
raise ValueError(f"Cannot parse option '{option_path}'") from e
if option is None or not issubclass(option, Option):
raise ValueError(f"Invalid option '{option_path}' returns type '{option}' instead of Option subclass")
value = data["value"]
operator = data.get("operator", "eq")
return cls(option=cast(type[Option[Any]], option), value=value, operator=operator)
@classmethod
def multiple_from_dict(cls, data: Iterable[dict[str, Any]]) -> tuple[Self, ...]:
"""Returns a tuple of OptionFilters instances from an iterable of dict representations"""
return tuple(cls.from_dict(o) for o in data)
@override
def __str__(self) -> str:
op = OPERATOR_STRINGS.get(self.operator, self.operator)
return f"{self.option.__name__} {op} {self.value}"

1791
rule_builder/rules.py Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -5,17 +5,18 @@ import logging
import pathlib
import sys
import time
from collections.abc import Callable, Iterable, Mapping
from random import Random
from dataclasses import make_dataclass
from typing import (Any, Callable, ClassVar, Dict, FrozenSet, Iterable, List, Mapping, Optional, Set, TextIO, Tuple,
from typing import (Any, ClassVar, Dict, FrozenSet, List, Optional, Self, Set, TextIO, Tuple,
TYPE_CHECKING, Type, Union)
from Options import item_and_loc_options, ItemsAccessibility, OptionGroup, PerGameCommonOptions
from BaseClasses import CollectionState
from BaseClasses import CollectionState, Entrance
from rule_builder.rules import CustomRuleRegister, Rule
from Utils import Version
if TYPE_CHECKING:
from BaseClasses import MultiWorld, Item, Location, Tutorial, Region, Entrance
from BaseClasses import CollectionRule, Item, Location, MultiWorld, Region, Tutorial
from NetUtils import GamesPackage, MultiData
from settings import Group
@@ -177,7 +178,8 @@ def _timed_call(method: Callable[..., Any], *args: Any,
def call_single(multiworld: "MultiWorld", method_name: str, player: int, *args: Any) -> Any:
method = getattr(multiworld.worlds[player], method_name)
world = multiworld.worlds[player]
method = getattr(world, method_name)
try:
ret = _timed_call(method, *args, multiworld=multiworld, player=player)
except Exception as e:
@@ -188,6 +190,10 @@ def call_single(multiworld: "MultiWorld", method_name: str, player: int, *args:
logging.error(message)
raise e
else:
# Convenience for CachedRuleBuilderWorld users: Ensure that caching setup function is called
# Can be removed once dependency system is improved
if method_name == "set_rules" and hasattr(world, "register_rule_builder_dependencies"):
call_single(multiworld, "register_rule_builder_dependencies", player)
return ret
@@ -549,6 +555,10 @@ class World(metaclass=AutoWorldRegister):
return True
return False
def reached_region(self, state: "CollectionState", region: "Region") -> None:
"""Called when a region is newly reachable by the state."""
pass
# following methods should not need to be overridden.
def create_filler(self) -> "Item":
return self.create_item(self.get_filler_item_name())
@@ -597,6 +607,64 @@ class World(metaclass=AutoWorldRegister):
res["checksum"] = data_package_checksum(res)
return res
@classmethod
def get_rule_cls(cls, name: str) -> type[Rule[Self]]:
"""Returns the world-registered or default rule with the given name"""
return CustomRuleRegister.get_rule_cls(cls.game, name)
@classmethod
def rule_from_dict(cls, data: Mapping[str, Any]) -> Rule[Self]:
"""Create a rule instance from a serialized dict representation"""
name = data.get("rule", "")
rule_class = cls.get_rule_cls(name)
return rule_class.from_dict(data, cls)
def set_rule(self, spot: Location | Entrance, rule: CollectionRule | Rule[Any]) -> None:
"""Sets an access rule for a location or entrance"""
if isinstance(rule, Rule):
rule = rule.resolve(self)
self.register_rule_dependencies(rule)
if isinstance(spot, Entrance):
self._register_rule_indirects(rule, spot)
spot.access_rule = rule
def set_completion_rule(self, rule: CollectionRule | Rule[Any]) -> None:
"""Set the completion rule for this world"""
if isinstance(rule, Rule):
rule = rule.resolve(self)
self.register_rule_dependencies(rule)
self.multiworld.completion_condition[self.player] = rule
def create_entrance(
self,
from_region: Region,
to_region: Region,
rule: CollectionRule | Rule[Any] | None = None,
name: str | None = None,
force_creation: bool = False,
) -> Entrance | None:
"""Try to create an entrance between regions with the given rule,
skipping it if the rule resolves to False (unless force_creation is True)"""
if rule is not None and isinstance(rule, Rule):
rule = rule.resolve(self)
if rule.always_false and not force_creation:
return None
self.register_rule_dependencies(rule)
entrance = from_region.connect(to_region, name, rule=rule)
if rule and isinstance(rule, Rule.Resolved):
self._register_rule_indirects(rule, entrance)
return entrance
def register_rule_dependencies(self, resolved_rule: Rule.Resolved) -> None:
"""Hook for registering dependencies when a rule is assigned for this world"""
pass
def _register_rule_indirects(self, resolved_rule: Rule.Resolved, entrance: Entrance) -> None:
if self.explicit_indirect_conditions:
for indirect_region in resolved_rule.region_dependencies().keys():
self.multiworld.register_indirect_condition(self.get_region(indirect_region), entrance)
# any methods attached to this can be used as part of CollectionState,
# please use a prefix as all of them get clobbered together

View File

@@ -2,16 +2,10 @@ import collections
import logging
import typing
from BaseClasses import LocationProgressType, MultiWorld, Location, Region, Entrance
from BaseClasses import (CollectionRule, CollectionState, Entrance, Item, Location,
LocationProgressType, MultiWorld, Region)
if typing.TYPE_CHECKING:
import BaseClasses
CollectionRule = typing.Callable[[BaseClasses.CollectionState], bool]
ItemRule = typing.Callable[[BaseClasses.Item], bool]
else:
CollectionRule = typing.Callable[[object], bool]
ItemRule = typing.Callable[[object], bool]
ItemRule = typing.Callable[[Item], bool]
def locality_needed(multiworld: MultiWorld) -> bool:
@@ -96,11 +90,11 @@ def exclusion_rules(multiworld: MultiWorld, player: int, exclude_locations: typi
logging.warning(f"Unable to exclude location {loc_name} in player {player}'s world.")
def set_rule(spot: typing.Union["BaseClasses.Location", "BaseClasses.Entrance"], rule: CollectionRule):
def set_rule(spot: typing.Union[Location, Entrance], rule: CollectionRule):
spot.access_rule = rule
def add_rule(spot: typing.Union["BaseClasses.Location", "BaseClasses.Entrance"], rule: CollectionRule, combine="and"):
def add_rule(spot: typing.Union[Location, Entrance], rule: CollectionRule, combine="and"):
old_rule = spot.access_rule
# empty rule, replace instead of add
if old_rule is Location.access_rule or old_rule is Entrance.access_rule:
@@ -112,7 +106,7 @@ def add_rule(spot: typing.Union["BaseClasses.Location", "BaseClasses.Entrance"],
spot.access_rule = lambda state: rule(state) or old_rule(state)
def forbid_item(location: "BaseClasses.Location", item: str, player: int):
def forbid_item(location: Location, item: str, player: int):
old_rule = location.item_rule
# empty rule
if old_rule is Location.item_rule:
@@ -121,18 +115,18 @@ def forbid_item(location: "BaseClasses.Location", item: str, player: int):
location.item_rule = lambda i: (i.name != item or i.player != player) and old_rule(i)
def forbid_items_for_player(location: "BaseClasses.Location", items: typing.Set[str], player: int):
def forbid_items_for_player(location: Location, items: typing.Set[str], player: int):
old_rule = location.item_rule
location.item_rule = lambda i: (i.player != player or i.name not in items) and old_rule(i)
def forbid_items(location: "BaseClasses.Location", items: typing.Set[str]):
def forbid_items(location: Location, items: typing.Set[str]):
"""unused, but kept as a debugging tool."""
old_rule = location.item_rule
location.item_rule = lambda i: i.name not in items and old_rule(i)
def add_item_rule(location: "BaseClasses.Location", rule: ItemRule, combine: str = "and"):
def add_item_rule(location: Location, rule: ItemRule, combine: str = "and"):
old_rule = location.item_rule
# empty rule, replace instead of add
if old_rule is Location.item_rule:
@@ -144,7 +138,7 @@ def add_item_rule(location: "BaseClasses.Location", rule: ItemRule, combine: str
location.item_rule = lambda item: rule(item) or old_rule(item)
def item_name_in_location_names(state: "BaseClasses.CollectionState", item: str, player: int,
def item_name_in_location_names(state: CollectionState, item: str, player: int,
location_name_player_pairs: typing.Sequence[typing.Tuple[str, int]]) -> bool:
for location in location_name_player_pairs:
if location_item_name(state, location[0], location[1]) == (item, player):
@@ -153,14 +147,14 @@ def item_name_in_location_names(state: "BaseClasses.CollectionState", item: str,
def item_name_in_locations(item: str, player: int,
locations: typing.Sequence["BaseClasses.Location"]) -> bool:
locations: typing.Sequence[Location]) -> bool:
for location in locations:
if location.item and location.item.name == item and location.item.player == player:
return True
return False
def location_item_name(state: "BaseClasses.CollectionState", location: str, player: int) -> \
def location_item_name(state: CollectionState, location: str, player: int) -> \
typing.Optional[typing.Tuple[str, int]]:
location = state.multiworld.get_location(location, player)
if location.item is None: