exploration.analysis
- Authors: Peter Mawhorter
- Consulted:
- Date: 2022-10-24
- Purpose: Analysis functions for decision graphs an explorations.
1""" 2- Authors: Peter Mawhorter 3- Consulted: 4- Date: 2022-10-24 5- Purpose: Analysis functions for decision graphs an explorations. 6""" 7 8from typing import ( 9 List, Dict, Tuple, Optional, TypeVar, Callable, Union, Any, 10 ParamSpec, Concatenate, Set, cast, Type, TypeAlias, Literal, 11 TypedDict, Protocol, Sequence, Callable, Collection 12) 13 14from types import FunctionType 15 16from . import base, core, parsing 17 18import textwrap 19import functools 20import inspect 21import time 22 23import networkx as nx 24 25 26#-------------------# 27# Text descriptions # 28#-------------------# 29 30def describeConsequence(consequence: base.Consequence) -> str: 31 """ 32 Returns a string which concisely describes a consequence list. 33 Returns an empty string if given an empty consequence. Examples: 34 35 >>> describeConsequence([]) 36 '' 37 >>> describeConsequence([ 38 ... base.effect(gain=('gold', 5), delay=2, charges=3), 39 ... base.effect(lose='flight') 40 ... ]) 41 'gain gold*5 ,2 =3; lose flight' 42 >>> from . import commands 43 >>> d = describeConsequence([ 44 ... base.effect(edit=[ 45 ... [ 46 ... commands.command('val', '5'), 47 ... commands.command('empty', 'list'), 48 ... commands.command('append') 49 ... ], 50 ... [ 51 ... commands.command('val', '11'), 52 ... commands.command('assign', 'var'), 53 ... commands.command('op', '+', '$var', '$var') 54 ... ], 55 ... ]) 56 ... ]) 57 >>> d 58 'with consequences:\ 59\\n edit {\ 60\\n val 5;\ 61\\n empty list;\ 62\\n append $_;\ 63\\n } {\ 64\\n val 11;\ 65\\n assign var $_;\ 66\\n op + $var $var;\ 67\\n }\ 68\\n' 69 >>> for line in d.splitlines(): 70 ... print(line) 71 with consequences: 72 edit { 73 val 5; 74 empty list; 75 append $_; 76 } { 77 val 11; 78 assign var $_; 79 op + $var $var; 80 } 81 """ 82 edesc = '' 83 pf = parsing.ParseFormat() 84 if consequence: 85 parts = [] 86 for item in consequence: 87 # TODO: Challenges and Conditions here! 88 if 'skills' in item: # a Challenge 89 item = cast(base.Challenge, item) 90 parts.append(pf.unparseChallenge(item)) 91 elif 'value' in item: # an Effect 92 item = cast(base.Effect, item) 93 parts.append(pf.unparseEffect(item)) 94 elif 'condition' in item: # a Condition 95 item = cast(base.Condition, item) 96 parts.append(pf.unparseCondition(item)) 97 else: 98 raise TypeError( 99 f"Invalid consequence item (no 'skills', 'value', or" 100 f" 'condition' key found):\n{repr(item)}" 101 ) 102 edesc = '; '.join(parts) 103 if len(edesc) > 60 or '\n' in edesc: 104 edesc = 'with consequences:\n' + ';\n'.join( 105 textwrap.indent(part, ' ') 106 for part in parts 107 ) + '\n' 108 109 return edesc 110 111 112def describeProgress(exploration: core.DiscreteExploration) -> str: 113 """ 114 Describes the progress of an exploration by noting each room/zone 115 visited and explaining the options visible at each point plus which 116 option was taken. Notes powers/tokens gained/lost along the way. 117 Returns a string. 118 119 Example: 120 >>> from exploration import journal 121 >>> e = journal.convertJournal('''\\ 122 ... S Start::pit 123 ... A gain jump 124 ... A gain attack 125 ... n button check 126 ... zz Wilds 127 ... o up 128 ... q _flight 129 ... o left 130 ... x left left_nook right 131 ... a geo_rock 132 ... At gain geo*15 133 ... At deactivate 134 ... o up 135 ... q _tall_narrow 136 ... t right 137 ... o right 138 ... q attack 139 ... ''') 140 >>> for line in describeProgress(e).splitlines(): 141 ... print(line) 142 Start of the exploration 143 Start exploring domain main at 0 (Start::pit) 144 Gained capability 'attack' 145 Gained capability 'jump' 146 At decision 0 (Start::pit) 147 In zone Start 148 In region Wilds 149 There are transitions: 150 left to unconfirmed 151 up to unconfirmed; requires _flight 152 1 note(s) at this step 153 Explore left from decision 0 (Start::pit) to 2 (now Start::left_nook) 154 At decision 2 (Start::left_nook) 155 There are transitions: 156 right to 0 (Start::pit) 157 There are actions: 158 geo_rock 159 Do action geo_rock 160 Gained 15 geo(s) 161 Take right from decision 2 (Start::left_nook) to 0 (Start::pit) 162 At decision 0 (Start::pit) 163 There are transitions: 164 left to 2 (Start::left_nook) 165 right to unconfirmed; requires attack 166 up to unconfirmed; requires _flight 167 Waiting for another action... 168 End of the exploration. 169 """ 170 result = '' 171 172 regions: Set[base.Zone] = set() 173 zones: Set[base.Zone] = set() 174 last: Union[base.DecisionID, Set[base.DecisionID], None] = None 175 lastState: base.State = base.emptyState() 176 prevCapabilities = base.effectiveCapabilitySet(lastState) 177 prevMechanisms = lastState['mechanisms'] 178 oldActiveDecisions: Set[base.DecisionID] = set() 179 for i, situation in enumerate(exploration): 180 if i == 0: 181 result += "Start of the exploration\n" 182 183 # Extract info 184 graph = situation.graph 185 activeDecisions = exploration.getActiveDecisions(i) 186 newActive = activeDecisions - oldActiveDecisions 187 departedFrom = exploration.movementAtStep(i)[0] 188 # TODO: use the other parts of this? 189 nowZones: Set[base.Zone] = set() 190 for active in activeDecisions: 191 nowZones |= graph.zoneAncestors(active) 192 regionsHere = set( 193 z 194 for z in nowZones 195 if graph.zoneHierarchyLevel(z) == 1 196 ) 197 zonesHere = set( 198 z 199 for z in nowZones 200 if graph.zoneHierarchyLevel(z) == 0 201 ) 202 here = departedFrom 203 state = situation.state 204 capabilities = base.effectiveCapabilitySet(state) 205 mechanisms = state['mechanisms'] 206 207 # Describe capabilities gained/lost relative to previous step 208 # (i.e., as a result of the previous action) 209 gained = ( 210 capabilities['capabilities'] 211 - prevCapabilities['capabilities'] 212 ) 213 gainedTokens = [] 214 for tokenType in capabilities['tokens']: 215 net = ( 216 capabilities['tokens'][tokenType] 217 - prevCapabilities['tokens'].get(tokenType, 0) 218 ) 219 if net != 0: 220 gainedTokens.append((tokenType, net)) 221 changed = [ 222 mID 223 for mID in list(mechanisms.keys()) + list(prevMechanisms.keys()) 224 if mechanisms.get(mID) != prevMechanisms.get(mID) 225 ] 226 227 for capability in sorted(gained): 228 result += f" Gained capability '{capability}'\n" 229 230 for tokenType, net in gainedTokens: 231 if net > 0: 232 result += f" Gained {net} {tokenType}(s)\n" 233 else: 234 result += f" Lost {-net} {tokenType}(s)\n" 235 236 for mID in changed: 237 oldState = prevMechanisms.get(mID, base.DEFAULT_MECHANISM_STATE) 238 newState = mechanisms.get(mID, base.DEFAULT_MECHANISM_STATE) 239 240 details = graph.mechanismDetails(mID) 241 if details is None: 242 mName = "(unknown)" 243 else: 244 mName = details[1] 245 result += ( 246 f" Set mechanism {mID} ({mName}) to {newState} (was" 247 f" {oldState})" 248 ) 249 # TODO: Test this! 250 251 if isinstance(departedFrom, base.DecisionID): 252 # Print location info 253 if here != last: 254 if here is None: 255 result += "Without a position...\n" 256 elif isinstance(here, set): 257 result += f"With {len(here)} active decisions\n" 258 # TODO: List them using namesListing? 259 else: 260 result += f"At decision {graph.identityOf(here)}\n" 261 newZones = zonesHere - zones 262 for zone in sorted(newZones): 263 result += f" In zone {zone}\n" 264 newRegions = regionsHere - regions 265 for region in sorted(newRegions): 266 result += f" In region {region}\n" 267 268 elif isinstance(departedFrom, set): # active in spreading domain 269 spreadingDomain = graph.domainFor(list(departedFrom)[0]) 270 result += ( 271 f" In domain {spreadingDomain} with {len(departedFrom)}" 272 f" active decisions...\n" 273 ) 274 275 else: 276 assert departedFrom is None 277 278 # Describe new position/positions at start of this step 279 if len(newActive) > 1: 280 newListing = ', '.join( 281 sorted(graph.identityOf(n) for n in newActive) 282 ) 283 result += ( 284 f" There are {len(newActive)} new active decisions:" 285 f"\n {newListing}" 286 ) 287 288 elif len(newActive) == 1: 289 here = list(newActive)[0] 290 291 outgoing = graph.destinationsFrom(here) 292 293 transitions = {t: d for (t, d) in outgoing.items() if d != here} 294 actions = {t: d for (t, d) in outgoing.items() if d == here} 295 if transitions: 296 result += " There are transitions:\n" 297 for transition in sorted(transitions): 298 dest = transitions[transition] 299 if not graph.isConfirmed(dest): 300 destSpec = 'unconfirmed' 301 else: 302 destSpec = graph.identityOf(dest) 303 req = graph.getTransitionRequirement(here, transition) 304 rDesc = '' 305 if req != base.ReqNothing(): 306 rDesc = f"; requires {req.unparse()}" 307 cDesc = describeConsequence( 308 graph.getConsequence(here, transition) 309 ) 310 if cDesc: 311 cDesc = '; ' + cDesc 312 result += ( 313 f" {transition} to {destSpec}{rDesc}{cDesc}\n" 314 ) 315 316 if actions: 317 result += " There are actions:\n" 318 for action in sorted(actions): 319 req = graph.getTransitionRequirement(here, action) 320 rDesc = '' 321 if req != base.ReqNothing(): 322 rDesc = f"; requires {req.unparse()}" 323 cDesc = describeConsequence( 324 graph.getConsequence(here, action) 325 ) 326 if cDesc: 327 cDesc = '; ' + cDesc 328 if rDesc or cDesc: 329 desc = (rDesc + cDesc)[2:] # chop '; ' from either 330 result += f" {action} ({desc})\n" 331 else: 332 result += f" {action}\n" 333 334 # note annotations 335 if len(situation.annotations) > 0: 336 result += ( 337 f" {len(situation.annotations)} note(s) at this step\n" 338 ) 339 340 # Describe action taken 341 if situation.action is None and situation.type == "pending": 342 result += "Waiting for another action...\n" 343 else: 344 desc = base.describeExplorationAction(situation, situation.action) 345 desc = desc[0].capitalize() + desc[1:] 346 result += desc + '\n' 347 348 if i == len(exploration) - 1: 349 result += "End of the exploration.\n" 350 351 # Update state variables 352 oldActiveDecisions = activeDecisions 353 prevCapabilities = capabilities 354 prevMechanisms = mechanisms 355 regions = regionsHere 356 zones = zonesHere 357 if here is not None: 358 last = here 359 lastState = state 360 361 return result 362 363 364#-----------------------# 365# Analysis result types # 366#-----------------------# 367 368AnalysisUnit: 'TypeAlias' = Literal[ 369 'step', 370 'stepDecision', 371 'stepTransition', 372 'decision', 373 'transition', 374 'exploration', 375] 376""" 377The different kinds of analysis units we consider: per-step-per-decision, 378per-step-per-transition, per-step, per-final-decision, 379per-final-transition, and per-exploration (i.e. overall). 380""" 381 382 383AnalysisResults: 'TypeAlias' = Dict[str, Any] 384""" 385Analysis results are dictionaries that map analysis routine names to 386results from those routines, which can be of any type. 387""" 388 389 390SpecificTransition: 'TypeAlias' = Tuple[base.DecisionID, base.Transition] 391""" 392A specific transition is identified by its source decision ID and its 393transition name. Note that transitions which get renamed are treated as 394two separate transitions. 395""" 396 397OverspecificTransition: 'TypeAlias' = Tuple[ 398 base.DecisionID, 399 base.Transition, 400 base.DecisionID 401] 402""" 403In contrast to a `SpecificTransition`, an `OverspecificTransition` 404includes the destination of the transition, which might help disambiguate 405cases where a transition is created, then re-targeted or deleted and 406re-created with a different destination. Transitions which get renamed 407still are treated as two separate transitions. 408""" 409 410DecisionAnalyses: 'TypeAlias' = Dict[base.DecisionID, AnalysisResults] 411""" 412Decision analysis results are stored per-decision, with a dictionary of 413property-name → value associations. These properties either apply to 414decisions across all steps of an exploration, or apply to decisions in a 415particular `core.DecisionGraph`. 416""" 417 418TransitionAnalyses: 'TypeAlias' = Dict[OverspecificTransition, AnalysisResults] 419""" 420Per-transition analysis results, similar to `DecisionAnalyses`. 421""" 422 423StepAnalyses: 'TypeAlias' = List[AnalysisResults] 424""" 425Per-exploration-step analysis results are stored in a list and indexed by 426exploration step integers. 427""" 428 429StepwiseDecisionAnalyses: 'TypeAlias' = List[DecisionAnalyses] 430""" 431Per-step-per-decision analysis results are stored as a list of decision 432analysis results. 433""" 434 435StepwiseTransitionAnalyses: 'TypeAlias' = List[TransitionAnalyses] 436""" 437Per-step-per-transition analysis results are stored as a list of 438transition analysis results. 439""" 440 441ExplorationAnalyses: 'TypeAlias' = AnalysisResults 442""" 443Whole-exploration analyses are just a normal `AnalysisResults` dictionary. 444""" 445 446class FullAnalysisResults(TypedDict): 447 """ 448 Full analysis results hold every kind of analysis result in one 449 dictionary. 450 """ 451 perDecision: DecisionAnalyses 452 perTransition: TransitionAnalyses 453 perStep: StepAnalyses 454 perStepDecision: StepwiseDecisionAnalyses 455 perStepTransition: StepwiseTransitionAnalyses 456 overall: ExplorationAnalyses 457 458 459def newFullAnalysisResults() -> FullAnalysisResults: 460 """ 461 Returns a new empty `FullAnalysisResults` dictionary. 462 """ 463 return { 464 'perDecision': {}, 465 'perTransition': {}, 466 'perStep': [], 467 'perStepDecision': [], 468 'perStepTransition': [], 469 'overall': {} 470 } 471 472 473Params = ParamSpec('Params') 474'Parameter specification variable for `AnalysisFunction` definition.' 475 476 477class AnalysisFunction(Protocol[Params]): 478 """ 479 Analysis functions are callable, but also have a `_unit` attribute 480 which is a string. 481 """ 482 _unit: AnalysisUnit 483 __name__: str 484 __doc__: str 485 def __call__( 486 self, 487 exploration: core.DiscreteExploration, 488 *args: Params.args, 489 **kwargs: Params.kwargs 490 ) -> Any: 491 ... 492 493 494StepAnalyzer: 'TypeAlias' = AnalysisFunction[[int]] 495''' 496A step analyzer is a function which will receive a 497`core.DiscreteExploration` along with the step in that exploration being 498considered. It can return any type of analysis result. 499''' 500 501StepDecisionAnalyzer: 'TypeAlias' = AnalysisFunction[[int, base.DecisionID]] 502''' 503Like a `StepAnalyzer` but also gets a decision ID to consider. 504''' 505 506StepTransitionAnalyzer: 'TypeAlias' = AnalysisFunction[ 507 [int, base.DecisionID, base.Transition, base.DecisionID] 508] 509''' 510Like a `StepAnalyzer` but also gets a source decision ID, a transition 511name, and a destination decision ID to target. 512''' 513 514 515DecisionAnalyzer: 'TypeAlias' = AnalysisFunction[[base.DecisionID]] 516''' 517A decision analyzer gets full analysis results to update plus an 518exploration and a particular decision ID to consider. 519''' 520 521TransitionAnalyzer: 'TypeAlias' = AnalysisFunction[ 522 [base.DecisionID, base.Transition, base.DecisionID] 523] 524''' 525Like a `DecisionAnalyzer` but gets a transition name as well. 526''' 527 528ExplorationAnalyzer: 'TypeAlias' = AnalysisFunction[[]] 529''' 530Analyzes overall properties of an entire `core.DiscreteExploration`. 531''' 532 533 534#--------------------------# 535# Analysis caching support # 536#--------------------------# 537 538AnyAnalyzer: 'TypeAlias' = Union[ 539 ExplorationAnalyzer, 540 TransitionAnalyzer, 541 DecisionAnalyzer, 542 StepAnalyzer, 543 StepDecisionAnalyzer, 544 StepTransitionAnalyzer 545] 546 547 548ANALYSIS_RESULTS: Dict[int, FullAnalysisResults] = {} 549""" 550Caches analysis results, keyed by the `id` of the 551`core.DiscreteExploration` they're based on. 552""" 553 554 555class NotCached: 556 """ 557 Reference object for specifying that no cached value is available, 558 since `None` is a valid cached value. 559 """ 560 pass 561 562 563def lookupAnalysisResult( 564 cache: FullAnalysisResults, 565 analyzer: AnalysisFunction, 566 argsInOrder: Sequence[Any] 567) -> Union[Type[NotCached], Any]: 568 """ 569 Looks up an analysis result for the given function in the given 570 cache. The function must have been decorated with `analyzer`. The 571 bound arguments must match the unit of analysis, for example, if the 572 unit is 'stepDecision', the arguments must be those for a 573 `StepDecisionAnalyzer`. The bound arguments should have had 574 `apply_defaults` called already to fill in default argument values. 575 Returns the special object `NotCached` if there is no cached value 576 for the specified arguments yet. 577 """ 578 unit = analyzer._unit 579 if unit == 'step': 580 whichStep = argsInOrder[1] 581 perStep = cache['perStep'] 582 while len(perStep) <= whichStep: 583 perStep.append({}) 584 return perStep[whichStep].get(analyzer.__name__, NotCached) 585 elif unit == 'stepDecision': 586 whichStep = argsInOrder[1] 587 whichDecision = argsInOrder[2] 588 perStepDecision = cache['perStepDecision'] 589 while len(perStepDecision) <= whichStep: 590 perStepDecision.append({}) 591 forThis = perStepDecision[whichStep].get(whichDecision) 592 if forThis is None: 593 return NotCached 594 return forThis.get(analyzer.__name__, NotCached) 595 elif unit == 'stepTransition': 596 whichStep = argsInOrder[1] 597 whichTransition = (argsInOrder[2], argsInOrder[3], argsInOrder[4]) 598 perStepTransition = cache['perStepTransition'] 599 while len(perStepTransition) <= whichStep: 600 perStepTransition.append({}) 601 forThis = perStepTransition[whichStep].get(whichTransition) 602 if forThis is None: 603 return NotCached 604 return forThis.get(analyzer.__name__, NotCached) 605 elif unit == 'decision': 606 whichDecision = argsInOrder[1] 607 perDecision = cache['perDecision'] 608 if whichDecision not in perDecision: 609 return NotCached 610 return perDecision[whichDecision].get(analyzer.__name__, NotCached) 611 elif unit == 'transition': 612 whichTransition = (argsInOrder[1], argsInOrder[2], argsInOrder[3]) 613 perTransition = cache['perTransition'] 614 if whichTransition not in perTransition: 615 return NotCached 616 return perTransition[whichTransition].get( 617 analyzer.__name__, 618 NotCached 619 ) 620 elif unit == 'exploration': 621 return cache['overall'].get(analyzer.__name__, NotCached) 622 else: 623 raise ValueError(f"Invalid analysis unit {unit!r}.") 624 625 626def saveAnalysisResult( 627 cache: FullAnalysisResults, 628 result: Any, 629 analyzer: AnalysisFunction, 630 argsInOrder: Sequence[Any] 631) -> None: 632 """ 633 Saves an analysis result in the specified cache. The bound arguments 634 must match the unit, for example, if the unit is 'stepDecision', the 635 arguments must be those for a `StepDecisionAnalyzer`. 636 """ 637 unit = analyzer._unit 638 if unit == 'step': 639 whichStep = argsInOrder[1] 640 perStep = cache['perStep'] 641 while len(perStep) <= whichStep: 642 perStep.append({}) 643 perStep[whichStep][analyzer.__name__] = result 644 elif unit == 'stepDecision': 645 whichStep = argsInOrder[1] 646 whichDecision = argsInOrder[2] 647 perStepDecision = cache['perStepDecision'] 648 while len(perStepDecision) <= whichStep: 649 perStepDecision.append({}) 650 forThis = perStepDecision[whichStep].setdefault(whichDecision, {}) 651 forThis[analyzer.__name__] = result 652 elif unit == 'stepTransition': 653 whichStep = argsInOrder[1] 654 whichTransition = (argsInOrder[2], argsInOrder[3], argsInOrder[4]) 655 perStepTransition = cache['perStepTransition'] 656 while len(perStepTransition) <= whichStep: 657 perStepTransition.append({}) 658 forThis = perStepTransition[whichStep].setdefault(whichTransition, {}) 659 forThis[analyzer.__name__] = result 660 elif unit == 'decision': 661 whichDecision = argsInOrder[1] 662 perDecision = cache['perDecision'] 663 perDecision.setdefault(whichDecision, {})[analyzer.__name__] = result 664 elif unit == 'transition': 665 whichTransition = (argsInOrder[1], argsInOrder[2], argsInOrder[3]) 666 perTransition = cache['perTransition'] 667 perTransition.setdefault( 668 whichTransition, 669 {} 670 )[analyzer.__name__] = result 671 elif unit == 'exploration': 672 cache['overall'][analyzer.__name__] = result 673 else: 674 raise ValueError(f"Invalid analysis unit {unit!r}.") 675 676 677ALL_ANALYZERS: Dict[str, AnyAnalyzer] = {} 678""" 679Holds all analyzers indexed by name with the analysis unit plus function 680as the value. The `analyzer` decorator registers them. 681""" 682 683 684RECORD_PROFILE: bool = False 685""" 686Whether or not to record time spent by each analysis function. 687""" 688 689 690class AnalyzerPerf(TypedDict): 691 """ 692 Tracks performance of an analysis function, recording total calls, 693 non-cached calls, time spent looking up cached results, and time 694 spent in non-cached calls (including the failed cache lookup and 695 saving the result in the cache). 696 """ 697 calls: int 698 nonCached: int 699 lookupTime: float 700 analyzeTime: float 701 702 703def newAnalyzerPerf() -> AnalyzerPerf: 704 """ 705 Creates a new empty `AnalyzerPerf` dictionary. 706 """ 707 return { 708 "calls": 0, 709 "nonCached": 0, 710 "lookupTime": 0.0, 711 "analyzeTime": 0.0 712 } 713 714 715ANALYSIS_TIME_SPENT: Dict[str, AnalyzerPerf] = {} 716""" 717Records number-of-calls, number-of-non-cached calls, and time spent in 718each analysis function, when `RECORD_PROFILE` is set to `True`. 719""" 720 721 722ELIDE: Set[str] = set() 723""" 724Analyzers which should not be included in CSV output by default. 725""" 726 727FINAL_ONLY: Set[str] = set() 728""" 729Per-step/step-decision/step-transition analyzers which should by default 730only be applied to the final step of an exploration to save time. 731""" 732 733 734def getArgsInOrder( 735 f: Callable, 736 *args: Any, 737 **kwargs: Any 738) -> List[Any]: 739 """ 740 Given a callable and some arguments, returns a list of argument and 741 some arguments, returns a list of argument values in the same order 742 as that function would accept them from the given arguments, 743 accounting for things like keyword arguments and default values. 744 745 For example: 746 747 >>> def f(a, /, b, *more, x=3, y=10, **kw): 748 ... pass 749 >>> sig = inspect.Signature.from_callable(f) 750 >>> getArgsInOrder(f, 1, 2) 751 [1, 2, 3, 10] 752 >>> getArgsInOrder(f, 4, 5, y=2, x=8) 753 [4, 5, 8, 2] 754 >>> getArgsInOrder(f, 4, y=2, x=8, b=3) 755 [4, 3, 8, 2] 756 >>> getArgsInOrder(f, 4, y=2, x=8, b=3) 757 [4, 3, 8, 2] 758 >>> getArgsInOrder(f, 1, 2, 3, 4) 759 [1, 2, 3, 4, 3, 10] 760 >>> getArgsInOrder(f, 1, 2, 3, 4, q=5, k=9) 761 [1, 2, 3, 4, 3, 10, 5, 9] 762 """ 763 sig = inspect.Signature.from_callable(f) 764 bound = sig.bind(*args, **kwargs) 765 bound.apply_defaults() 766 result = [] 767 for paramName in sig.parameters: 768 param = sig.parameters[paramName] 769 if param.kind in ( 770 inspect.Parameter.POSITIONAL_ONLY, 771 inspect.Parameter.POSITIONAL_OR_KEYWORD, 772 ): 773 result.append(bound.arguments[paramName]) 774 elif param.kind == inspect.Parameter.VAR_POSITIONAL: 775 result.extend(bound.arguments[paramName]) 776 elif param.kind == inspect.Parameter.KEYWORD_ONLY: 777 result.append(bound.arguments[paramName]) 778 elif param.kind == inspect.Parameter.VAR_KEYWORD: 779 result.extend(bound.arguments[paramName].values()) 780 781 return result 782 783 784def analyzer(unit: AnalysisUnit) -> Callable[ 785 [Callable[Concatenate[core.DiscreteExploration, Params], Any]], 786 AnalysisFunction 787]: 788 ''' 789 Decorator which sets up caching for an analysis function in the 790 global `ANALYSIS_RESULTS` dictionary. Whenever the decorated function 791 is called, it will first check whether a cached result is available 792 for the same target exploration (by id) and additional target info 793 based on the analysis unit type. If so, the cached result will be 794 returned. This allows analysis functions to simply call each other 795 when they need results and themselves recursively if they need to 796 track things across steps/decisions, while avoiding tons of duplicate 797 work. 798 ''' 799 def makeCachedAnalyzer( 800 baseFunction: Callable[ 801 Concatenate[core.DiscreteExploration, Params], 802 Any 803 ] 804 ) -> AnalysisFunction: 805 """ 806 Decoration function which registers an analysis function with 807 pre-specified dependencies. 808 """ 809 analysisFunction = cast(AnalysisFunction, baseFunction) 810 analysisFunction._unit = unit 811 analyzerName= analysisFunction.__name__ 812 813 @functools.wraps(analysisFunction) 814 def cachingAnalyzer( 815 exploration: core.DiscreteExploration, 816 *args: Params.args, 817 **kwargs: Params.kwargs 818 ): 819 """ 820 This docstring will be replaced with the docstring of the 821 decorated function plus a note about caching. 822 """ 823 if RECORD_PROFILE: 824 ANALYSIS_TIME_SPENT.setdefault( 825 analyzerName, 826 newAnalyzerPerf() 827 ) 828 perf = ANALYSIS_TIME_SPENT[analyzerName] 829 perf["calls"] += 1 830 start = time.perf_counter() 831 cache = ANALYSIS_RESULTS.setdefault( 832 id(exploration), 833 newFullAnalysisResults() 834 ) 835 argsInOrder = getArgsInOrder( 836 baseFunction, 837 exploration, 838 *args, 839 **kwargs 840 ) 841 cachedResult = lookupAnalysisResult( 842 cache, 843 analysisFunction, 844 argsInOrder 845 ) 846 if cachedResult is not NotCached: 847 if RECORD_PROFILE: 848 perf["lookupTime"] += time.perf_counter() - start 849 return cachedResult 850 851 result = analysisFunction(exploration, *args, **kwargs) 852 saveAnalysisResult(cache, result, analysisFunction, argsInOrder) 853 if RECORD_PROFILE: 854 perf["nonCached"] += 1 855 perf["analyzeTime"] += time.perf_counter() - start 856 return result 857 858 cachingAnalyzer.__doc__ = ( 859 textwrap.dedent(analysisFunction.__doc__) 860 + """ 861 862This function's results are cached in the `ALL_ANALYZERS` dictionary, and 863it returns cached results when possible. Use `clearAnalysisCache` to 864clear the analysis cache. 865""" 866 ) 867 868 # Save caching version of analyzer 869 result = cast(AnalysisFunction, cachingAnalyzer) 870 ALL_ANALYZERS[analyzerName] = result 871 return result 872 873 return makeCachedAnalyzer 874 875 876T = TypeVar('T', bound=AnalysisFunction) 877'Type variable for `elide` and `finalOnly`.' 878 879def elide(analyzer: T) -> T: 880 """ 881 Returns the given analyzer after noting that its result should *not* 882 be included in CSV output by default. 883 """ 884 ELIDE.add(analyzer.__name__) 885 return analyzer 886 887 888def finalOnly(analyzer: T) -> T: 889 """ 890 Returns the given analyzer after noting that it should only be run on 891 the final exploration step by default. 892 """ 893 FINAL_ONLY.add(analyzer.__name__) 894 return analyzer 895 896 897#-----------------------# 898# Generalizer Functions # 899#-----------------------# 900 901AnalyzerType = TypeVar('AnalyzerType', bound=AnyAnalyzer) 902""" 903Type var to forward through analyzer types. 904""" 905 906def registerCount(target: AnalyzerType, sizeName: str) -> AnalyzerType: 907 """ 908 Registers a new analysis routine which uses the same analysis unit as 909 the target routine but which returns the length of that routine's 910 result. Returns `None` if the target routine does. 911 912 Needs the target routine and the name to register the new analysis 913 routine under. 914 915 Returns the analysis function it creates. 916 """ 917 def countAnalyzer(*args, **kwargs): 918 'To be replaced' 919 result = target(*args, **kwargs) 920 if result is None: 921 return None 922 else: 923 return len(result) 924 925 countAnalyzer.__doc__ = ( 926 f"Measures count of the {target.__name__!r} result applied to" 927 f" {target._unit!r}." 928 ) 929 countAnalyzer.__name__ = sizeName 930 931 # Register the new function & return the result 932 return cast( 933 AnalyzerType, 934 analyzer(target._unit)(countAnalyzer) 935 ) 936 937 938CombinerResult = TypeVar('CombinerResult') 939""" 940Type variable for the result of a combiner function. 941""" 942 943StepCombiner: 'TypeAlias' = Callable[ 944 [ 945 Dict[Union[base.DecisionID, OverspecificTransition], Any], 946 core.DiscreteExploration, 947 int 948 ], 949 CombinerResult 950] 951""" 952A combiner function which gets a dictionary of per-decision or 953per-transition values along with an exploration object and a step index 954and combines the values into a `CombinerResult` that's specific to that 955step. 956""" 957 958OverallCombiner: 'TypeAlias' = Callable[ 959 [ 960 Dict[Union[base.DecisionID, OverspecificTransition, int], Any], 961 core.DiscreteExploration 962 ], 963 CombinerResult 964] 965""" 966A combiner function which gets a dictionary of per-decision, 967per-transition, and/or per-step values along with an exploration object 968and combines the values into a `CombinerResult`. 969""" 970 971 972def registerStepCombined( 973 name: str, 974 resultName: str, 975 combiner: StepCombiner[CombinerResult] 976) -> StepAnalyzer: 977 """ 978 Registers a new analysis routine which combines results of another 979 routine either across all decisions/transitions at a step. The new 980 routine will have a 'step' analysis unit. 981 982 Needs the name of the target routine, the name to register the new 983 analysis routine under, and the function that will be called to 984 combine results, given a dictionary of results that maps 985 decisions/transitions to results for each. 986 987 Returns the analysis function it creates. 988 """ 989 # Target function 990 target = ALL_ANALYZERS[name] 991 # Analysis unit of the target function 992 targetUnit = target._unit 993 994 if targetUnit not in ('stepDecision', 'stepTransition'): 995 raise ValueError( 996 f"Target analysis routine {name!r} has incompatible analysis" 997 f" unit {targetUnit!r}." 998 ) 999 1000 def analysisCombiner( 1001 exploration: core.DiscreteExploration, 1002 step: int 1003 ) -> CombinerResult: 1004 'To be replaced' 1005 # Declare data here as generic type 1006 data: Dict[ 1007 Union[base.DecisionID, OverspecificTransition, int], 1008 Any 1009 ] 1010 graph = exploration[step].graph 1011 if targetUnit == "stepDecision": 1012 analyzeStepDecision = cast(StepDecisionAnalyzer, target) 1013 data = { 1014 dID: analyzeStepDecision(exploration, step, dID) 1015 for dID in graph 1016 } 1017 elif targetUnit == "stepTransition": 1018 edges = graph.allEdges() 1019 analyzeStepTransition = cast(StepTransitionAnalyzer, target) 1020 data = { 1021 (src, transition, dst): analyzeStepTransition( 1022 exploration, 1023 step, 1024 src, 1025 transition, 1026 dst 1027 ) 1028 for (src, dst, transition) in edges 1029 } 1030 else: 1031 raise ValueError( 1032 f"Target analysis routine {name!r} has inconsistent" 1033 f" analysis unit {targetUnit!r} for 'step' result" 1034 f" unit." 1035 ) 1036 return combiner(data, exploration, step) 1037 1038 analysisCombiner.__doc__ = ( 1039 f"Computes {combiner.__name__} for the {name!r} result over all" 1040 f" {targetUnit}s at each step." 1041 ) 1042 analysisCombiner.__name__ = resultName 1043 1044 # Register the new function & return it 1045 return analyzer("step")(analysisCombiner) 1046 1047 1048def registerFullCombined( 1049 name: str, 1050 resultName: str, 1051 combiner: OverallCombiner[CombinerResult] 1052) -> ExplorationAnalyzer: 1053 """ 1054 Works like `registerStepCombined` but combines results over 1055 decisions/transitions/steps across the entire exploration to get one 1056 result for the entire thing, not one result per step. May also 1057 target an existing `ExplorationAnalyzer` whose result is a 1058 dictionary, in which case it will combine that dictionary's values. 1059 1060 Needs the name of the target routine, the name to register the new 1061 analysis routine under, and the function that will be called to 1062 combine results, given a dictionary of results that maps 1063 decisions/transitions/steps to results for each. 1064 1065 Returns the analysis function it creates. 1066 """ 1067 # Target function 1068 target = ALL_ANALYZERS[name] 1069 # Analysis unit of the target function 1070 targetUnit = target._unit 1071 if targetUnit not in ('step', 'decision', 'transition', 'exploration'): 1072 raise ValueError( 1073 f"Target analysis routine {name!r} has incompatible analysis" 1074 f" unit {targetUnit!r}." 1075 ) 1076 1077 def analysisCombiner( # type: ignore 1078 exploration: core.DiscreteExploration, 1079 ) -> CombinerResult: 1080 'To be replaced' 1081 # Declare data here as generic type 1082 data: Dict[ 1083 Union[base.DecisionID, OverspecificTransition, int], 1084 Any 1085 ] 1086 if targetUnit == "step": 1087 analyzeStep = cast(StepAnalyzer, target) 1088 data = { 1089 step: analyzeStep(exploration, step) 1090 for step in range(len(exploration)) 1091 } 1092 elif targetUnit == "decision": 1093 analyzeDecision = cast(DecisionAnalyzer, target) 1094 data = { 1095 dID: analyzeDecision(exploration, dID) 1096 for dID in exploration.allDecisions() 1097 } 1098 elif targetUnit == "transition": 1099 analyzeTransition = cast(TransitionAnalyzer, target) 1100 data = { 1101 (src, transition, dst): analyzeTransition( 1102 exploration, 1103 src, 1104 transition, 1105 dst 1106 ) 1107 for (src, transition, dst) in exploration.allTransitions() 1108 } 1109 elif targetUnit == "exploration": 1110 analyzeExploration = cast(ExplorationAnalyzer, target) 1111 data = analyzeExploration(exploration) 1112 else: 1113 raise ValueError( 1114 f"Target analysis routine {name!r} has inconsistent" 1115 f" analysis unit {targetUnit!r} for 'step' result" 1116 f" unit." 1117 ) 1118 return combiner(data, exploration) 1119 1120 analysisCombiner.__doc__ = ( 1121 f"Computes {combiner.__name__} for the {name!r} result over all" 1122 f" {targetUnit}s." 1123 ) 1124 analysisCombiner.__name__ = resultName 1125 1126 # Register the new function & return it 1127 return analyzer("exploration")(analysisCombiner) 1128 1129 1130def sumCombiner(data, *_): 1131 """ 1132 Computes sum over numeric data as a "combiner" function to be used 1133 with `registerStepCombined` or `registerFullCombined`. 1134 1135 Only sums values which are `int`s, `float`s, or `complex`es, ignoring 1136 any other values. 1137 """ 1138 return sum( 1139 x for x in data.values() if isinstance(x, (int, float, complex)) 1140 ) 1141 1142 1143def meanCombiner(data, *_): 1144 """ 1145 Computes mean over numeric data as a "combiner" function to be used 1146 with `registerStepCombined` or `registerFullCombined`. 1147 1148 Only counts values which are `int`s, `float`s, or `complex`es, ignoring 1149 any other values. Uses `None` as the result when there are 0 numeric 1150 values. 1151 """ 1152 numeric = [ 1153 x for x in data.values() if isinstance(x, (int, float, complex)) 1154 ] 1155 if len(numeric) == 0: 1156 return None 1157 else: 1158 return sum(numeric) / len(numeric) 1159 1160 1161def medianCombiner(data, *_): 1162 """ 1163 Computes median over numeric data as a "combiner" function to be used 1164 with `registerStepCombined` or `registerFullCombined`. 1165 1166 Only counts values which are `int`s, `float`s, or `complex`es, ignoring 1167 any other values. Uses `None` as the result when there are 0 numeric 1168 values. 1169 """ 1170 numeric = sorted( 1171 x for x in data.values() if isinstance(x, (int, float, complex)) 1172 ) 1173 if len(numeric) == 0: 1174 return None 1175 elif len(numeric) == 1: 1176 return numeric[0] 1177 else: 1178 half = len(numeric) // 2 1179 if len(numeric) % 2 == 0: 1180 return (numeric[half - 1] + numeric[half]) / 2 1181 else: 1182 return numeric[half] 1183 1184 1185#---------------------------# 1186# Simple property functions # 1187#---------------------------# 1188 1189@analyzer('decision') 1190def finalIdentity( 1191 exploration: core.DiscreteExploration, 1192 decision: base.DecisionID 1193) -> str: 1194 """ 1195 Returns the `identityOf` result for the specified decision in the 1196 last step in which that decision existed. 1197 """ 1198 for i in range(-1, -len(exploration) - 1, -1): 1199 situation = exploration.getSituation(i) 1200 try: 1201 return situation.graph.identityOf(decision) 1202 except core.MissingDecisionError: 1203 pass 1204 raise core.MissingDecisionError( 1205 f"Decision {decision!r} never existed." 1206 ) 1207 1208 1209@analyzer('step') 1210def currentDecision( 1211 exploration: core.DiscreteExploration, 1212 step: int 1213) -> Optional[base.DecisionID]: 1214 """ 1215 Returns the `base.DecisionID` for the current decision in a given 1216 situation. 1217 """ 1218 return exploration[step].state['primaryDecision'] 1219 1220 1221@analyzer('step') 1222def currentDecisionIdentity( 1223 exploration: core.DiscreteExploration, 1224 step: int 1225) -> str: 1226 """ 1227 Returns the `identityOf` string for the current decision in a given 1228 situation. 1229 """ 1230 situation = exploration[step] 1231 return situation.graph.identityOf(situation.state['primaryDecision']) 1232 1233 1234@analyzer('step') 1235def observedSoFar( 1236 exploration: core.DiscreteExploration, 1237 step: int 1238) -> Set[base.DecisionID]: 1239 """ 1240 Returns the set of all decision IDs observed so far. Note that some 1241 of them may no longer be present in the graph at the given step if 1242 they got merged or deleted. 1243 """ 1244 # Can't allow negative steps (caching would fail) 1245 if step < 0: 1246 raise IndexError(f"Invalid step (can't be negative): {step!r}") 1247 elif step == 0: 1248 result = set() 1249 else: 1250 result = observedSoFar(exploration, step - 1) 1251 result |= set(exploration[step].graph) 1252 return result 1253 1254 1255totalDecisionsSoFar = registerCount(observedSoFar, 'totalDecisionsSoFar') 1256 1257 1258@analyzer('step') 1259def justObserved( 1260 exploration: core.DiscreteExploration, 1261 step: int 1262) -> Set[base.DecisionID]: 1263 """ 1264 Returns the set of new `base.DecisionID`s that first appeared at the 1265 given step. Will be empty for steps where no new decisions are 1266 observed. Note that this is about decisions whose existence becomes 1267 known, NOT decisions which get confirmed. 1268 """ 1269 if step == 0: 1270 return observedSoFar(exploration, step) 1271 else: 1272 return ( 1273 observedSoFar(exploration, step - 1) 1274 - observedSoFar(exploration, step) 1275 ) 1276 1277 1278newDecisionCount = registerCount(justObserved, 'newDecisionCount') 1279 1280 1281@elide 1282@analyzer('stepDecision') 1283def hasBeenObserved( 1284 exploration: core.DiscreteExploration, 1285 step: int, 1286 dID: base.DecisionID 1287) -> bool: 1288 """ 1289 Whether or not the specified decision has been observed at or prior 1290 to the specified step. Note that it may or may not actually be a 1291 decision in the specified step (e.g., if it was previously observed 1292 but then deleted). 1293 """ 1294 return dID in observedSoFar(exploration, step) 1295 1296 1297@analyzer('exploration') 1298def stepsObserved( 1299 exploration: core.DiscreteExploration, 1300) -> Dict[base.DecisionID, int]: 1301 """ 1302 Returns a dictionary that holds the step at which each decision was 1303 first observed, keyed by decision ID. 1304 """ 1305 result = {} 1306 soFar: Set[base.DecisionID] = set() 1307 for step, situation in enumerate(exploration): 1308 new = set(situation.graph) - soFar 1309 for dID in new: 1310 result[dID] = step 1311 soFar |= new 1312 return result 1313 1314 1315@analyzer('decision') 1316def stepObserved( 1317 exploration: core.DiscreteExploration, 1318 dID: base.DecisionID 1319) -> int: 1320 """ 1321 Returns the step at which the specified decision was first observed 1322 (NOT confirmed). 1323 """ 1324 try: 1325 return stepsObserved(exploration)[dID] 1326 except KeyError: 1327 raise core.MissingDecisionError( 1328 f"Decision {dID!r} was never observed." 1329 ) 1330 1331 1332@analyzer('exploration') 1333def stepsConfirmed( 1334 exploration: core.DiscreteExploration, 1335) -> Dict[base.DecisionID, int]: 1336 """ 1337 Given an exploration, returns a dictionary mapping decision IDs to 1338 the step at which each was first confirmed. Decisions which were 1339 never confirmed will not be included in the dictionary. 1340 """ 1341 result = {} 1342 for i, situation in enumerate(exploration): 1343 for dID in situation.graph: 1344 if ( 1345 dID not in result 1346 and 'unconfirmed' not in situation.graph.decisionTags(dID) 1347 ): 1348 result[dID] = i 1349 return result 1350 1351 1352@analyzer('decision') 1353def stepConfirmed( 1354 exploration: core.DiscreteExploration, 1355 dID: base.DecisionID 1356) -> Optional[int]: 1357 """ 1358 Returns the step at which the specified decision was first confirmed, 1359 or `None` if it was never confirmed. Returns `None` for invalid 1360 decision IDs. 1361 """ 1362 return stepsConfirmed(exploration).get(dID) 1363 1364 1365@analyzer('exploration') 1366def stepsVisited( 1367 exploration: core.DiscreteExploration, 1368) -> Dict[base.DecisionID, List[int]]: 1369 """ 1370 Given an exploration, returns a dictionary mapping decision IDs to 1371 the list of steps at which each was visited. Decisions which were 1372 never visited will not be included in the dictionary. 1373 """ 1374 result: Dict[base.DecisionID, List[int]] = {} 1375 for i, situation in enumerate(exploration): 1376 for dID in situation.graph: 1377 if dID in base.combinedDecisionSet(situation.state): 1378 result.setdefault(dID, []).append(i) 1379 return result 1380 1381 1382@finalOnly 1383@analyzer('stepDecision') 1384def hasBeenVisited( 1385 exploration: core.DiscreteExploration, 1386 step: int, 1387 dID: base.DecisionID 1388) -> bool: 1389 """ 1390 Whether or not the specified decision has been visited at or prior 1391 to the specified step. Note that it may or may not actually be a 1392 decision in the specified step (e.g., if it was previously observed 1393 but then deleted). 1394 """ 1395 visits = stepsVisited(exploration).get(dID, []) 1396 # No visits -> not visited yet 1397 if len(visits) == 0: 1398 return False 1399 else: 1400 # First visit was at or before this step 1401 return min(visits) <= step 1402 1403 1404@analyzer('decision') 1405def stepFirstVisited( 1406 exploration: core.DiscreteExploration, 1407 decision: base.DecisionID, 1408) -> Optional[int]: 1409 """ 1410 Returns the first step at which the given decision was visited, or 1411 `None` if the decision was never visited. 1412 """ 1413 vis = stepsVisited(exploration) 1414 if decision in vis: 1415 return min(vis[decision]) 1416 else: 1417 return None 1418 1419 1420@analyzer('decision') 1421def stepsActive( 1422 exploration: core.DiscreteExploration, 1423 decision: base.DecisionID, 1424) -> Optional[int]: 1425 """ 1426 Returns the total number of steps in which this decision was active. 1427 """ 1428 vis = stepsVisited(exploration) 1429 if decision in vis: 1430 return len(vis[decision]) 1431 else: 1432 return 0 1433 1434 1435@analyzer('exploration') 1436def stepsTransisionsObserved( 1437 exploration: core.DiscreteExploration 1438) -> Dict[OverspecificTransition, int]: 1439 """ 1440 Returns a dictionary that holds the step at which each transition was 1441 first observed, keyed by (source-decision, transition-name, 1442 destination-decision) triples. 1443 1444 Does NOT distinguish between cases where a once-deleted transition 1445 was later reinstated (unless it had a different destination in the 1446 end). 1447 """ 1448 result = {} 1449 for i, situation in enumerate(exploration): 1450 for dID in situation.graph: 1451 destinations = situation.graph.destinationsFrom(dID) 1452 for name, dest in destinations.items(): 1453 key = (dID, name, dest) 1454 if key not in result: 1455 result[key] = i 1456 return result 1457 1458 1459@analyzer('transition') 1460def stepObservedTransition( 1461 exploration: core.DiscreteExploration, 1462 source: base.DecisionID, 1463 transition: base.Transition, 1464 destination: base.DecisionID 1465) -> Optional[int]: 1466 """ 1467 Returns the step within the exploration at which the specified 1468 transition was first observed. Note that transitions which get 1469 renamed do NOT preserve their identities, so a search for a renamed 1470 transition will return the step on which it was renamed (assuming it 1471 didn't change destination). 1472 1473 Returns `None` if the specified transition never existed in the 1474 exploration. 1475 """ 1476 obs = stepsTransisionsObserved(exploration) 1477 return obs.get((source, transition, destination)) 1478 1479 1480@analyzer('step') 1481def transitionTaken( 1482 exploration: core.DiscreteExploration, 1483 step: int 1484) -> Optional[OverspecificTransition]: 1485 """ 1486 Returns the source decision Id, the name of the transition taken, and 1487 the destination decision ID at the given step. This is the transition 1488 chosen at that step whose consequences were triggered resulting in 1489 the next step. Returns `None` for steps where no transition was 1490 taken (e.g., wait, warp, etc.). 1491 1492 Note that in some cases due to e.g., a 'follow' effect, multiple 1493 transitions are taken at a step. In that case, this returns the name 1494 of the first transition taken (which would have triggered any 1495 others). 1496 1497 Also in some cases, there may be multiple starting nodes given, in 1498 which case the first such node (by ID order) which has a transition 1499 with the identified transition name will be returned, or None if none 1500 of them match. 1501 """ 1502 start, transition, end = exploration.movementAtStep(step) 1503 graph = exploration[step].graph 1504 if start is None or transition is None: 1505 return None 1506 if isinstance(start, set): 1507 for dID in sorted(start): 1508 destination = graph.getDestination(dID, transition) 1509 if destination is not None: 1510 return (dID, transition, destination) 1511 return None 1512 else: 1513 destination = graph.getDestination(start, transition) 1514 if destination is not None: 1515 return (start, transition, destination) 1516 else: 1517 return None 1518 1519 1520@analyzer('exploration') 1521def transitionStepsTaken( 1522 exploration: core.DiscreteExploration 1523) -> Dict[OverspecificTransition, List[int]]: 1524 """ 1525 Returns a dictionary mapping each specific transition that was taken 1526 at least once to the list of steps on which it was taken. Does NOT 1527 account for transitions elided by 'jaunt' warps, nor for transitions 1528 taken as a result of follow/bounce effects. 1529 1530 TODO: Account for those? 1531 """ 1532 result: Dict[OverspecificTransition, List[int]] = {} 1533 for i in range(len(exploration)): 1534 taken = transitionTaken(exploration, i) 1535 if taken is not None: 1536 if taken in result: 1537 result[taken].append(i) 1538 else: 1539 result[taken] = [i] 1540 1541 return result 1542 1543 1544@analyzer('transition') 1545def stepsTaken( 1546 exploration: core.DiscreteExploration, 1547 source: base.DecisionID, 1548 transition: base.Transition, 1549 destination: base.DecisionID 1550) -> int: 1551 """ 1552 Returns the list of exploration steps on which a particular 1553 transition has been taken. Returns an empty list for transitions that 1554 were never taken. 1555 1556 Note that this does NOT account for times taken as a result of 1557 follow/bounce effects, and it does NOT account for all times a 1558 transition was taken when warp effects are used as shorthand for 1559 jaunts across the graph. 1560 1561 TODO: Try to account for those? 1562 """ 1563 return transitionStepsTaken(exploration).get( 1564 (source, transition, destination), 1565 [] 1566 ) 1567 1568 1569@analyzer('transition') 1570def timesTaken( 1571 exploration: core.DiscreteExploration, 1572 source: base.DecisionID, 1573 transition: base.Transition, 1574 destination: base.DecisionID 1575) -> int: 1576 """ 1577 Returns the number of times a particular transition has been taken 1578 throughout the exploration. Returns 0 for transitions that were never 1579 taken. 1580 1581 Note that this does NOT account for times taken as a result of 1582 follow/bounce effects, and it does NOT account for all times a 1583 transition was taken when warp effects are used as shorthand for 1584 jaunts across the graph. 1585 1586 TODO: Try to account for those? 1587 """ 1588 return len(stepsTaken(exploration, source, transition, destination)) 1589 1590 1591#--------------------# 1592# Analysis functions # 1593#--------------------# 1594 1595def unexploredBranches( 1596 graph: core.DecisionGraph, 1597 context: Optional[base.RequirementContext] = None 1598) -> List[SpecificTransition]: 1599 """ 1600 Returns a list of from-decision, transition-at-that-decision pairs 1601 which each identify an unexplored branch in the given graph. 1602 1603 When a `context` is provided it only counts options whose 1604 requirements are satisfied in that `RequirementContext`, and the 1605 'searchFrom' part of the context will be replaced by both ends of 1606 each transition tested. This doesn't perfectly map onto actually 1607 reachability since nodes between where the player is and where the 1608 option is might force changes in the game state that make it 1609 un-takeable. 1610 1611 TODO: add logic to detect trivially-unblocked edges? 1612 """ 1613 result = [] 1614 # TODO: Fix networkx type stubs for MultiDiGraph! 1615 for (src, dst, transition) in graph.allEdges(): 1616 req = graph.getTransitionRequirement(src, transition) 1617 localContext: Optional[base.RequirementContext] = None 1618 if context is not None: 1619 localContext = base.RequirementContext( 1620 state=context.state, 1621 graph=context.graph, 1622 searchFrom=graph.bothEnds(src, transition) 1623 ) 1624 # Check if this edge goes from a confirmed to an unconfirmed node 1625 if ( 1626 graph.isConfirmed(src) 1627 and not graph.isConfirmed(dst) 1628 and (localContext is None or req.satisfied(localContext)) 1629 ): 1630 result.append((src, transition)) 1631 return result 1632 1633 1634@analyzer('step') 1635def allUnexploredBranches( 1636 exploration: core.DiscreteExploration, 1637 step: int 1638) -> List[SpecificTransition]: 1639 """ 1640 Returns the list of unexplored branches in the specified situation's 1641 graph, regardless of traversibility (see `unexploredBranches`). 1642 """ 1643 return unexploredBranches(exploration[step].graph) 1644 1645 1646unexploredBranchCount = registerCount( 1647 allUnexploredBranches, 1648 'unexploredBranchCount' 1649) 1650 1651 1652@analyzer('step') 1653def traversableUnexploredBranches( 1654 exploration: core.DiscreteExploration, 1655 step: int 1656) -> List[SpecificTransition]: 1657 """ 1658 Returns the list of traversable unexplored branches in the specified 1659 situation's graph (see `unexploredBranches`). Does not perfectly 1660 account for all traversibility information, because it uses a single 1661 context from which to judge traversibility (TODO: Fix that). 1662 """ 1663 situation = exploration[step] 1664 context = base.genericContextForSituation( 1665 situation, 1666 base.combinedDecisionSet(situation.state) 1667 ) 1668 return unexploredBranches(situation.graph, context) 1669 1670 1671traversableUnexploredCount = registerCount( 1672 traversableUnexploredBranches, 1673 'traversableUnexploredCount' 1674) 1675 1676 1677@finalOnly 1678@analyzer('stepDecision') 1679def actions( 1680 exploration: core.DiscreteExploration, 1681 step: int, 1682 decision: base.DecisionID 1683) -> Optional[Set[base.Transition]]: 1684 """ 1685 Given a particular decision at a particular step, returns the set of 1686 actions available at that decision in that step. Returns `None` if 1687 the specified decision does not exist. 1688 """ 1689 graph = exploration[step].graph 1690 if decision not in graph: 1691 return None 1692 return graph.decisionActions(decision) 1693 1694 1695actionCount = registerCount(actions, 'actionCount') 1696finalOnly(actionCount) 1697 1698totalActions = registerStepCombined( 1699 'actionCount', 1700 'totalActions', 1701 sumCombiner 1702) 1703finalOnly(totalActions) 1704 1705meanActions = registerStepCombined( 1706 'actionCount', 1707 'meanActions', 1708 meanCombiner 1709) 1710finalOnly(meanActions) 1711 1712medianActions = registerStepCombined( 1713 'actionCount', 1714 'medianActions', 1715 medianCombiner 1716) 1717finalOnly(medianActions) 1718 1719 1720@finalOnly 1721@analyzer('stepDecision') 1722def branches( 1723 exploration: core.DiscreteExploration, 1724 step: int, 1725 decision: base.DecisionID 1726) -> Optional[int]: 1727 """ 1728 Computes the number of branches at a particular decision, not 1729 counting actions, but counting as separate branches multiple 1730 transitions which lead to the same decision as each other. Returns 1731 `None` for unconfirmed and nonexistent decisions so that they aren't 1732 counted as part of averages, even though unconfirmed decisions do 1733 have countable branches. 1734 """ 1735 graph = exploration[step].graph 1736 if decision not in graph or not graph.isConfirmed(decision): 1737 return None 1738 1739 dests = graph.destinationsFrom(decision) 1740 branches = 0 1741 for transition, dest in dests.items(): 1742 if dest != decision: 1743 branches += 1 1744 1745 return branches 1746 1747 1748totalBranches = registerStepCombined( 1749 'branches', 1750 'totalBranches', 1751 sumCombiner 1752) 1753finalOnly(totalBranches) 1754 1755meanBranches = registerStepCombined( 1756 'branches', 1757 'meanBranches', 1758 meanCombiner 1759) 1760finalOnly(meanBranches) 1761 1762medianBranches = registerStepCombined( 1763 'branches', 1764 'medianBranches', 1765 medianCombiner 1766) 1767finalOnly(medianBranches) 1768 1769 1770@analyzer('decision') 1771def arrivals( 1772 exploration: core.DiscreteExploration, 1773 decision: base.DecisionID 1774) -> int: 1775 """ 1776 Given an `DiscreteExploration` object and a particular `Decision` 1777 which exists at some point during that exploration, counts the number 1778 of times that decision was in the active decision set for a step 1779 after not being in that set the previous step. Effectively, counts 1780 how many times we arrived at that decision, ignoring steps where we 1781 remained at it due to a wait or an action or the like. 1782 1783 Returns 0 even for decisions that aren't part of the exploration. 1784 """ 1785 visits = stepsVisited(exploration) 1786 result = 0 1787 prev = -2 # won't be contiguous with step 0 1788 for step in visits.get(decision, []): 1789 # if previous visited step wasn't the prior step it's a revisit 1790 if prev != step - 1: 1791 result += 1 1792 prev = step 1793 1794 return result 1795 1796 1797@analyzer('decision') 1798def revisits( 1799 exploration: core.DiscreteExploration, 1800 decision: base.DecisionID 1801) -> int: 1802 """ 1803 Returns the number of times we revisited the target decision, which 1804 is just `arrivals` minus 1 for the first arrival, but not < 0. 1805 """ 1806 return max(0, arrivals(exploration, decision) - 1) 1807 1808 1809totalRevisits = registerFullCombined( 1810 'revisits', 1811 'totalRevisits', 1812 sumCombiner 1813) 1814 1815meanRevisits = registerFullCombined( 1816 'revisits', 1817 'meanRevisits', 1818 meanCombiner 1819) 1820 1821medianRevisits = registerFullCombined( 1822 'revisits', 1823 'medianRevisits', 1824 medianCombiner 1825) 1826 1827 1828#-------------------# 1829# Paths & distances # 1830#-------------------# 1831 1832HopPaths: 'TypeAlias' = Dict[ 1833 Tuple[base.DecisionID, base.DecisionID], 1834 Optional[List[base.DecisionID]] 1835] 1836""" 1837Records paths between decisions ignoring edge directions & requirements. 1838Stores a list of decision IDs to traverse keyed by a decision ID pair 1839where the smaller decision ID comes first (since paths are symmetric). 1840""" 1841 1842def hopDistance( 1843 hopPaths: HopPaths, 1844 src: base.DecisionID, 1845 dst: base.DecisionID 1846) -> Optional[int]: 1847 """ 1848 Returns the number of hops required to move from the given source to 1849 the given destination, ignoring edge directions & requirements. 1850 Looks that up in the given `HopPaths` dictionary. Returns 0 when 1851 source and destination are the same. 1852 1853 For example: 1854 1855 >>> e = core.DiscreteExploration.example() 1856 >>> hops = shortestHopPaths(e) 1857 >>> hopDistance(hops, 0, 1) 1858 1 1859 >>> hopDistance(hops, 1, 0) 1860 1 1861 >>> hopDistance(hops, 0, 0) 1862 0 1863 >>> hopDistance(hops, 0, 0) 1864 0 1865 >>> hopDistance(hops, 0, 4) is None 1866 True 1867 >>> hopDistance(hops, 4, 0) is None 1868 True 1869 >>> hopDistance(hops, 0, 5) 1870 2 1871 >>> hopDistance(hops, 5, 0) 1872 2 1873 >>> hopDistance(hops, 5, 1) 1874 3 1875 >>> hopDistance(hops, 1, 5) 1876 3 1877 >>> hopDistance(hops, 3, 5) 1878 1 1879 >>> hopDistance(hops, 5, 3) 1880 1 1881 >>> dIDs = list(e[-1].graph) 1882 >>> for i, src in enumerate(dIDs): 1883 ... for j in range(i + 1, len(dIDs)): 1884 ... dst = dIDs[j] 1885 ... assert ( 1886 ... hopDistance(hops, src, dst) == hopDistance(hops, dst, src) 1887 ... ) 1888 """ 1889 if src == dst: 1890 return 0 1891 elif src < dst: 1892 path = hopPaths.get((src, dst)) 1893 if path is None: 1894 return None 1895 else: 1896 return 1 + len(path) 1897 else: 1898 path = hopPaths.get((dst, src)) 1899 if path is None: 1900 return None 1901 else: 1902 return 1 + len(path) 1903 1904 1905@elide 1906@analyzer('exploration') 1907def shortestHopPaths( 1908 exploration: core.DiscreteExploration, 1909 edgeFilter: Optional[Callable[ 1910 [base.DecisionID, base.Transition, base.DecisionID, core.DecisionGraph], 1911 bool 1912 ]] = None 1913) -> HopPaths: 1914 """ 1915 Creates a dictionary that holds shortest paths between pairs of 1916 nodes, ignoring edge directions and requirements entirely. 1917 1918 If given an `edgeFilter`, that function is applied with source ID, 1919 transition name, destination ID, and full graph as arguments and 1920 edges for which it returns False are ignored when computing hops. 1921 Note that you have to filter out all edges in both directions between 1922 two nodes for there not to be a 1-hop path between them. 1923 1924 Keys in the dictionary are pairs of decision IDs, where the decision 1925 with the smaller ID always comes first (because shortest hop paths 1926 are symmetric so we don't store the reverse paths). Values are lists 1927 of decision IDs that can be traversed to get from the first decision 1928 to the second, with an empty list indicating adjacent decisions 1929 (note that these "hop paths" cannot always be traversed in the 1930 actual graph because they may go the "wrong way" across one-way 1931 connections). The number of hops required to get between the nodes 1932 is one more than the length of the path. Decision pairs which are 1933 not reachable from each other will not be included in the 1934 dictionary. Only decisions present in the final graph in the 1935 exploration will be included, and only edges present in that final 1936 graph will be considered. 1937 1938 Where there are multiple shortest hop paths, an arbitrary one is 1939 included in the result. 1940 1941 TODO: EXAMPLE 1942 >>> e = core.DiscreteExploration.example() 1943 >>> print(e[-1].graph.namesListing(e[-1].graph)) 1944 0 (House) 1945 1 (_u.0) 1946 2 (Cellar) 1947 3 (Yard) 1948 5 (Lane) 1949 <BLANKLINE> 1950 >>> shortest = dict(nx.all_pairs_shortest_path(e[-1].graph.connections())) 1951 >>> for src in shortest: 1952 ... print(f"{src} -> {shortest[src]}") 1953 0 -> {0: [0], 1: [0, 1], 2: [0, 2], 3: [0, 3], 5: [0, 3, 5]} 1954 1 -> {1: [1], 0: [1, 0], 2: [1, 0, 2], 3: [1, 0, 3], 5: [1, 0, 3, 5]} 1955 2 -> {2: [2], 0: [2, 0], 3: [2, 3], 1: [2, 0, 1], 5: [2, 3, 5]} 1956 3 -> {3: [3], 0: [3, 0], 2: [3, 2], 5: [3, 5], 1: [3, 0, 1]} 1957 5 -> {5: [5], 3: [5, 3], 0: [5, 3, 0], 2: [5, 3, 2], 1: [5, 3, 0, 1]} 1958 >>> hops = shortestHopPaths(e) 1959 >>> for src in hops: 1960 ... print(f"{src} -> {hops[src]}") 1961 (0, 1) -> [] 1962 (0, 2) -> [] 1963 (0, 3) -> [] 1964 (0, 5) -> [3] 1965 (1, 2) -> [0] 1966 (1, 3) -> [0] 1967 (1, 5) -> [0, 3] 1968 (2, 3) -> [] 1969 (2, 5) -> [3] 1970 (3, 5) -> [] 1971 """ 1972 graph = exploration[-1].graph 1973 allIDs = sorted(graph) 1974 connections = graph.connections(edgeFilter) 1975 shortest = dict(nx.all_pairs_shortest_path(connections)) 1976 1977 result = {} 1978 for i, src in enumerate(allIDs): 1979 for j in range(i + 1, len(allIDs)): 1980 dst = allIDs[j] 1981 path = shortest.get(src, {}).get(dst, None) 1982 if path is not None: 1983 result[(src, dst)] = path[1:-1] 1984 1985 return result 1986 1987 1988#---------------# 1989# Full analysis # 1990#---------------# 1991 1992def runFullAnalysis( 1993 exploration: core.DiscreteExploration, 1994 elide: Collection[str] = ELIDE, 1995 finalOnly: Collection[str] = FINAL_ONLY 1996) -> FullAnalysisResults: 1997 """ 1998 Runs every single analysis function on every valid target for that 1999 function in the given exploration, building up the cache of 2000 `FullAnalysisResults` in `ALL_ANALYZERS`. Returns the relevant 2001 `FullAnalysisResults` object. 2002 2003 Skips analyzers in the provided `elide` collection, which by default 2004 is the `ELIDE` global set containing functions explicitly decorated 2005 with `elide`. Analyzers in the `FINAL_ONLY` set are only applied to 2006 the final decision graph in the exploration (although note that they 2007 might call other analyzers which recursively need to analyze prior 2008 steps). `finalOnly` only has an effect for analyzers with 'step', 2009 'stepDecision', or 'stepTransition' units. 2010 """ 2011 for aName, analyzer in ALL_ANALYZERS.items(): 2012 # Skip this one if we're told to 2013 if aName in elide: 2014 continue 2015 # Split out cases for each unit & apply as appropriate 2016 unit = analyzer._unit 2017 if unit == 'step': 2018 sa = cast(StepAnalyzer, analyzer) 2019 if aName in finalOnly: 2020 sa(exploration, len(exploration) - 1) 2021 else: 2022 for step in range(len(exploration)): 2023 sa(exploration, step) 2024 elif unit == 'stepDecision': 2025 sda = cast(StepDecisionAnalyzer, analyzer) 2026 # Only apply to final graph if it's in finalOnly 2027 if aName in finalOnly: 2028 step = len(exploration) - 1 2029 for dID in exploration[step].graph: 2030 sda(exploration, step, dID) 2031 else: 2032 for step in range(len(exploration)): 2033 for dID in exploration[step].graph: 2034 sda(exploration, step, dID) 2035 elif unit == 'stepTransition': 2036 sta = cast(StepTransitionAnalyzer, analyzer) 2037 if aName in finalOnly: 2038 step = len(exploration) - 1 2039 edges = exploration[step].graph.allEdges() 2040 for (src, dst, transition) in edges: 2041 sta(exploration, step, src, transition, dst) 2042 else: 2043 for step in range(len(exploration)): 2044 edges = exploration[step].graph.allEdges() 2045 for (src, dst, transition) in edges: 2046 sta(exploration, step, src, transition, dst) 2047 elif unit == 'decision': 2048 da = cast(DecisionAnalyzer, analyzer) 2049 for dID in exploration.allDecisions(): 2050 da(exploration, dID) 2051 elif unit == 'transition': 2052 ta = cast(TransitionAnalyzer, analyzer) 2053 for (src, trans, dst) in exploration.allTransitions(): 2054 ta(exploration, src, trans, dst) 2055 elif unit == 'exploration': 2056 ea = cast(ExplorationAnalyzer, analyzer) 2057 ea(exploration) 2058 else: 2059 raise ValueError(f"Invalid analysis unit {unit!r}.") 2060 2061 return ANALYSIS_RESULTS[id(exploration)] 2062 2063 2064#--------------------# 2065# Analysis accessors # 2066#--------------------# 2067 2068# These functions access pre-computed analysis results. Call 2069# `runFullAnalysis` first to populate those. 2070 2071 2072def getDecisionAnalyses( 2073 exploration: core.DiscreteExploration, 2074 dID: base.DecisionID 2075) -> AnalysisResults: 2076 """ 2077 Retrieves all pre-computed all-step analysis results for the 2078 specified decision. Use `runFullAnalysis` or call specific analysis 2079 functions of interest first to populate these results. Does not 2080 include per-step decision analyses. 2081 2082 Returns the dictionary of `AnalysisResults`, which can be modified to 2083 update stored results if necessary (although it's better to write 2084 additional analysis routines using the `@analyzer` decorator). 2085 """ 2086 cached = ANALYSIS_RESULTS.setdefault( 2087 id(exploration), 2088 newFullAnalysisResults() 2089 ) 2090 return cached["perDecision"].setdefault(dID, {}) 2091 2092 2093def getTransitionAnalyses( 2094 exploration: core.DiscreteExploration, 2095 source: base.DecisionID, 2096 transition: base.Transition, 2097 destination: base.DecisionID 2098) -> AnalysisResults: 2099 """ 2100 Like `getDecisionAnalyses` but returns analyses for a transition 2101 instead of a decision. 2102 """ 2103 cached = ANALYSIS_RESULTS.setdefault( 2104 id(exploration), 2105 newFullAnalysisResults() 2106 ) 2107 return cached["perTransition"].setdefault( 2108 (source, transition, destination), 2109 {} 2110 ) 2111 2112 2113def getStepDecisionAnalyses( 2114 exploration: core.DiscreteExploration, 2115 step: int, 2116 dID: base.DecisionID 2117) -> AnalysisResults: 2118 """ 2119 Like `getDecisionAnalyses` but for analyses applicable only to the 2120 specified exploration step. 2121 """ 2122 cached = ANALYSIS_RESULTS.setdefault( 2123 id(exploration), 2124 newFullAnalysisResults() 2125 ) 2126 stepwise = cached.setdefault("perStepDecision", []) 2127 while step >= len(stepwise): 2128 stepwise.append({}) 2129 return stepwise[step].setdefault(dID, {}) 2130 2131 2132def getStepTransitionAnalyses( 2133 exploration: core.DiscreteExploration, 2134 step: int, 2135 source: base.DecisionID, 2136 transition: base.Transition, 2137 destination: base.DecisionID 2138) -> AnalysisResults: 2139 """ 2140 Like `getStepDecisionAnalyses` but for a transition at a particular 2141 step, not a decision. 2142 """ 2143 cached = ANALYSIS_RESULTS.setdefault( 2144 id(exploration), 2145 newFullAnalysisResults() 2146 ) 2147 stepwise = cached.setdefault("perStepTransition", []) 2148 while step >= len(stepwise): 2149 stepwise.append({}) 2150 return stepwise[step].setdefault((source, transition, destination), {}) 2151 2152 2153def getStepAnalyses( 2154 exploration: core.DiscreteExploration, 2155 step: int 2156) -> AnalysisResults: 2157 """ 2158 Like `getDecisionAnalyses` but retrieves full-step analysis results 2159 for the specified exploration step. 2160 """ 2161 cached = ANALYSIS_RESULTS.setdefault( 2162 id(exploration), 2163 newFullAnalysisResults() 2164 ) 2165 stepwise = cached.setdefault("perStep", []) 2166 while step >= len(stepwise): 2167 stepwise.append({}) 2168 return stepwise[step] 2169 2170 2171def getExplorationAnalyses( 2172 exploration: core.DiscreteExploration 2173) -> AnalysisResults: 2174 """ 2175 Like `getDecisionAnalyses` but retrieves full-exploration analysis 2176 results. 2177 """ 2178 cached = ANALYSIS_RESULTS.setdefault( 2179 id(exploration), 2180 newFullAnalysisResults() 2181 ) 2182 return cached.setdefault("overall", {}) 2183 2184 2185class AnalyzersByUnit(TypedDict): 2186 """ 2187 Holds lists of analyzers for each analysis unit type. 2188 """ 2189 step: List[StepAnalyzer] 2190 stepDecision: List[StepDecisionAnalyzer] 2191 stepTransition: List[StepTransitionAnalyzer] 2192 decision: List[DecisionAnalyzer] 2193 transition: List[TransitionAnalyzer] 2194 exploration: List[ExplorationAnalyzer] 2195 2196 2197def analyzersByUnit(onlyInclude: Optional[Set[str]] = None) -> AnalyzersByUnit: 2198 """ 2199 Returns an `AnalyzersByUnit` dictionary containing all analyzers 2200 from `ALL_ANALYZERS` which are in the given `onlyInclude` set (or 2201 just all of them if no set is specified). This will by default be all 2202 analyzers registered so far. 2203 """ 2204 byUnit: AnalyzersByUnit = { 2205 "step": [], 2206 "stepDecision": [], 2207 "stepTransition": [], 2208 "decision": [], 2209 "transition": [], 2210 "exploration": [] 2211 } 2212 for analyzerName in ALL_ANALYZERS: 2213 if onlyInclude is not None and analyzerName not in onlyInclude: 2214 continue 2215 analyzer = ALL_ANALYZERS[analyzerName] 2216 unit = analyzer._unit 2217 byUnit[unit].append(analyzer) # type: ignore 2218 # Mypy will just have to trust that We've put the correct unit 2219 # values on each analyzer. That relationship is type-checked in 2220 # the `analyzer` definition. 2221 2222 return byUnit
31def describeConsequence(consequence: base.Consequence) -> str: 32 """ 33 Returns a string which concisely describes a consequence list. 34 Returns an empty string if given an empty consequence. Examples: 35 36 >>> describeConsequence([]) 37 '' 38 >>> describeConsequence([ 39 ... base.effect(gain=('gold', 5), delay=2, charges=3), 40 ... base.effect(lose='flight') 41 ... ]) 42 'gain gold*5 ,2 =3; lose flight' 43 >>> from . import commands 44 >>> d = describeConsequence([ 45 ... base.effect(edit=[ 46 ... [ 47 ... commands.command('val', '5'), 48 ... commands.command('empty', 'list'), 49 ... commands.command('append') 50 ... ], 51 ... [ 52 ... commands.command('val', '11'), 53 ... commands.command('assign', 'var'), 54 ... commands.command('op', '+', '$var', '$var') 55 ... ], 56 ... ]) 57 ... ]) 58 >>> d 59 'with consequences:\ 60\\n edit {\ 61\\n val 5;\ 62\\n empty list;\ 63\\n append $_;\ 64\\n } {\ 65\\n val 11;\ 66\\n assign var $_;\ 67\\n op + $var $var;\ 68\\n }\ 69\\n' 70 >>> for line in d.splitlines(): 71 ... print(line) 72 with consequences: 73 edit { 74 val 5; 75 empty list; 76 append $_; 77 } { 78 val 11; 79 assign var $_; 80 op + $var $var; 81 } 82 """ 83 edesc = '' 84 pf = parsing.ParseFormat() 85 if consequence: 86 parts = [] 87 for item in consequence: 88 # TODO: Challenges and Conditions here! 89 if 'skills' in item: # a Challenge 90 item = cast(base.Challenge, item) 91 parts.append(pf.unparseChallenge(item)) 92 elif 'value' in item: # an Effect 93 item = cast(base.Effect, item) 94 parts.append(pf.unparseEffect(item)) 95 elif 'condition' in item: # a Condition 96 item = cast(base.Condition, item) 97 parts.append(pf.unparseCondition(item)) 98 else: 99 raise TypeError( 100 f"Invalid consequence item (no 'skills', 'value', or" 101 f" 'condition' key found):\n{repr(item)}" 102 ) 103 edesc = '; '.join(parts) 104 if len(edesc) > 60 or '\n' in edesc: 105 edesc = 'with consequences:\n' + ';\n'.join( 106 textwrap.indent(part, ' ') 107 for part in parts 108 ) + '\n' 109 110 return edesc
Returns a string which concisely describes a consequence list. Returns an empty string if given an empty consequence. Examples:
>>> describeConsequence([])
''
>>> describeConsequence([
... base.effect(gain=('gold', 5), delay=2, charges=3),
... base.effect(lose='flight')
... ])
'gain gold*5 ,2 =3; lose flight'
>>> from . import commands
>>> d = describeConsequence([
... base.effect(edit=[
... [
... commands.command('val', '5'),
... commands.command('empty', 'list'),
... commands.command('append')
... ],
... [
... commands.command('val', '11'),
... commands.command('assign', 'var'),
... commands.command('op', '+', '$var', '$var')
... ],
... ])
... ])
>>> d
'with consequences:\n edit {\n val 5;\n empty list;\n append $_;\n } {\n val 11;\n assign var $_;\n op + $var $var;\n }\n'
>>> for line in d.splitlines():
... print(line)
with consequences:
edit {
val 5;
empty list;
append $_;
} {
val 11;
assign var $_;
op + $var $var;
}
113def describeProgress(exploration: core.DiscreteExploration) -> str: 114 """ 115 Describes the progress of an exploration by noting each room/zone 116 visited and explaining the options visible at each point plus which 117 option was taken. Notes powers/tokens gained/lost along the way. 118 Returns a string. 119 120 Example: 121 >>> from exploration import journal 122 >>> e = journal.convertJournal('''\\ 123 ... S Start::pit 124 ... A gain jump 125 ... A gain attack 126 ... n button check 127 ... zz Wilds 128 ... o up 129 ... q _flight 130 ... o left 131 ... x left left_nook right 132 ... a geo_rock 133 ... At gain geo*15 134 ... At deactivate 135 ... o up 136 ... q _tall_narrow 137 ... t right 138 ... o right 139 ... q attack 140 ... ''') 141 >>> for line in describeProgress(e).splitlines(): 142 ... print(line) 143 Start of the exploration 144 Start exploring domain main at 0 (Start::pit) 145 Gained capability 'attack' 146 Gained capability 'jump' 147 At decision 0 (Start::pit) 148 In zone Start 149 In region Wilds 150 There are transitions: 151 left to unconfirmed 152 up to unconfirmed; requires _flight 153 1 note(s) at this step 154 Explore left from decision 0 (Start::pit) to 2 (now Start::left_nook) 155 At decision 2 (Start::left_nook) 156 There are transitions: 157 right to 0 (Start::pit) 158 There are actions: 159 geo_rock 160 Do action geo_rock 161 Gained 15 geo(s) 162 Take right from decision 2 (Start::left_nook) to 0 (Start::pit) 163 At decision 0 (Start::pit) 164 There are transitions: 165 left to 2 (Start::left_nook) 166 right to unconfirmed; requires attack 167 up to unconfirmed; requires _flight 168 Waiting for another action... 169 End of the exploration. 170 """ 171 result = '' 172 173 regions: Set[base.Zone] = set() 174 zones: Set[base.Zone] = set() 175 last: Union[base.DecisionID, Set[base.DecisionID], None] = None 176 lastState: base.State = base.emptyState() 177 prevCapabilities = base.effectiveCapabilitySet(lastState) 178 prevMechanisms = lastState['mechanisms'] 179 oldActiveDecisions: Set[base.DecisionID] = set() 180 for i, situation in enumerate(exploration): 181 if i == 0: 182 result += "Start of the exploration\n" 183 184 # Extract info 185 graph = situation.graph 186 activeDecisions = exploration.getActiveDecisions(i) 187 newActive = activeDecisions - oldActiveDecisions 188 departedFrom = exploration.movementAtStep(i)[0] 189 # TODO: use the other parts of this? 190 nowZones: Set[base.Zone] = set() 191 for active in activeDecisions: 192 nowZones |= graph.zoneAncestors(active) 193 regionsHere = set( 194 z 195 for z in nowZones 196 if graph.zoneHierarchyLevel(z) == 1 197 ) 198 zonesHere = set( 199 z 200 for z in nowZones 201 if graph.zoneHierarchyLevel(z) == 0 202 ) 203 here = departedFrom 204 state = situation.state 205 capabilities = base.effectiveCapabilitySet(state) 206 mechanisms = state['mechanisms'] 207 208 # Describe capabilities gained/lost relative to previous step 209 # (i.e., as a result of the previous action) 210 gained = ( 211 capabilities['capabilities'] 212 - prevCapabilities['capabilities'] 213 ) 214 gainedTokens = [] 215 for tokenType in capabilities['tokens']: 216 net = ( 217 capabilities['tokens'][tokenType] 218 - prevCapabilities['tokens'].get(tokenType, 0) 219 ) 220 if net != 0: 221 gainedTokens.append((tokenType, net)) 222 changed = [ 223 mID 224 for mID in list(mechanisms.keys()) + list(prevMechanisms.keys()) 225 if mechanisms.get(mID) != prevMechanisms.get(mID) 226 ] 227 228 for capability in sorted(gained): 229 result += f" Gained capability '{capability}'\n" 230 231 for tokenType, net in gainedTokens: 232 if net > 0: 233 result += f" Gained {net} {tokenType}(s)\n" 234 else: 235 result += f" Lost {-net} {tokenType}(s)\n" 236 237 for mID in changed: 238 oldState = prevMechanisms.get(mID, base.DEFAULT_MECHANISM_STATE) 239 newState = mechanisms.get(mID, base.DEFAULT_MECHANISM_STATE) 240 241 details = graph.mechanismDetails(mID) 242 if details is None: 243 mName = "(unknown)" 244 else: 245 mName = details[1] 246 result += ( 247 f" Set mechanism {mID} ({mName}) to {newState} (was" 248 f" {oldState})" 249 ) 250 # TODO: Test this! 251 252 if isinstance(departedFrom, base.DecisionID): 253 # Print location info 254 if here != last: 255 if here is None: 256 result += "Without a position...\n" 257 elif isinstance(here, set): 258 result += f"With {len(here)} active decisions\n" 259 # TODO: List them using namesListing? 260 else: 261 result += f"At decision {graph.identityOf(here)}\n" 262 newZones = zonesHere - zones 263 for zone in sorted(newZones): 264 result += f" In zone {zone}\n" 265 newRegions = regionsHere - regions 266 for region in sorted(newRegions): 267 result += f" In region {region}\n" 268 269 elif isinstance(departedFrom, set): # active in spreading domain 270 spreadingDomain = graph.domainFor(list(departedFrom)[0]) 271 result += ( 272 f" In domain {spreadingDomain} with {len(departedFrom)}" 273 f" active decisions...\n" 274 ) 275 276 else: 277 assert departedFrom is None 278 279 # Describe new position/positions at start of this step 280 if len(newActive) > 1: 281 newListing = ', '.join( 282 sorted(graph.identityOf(n) for n in newActive) 283 ) 284 result += ( 285 f" There are {len(newActive)} new active decisions:" 286 f"\n {newListing}" 287 ) 288 289 elif len(newActive) == 1: 290 here = list(newActive)[0] 291 292 outgoing = graph.destinationsFrom(here) 293 294 transitions = {t: d for (t, d) in outgoing.items() if d != here} 295 actions = {t: d for (t, d) in outgoing.items() if d == here} 296 if transitions: 297 result += " There are transitions:\n" 298 for transition in sorted(transitions): 299 dest = transitions[transition] 300 if not graph.isConfirmed(dest): 301 destSpec = 'unconfirmed' 302 else: 303 destSpec = graph.identityOf(dest) 304 req = graph.getTransitionRequirement(here, transition) 305 rDesc = '' 306 if req != base.ReqNothing(): 307 rDesc = f"; requires {req.unparse()}" 308 cDesc = describeConsequence( 309 graph.getConsequence(here, transition) 310 ) 311 if cDesc: 312 cDesc = '; ' + cDesc 313 result += ( 314 f" {transition} to {destSpec}{rDesc}{cDesc}\n" 315 ) 316 317 if actions: 318 result += " There are actions:\n" 319 for action in sorted(actions): 320 req = graph.getTransitionRequirement(here, action) 321 rDesc = '' 322 if req != base.ReqNothing(): 323 rDesc = f"; requires {req.unparse()}" 324 cDesc = describeConsequence( 325 graph.getConsequence(here, action) 326 ) 327 if cDesc: 328 cDesc = '; ' + cDesc 329 if rDesc or cDesc: 330 desc = (rDesc + cDesc)[2:] # chop '; ' from either 331 result += f" {action} ({desc})\n" 332 else: 333 result += f" {action}\n" 334 335 # note annotations 336 if len(situation.annotations) > 0: 337 result += ( 338 f" {len(situation.annotations)} note(s) at this step\n" 339 ) 340 341 # Describe action taken 342 if situation.action is None and situation.type == "pending": 343 result += "Waiting for another action...\n" 344 else: 345 desc = base.describeExplorationAction(situation, situation.action) 346 desc = desc[0].capitalize() + desc[1:] 347 result += desc + '\n' 348 349 if i == len(exploration) - 1: 350 result += "End of the exploration.\n" 351 352 # Update state variables 353 oldActiveDecisions = activeDecisions 354 prevCapabilities = capabilities 355 prevMechanisms = mechanisms 356 regions = regionsHere 357 zones = zonesHere 358 if here is not None: 359 last = here 360 lastState = state 361 362 return result
Describes the progress of an exploration by noting each room/zone visited and explaining the options visible at each point plus which option was taken. Notes powers/tokens gained/lost along the way. Returns a string.
Example:
>>> from exploration import journal
>>> e = journal.convertJournal('''\
... S Start::pit
... A gain jump
... A gain attack
... n button check
... zz Wilds
... o up
... q _flight
... o left
... x left left_nook right
... a geo_rock
... At gain geo*15
... At deactivate
... o up
... q _tall_narrow
... t right
... o right
... q attack
... ''')
>>> for line in describeProgress(e).splitlines():
... print(line)
Start of the exploration
Start exploring domain main at 0 (Start::pit)
Gained capability 'attack'
Gained capability 'jump'
At decision 0 (Start::pit)
In zone Start
In region Wilds
There are transitions:
left to unconfirmed
up to unconfirmed; requires _flight
1 note(s) at this step
Explore left from decision 0 (Start::pit) to 2 (now Start::left_nook)
At decision 2 (Start::left_nook)
There are transitions:
right to 0 (Start::pit)
There are actions:
geo_rock
Do action geo_rock
Gained 15 geo(s)
Take right from decision 2 (Start::left_nook) to 0 (Start::pit)
At decision 0 (Start::pit)
There are transitions:
left to 2 (Start::left_nook)
right to unconfirmed; requires attack
up to unconfirmed; requires _flight
Waiting for another action...
End of the exploration.
The different kinds of analysis units we consider: per-step-per-decision, per-step-per-transition, per-step, per-final-decision, per-final-transition, and per-exploration (i.e. overall).
Analysis results are dictionaries that map analysis routine names to results from those routines, which can be of any type.
A specific transition is identified by its source decision ID and its transition name. Note that transitions which get renamed are treated as two separate transitions.
In contrast to a SpecificTransition
, an OverspecificTransition
includes the destination of the transition, which might help disambiguate
cases where a transition is created, then re-targeted or deleted and
re-created with a different destination. Transitions which get renamed
still are treated as two separate transitions.
Decision analysis results are stored per-decision, with a dictionary of
property-name → value associations. These properties either apply to
decisions across all steps of an exploration, or apply to decisions in a
particular core.DecisionGraph
.
Per-transition analysis results, similar to DecisionAnalyses
.
Per-exploration-step analysis results are stored in a list and indexed by exploration step integers.
Per-step-per-decision analysis results are stored as a list of decision analysis results.
Per-step-per-transition analysis results are stored as a list of transition analysis results.
Whole-exploration analyses are just a normal AnalysisResults
dictionary.
447class FullAnalysisResults(TypedDict): 448 """ 449 Full analysis results hold every kind of analysis result in one 450 dictionary. 451 """ 452 perDecision: DecisionAnalyses 453 perTransition: TransitionAnalyses 454 perStep: StepAnalyses 455 perStepDecision: StepwiseDecisionAnalyses 456 perStepTransition: StepwiseTransitionAnalyses 457 overall: ExplorationAnalyses
Full analysis results hold every kind of analysis result in one dictionary.
460def newFullAnalysisResults() -> FullAnalysisResults: 461 """ 462 Returns a new empty `FullAnalysisResults` dictionary. 463 """ 464 return { 465 'perDecision': {}, 466 'perTransition': {}, 467 'perStep': [], 468 'perStepDecision': [], 469 'perStepTransition': [], 470 'overall': {} 471 }
Returns a new empty FullAnalysisResults
dictionary.
Parameter specification variable for AnalysisFunction
definition.
478class AnalysisFunction(Protocol[Params]): 479 """ 480 Analysis functions are callable, but also have a `_unit` attribute 481 which is a string. 482 """ 483 _unit: AnalysisUnit 484 __name__: str 485 __doc__: str 486 def __call__( 487 self, 488 exploration: core.DiscreteExploration, 489 *args: Params.args, 490 **kwargs: Params.kwargs 491 ) -> Any: 492 ...
Analysis functions are callable, but also have a _unit
attribute
which is a string.
1953def _no_init_or_replace_init(self, *args, **kwargs): 1954 cls = type(self) 1955 1956 if cls._is_protocol: 1957 raise TypeError('Protocols cannot be instantiated') 1958 1959 # Already using a custom `__init__`. No need to calculate correct 1960 # `__init__` to call. This can lead to RecursionError. See bpo-45121. 1961 if cls.__init__ is not _no_init_or_replace_init: 1962 return 1963 1964 # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`. 1965 # The first instantiation of the subclass will call `_no_init_or_replace_init` which 1966 # searches for a proper new `__init__` in the MRO. The new `__init__` 1967 # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent 1968 # instantiation of the protocol subclass will thus use the new 1969 # `__init__` and no longer call `_no_init_or_replace_init`. 1970 for base in cls.__mro__: 1971 init = base.__dict__.get('__init__', _no_init_or_replace_init) 1972 if init is not _no_init_or_replace_init: 1973 cls.__init__ = init 1974 break 1975 else: 1976 # should not happen 1977 cls.__init__ = object.__init__ 1978 1979 cls.__init__(self, *args, **kwargs)
A step analyzer is a function which will receive a
core.DiscreteExploration
along with the step in that exploration being
considered. It can return any type of analysis result.
Like a StepAnalyzer
but also gets a decision ID to consider.
Like a StepAnalyzer
but also gets a source decision ID, a transition
name, and a destination decision ID to target.
A decision analyzer gets full analysis results to update plus an exploration and a particular decision ID to consider.
Like a DecisionAnalyzer
but gets a transition name as well.
Analyzes overall properties of an entire core.DiscreteExploration
.
Caches analysis results, keyed by the id
of the
core.DiscreteExploration
they're based on.
556class NotCached: 557 """ 558 Reference object for specifying that no cached value is available, 559 since `None` is a valid cached value. 560 """ 561 pass
Reference object for specifying that no cached value is available,
since None
is a valid cached value.
564def lookupAnalysisResult( 565 cache: FullAnalysisResults, 566 analyzer: AnalysisFunction, 567 argsInOrder: Sequence[Any] 568) -> Union[Type[NotCached], Any]: 569 """ 570 Looks up an analysis result for the given function in the given 571 cache. The function must have been decorated with `analyzer`. The 572 bound arguments must match the unit of analysis, for example, if the 573 unit is 'stepDecision', the arguments must be those for a 574 `StepDecisionAnalyzer`. The bound arguments should have had 575 `apply_defaults` called already to fill in default argument values. 576 Returns the special object `NotCached` if there is no cached value 577 for the specified arguments yet. 578 """ 579 unit = analyzer._unit 580 if unit == 'step': 581 whichStep = argsInOrder[1] 582 perStep = cache['perStep'] 583 while len(perStep) <= whichStep: 584 perStep.append({}) 585 return perStep[whichStep].get(analyzer.__name__, NotCached) 586 elif unit == 'stepDecision': 587 whichStep = argsInOrder[1] 588 whichDecision = argsInOrder[2] 589 perStepDecision = cache['perStepDecision'] 590 while len(perStepDecision) <= whichStep: 591 perStepDecision.append({}) 592 forThis = perStepDecision[whichStep].get(whichDecision) 593 if forThis is None: 594 return NotCached 595 return forThis.get(analyzer.__name__, NotCached) 596 elif unit == 'stepTransition': 597 whichStep = argsInOrder[1] 598 whichTransition = (argsInOrder[2], argsInOrder[3], argsInOrder[4]) 599 perStepTransition = cache['perStepTransition'] 600 while len(perStepTransition) <= whichStep: 601 perStepTransition.append({}) 602 forThis = perStepTransition[whichStep].get(whichTransition) 603 if forThis is None: 604 return NotCached 605 return forThis.get(analyzer.__name__, NotCached) 606 elif unit == 'decision': 607 whichDecision = argsInOrder[1] 608 perDecision = cache['perDecision'] 609 if whichDecision not in perDecision: 610 return NotCached 611 return perDecision[whichDecision].get(analyzer.__name__, NotCached) 612 elif unit == 'transition': 613 whichTransition = (argsInOrder[1], argsInOrder[2], argsInOrder[3]) 614 perTransition = cache['perTransition'] 615 if whichTransition not in perTransition: 616 return NotCached 617 return perTransition[whichTransition].get( 618 analyzer.__name__, 619 NotCached 620 ) 621 elif unit == 'exploration': 622 return cache['overall'].get(analyzer.__name__, NotCached) 623 else: 624 raise ValueError(f"Invalid analysis unit {unit!r}.")
Looks up an analysis result for the given function in the given
cache. The function must have been decorated with analyzer
. The
bound arguments must match the unit of analysis, for example, if the
unit is 'stepDecision', the arguments must be those for a
StepDecisionAnalyzer
. The bound arguments should have had
apply_defaults
called already to fill in default argument values.
Returns the special object NotCached
if there is no cached value
for the specified arguments yet.
627def saveAnalysisResult( 628 cache: FullAnalysisResults, 629 result: Any, 630 analyzer: AnalysisFunction, 631 argsInOrder: Sequence[Any] 632) -> None: 633 """ 634 Saves an analysis result in the specified cache. The bound arguments 635 must match the unit, for example, if the unit is 'stepDecision', the 636 arguments must be those for a `StepDecisionAnalyzer`. 637 """ 638 unit = analyzer._unit 639 if unit == 'step': 640 whichStep = argsInOrder[1] 641 perStep = cache['perStep'] 642 while len(perStep) <= whichStep: 643 perStep.append({}) 644 perStep[whichStep][analyzer.__name__] = result 645 elif unit == 'stepDecision': 646 whichStep = argsInOrder[1] 647 whichDecision = argsInOrder[2] 648 perStepDecision = cache['perStepDecision'] 649 while len(perStepDecision) <= whichStep: 650 perStepDecision.append({}) 651 forThis = perStepDecision[whichStep].setdefault(whichDecision, {}) 652 forThis[analyzer.__name__] = result 653 elif unit == 'stepTransition': 654 whichStep = argsInOrder[1] 655 whichTransition = (argsInOrder[2], argsInOrder[3], argsInOrder[4]) 656 perStepTransition = cache['perStepTransition'] 657 while len(perStepTransition) <= whichStep: 658 perStepTransition.append({}) 659 forThis = perStepTransition[whichStep].setdefault(whichTransition, {}) 660 forThis[analyzer.__name__] = result 661 elif unit == 'decision': 662 whichDecision = argsInOrder[1] 663 perDecision = cache['perDecision'] 664 perDecision.setdefault(whichDecision, {})[analyzer.__name__] = result 665 elif unit == 'transition': 666 whichTransition = (argsInOrder[1], argsInOrder[2], argsInOrder[3]) 667 perTransition = cache['perTransition'] 668 perTransition.setdefault( 669 whichTransition, 670 {} 671 )[analyzer.__name__] = result 672 elif unit == 'exploration': 673 cache['overall'][analyzer.__name__] = result 674 else: 675 raise ValueError(f"Invalid analysis unit {unit!r}.")
Saves an analysis result in the specified cache. The bound arguments
must match the unit, for example, if the unit is 'stepDecision', the
arguments must be those for a StepDecisionAnalyzer
.
Holds all analyzers indexed by name with the analysis unit plus function
as the value. The analyzer
decorator registers them.
Whether or not to record time spent by each analysis function.
691class AnalyzerPerf(TypedDict): 692 """ 693 Tracks performance of an analysis function, recording total calls, 694 non-cached calls, time spent looking up cached results, and time 695 spent in non-cached calls (including the failed cache lookup and 696 saving the result in the cache). 697 """ 698 calls: int 699 nonCached: int 700 lookupTime: float 701 analyzeTime: float
Tracks performance of an analysis function, recording total calls, non-cached calls, time spent looking up cached results, and time spent in non-cached calls (including the failed cache lookup and saving the result in the cache).
704def newAnalyzerPerf() -> AnalyzerPerf: 705 """ 706 Creates a new empty `AnalyzerPerf` dictionary. 707 """ 708 return { 709 "calls": 0, 710 "nonCached": 0, 711 "lookupTime": 0.0, 712 "analyzeTime": 0.0 713 }
Creates a new empty AnalyzerPerf
dictionary.
Records number-of-calls, number-of-non-cached calls, and time spent in
each analysis function, when RECORD_PROFILE
is set to True
.
Analyzers which should not be included in CSV output by default.
Per-step/step-decision/step-transition analyzers which should by default only be applied to the final step of an exploration to save time.
735def getArgsInOrder( 736 f: Callable, 737 *args: Any, 738 **kwargs: Any 739) -> List[Any]: 740 """ 741 Given a callable and some arguments, returns a list of argument and 742 some arguments, returns a list of argument values in the same order 743 as that function would accept them from the given arguments, 744 accounting for things like keyword arguments and default values. 745 746 For example: 747 748 >>> def f(a, /, b, *more, x=3, y=10, **kw): 749 ... pass 750 >>> sig = inspect.Signature.from_callable(f) 751 >>> getArgsInOrder(f, 1, 2) 752 [1, 2, 3, 10] 753 >>> getArgsInOrder(f, 4, 5, y=2, x=8) 754 [4, 5, 8, 2] 755 >>> getArgsInOrder(f, 4, y=2, x=8, b=3) 756 [4, 3, 8, 2] 757 >>> getArgsInOrder(f, 4, y=2, x=8, b=3) 758 [4, 3, 8, 2] 759 >>> getArgsInOrder(f, 1, 2, 3, 4) 760 [1, 2, 3, 4, 3, 10] 761 >>> getArgsInOrder(f, 1, 2, 3, 4, q=5, k=9) 762 [1, 2, 3, 4, 3, 10, 5, 9] 763 """ 764 sig = inspect.Signature.from_callable(f) 765 bound = sig.bind(*args, **kwargs) 766 bound.apply_defaults() 767 result = [] 768 for paramName in sig.parameters: 769 param = sig.parameters[paramName] 770 if param.kind in ( 771 inspect.Parameter.POSITIONAL_ONLY, 772 inspect.Parameter.POSITIONAL_OR_KEYWORD, 773 ): 774 result.append(bound.arguments[paramName]) 775 elif param.kind == inspect.Parameter.VAR_POSITIONAL: 776 result.extend(bound.arguments[paramName]) 777 elif param.kind == inspect.Parameter.KEYWORD_ONLY: 778 result.append(bound.arguments[paramName]) 779 elif param.kind == inspect.Parameter.VAR_KEYWORD: 780 result.extend(bound.arguments[paramName].values()) 781 782 return result
Given a callable and some arguments, returns a list of argument and some arguments, returns a list of argument values in the same order as that function would accept them from the given arguments, accounting for things like keyword arguments and default values.
For example:
>>> def f(a, /, b, *more, x=3, y=10, **kw):
... pass
>>> sig = inspect.Signature.from_callable(f)
>>> getArgsInOrder(f, 1, 2)
[1, 2, 3, 10]
>>> getArgsInOrder(f, 4, 5, y=2, x=8)
[4, 5, 8, 2]
>>> getArgsInOrder(f, 4, y=2, x=8, b=3)
[4, 3, 8, 2]
>>> getArgsInOrder(f, 4, y=2, x=8, b=3)
[4, 3, 8, 2]
>>> getArgsInOrder(f, 1, 2, 3, 4)
[1, 2, 3, 4, 3, 10]
>>> getArgsInOrder(f, 1, 2, 3, 4, q=5, k=9)
[1, 2, 3, 4, 3, 10, 5, 9]
785def analyzer(unit: AnalysisUnit) -> Callable[ 786 [Callable[Concatenate[core.DiscreteExploration, Params], Any]], 787 AnalysisFunction 788]: 789 ''' 790 Decorator which sets up caching for an analysis function in the 791 global `ANALYSIS_RESULTS` dictionary. Whenever the decorated function 792 is called, it will first check whether a cached result is available 793 for the same target exploration (by id) and additional target info 794 based on the analysis unit type. If so, the cached result will be 795 returned. This allows analysis functions to simply call each other 796 when they need results and themselves recursively if they need to 797 track things across steps/decisions, while avoiding tons of duplicate 798 work. 799 ''' 800 def makeCachedAnalyzer( 801 baseFunction: Callable[ 802 Concatenate[core.DiscreteExploration, Params], 803 Any 804 ] 805 ) -> AnalysisFunction: 806 """ 807 Decoration function which registers an analysis function with 808 pre-specified dependencies. 809 """ 810 analysisFunction = cast(AnalysisFunction, baseFunction) 811 analysisFunction._unit = unit 812 analyzerName= analysisFunction.__name__ 813 814 @functools.wraps(analysisFunction) 815 def cachingAnalyzer( 816 exploration: core.DiscreteExploration, 817 *args: Params.args, 818 **kwargs: Params.kwargs 819 ): 820 """ 821 This docstring will be replaced with the docstring of the 822 decorated function plus a note about caching. 823 """ 824 if RECORD_PROFILE: 825 ANALYSIS_TIME_SPENT.setdefault( 826 analyzerName, 827 newAnalyzerPerf() 828 ) 829 perf = ANALYSIS_TIME_SPENT[analyzerName] 830 perf["calls"] += 1 831 start = time.perf_counter() 832 cache = ANALYSIS_RESULTS.setdefault( 833 id(exploration), 834 newFullAnalysisResults() 835 ) 836 argsInOrder = getArgsInOrder( 837 baseFunction, 838 exploration, 839 *args, 840 **kwargs 841 ) 842 cachedResult = lookupAnalysisResult( 843 cache, 844 analysisFunction, 845 argsInOrder 846 ) 847 if cachedResult is not NotCached: 848 if RECORD_PROFILE: 849 perf["lookupTime"] += time.perf_counter() - start 850 return cachedResult 851 852 result = analysisFunction(exploration, *args, **kwargs) 853 saveAnalysisResult(cache, result, analysisFunction, argsInOrder) 854 if RECORD_PROFILE: 855 perf["nonCached"] += 1 856 perf["analyzeTime"] += time.perf_counter() - start 857 return result 858 859 cachingAnalyzer.__doc__ = ( 860 textwrap.dedent(analysisFunction.__doc__) 861 + """ 862 863This function's results are cached in the `ALL_ANALYZERS` dictionary, and 864it returns cached results when possible. Use `clearAnalysisCache` to 865clear the analysis cache. 866""" 867 ) 868 869 # Save caching version of analyzer 870 result = cast(AnalysisFunction, cachingAnalyzer) 871 ALL_ANALYZERS[analyzerName] = result 872 return result 873 874 return makeCachedAnalyzer
Decorator which sets up caching for an analysis function in the
global ANALYSIS_RESULTS
dictionary. Whenever the decorated function
is called, it will first check whether a cached result is available
for the same target exploration (by id) and additional target info
based on the analysis unit type. If so, the cached result will be
returned. This allows analysis functions to simply call each other
when they need results and themselves recursively if they need to
track things across steps/decisions, while avoiding tons of duplicate
work.
880def elide(analyzer: T) -> T: 881 """ 882 Returns the given analyzer after noting that its result should *not* 883 be included in CSV output by default. 884 """ 885 ELIDE.add(analyzer.__name__) 886 return analyzer
Returns the given analyzer after noting that its result should not be included in CSV output by default.
889def finalOnly(analyzer: T) -> T: 890 """ 891 Returns the given analyzer after noting that it should only be run on 892 the final exploration step by default. 893 """ 894 FINAL_ONLY.add(analyzer.__name__) 895 return analyzer
Returns the given analyzer after noting that it should only be run on the final exploration step by default.
Type var to forward through analyzer types.
907def registerCount(target: AnalyzerType, sizeName: str) -> AnalyzerType: 908 """ 909 Registers a new analysis routine which uses the same analysis unit as 910 the target routine but which returns the length of that routine's 911 result. Returns `None` if the target routine does. 912 913 Needs the target routine and the name to register the new analysis 914 routine under. 915 916 Returns the analysis function it creates. 917 """ 918 def countAnalyzer(*args, **kwargs): 919 'To be replaced' 920 result = target(*args, **kwargs) 921 if result is None: 922 return None 923 else: 924 return len(result) 925 926 countAnalyzer.__doc__ = ( 927 f"Measures count of the {target.__name__!r} result applied to" 928 f" {target._unit!r}." 929 ) 930 countAnalyzer.__name__ = sizeName 931 932 # Register the new function & return the result 933 return cast( 934 AnalyzerType, 935 analyzer(target._unit)(countAnalyzer) 936 )
Registers a new analysis routine which uses the same analysis unit as
the target routine but which returns the length of that routine's
result. Returns None
if the target routine does.
Needs the target routine and the name to register the new analysis routine under.
Returns the analysis function it creates.
Type variable for the result of a combiner function.
A combiner function which gets a dictionary of per-decision or
per-transition values along with an exploration object and a step index
and combines the values into a CombinerResult
that's specific to that
step.
A combiner function which gets a dictionary of per-decision,
per-transition, and/or per-step values along with an exploration object
and combines the values into a CombinerResult
.
973def registerStepCombined( 974 name: str, 975 resultName: str, 976 combiner: StepCombiner[CombinerResult] 977) -> StepAnalyzer: 978 """ 979 Registers a new analysis routine which combines results of another 980 routine either across all decisions/transitions at a step. The new 981 routine will have a 'step' analysis unit. 982 983 Needs the name of the target routine, the name to register the new 984 analysis routine under, and the function that will be called to 985 combine results, given a dictionary of results that maps 986 decisions/transitions to results for each. 987 988 Returns the analysis function it creates. 989 """ 990 # Target function 991 target = ALL_ANALYZERS[name] 992 # Analysis unit of the target function 993 targetUnit = target._unit 994 995 if targetUnit not in ('stepDecision', 'stepTransition'): 996 raise ValueError( 997 f"Target analysis routine {name!r} has incompatible analysis" 998 f" unit {targetUnit!r}." 999 ) 1000 1001 def analysisCombiner( 1002 exploration: core.DiscreteExploration, 1003 step: int 1004 ) -> CombinerResult: 1005 'To be replaced' 1006 # Declare data here as generic type 1007 data: Dict[ 1008 Union[base.DecisionID, OverspecificTransition, int], 1009 Any 1010 ] 1011 graph = exploration[step].graph 1012 if targetUnit == "stepDecision": 1013 analyzeStepDecision = cast(StepDecisionAnalyzer, target) 1014 data = { 1015 dID: analyzeStepDecision(exploration, step, dID) 1016 for dID in graph 1017 } 1018 elif targetUnit == "stepTransition": 1019 edges = graph.allEdges() 1020 analyzeStepTransition = cast(StepTransitionAnalyzer, target) 1021 data = { 1022 (src, transition, dst): analyzeStepTransition( 1023 exploration, 1024 step, 1025 src, 1026 transition, 1027 dst 1028 ) 1029 for (src, dst, transition) in edges 1030 } 1031 else: 1032 raise ValueError( 1033 f"Target analysis routine {name!r} has inconsistent" 1034 f" analysis unit {targetUnit!r} for 'step' result" 1035 f" unit." 1036 ) 1037 return combiner(data, exploration, step) 1038 1039 analysisCombiner.__doc__ = ( 1040 f"Computes {combiner.__name__} for the {name!r} result over all" 1041 f" {targetUnit}s at each step." 1042 ) 1043 analysisCombiner.__name__ = resultName 1044 1045 # Register the new function & return it 1046 return analyzer("step")(analysisCombiner)
Registers a new analysis routine which combines results of another routine either across all decisions/transitions at a step. The new routine will have a 'step' analysis unit.
Needs the name of the target routine, the name to register the new analysis routine under, and the function that will be called to combine results, given a dictionary of results that maps decisions/transitions to results for each.
Returns the analysis function it creates.
1049def registerFullCombined( 1050 name: str, 1051 resultName: str, 1052 combiner: OverallCombiner[CombinerResult] 1053) -> ExplorationAnalyzer: 1054 """ 1055 Works like `registerStepCombined` but combines results over 1056 decisions/transitions/steps across the entire exploration to get one 1057 result for the entire thing, not one result per step. May also 1058 target an existing `ExplorationAnalyzer` whose result is a 1059 dictionary, in which case it will combine that dictionary's values. 1060 1061 Needs the name of the target routine, the name to register the new 1062 analysis routine under, and the function that will be called to 1063 combine results, given a dictionary of results that maps 1064 decisions/transitions/steps to results for each. 1065 1066 Returns the analysis function it creates. 1067 """ 1068 # Target function 1069 target = ALL_ANALYZERS[name] 1070 # Analysis unit of the target function 1071 targetUnit = target._unit 1072 if targetUnit not in ('step', 'decision', 'transition', 'exploration'): 1073 raise ValueError( 1074 f"Target analysis routine {name!r} has incompatible analysis" 1075 f" unit {targetUnit!r}." 1076 ) 1077 1078 def analysisCombiner( # type: ignore 1079 exploration: core.DiscreteExploration, 1080 ) -> CombinerResult: 1081 'To be replaced' 1082 # Declare data here as generic type 1083 data: Dict[ 1084 Union[base.DecisionID, OverspecificTransition, int], 1085 Any 1086 ] 1087 if targetUnit == "step": 1088 analyzeStep = cast(StepAnalyzer, target) 1089 data = { 1090 step: analyzeStep(exploration, step) 1091 for step in range(len(exploration)) 1092 } 1093 elif targetUnit == "decision": 1094 analyzeDecision = cast(DecisionAnalyzer, target) 1095 data = { 1096 dID: analyzeDecision(exploration, dID) 1097 for dID in exploration.allDecisions() 1098 } 1099 elif targetUnit == "transition": 1100 analyzeTransition = cast(TransitionAnalyzer, target) 1101 data = { 1102 (src, transition, dst): analyzeTransition( 1103 exploration, 1104 src, 1105 transition, 1106 dst 1107 ) 1108 for (src, transition, dst) in exploration.allTransitions() 1109 } 1110 elif targetUnit == "exploration": 1111 analyzeExploration = cast(ExplorationAnalyzer, target) 1112 data = analyzeExploration(exploration) 1113 else: 1114 raise ValueError( 1115 f"Target analysis routine {name!r} has inconsistent" 1116 f" analysis unit {targetUnit!r} for 'step' result" 1117 f" unit." 1118 ) 1119 return combiner(data, exploration) 1120 1121 analysisCombiner.__doc__ = ( 1122 f"Computes {combiner.__name__} for the {name!r} result over all" 1123 f" {targetUnit}s." 1124 ) 1125 analysisCombiner.__name__ = resultName 1126 1127 # Register the new function & return it 1128 return analyzer("exploration")(analysisCombiner)
Works like registerStepCombined
but combines results over
decisions/transitions/steps across the entire exploration to get one
result for the entire thing, not one result per step. May also
target an existing ExplorationAnalyzer
whose result is a
dictionary, in which case it will combine that dictionary's values.
Needs the name of the target routine, the name to register the new analysis routine under, and the function that will be called to combine results, given a dictionary of results that maps decisions/transitions/steps to results for each.
Returns the analysis function it creates.
1131def sumCombiner(data, *_): 1132 """ 1133 Computes sum over numeric data as a "combiner" function to be used 1134 with `registerStepCombined` or `registerFullCombined`. 1135 1136 Only sums values which are `int`s, `float`s, or `complex`es, ignoring 1137 any other values. 1138 """ 1139 return sum( 1140 x for x in data.values() if isinstance(x, (int, float, complex)) 1141 )
Computes sum over numeric data as a "combiner" function to be used
with registerStepCombined
or registerFullCombined
.
Only sums values which are int
s, float
s, or complex
es, ignoring
any other values.
1144def meanCombiner(data, *_): 1145 """ 1146 Computes mean over numeric data as a "combiner" function to be used 1147 with `registerStepCombined` or `registerFullCombined`. 1148 1149 Only counts values which are `int`s, `float`s, or `complex`es, ignoring 1150 any other values. Uses `None` as the result when there are 0 numeric 1151 values. 1152 """ 1153 numeric = [ 1154 x for x in data.values() if isinstance(x, (int, float, complex)) 1155 ] 1156 if len(numeric) == 0: 1157 return None 1158 else: 1159 return sum(numeric) / len(numeric)
Computes mean over numeric data as a "combiner" function to be used
with registerStepCombined
or registerFullCombined
.
Only counts values which are int
s, float
s, or complex
es, ignoring
any other values. Uses None
as the result when there are 0 numeric
values.
1162def medianCombiner(data, *_): 1163 """ 1164 Computes median over numeric data as a "combiner" function to be used 1165 with `registerStepCombined` or `registerFullCombined`. 1166 1167 Only counts values which are `int`s, `float`s, or `complex`es, ignoring 1168 any other values. Uses `None` as the result when there are 0 numeric 1169 values. 1170 """ 1171 numeric = sorted( 1172 x for x in data.values() if isinstance(x, (int, float, complex)) 1173 ) 1174 if len(numeric) == 0: 1175 return None 1176 elif len(numeric) == 1: 1177 return numeric[0] 1178 else: 1179 half = len(numeric) // 2 1180 if len(numeric) % 2 == 0: 1181 return (numeric[half - 1] + numeric[half]) / 2 1182 else: 1183 return numeric[half]
Computes median over numeric data as a "combiner" function to be used
with registerStepCombined
or registerFullCombined
.
Only counts values which are int
s, float
s, or complex
es, ignoring
any other values. Uses None
as the result when there are 0 numeric
values.
1190@analyzer('decision') 1191def finalIdentity( 1192 exploration: core.DiscreteExploration, 1193 decision: base.DecisionID 1194) -> str: 1195 """ 1196 Returns the `identityOf` result for the specified decision in the 1197 last step in which that decision existed. 1198 """ 1199 for i in range(-1, -len(exploration) - 1, -1): 1200 situation = exploration.getSituation(i) 1201 try: 1202 return situation.graph.identityOf(decision) 1203 except core.MissingDecisionError: 1204 pass 1205 raise core.MissingDecisionError( 1206 f"Decision {decision!r} never existed." 1207 )
Returns the identityOf
result for the specified decision in the
last step in which that decision existed.
1210@analyzer('step') 1211def currentDecision( 1212 exploration: core.DiscreteExploration, 1213 step: int 1214) -> Optional[base.DecisionID]: 1215 """ 1216 Returns the `base.DecisionID` for the current decision in a given 1217 situation. 1218 """ 1219 return exploration[step].state['primaryDecision']
Returns the base.DecisionID
for the current decision in a given
situation.
1222@analyzer('step') 1223def currentDecisionIdentity( 1224 exploration: core.DiscreteExploration, 1225 step: int 1226) -> str: 1227 """ 1228 Returns the `identityOf` string for the current decision in a given 1229 situation. 1230 """ 1231 situation = exploration[step] 1232 return situation.graph.identityOf(situation.state['primaryDecision'])
Returns the identityOf
string for the current decision in a given
situation.
1235@analyzer('step') 1236def observedSoFar( 1237 exploration: core.DiscreteExploration, 1238 step: int 1239) -> Set[base.DecisionID]: 1240 """ 1241 Returns the set of all decision IDs observed so far. Note that some 1242 of them may no longer be present in the graph at the given step if 1243 they got merged or deleted. 1244 """ 1245 # Can't allow negative steps (caching would fail) 1246 if step < 0: 1247 raise IndexError(f"Invalid step (can't be negative): {step!r}") 1248 elif step == 0: 1249 result = set() 1250 else: 1251 result = observedSoFar(exploration, step - 1) 1252 result |= set(exploration[step].graph) 1253 return result
Returns the set of all decision IDs observed so far. Note that some of them may no longer be present in the graph at the given step if they got merged or deleted.
918 def countAnalyzer(*args, **kwargs): 919 'To be replaced' 920 result = target(*args, **kwargs) 921 if result is None: 922 return None 923 else: 924 return len(result)
Measures count of the 'observedSoFar' result applied to 'step'.
1259@analyzer('step') 1260def justObserved( 1261 exploration: core.DiscreteExploration, 1262 step: int 1263) -> Set[base.DecisionID]: 1264 """ 1265 Returns the set of new `base.DecisionID`s that first appeared at the 1266 given step. Will be empty for steps where no new decisions are 1267 observed. Note that this is about decisions whose existence becomes 1268 known, NOT decisions which get confirmed. 1269 """ 1270 if step == 0: 1271 return observedSoFar(exploration, step) 1272 else: 1273 return ( 1274 observedSoFar(exploration, step - 1) 1275 - observedSoFar(exploration, step) 1276 )
Returns the set of new base.DecisionID
s that first appeared at the
given step. Will be empty for steps where no new decisions are
observed. Note that this is about decisions whose existence becomes
known, NOT decisions which get confirmed.
918 def countAnalyzer(*args, **kwargs): 919 'To be replaced' 920 result = target(*args, **kwargs) 921 if result is None: 922 return None 923 else: 924 return len(result)
Measures count of the 'justObserved' result applied to 'step'.
1282@elide 1283@analyzer('stepDecision') 1284def hasBeenObserved( 1285 exploration: core.DiscreteExploration, 1286 step: int, 1287 dID: base.DecisionID 1288) -> bool: 1289 """ 1290 Whether or not the specified decision has been observed at or prior 1291 to the specified step. Note that it may or may not actually be a 1292 decision in the specified step (e.g., if it was previously observed 1293 but then deleted). 1294 """ 1295 return dID in observedSoFar(exploration, step)
Whether or not the specified decision has been observed at or prior to the specified step. Note that it may or may not actually be a decision in the specified step (e.g., if it was previously observed but then deleted).
1298@analyzer('exploration') 1299def stepsObserved( 1300 exploration: core.DiscreteExploration, 1301) -> Dict[base.DecisionID, int]: 1302 """ 1303 Returns a dictionary that holds the step at which each decision was 1304 first observed, keyed by decision ID. 1305 """ 1306 result = {} 1307 soFar: Set[base.DecisionID] = set() 1308 for step, situation in enumerate(exploration): 1309 new = set(situation.graph) - soFar 1310 for dID in new: 1311 result[dID] = step 1312 soFar |= new 1313 return result
Returns a dictionary that holds the step at which each decision was first observed, keyed by decision ID.
1316@analyzer('decision') 1317def stepObserved( 1318 exploration: core.DiscreteExploration, 1319 dID: base.DecisionID 1320) -> int: 1321 """ 1322 Returns the step at which the specified decision was first observed 1323 (NOT confirmed). 1324 """ 1325 try: 1326 return stepsObserved(exploration)[dID] 1327 except KeyError: 1328 raise core.MissingDecisionError( 1329 f"Decision {dID!r} was never observed." 1330 )
Returns the step at which the specified decision was first observed (NOT confirmed).
1333@analyzer('exploration') 1334def stepsConfirmed( 1335 exploration: core.DiscreteExploration, 1336) -> Dict[base.DecisionID, int]: 1337 """ 1338 Given an exploration, returns a dictionary mapping decision IDs to 1339 the step at which each was first confirmed. Decisions which were 1340 never confirmed will not be included in the dictionary. 1341 """ 1342 result = {} 1343 for i, situation in enumerate(exploration): 1344 for dID in situation.graph: 1345 if ( 1346 dID not in result 1347 and 'unconfirmed' not in situation.graph.decisionTags(dID) 1348 ): 1349 result[dID] = i 1350 return result
Given an exploration, returns a dictionary mapping decision IDs to the step at which each was first confirmed. Decisions which were never confirmed will not be included in the dictionary.
1353@analyzer('decision') 1354def stepConfirmed( 1355 exploration: core.DiscreteExploration, 1356 dID: base.DecisionID 1357) -> Optional[int]: 1358 """ 1359 Returns the step at which the specified decision was first confirmed, 1360 or `None` if it was never confirmed. Returns `None` for invalid 1361 decision IDs. 1362 """ 1363 return stepsConfirmed(exploration).get(dID)
Returns the step at which the specified decision was first confirmed,
or None
if it was never confirmed. Returns None
for invalid
decision IDs.
1366@analyzer('exploration') 1367def stepsVisited( 1368 exploration: core.DiscreteExploration, 1369) -> Dict[base.DecisionID, List[int]]: 1370 """ 1371 Given an exploration, returns a dictionary mapping decision IDs to 1372 the list of steps at which each was visited. Decisions which were 1373 never visited will not be included in the dictionary. 1374 """ 1375 result: Dict[base.DecisionID, List[int]] = {} 1376 for i, situation in enumerate(exploration): 1377 for dID in situation.graph: 1378 if dID in base.combinedDecisionSet(situation.state): 1379 result.setdefault(dID, []).append(i) 1380 return result
Given an exploration, returns a dictionary mapping decision IDs to the list of steps at which each was visited. Decisions which were never visited will not be included in the dictionary.
1383@finalOnly 1384@analyzer('stepDecision') 1385def hasBeenVisited( 1386 exploration: core.DiscreteExploration, 1387 step: int, 1388 dID: base.DecisionID 1389) -> bool: 1390 """ 1391 Whether or not the specified decision has been visited at or prior 1392 to the specified step. Note that it may or may not actually be a 1393 decision in the specified step (e.g., if it was previously observed 1394 but then deleted). 1395 """ 1396 visits = stepsVisited(exploration).get(dID, []) 1397 # No visits -> not visited yet 1398 if len(visits) == 0: 1399 return False 1400 else: 1401 # First visit was at or before this step 1402 return min(visits) <= step
Whether or not the specified decision has been visited at or prior to the specified step. Note that it may or may not actually be a decision in the specified step (e.g., if it was previously observed but then deleted).
1405@analyzer('decision') 1406def stepFirstVisited( 1407 exploration: core.DiscreteExploration, 1408 decision: base.DecisionID, 1409) -> Optional[int]: 1410 """ 1411 Returns the first step at which the given decision was visited, or 1412 `None` if the decision was never visited. 1413 """ 1414 vis = stepsVisited(exploration) 1415 if decision in vis: 1416 return min(vis[decision]) 1417 else: 1418 return None
Returns the first step at which the given decision was visited, or
None
if the decision was never visited.
1421@analyzer('decision') 1422def stepsActive( 1423 exploration: core.DiscreteExploration, 1424 decision: base.DecisionID, 1425) -> Optional[int]: 1426 """ 1427 Returns the total number of steps in which this decision was active. 1428 """ 1429 vis = stepsVisited(exploration) 1430 if decision in vis: 1431 return len(vis[decision]) 1432 else: 1433 return 0
Returns the total number of steps in which this decision was active.
1436@analyzer('exploration') 1437def stepsTransisionsObserved( 1438 exploration: core.DiscreteExploration 1439) -> Dict[OverspecificTransition, int]: 1440 """ 1441 Returns a dictionary that holds the step at which each transition was 1442 first observed, keyed by (source-decision, transition-name, 1443 destination-decision) triples. 1444 1445 Does NOT distinguish between cases where a once-deleted transition 1446 was later reinstated (unless it had a different destination in the 1447 end). 1448 """ 1449 result = {} 1450 for i, situation in enumerate(exploration): 1451 for dID in situation.graph: 1452 destinations = situation.graph.destinationsFrom(dID) 1453 for name, dest in destinations.items(): 1454 key = (dID, name, dest) 1455 if key not in result: 1456 result[key] = i 1457 return result
Returns a dictionary that holds the step at which each transition was first observed, keyed by (source-decision, transition-name, destination-decision) triples.
Does NOT distinguish between cases where a once-deleted transition was later reinstated (unless it had a different destination in the end).
1460@analyzer('transition') 1461def stepObservedTransition( 1462 exploration: core.DiscreteExploration, 1463 source: base.DecisionID, 1464 transition: base.Transition, 1465 destination: base.DecisionID 1466) -> Optional[int]: 1467 """ 1468 Returns the step within the exploration at which the specified 1469 transition was first observed. Note that transitions which get 1470 renamed do NOT preserve their identities, so a search for a renamed 1471 transition will return the step on which it was renamed (assuming it 1472 didn't change destination). 1473 1474 Returns `None` if the specified transition never existed in the 1475 exploration. 1476 """ 1477 obs = stepsTransisionsObserved(exploration) 1478 return obs.get((source, transition, destination))
Returns the step within the exploration at which the specified transition was first observed. Note that transitions which get renamed do NOT preserve their identities, so a search for a renamed transition will return the step on which it was renamed (assuming it didn't change destination).
Returns None
if the specified transition never existed in the
exploration.
1481@analyzer('step') 1482def transitionTaken( 1483 exploration: core.DiscreteExploration, 1484 step: int 1485) -> Optional[OverspecificTransition]: 1486 """ 1487 Returns the source decision Id, the name of the transition taken, and 1488 the destination decision ID at the given step. This is the transition 1489 chosen at that step whose consequences were triggered resulting in 1490 the next step. Returns `None` for steps where no transition was 1491 taken (e.g., wait, warp, etc.). 1492 1493 Note that in some cases due to e.g., a 'follow' effect, multiple 1494 transitions are taken at a step. In that case, this returns the name 1495 of the first transition taken (which would have triggered any 1496 others). 1497 1498 Also in some cases, there may be multiple starting nodes given, in 1499 which case the first such node (by ID order) which has a transition 1500 with the identified transition name will be returned, or None if none 1501 of them match. 1502 """ 1503 start, transition, end = exploration.movementAtStep(step) 1504 graph = exploration[step].graph 1505 if start is None or transition is None: 1506 return None 1507 if isinstance(start, set): 1508 for dID in sorted(start): 1509 destination = graph.getDestination(dID, transition) 1510 if destination is not None: 1511 return (dID, transition, destination) 1512 return None 1513 else: 1514 destination = graph.getDestination(start, transition) 1515 if destination is not None: 1516 return (start, transition, destination) 1517 else: 1518 return None
Returns the source decision Id, the name of the transition taken, and
the destination decision ID at the given step. This is the transition
chosen at that step whose consequences were triggered resulting in
the next step. Returns None
for steps where no transition was
taken (e.g., wait, warp, etc.).
Note that in some cases due to e.g., a 'follow' effect, multiple transitions are taken at a step. In that case, this returns the name of the first transition taken (which would have triggered any others).
Also in some cases, there may be multiple starting nodes given, in which case the first such node (by ID order) which has a transition with the identified transition name will be returned, or None if none of them match.
1521@analyzer('exploration') 1522def transitionStepsTaken( 1523 exploration: core.DiscreteExploration 1524) -> Dict[OverspecificTransition, List[int]]: 1525 """ 1526 Returns a dictionary mapping each specific transition that was taken 1527 at least once to the list of steps on which it was taken. Does NOT 1528 account for transitions elided by 'jaunt' warps, nor for transitions 1529 taken as a result of follow/bounce effects. 1530 1531 TODO: Account for those? 1532 """ 1533 result: Dict[OverspecificTransition, List[int]] = {} 1534 for i in range(len(exploration)): 1535 taken = transitionTaken(exploration, i) 1536 if taken is not None: 1537 if taken in result: 1538 result[taken].append(i) 1539 else: 1540 result[taken] = [i] 1541 1542 return result
Returns a dictionary mapping each specific transition that was taken at least once to the list of steps on which it was taken. Does NOT account for transitions elided by 'jaunt' warps, nor for transitions taken as a result of follow/bounce effects.
TODO: Account for those?
1545@analyzer('transition') 1546def stepsTaken( 1547 exploration: core.DiscreteExploration, 1548 source: base.DecisionID, 1549 transition: base.Transition, 1550 destination: base.DecisionID 1551) -> int: 1552 """ 1553 Returns the list of exploration steps on which a particular 1554 transition has been taken. Returns an empty list for transitions that 1555 were never taken. 1556 1557 Note that this does NOT account for times taken as a result of 1558 follow/bounce effects, and it does NOT account for all times a 1559 transition was taken when warp effects are used as shorthand for 1560 jaunts across the graph. 1561 1562 TODO: Try to account for those? 1563 """ 1564 return transitionStepsTaken(exploration).get( 1565 (source, transition, destination), 1566 [] 1567 )
Returns the list of exploration steps on which a particular transition has been taken. Returns an empty list for transitions that were never taken.
Note that this does NOT account for times taken as a result of follow/bounce effects, and it does NOT account for all times a transition was taken when warp effects are used as shorthand for jaunts across the graph.
TODO: Try to account for those?
1570@analyzer('transition') 1571def timesTaken( 1572 exploration: core.DiscreteExploration, 1573 source: base.DecisionID, 1574 transition: base.Transition, 1575 destination: base.DecisionID 1576) -> int: 1577 """ 1578 Returns the number of times a particular transition has been taken 1579 throughout the exploration. Returns 0 for transitions that were never 1580 taken. 1581 1582 Note that this does NOT account for times taken as a result of 1583 follow/bounce effects, and it does NOT account for all times a 1584 transition was taken when warp effects are used as shorthand for 1585 jaunts across the graph. 1586 1587 TODO: Try to account for those? 1588 """ 1589 return len(stepsTaken(exploration, source, transition, destination))
Returns the number of times a particular transition has been taken throughout the exploration. Returns 0 for transitions that were never taken.
Note that this does NOT account for times taken as a result of follow/bounce effects, and it does NOT account for all times a transition was taken when warp effects are used as shorthand for jaunts across the graph.
TODO: Try to account for those?
1596def unexploredBranches( 1597 graph: core.DecisionGraph, 1598 context: Optional[base.RequirementContext] = None 1599) -> List[SpecificTransition]: 1600 """ 1601 Returns a list of from-decision, transition-at-that-decision pairs 1602 which each identify an unexplored branch in the given graph. 1603 1604 When a `context` is provided it only counts options whose 1605 requirements are satisfied in that `RequirementContext`, and the 1606 'searchFrom' part of the context will be replaced by both ends of 1607 each transition tested. This doesn't perfectly map onto actually 1608 reachability since nodes between where the player is and where the 1609 option is might force changes in the game state that make it 1610 un-takeable. 1611 1612 TODO: add logic to detect trivially-unblocked edges? 1613 """ 1614 result = [] 1615 # TODO: Fix networkx type stubs for MultiDiGraph! 1616 for (src, dst, transition) in graph.allEdges(): 1617 req = graph.getTransitionRequirement(src, transition) 1618 localContext: Optional[base.RequirementContext] = None 1619 if context is not None: 1620 localContext = base.RequirementContext( 1621 state=context.state, 1622 graph=context.graph, 1623 searchFrom=graph.bothEnds(src, transition) 1624 ) 1625 # Check if this edge goes from a confirmed to an unconfirmed node 1626 if ( 1627 graph.isConfirmed(src) 1628 and not graph.isConfirmed(dst) 1629 and (localContext is None or req.satisfied(localContext)) 1630 ): 1631 result.append((src, transition)) 1632 return result
Returns a list of from-decision, transition-at-that-decision pairs which each identify an unexplored branch in the given graph.
When a context
is provided it only counts options whose
requirements are satisfied in that RequirementContext
, and the
'searchFrom' part of the context will be replaced by both ends of
each transition tested. This doesn't perfectly map onto actually
reachability since nodes between where the player is and where the
option is might force changes in the game state that make it
un-takeable.
TODO: add logic to detect trivially-unblocked edges?
1635@analyzer('step') 1636def allUnexploredBranches( 1637 exploration: core.DiscreteExploration, 1638 step: int 1639) -> List[SpecificTransition]: 1640 """ 1641 Returns the list of unexplored branches in the specified situation's 1642 graph, regardless of traversibility (see `unexploredBranches`). 1643 """ 1644 return unexploredBranches(exploration[step].graph)
Returns the list of unexplored branches in the specified situation's
graph, regardless of traversibility (see unexploredBranches
).
918 def countAnalyzer(*args, **kwargs): 919 'To be replaced' 920 result = target(*args, **kwargs) 921 if result is None: 922 return None 923 else: 924 return len(result)
Measures count of the 'allUnexploredBranches' result applied to 'step'.
1653@analyzer('step') 1654def traversableUnexploredBranches( 1655 exploration: core.DiscreteExploration, 1656 step: int 1657) -> List[SpecificTransition]: 1658 """ 1659 Returns the list of traversable unexplored branches in the specified 1660 situation's graph (see `unexploredBranches`). Does not perfectly 1661 account for all traversibility information, because it uses a single 1662 context from which to judge traversibility (TODO: Fix that). 1663 """ 1664 situation = exploration[step] 1665 context = base.genericContextForSituation( 1666 situation, 1667 base.combinedDecisionSet(situation.state) 1668 ) 1669 return unexploredBranches(situation.graph, context)
Returns the list of traversable unexplored branches in the specified
situation's graph (see unexploredBranches
). Does not perfectly
account for all traversibility information, because it uses a single
context from which to judge traversibility (TODO: Fix that).
918 def countAnalyzer(*args, **kwargs): 919 'To be replaced' 920 result = target(*args, **kwargs) 921 if result is None: 922 return None 923 else: 924 return len(result)
Measures count of the 'traversableUnexploredBranches' result applied to 'step'.
1678@finalOnly 1679@analyzer('stepDecision') 1680def actions( 1681 exploration: core.DiscreteExploration, 1682 step: int, 1683 decision: base.DecisionID 1684) -> Optional[Set[base.Transition]]: 1685 """ 1686 Given a particular decision at a particular step, returns the set of 1687 actions available at that decision in that step. Returns `None` if 1688 the specified decision does not exist. 1689 """ 1690 graph = exploration[step].graph 1691 if decision not in graph: 1692 return None 1693 return graph.decisionActions(decision)
Given a particular decision at a particular step, returns the set of
actions available at that decision in that step. Returns None
if
the specified decision does not exist.
918 def countAnalyzer(*args, **kwargs): 919 'To be replaced' 920 result = target(*args, **kwargs) 921 if result is None: 922 return None 923 else: 924 return len(result)
Measures count of the 'actions' result applied to 'stepDecision'.
1001 def analysisCombiner( 1002 exploration: core.DiscreteExploration, 1003 step: int 1004 ) -> CombinerResult: 1005 'To be replaced' 1006 # Declare data here as generic type 1007 data: Dict[ 1008 Union[base.DecisionID, OverspecificTransition, int], 1009 Any 1010 ] 1011 graph = exploration[step].graph 1012 if targetUnit == "stepDecision": 1013 analyzeStepDecision = cast(StepDecisionAnalyzer, target) 1014 data = { 1015 dID: analyzeStepDecision(exploration, step, dID) 1016 for dID in graph 1017 } 1018 elif targetUnit == "stepTransition": 1019 edges = graph.allEdges() 1020 analyzeStepTransition = cast(StepTransitionAnalyzer, target) 1021 data = { 1022 (src, transition, dst): analyzeStepTransition( 1023 exploration, 1024 step, 1025 src, 1026 transition, 1027 dst 1028 ) 1029 for (src, dst, transition) in edges 1030 } 1031 else: 1032 raise ValueError( 1033 f"Target analysis routine {name!r} has inconsistent" 1034 f" analysis unit {targetUnit!r} for 'step' result" 1035 f" unit." 1036 ) 1037 return combiner(data, exploration, step)
Computes sumCombiner for the 'actionCount' result over all stepDecisions at each step.
1001 def analysisCombiner( 1002 exploration: core.DiscreteExploration, 1003 step: int 1004 ) -> CombinerResult: 1005 'To be replaced' 1006 # Declare data here as generic type 1007 data: Dict[ 1008 Union[base.DecisionID, OverspecificTransition, int], 1009 Any 1010 ] 1011 graph = exploration[step].graph 1012 if targetUnit == "stepDecision": 1013 analyzeStepDecision = cast(StepDecisionAnalyzer, target) 1014 data = { 1015 dID: analyzeStepDecision(exploration, step, dID) 1016 for dID in graph 1017 } 1018 elif targetUnit == "stepTransition": 1019 edges = graph.allEdges() 1020 analyzeStepTransition = cast(StepTransitionAnalyzer, target) 1021 data = { 1022 (src, transition, dst): analyzeStepTransition( 1023 exploration, 1024 step, 1025 src, 1026 transition, 1027 dst 1028 ) 1029 for (src, dst, transition) in edges 1030 } 1031 else: 1032 raise ValueError( 1033 f"Target analysis routine {name!r} has inconsistent" 1034 f" analysis unit {targetUnit!r} for 'step' result" 1035 f" unit." 1036 ) 1037 return combiner(data, exploration, step)
Computes meanCombiner for the 'actionCount' result over all stepDecisions at each step.
1001 def analysisCombiner( 1002 exploration: core.DiscreteExploration, 1003 step: int 1004 ) -> CombinerResult: 1005 'To be replaced' 1006 # Declare data here as generic type 1007 data: Dict[ 1008 Union[base.DecisionID, OverspecificTransition, int], 1009 Any 1010 ] 1011 graph = exploration[step].graph 1012 if targetUnit == "stepDecision": 1013 analyzeStepDecision = cast(StepDecisionAnalyzer, target) 1014 data = { 1015 dID: analyzeStepDecision(exploration, step, dID) 1016 for dID in graph 1017 } 1018 elif targetUnit == "stepTransition": 1019 edges = graph.allEdges() 1020 analyzeStepTransition = cast(StepTransitionAnalyzer, target) 1021 data = { 1022 (src, transition, dst): analyzeStepTransition( 1023 exploration, 1024 step, 1025 src, 1026 transition, 1027 dst 1028 ) 1029 for (src, dst, transition) in edges 1030 } 1031 else: 1032 raise ValueError( 1033 f"Target analysis routine {name!r} has inconsistent" 1034 f" analysis unit {targetUnit!r} for 'step' result" 1035 f" unit." 1036 ) 1037 return combiner(data, exploration, step)
Computes medianCombiner for the 'actionCount' result over all stepDecisions at each step.
1721@finalOnly 1722@analyzer('stepDecision') 1723def branches( 1724 exploration: core.DiscreteExploration, 1725 step: int, 1726 decision: base.DecisionID 1727) -> Optional[int]: 1728 """ 1729 Computes the number of branches at a particular decision, not 1730 counting actions, but counting as separate branches multiple 1731 transitions which lead to the same decision as each other. Returns 1732 `None` for unconfirmed and nonexistent decisions so that they aren't 1733 counted as part of averages, even though unconfirmed decisions do 1734 have countable branches. 1735 """ 1736 graph = exploration[step].graph 1737 if decision not in graph or not graph.isConfirmed(decision): 1738 return None 1739 1740 dests = graph.destinationsFrom(decision) 1741 branches = 0 1742 for transition, dest in dests.items(): 1743 if dest != decision: 1744 branches += 1 1745 1746 return branches
Computes the number of branches at a particular decision, not
counting actions, but counting as separate branches multiple
transitions which lead to the same decision as each other. Returns
None
for unconfirmed and nonexistent decisions so that they aren't
counted as part of averages, even though unconfirmed decisions do
have countable branches.
1001 def analysisCombiner( 1002 exploration: core.DiscreteExploration, 1003 step: int 1004 ) -> CombinerResult: 1005 'To be replaced' 1006 # Declare data here as generic type 1007 data: Dict[ 1008 Union[base.DecisionID, OverspecificTransition, int], 1009 Any 1010 ] 1011 graph = exploration[step].graph 1012 if targetUnit == "stepDecision": 1013 analyzeStepDecision = cast(StepDecisionAnalyzer, target) 1014 data = { 1015 dID: analyzeStepDecision(exploration, step, dID) 1016 for dID in graph 1017 } 1018 elif targetUnit == "stepTransition": 1019 edges = graph.allEdges() 1020 analyzeStepTransition = cast(StepTransitionAnalyzer, target) 1021 data = { 1022 (src, transition, dst): analyzeStepTransition( 1023 exploration, 1024 step, 1025 src, 1026 transition, 1027 dst 1028 ) 1029 for (src, dst, transition) in edges 1030 } 1031 else: 1032 raise ValueError( 1033 f"Target analysis routine {name!r} has inconsistent" 1034 f" analysis unit {targetUnit!r} for 'step' result" 1035 f" unit." 1036 ) 1037 return combiner(data, exploration, step)
Computes sumCombiner for the 'branches' result over all stepDecisions at each step.
1001 def analysisCombiner( 1002 exploration: core.DiscreteExploration, 1003 step: int 1004 ) -> CombinerResult: 1005 'To be replaced' 1006 # Declare data here as generic type 1007 data: Dict[ 1008 Union[base.DecisionID, OverspecificTransition, int], 1009 Any 1010 ] 1011 graph = exploration[step].graph 1012 if targetUnit == "stepDecision": 1013 analyzeStepDecision = cast(StepDecisionAnalyzer, target) 1014 data = { 1015 dID: analyzeStepDecision(exploration, step, dID) 1016 for dID in graph 1017 } 1018 elif targetUnit == "stepTransition": 1019 edges = graph.allEdges() 1020 analyzeStepTransition = cast(StepTransitionAnalyzer, target) 1021 data = { 1022 (src, transition, dst): analyzeStepTransition( 1023 exploration, 1024 step, 1025 src, 1026 transition, 1027 dst 1028 ) 1029 for (src, dst, transition) in edges 1030 } 1031 else: 1032 raise ValueError( 1033 f"Target analysis routine {name!r} has inconsistent" 1034 f" analysis unit {targetUnit!r} for 'step' result" 1035 f" unit." 1036 ) 1037 return combiner(data, exploration, step)
Computes meanCombiner for the 'branches' result over all stepDecisions at each step.
1001 def analysisCombiner( 1002 exploration: core.DiscreteExploration, 1003 step: int 1004 ) -> CombinerResult: 1005 'To be replaced' 1006 # Declare data here as generic type 1007 data: Dict[ 1008 Union[base.DecisionID, OverspecificTransition, int], 1009 Any 1010 ] 1011 graph = exploration[step].graph 1012 if targetUnit == "stepDecision": 1013 analyzeStepDecision = cast(StepDecisionAnalyzer, target) 1014 data = { 1015 dID: analyzeStepDecision(exploration, step, dID) 1016 for dID in graph 1017 } 1018 elif targetUnit == "stepTransition": 1019 edges = graph.allEdges() 1020 analyzeStepTransition = cast(StepTransitionAnalyzer, target) 1021 data = { 1022 (src, transition, dst): analyzeStepTransition( 1023 exploration, 1024 step, 1025 src, 1026 transition, 1027 dst 1028 ) 1029 for (src, dst, transition) in edges 1030 } 1031 else: 1032 raise ValueError( 1033 f"Target analysis routine {name!r} has inconsistent" 1034 f" analysis unit {targetUnit!r} for 'step' result" 1035 f" unit." 1036 ) 1037 return combiner(data, exploration, step)
Computes medianCombiner for the 'branches' result over all stepDecisions at each step.
1771@analyzer('decision') 1772def arrivals( 1773 exploration: core.DiscreteExploration, 1774 decision: base.DecisionID 1775) -> int: 1776 """ 1777 Given an `DiscreteExploration` object and a particular `Decision` 1778 which exists at some point during that exploration, counts the number 1779 of times that decision was in the active decision set for a step 1780 after not being in that set the previous step. Effectively, counts 1781 how many times we arrived at that decision, ignoring steps where we 1782 remained at it due to a wait or an action or the like. 1783 1784 Returns 0 even for decisions that aren't part of the exploration. 1785 """ 1786 visits = stepsVisited(exploration) 1787 result = 0 1788 prev = -2 # won't be contiguous with step 0 1789 for step in visits.get(decision, []): 1790 # if previous visited step wasn't the prior step it's a revisit 1791 if prev != step - 1: 1792 result += 1 1793 prev = step 1794 1795 return result
Given an DiscreteExploration
object and a particular Decision
which exists at some point during that exploration, counts the number
of times that decision was in the active decision set for a step
after not being in that set the previous step. Effectively, counts
how many times we arrived at that decision, ignoring steps where we
remained at it due to a wait or an action or the like.
Returns 0 even for decisions that aren't part of the exploration.
1798@analyzer('decision') 1799def revisits( 1800 exploration: core.DiscreteExploration, 1801 decision: base.DecisionID 1802) -> int: 1803 """ 1804 Returns the number of times we revisited the target decision, which 1805 is just `arrivals` minus 1 for the first arrival, but not < 0. 1806 """ 1807 return max(0, arrivals(exploration, decision) - 1)
Returns the number of times we revisited the target decision, which
is just arrivals
minus 1 for the first arrival, but not < 0.
1078 def analysisCombiner( # type: ignore 1079 exploration: core.DiscreteExploration, 1080 ) -> CombinerResult: 1081 'To be replaced' 1082 # Declare data here as generic type 1083 data: Dict[ 1084 Union[base.DecisionID, OverspecificTransition, int], 1085 Any 1086 ] 1087 if targetUnit == "step": 1088 analyzeStep = cast(StepAnalyzer, target) 1089 data = { 1090 step: analyzeStep(exploration, step) 1091 for step in range(len(exploration)) 1092 } 1093 elif targetUnit == "decision": 1094 analyzeDecision = cast(DecisionAnalyzer, target) 1095 data = { 1096 dID: analyzeDecision(exploration, dID) 1097 for dID in exploration.allDecisions() 1098 } 1099 elif targetUnit == "transition": 1100 analyzeTransition = cast(TransitionAnalyzer, target) 1101 data = { 1102 (src, transition, dst): analyzeTransition( 1103 exploration, 1104 src, 1105 transition, 1106 dst 1107 ) 1108 for (src, transition, dst) in exploration.allTransitions() 1109 } 1110 elif targetUnit == "exploration": 1111 analyzeExploration = cast(ExplorationAnalyzer, target) 1112 data = analyzeExploration(exploration) 1113 else: 1114 raise ValueError( 1115 f"Target analysis routine {name!r} has inconsistent" 1116 f" analysis unit {targetUnit!r} for 'step' result" 1117 f" unit." 1118 ) 1119 return combiner(data, exploration)
Computes sumCombiner for the 'revisits' result over all decisions.
1078 def analysisCombiner( # type: ignore 1079 exploration: core.DiscreteExploration, 1080 ) -> CombinerResult: 1081 'To be replaced' 1082 # Declare data here as generic type 1083 data: Dict[ 1084 Union[base.DecisionID, OverspecificTransition, int], 1085 Any 1086 ] 1087 if targetUnit == "step": 1088 analyzeStep = cast(StepAnalyzer, target) 1089 data = { 1090 step: analyzeStep(exploration, step) 1091 for step in range(len(exploration)) 1092 } 1093 elif targetUnit == "decision": 1094 analyzeDecision = cast(DecisionAnalyzer, target) 1095 data = { 1096 dID: analyzeDecision(exploration, dID) 1097 for dID in exploration.allDecisions() 1098 } 1099 elif targetUnit == "transition": 1100 analyzeTransition = cast(TransitionAnalyzer, target) 1101 data = { 1102 (src, transition, dst): analyzeTransition( 1103 exploration, 1104 src, 1105 transition, 1106 dst 1107 ) 1108 for (src, transition, dst) in exploration.allTransitions() 1109 } 1110 elif targetUnit == "exploration": 1111 analyzeExploration = cast(ExplorationAnalyzer, target) 1112 data = analyzeExploration(exploration) 1113 else: 1114 raise ValueError( 1115 f"Target analysis routine {name!r} has inconsistent" 1116 f" analysis unit {targetUnit!r} for 'step' result" 1117 f" unit." 1118 ) 1119 return combiner(data, exploration)
Computes meanCombiner for the 'revisits' result over all decisions.
1078 def analysisCombiner( # type: ignore 1079 exploration: core.DiscreteExploration, 1080 ) -> CombinerResult: 1081 'To be replaced' 1082 # Declare data here as generic type 1083 data: Dict[ 1084 Union[base.DecisionID, OverspecificTransition, int], 1085 Any 1086 ] 1087 if targetUnit == "step": 1088 analyzeStep = cast(StepAnalyzer, target) 1089 data = { 1090 step: analyzeStep(exploration, step) 1091 for step in range(len(exploration)) 1092 } 1093 elif targetUnit == "decision": 1094 analyzeDecision = cast(DecisionAnalyzer, target) 1095 data = { 1096 dID: analyzeDecision(exploration, dID) 1097 for dID in exploration.allDecisions() 1098 } 1099 elif targetUnit == "transition": 1100 analyzeTransition = cast(TransitionAnalyzer, target) 1101 data = { 1102 (src, transition, dst): analyzeTransition( 1103 exploration, 1104 src, 1105 transition, 1106 dst 1107 ) 1108 for (src, transition, dst) in exploration.allTransitions() 1109 } 1110 elif targetUnit == "exploration": 1111 analyzeExploration = cast(ExplorationAnalyzer, target) 1112 data = analyzeExploration(exploration) 1113 else: 1114 raise ValueError( 1115 f"Target analysis routine {name!r} has inconsistent" 1116 f" analysis unit {targetUnit!r} for 'step' result" 1117 f" unit." 1118 ) 1119 return combiner(data, exploration)
Computes medianCombiner for the 'revisits' result over all decisions.
Records paths between decisions ignoring edge directions & requirements. Stores a list of decision IDs to traverse keyed by a decision ID pair where the smaller decision ID comes first (since paths are symmetric).
1843def hopDistance( 1844 hopPaths: HopPaths, 1845 src: base.DecisionID, 1846 dst: base.DecisionID 1847) -> Optional[int]: 1848 """ 1849 Returns the number of hops required to move from the given source to 1850 the given destination, ignoring edge directions & requirements. 1851 Looks that up in the given `HopPaths` dictionary. Returns 0 when 1852 source and destination are the same. 1853 1854 For example: 1855 1856 >>> e = core.DiscreteExploration.example() 1857 >>> hops = shortestHopPaths(e) 1858 >>> hopDistance(hops, 0, 1) 1859 1 1860 >>> hopDistance(hops, 1, 0) 1861 1 1862 >>> hopDistance(hops, 0, 0) 1863 0 1864 >>> hopDistance(hops, 0, 0) 1865 0 1866 >>> hopDistance(hops, 0, 4) is None 1867 True 1868 >>> hopDistance(hops, 4, 0) is None 1869 True 1870 >>> hopDistance(hops, 0, 5) 1871 2 1872 >>> hopDistance(hops, 5, 0) 1873 2 1874 >>> hopDistance(hops, 5, 1) 1875 3 1876 >>> hopDistance(hops, 1, 5) 1877 3 1878 >>> hopDistance(hops, 3, 5) 1879 1 1880 >>> hopDistance(hops, 5, 3) 1881 1 1882 >>> dIDs = list(e[-1].graph) 1883 >>> for i, src in enumerate(dIDs): 1884 ... for j in range(i + 1, len(dIDs)): 1885 ... dst = dIDs[j] 1886 ... assert ( 1887 ... hopDistance(hops, src, dst) == hopDistance(hops, dst, src) 1888 ... ) 1889 """ 1890 if src == dst: 1891 return 0 1892 elif src < dst: 1893 path = hopPaths.get((src, dst)) 1894 if path is None: 1895 return None 1896 else: 1897 return 1 + len(path) 1898 else: 1899 path = hopPaths.get((dst, src)) 1900 if path is None: 1901 return None 1902 else: 1903 return 1 + len(path)
Returns the number of hops required to move from the given source to
the given destination, ignoring edge directions & requirements.
Looks that up in the given HopPaths
dictionary. Returns 0 when
source and destination are the same.
For example:
>>> e = core.DiscreteExploration.example()
>>> hops = shortestHopPaths(e)
>>> hopDistance(hops, 0, 1)
1
>>> hopDistance(hops, 1, 0)
1
>>> hopDistance(hops, 0, 0)
0
>>> hopDistance(hops, 0, 0)
0
>>> hopDistance(hops, 0, 4) is None
True
>>> hopDistance(hops, 4, 0) is None
True
>>> hopDistance(hops, 0, 5)
2
>>> hopDistance(hops, 5, 0)
2
>>> hopDistance(hops, 5, 1)
3
>>> hopDistance(hops, 1, 5)
3
>>> hopDistance(hops, 3, 5)
1
>>> hopDistance(hops, 5, 3)
1
>>> dIDs = list(e[-1].graph)
>>> for i, src in enumerate(dIDs):
... for j in range(i + 1, len(dIDs)):
... dst = dIDs[j]
... assert (
... hopDistance(hops, src, dst) == hopDistance(hops, dst, src)
... )
1906@elide 1907@analyzer('exploration') 1908def shortestHopPaths( 1909 exploration: core.DiscreteExploration, 1910 edgeFilter: Optional[Callable[ 1911 [base.DecisionID, base.Transition, base.DecisionID, core.DecisionGraph], 1912 bool 1913 ]] = None 1914) -> HopPaths: 1915 """ 1916 Creates a dictionary that holds shortest paths between pairs of 1917 nodes, ignoring edge directions and requirements entirely. 1918 1919 If given an `edgeFilter`, that function is applied with source ID, 1920 transition name, destination ID, and full graph as arguments and 1921 edges for which it returns False are ignored when computing hops. 1922 Note that you have to filter out all edges in both directions between 1923 two nodes for there not to be a 1-hop path between them. 1924 1925 Keys in the dictionary are pairs of decision IDs, where the decision 1926 with the smaller ID always comes first (because shortest hop paths 1927 are symmetric so we don't store the reverse paths). Values are lists 1928 of decision IDs that can be traversed to get from the first decision 1929 to the second, with an empty list indicating adjacent decisions 1930 (note that these "hop paths" cannot always be traversed in the 1931 actual graph because they may go the "wrong way" across one-way 1932 connections). The number of hops required to get between the nodes 1933 is one more than the length of the path. Decision pairs which are 1934 not reachable from each other will not be included in the 1935 dictionary. Only decisions present in the final graph in the 1936 exploration will be included, and only edges present in that final 1937 graph will be considered. 1938 1939 Where there are multiple shortest hop paths, an arbitrary one is 1940 included in the result. 1941 1942 TODO: EXAMPLE 1943 >>> e = core.DiscreteExploration.example() 1944 >>> print(e[-1].graph.namesListing(e[-1].graph)) 1945 0 (House) 1946 1 (_u.0) 1947 2 (Cellar) 1948 3 (Yard) 1949 5 (Lane) 1950 <BLANKLINE> 1951 >>> shortest = dict(nx.all_pairs_shortest_path(e[-1].graph.connections())) 1952 >>> for src in shortest: 1953 ... print(f"{src} -> {shortest[src]}") 1954 0 -> {0: [0], 1: [0, 1], 2: [0, 2], 3: [0, 3], 5: [0, 3, 5]} 1955 1 -> {1: [1], 0: [1, 0], 2: [1, 0, 2], 3: [1, 0, 3], 5: [1, 0, 3, 5]} 1956 2 -> {2: [2], 0: [2, 0], 3: [2, 3], 1: [2, 0, 1], 5: [2, 3, 5]} 1957 3 -> {3: [3], 0: [3, 0], 2: [3, 2], 5: [3, 5], 1: [3, 0, 1]} 1958 5 -> {5: [5], 3: [5, 3], 0: [5, 3, 0], 2: [5, 3, 2], 1: [5, 3, 0, 1]} 1959 >>> hops = shortestHopPaths(e) 1960 >>> for src in hops: 1961 ... print(f"{src} -> {hops[src]}") 1962 (0, 1) -> [] 1963 (0, 2) -> [] 1964 (0, 3) -> [] 1965 (0, 5) -> [3] 1966 (1, 2) -> [0] 1967 (1, 3) -> [0] 1968 (1, 5) -> [0, 3] 1969 (2, 3) -> [] 1970 (2, 5) -> [3] 1971 (3, 5) -> [] 1972 """ 1973 graph = exploration[-1].graph 1974 allIDs = sorted(graph) 1975 connections = graph.connections(edgeFilter) 1976 shortest = dict(nx.all_pairs_shortest_path(connections)) 1977 1978 result = {} 1979 for i, src in enumerate(allIDs): 1980 for j in range(i + 1, len(allIDs)): 1981 dst = allIDs[j] 1982 path = shortest.get(src, {}).get(dst, None) 1983 if path is not None: 1984 result[(src, dst)] = path[1:-1] 1985 1986 return result
Creates a dictionary that holds shortest paths between pairs of nodes, ignoring edge directions and requirements entirely.
If given an edgeFilter
, that function is applied with source ID,
transition name, destination ID, and full graph as arguments and
edges for which it returns False are ignored when computing hops.
Note that you have to filter out all edges in both directions between
two nodes for there not to be a 1-hop path between them.
Keys in the dictionary are pairs of decision IDs, where the decision with the smaller ID always comes first (because shortest hop paths are symmetric so we don't store the reverse paths). Values are lists of decision IDs that can be traversed to get from the first decision to the second, with an empty list indicating adjacent decisions (note that these "hop paths" cannot always be traversed in the actual graph because they may go the "wrong way" across one-way connections). The number of hops required to get between the nodes is one more than the length of the path. Decision pairs which are not reachable from each other will not be included in the dictionary. Only decisions present in the final graph in the exploration will be included, and only edges present in that final graph will be considered.
Where there are multiple shortest hop paths, an arbitrary one is included in the result.
TODO: EXAMPLE
>>> e = core.DiscreteExploration.example()
>>> print(e[-1].graph.namesListing(e[-1].graph))
0 (House)
1 (_u.0)
2 (Cellar)
3 (Yard)
5 (Lane)
<BLANKLINE>
>>> shortest = dict(nx.all_pairs_shortest_path(e[-1].graph.connections()))
>>> for src in shortest:
... print(f"{src} -> {shortest[src]}")
0 -> {0: [0], 1: [0, 1], 2: [0, 2], 3: [0, 3], 5: [0, 3, 5]}
1 -> {1: [1], 0: [1, 0], 2: [1, 0, 2], 3: [1, 0, 3], 5: [1, 0, 3, 5]}
2 -> {2: [2], 0: [2, 0], 3: [2, 3], 1: [2, 0, 1], 5: [2, 3, 5]}
3 -> {3: [3], 0: [3, 0], 2: [3, 2], 5: [3, 5], 1: [3, 0, 1]}
5 -> {5: [5], 3: [5, 3], 0: [5, 3, 0], 2: [5, 3, 2], 1: [5, 3, 0, 1]}
>>> hops = shortestHopPaths(e)
>>> for src in hops:
... print(f"{src} -> {hops[src]}")
(0, 1) -> []
(0, 2) -> []
(0, 3) -> []
(0, 5) -> [3]
(1, 2) -> [0]
(1, 3) -> [0]
(1, 5) -> [0, 3]
(2, 3) -> []
(2, 5) -> [3]
(3, 5) -> []
1993def runFullAnalysis( 1994 exploration: core.DiscreteExploration, 1995 elide: Collection[str] = ELIDE, 1996 finalOnly: Collection[str] = FINAL_ONLY 1997) -> FullAnalysisResults: 1998 """ 1999 Runs every single analysis function on every valid target for that 2000 function in the given exploration, building up the cache of 2001 `FullAnalysisResults` in `ALL_ANALYZERS`. Returns the relevant 2002 `FullAnalysisResults` object. 2003 2004 Skips analyzers in the provided `elide` collection, which by default 2005 is the `ELIDE` global set containing functions explicitly decorated 2006 with `elide`. Analyzers in the `FINAL_ONLY` set are only applied to 2007 the final decision graph in the exploration (although note that they 2008 might call other analyzers which recursively need to analyze prior 2009 steps). `finalOnly` only has an effect for analyzers with 'step', 2010 'stepDecision', or 'stepTransition' units. 2011 """ 2012 for aName, analyzer in ALL_ANALYZERS.items(): 2013 # Skip this one if we're told to 2014 if aName in elide: 2015 continue 2016 # Split out cases for each unit & apply as appropriate 2017 unit = analyzer._unit 2018 if unit == 'step': 2019 sa = cast(StepAnalyzer, analyzer) 2020 if aName in finalOnly: 2021 sa(exploration, len(exploration) - 1) 2022 else: 2023 for step in range(len(exploration)): 2024 sa(exploration, step) 2025 elif unit == 'stepDecision': 2026 sda = cast(StepDecisionAnalyzer, analyzer) 2027 # Only apply to final graph if it's in finalOnly 2028 if aName in finalOnly: 2029 step = len(exploration) - 1 2030 for dID in exploration[step].graph: 2031 sda(exploration, step, dID) 2032 else: 2033 for step in range(len(exploration)): 2034 for dID in exploration[step].graph: 2035 sda(exploration, step, dID) 2036 elif unit == 'stepTransition': 2037 sta = cast(StepTransitionAnalyzer, analyzer) 2038 if aName in finalOnly: 2039 step = len(exploration) - 1 2040 edges = exploration[step].graph.allEdges() 2041 for (src, dst, transition) in edges: 2042 sta(exploration, step, src, transition, dst) 2043 else: 2044 for step in range(len(exploration)): 2045 edges = exploration[step].graph.allEdges() 2046 for (src, dst, transition) in edges: 2047 sta(exploration, step, src, transition, dst) 2048 elif unit == 'decision': 2049 da = cast(DecisionAnalyzer, analyzer) 2050 for dID in exploration.allDecisions(): 2051 da(exploration, dID) 2052 elif unit == 'transition': 2053 ta = cast(TransitionAnalyzer, analyzer) 2054 for (src, trans, dst) in exploration.allTransitions(): 2055 ta(exploration, src, trans, dst) 2056 elif unit == 'exploration': 2057 ea = cast(ExplorationAnalyzer, analyzer) 2058 ea(exploration) 2059 else: 2060 raise ValueError(f"Invalid analysis unit {unit!r}.") 2061 2062 return ANALYSIS_RESULTS[id(exploration)]
Runs every single analysis function on every valid target for that
function in the given exploration, building up the cache of
FullAnalysisResults
in ALL_ANALYZERS
. Returns the relevant
FullAnalysisResults
object.
Skips analyzers in the provided elide
collection, which by default
is the ELIDE
global set containing functions explicitly decorated
with elide
. Analyzers in the FINAL_ONLY
set are only applied to
the final decision graph in the exploration (although note that they
might call other analyzers which recursively need to analyze prior
steps). finalOnly
only has an effect for analyzers with 'step',
'stepDecision', or 'stepTransition' units.
2073def getDecisionAnalyses( 2074 exploration: core.DiscreteExploration, 2075 dID: base.DecisionID 2076) -> AnalysisResults: 2077 """ 2078 Retrieves all pre-computed all-step analysis results for the 2079 specified decision. Use `runFullAnalysis` or call specific analysis 2080 functions of interest first to populate these results. Does not 2081 include per-step decision analyses. 2082 2083 Returns the dictionary of `AnalysisResults`, which can be modified to 2084 update stored results if necessary (although it's better to write 2085 additional analysis routines using the `@analyzer` decorator). 2086 """ 2087 cached = ANALYSIS_RESULTS.setdefault( 2088 id(exploration), 2089 newFullAnalysisResults() 2090 ) 2091 return cached["perDecision"].setdefault(dID, {})
Retrieves all pre-computed all-step analysis results for the
specified decision. Use runFullAnalysis
or call specific analysis
functions of interest first to populate these results. Does not
include per-step decision analyses.
Returns the dictionary of AnalysisResults
, which can be modified to
update stored results if necessary (although it's better to write
additional analysis routines using the @analyzer
decorator).
2094def getTransitionAnalyses( 2095 exploration: core.DiscreteExploration, 2096 source: base.DecisionID, 2097 transition: base.Transition, 2098 destination: base.DecisionID 2099) -> AnalysisResults: 2100 """ 2101 Like `getDecisionAnalyses` but returns analyses for a transition 2102 instead of a decision. 2103 """ 2104 cached = ANALYSIS_RESULTS.setdefault( 2105 id(exploration), 2106 newFullAnalysisResults() 2107 ) 2108 return cached["perTransition"].setdefault( 2109 (source, transition, destination), 2110 {} 2111 )
Like getDecisionAnalyses
but returns analyses for a transition
instead of a decision.
2114def getStepDecisionAnalyses( 2115 exploration: core.DiscreteExploration, 2116 step: int, 2117 dID: base.DecisionID 2118) -> AnalysisResults: 2119 """ 2120 Like `getDecisionAnalyses` but for analyses applicable only to the 2121 specified exploration step. 2122 """ 2123 cached = ANALYSIS_RESULTS.setdefault( 2124 id(exploration), 2125 newFullAnalysisResults() 2126 ) 2127 stepwise = cached.setdefault("perStepDecision", []) 2128 while step >= len(stepwise): 2129 stepwise.append({}) 2130 return stepwise[step].setdefault(dID, {})
Like getDecisionAnalyses
but for analyses applicable only to the
specified exploration step.
2133def getStepTransitionAnalyses( 2134 exploration: core.DiscreteExploration, 2135 step: int, 2136 source: base.DecisionID, 2137 transition: base.Transition, 2138 destination: base.DecisionID 2139) -> AnalysisResults: 2140 """ 2141 Like `getStepDecisionAnalyses` but for a transition at a particular 2142 step, not a decision. 2143 """ 2144 cached = ANALYSIS_RESULTS.setdefault( 2145 id(exploration), 2146 newFullAnalysisResults() 2147 ) 2148 stepwise = cached.setdefault("perStepTransition", []) 2149 while step >= len(stepwise): 2150 stepwise.append({}) 2151 return stepwise[step].setdefault((source, transition, destination), {})
Like getStepDecisionAnalyses
but for a transition at a particular
step, not a decision.
2154def getStepAnalyses( 2155 exploration: core.DiscreteExploration, 2156 step: int 2157) -> AnalysisResults: 2158 """ 2159 Like `getDecisionAnalyses` but retrieves full-step analysis results 2160 for the specified exploration step. 2161 """ 2162 cached = ANALYSIS_RESULTS.setdefault( 2163 id(exploration), 2164 newFullAnalysisResults() 2165 ) 2166 stepwise = cached.setdefault("perStep", []) 2167 while step >= len(stepwise): 2168 stepwise.append({}) 2169 return stepwise[step]
Like getDecisionAnalyses
but retrieves full-step analysis results
for the specified exploration step.
2172def getExplorationAnalyses( 2173 exploration: core.DiscreteExploration 2174) -> AnalysisResults: 2175 """ 2176 Like `getDecisionAnalyses` but retrieves full-exploration analysis 2177 results. 2178 """ 2179 cached = ANALYSIS_RESULTS.setdefault( 2180 id(exploration), 2181 newFullAnalysisResults() 2182 ) 2183 return cached.setdefault("overall", {})
Like getDecisionAnalyses
but retrieves full-exploration analysis
results.
2186class AnalyzersByUnit(TypedDict): 2187 """ 2188 Holds lists of analyzers for each analysis unit type. 2189 """ 2190 step: List[StepAnalyzer] 2191 stepDecision: List[StepDecisionAnalyzer] 2192 stepTransition: List[StepTransitionAnalyzer] 2193 decision: List[DecisionAnalyzer] 2194 transition: List[TransitionAnalyzer] 2195 exploration: List[ExplorationAnalyzer]
Holds lists of analyzers for each analysis unit type.
2198def analyzersByUnit(onlyInclude: Optional[Set[str]] = None) -> AnalyzersByUnit: 2199 """ 2200 Returns an `AnalyzersByUnit` dictionary containing all analyzers 2201 from `ALL_ANALYZERS` which are in the given `onlyInclude` set (or 2202 just all of them if no set is specified). This will by default be all 2203 analyzers registered so far. 2204 """ 2205 byUnit: AnalyzersByUnit = { 2206 "step": [], 2207 "stepDecision": [], 2208 "stepTransition": [], 2209 "decision": [], 2210 "transition": [], 2211 "exploration": [] 2212 } 2213 for analyzerName in ALL_ANALYZERS: 2214 if onlyInclude is not None and analyzerName not in onlyInclude: 2215 continue 2216 analyzer = ALL_ANALYZERS[analyzerName] 2217 unit = analyzer._unit 2218 byUnit[unit].append(analyzer) # type: ignore 2219 # Mypy will just have to trust that We've put the correct unit 2220 # values on each analyzer. That relationship is type-checked in 2221 # the `analyzer` definition. 2222 2223 return byUnit
Returns an AnalyzersByUnit
dictionary containing all analyzers
from ALL_ANALYZERS
which are in the given onlyInclude
set (or
just all of them if no set is specified). This will by default be all
analyzers registered so far.