exploration.analysis

  • Authors: Peter Mawhorter
  • Consulted:
  • Date: 2022-10-24
  • Purpose: Analysis functions for decision graphs an explorations.
   1"""
   2- Authors: Peter Mawhorter
   3- Consulted:
   4- Date: 2022-10-24
   5- Purpose: Analysis functions for decision graphs an explorations.
   6"""
   7
   8from typing import (
   9    List, Dict, Tuple, Optional, TypeVar, Callable, Union, Any,
  10    ParamSpec, Concatenate, Set, cast, Type, TypeAlias, Literal,
  11    TypedDict, Protocol, Sequence, Callable, Collection
  12)
  13
  14from types import FunctionType
  15
  16from . import base, core, parsing
  17
  18import textwrap
  19import functools
  20import inspect
  21import time
  22
  23import networkx as nx
  24
  25
  26#-------------------#
  27# Text descriptions #
  28#-------------------#
  29
  30def describeConsequence(consequence: base.Consequence) -> str:
  31    """
  32    Returns a string which concisely describes a consequence list.
  33    Returns an empty string if given an empty consequence. Examples:
  34
  35    >>> describeConsequence([])
  36    ''
  37    >>> describeConsequence([
  38    ...     base.effect(gain=('gold', 5), delay=2, charges=3),
  39    ...     base.effect(lose='flight')
  40    ... ])
  41    'gain gold*5 ,2 =3; lose flight'
  42    >>> from . import commands
  43    >>> d = describeConsequence([
  44    ...     base.effect(edit=[
  45    ...         [
  46    ...             commands.command('val', '5'),
  47    ...             commands.command('empty', 'list'),
  48    ...             commands.command('append')
  49    ...         ],
  50    ...         [
  51    ...             commands.command('val', '11'),
  52    ...             commands.command('assign', 'var'),
  53    ...             commands.command('op', '+', '$var', '$var')
  54    ...         ],
  55    ...     ])
  56    ... ])
  57    >>> d
  58    'with consequences:\
  59\\n    edit {\
  60\\n      val 5;\
  61\\n      empty list;\
  62\\n      append $_;\
  63\\n    } {\
  64\\n      val 11;\
  65\\n      assign var $_;\
  66\\n      op + $var $var;\
  67\\n    }\
  68\\n'
  69    >>> for line in d.splitlines():
  70    ...     print(line)
  71    with consequences:
  72        edit {
  73          val 5;
  74          empty list;
  75          append $_;
  76        } {
  77          val 11;
  78          assign var $_;
  79          op + $var $var;
  80        }
  81    """
  82    edesc = ''
  83    pf = parsing.ParseFormat()
  84    if consequence:
  85        parts = []
  86        for item in consequence:
  87            # TODO: Challenges and Conditions here!
  88            if 'skills' in item:  # a Challenge
  89                item = cast(base.Challenge, item)
  90                parts.append(pf.unparseChallenge(item))
  91            elif 'value' in item:  # an Effect
  92                item = cast(base.Effect, item)
  93                parts.append(pf.unparseEffect(item))
  94            elif 'condition' in item:  # a Condition
  95                item = cast(base.Condition, item)
  96                parts.append(pf.unparseCondition(item))
  97            else:
  98                raise TypeError(
  99                    f"Invalid consequence item (no 'skills', 'value', or"
 100                    f" 'condition' key found):\n{repr(item)}"
 101                )
 102        edesc = '; '.join(parts)
 103        if len(edesc) > 60 or '\n' in edesc:
 104            edesc = 'with consequences:\n' + ';\n'.join(
 105                textwrap.indent(part, '    ')
 106                for part in parts
 107            ) + '\n'
 108
 109    return edesc
 110
 111
 112def describeProgress(exploration: core.DiscreteExploration) -> str:
 113    """
 114    Describes the progress of an exploration by noting each room/zone
 115    visited and explaining the options visible at each point plus which
 116    option was taken. Notes powers/tokens gained/lost along the way.
 117    Returns a string.
 118
 119    Example:
 120    >>> from exploration import journal
 121    >>> e = journal.convertJournal('''\\
 122    ... S Start::pit
 123    ... A gain jump
 124    ... A gain attack
 125    ... n button check
 126    ... zz Wilds
 127    ... o up
 128    ...   q _flight
 129    ... o left
 130    ... x left left_nook right
 131    ... a geo_rock
 132    ...   At gain geo*15
 133    ...   At deactivate
 134    ... o up
 135    ...   q _tall_narrow
 136    ... t right
 137    ... o right
 138    ...   q attack
 139    ... ''')
 140    >>> for line in describeProgress(e).splitlines():
 141    ...    print(line)
 142    Start of the exploration
 143    Start exploring domain main at 0 (Start::pit)
 144      Gained capability 'attack'
 145      Gained capability 'jump'
 146    At decision 0 (Start::pit)
 147      In zone Start
 148      In region Wilds
 149      There are transitions:
 150        left to unconfirmed
 151        up to unconfirmed; requires _flight
 152      1 note(s) at this step
 153    Explore left from decision 0 (Start::pit) to 2 (now Start::left_nook)
 154    At decision 2 (Start::left_nook)
 155      There are transitions:
 156        right to 0 (Start::pit)
 157      There are actions:
 158        geo_rock
 159    Do action geo_rock
 160      Gained 15 geo(s)
 161    Take right from decision 2 (Start::left_nook) to 0 (Start::pit)
 162    At decision 0 (Start::pit)
 163      There are transitions:
 164        left to 2 (Start::left_nook)
 165        right to unconfirmed; requires attack
 166        up to unconfirmed; requires _flight
 167    Waiting for another action...
 168    End of the exploration.
 169    """
 170    result = ''
 171
 172    regions: Set[base.Zone] = set()
 173    zones: Set[base.Zone] = set()
 174    last: Union[base.DecisionID, Set[base.DecisionID], None] = None
 175    lastState: base.State = base.emptyState()
 176    prevCapabilities = base.effectiveCapabilitySet(lastState)
 177    prevMechanisms = lastState['mechanisms']
 178    oldActiveDecisions: Set[base.DecisionID] = set()
 179    for i, situation in enumerate(exploration):
 180        if i == 0:
 181            result += "Start of the exploration\n"
 182
 183        # Extract info
 184        graph = situation.graph
 185        activeDecisions = exploration.getActiveDecisions(i)
 186        newActive = activeDecisions - oldActiveDecisions
 187        departedFrom = exploration.movementAtStep(i)[0]
 188        # TODO: use the other parts of this?
 189        nowZones: Set[base.Zone] = set()
 190        for active in activeDecisions:
 191            nowZones |= graph.zoneAncestors(active)
 192        regionsHere = set(
 193            z
 194            for z in nowZones
 195            if graph.zoneHierarchyLevel(z) == 1
 196        )
 197        zonesHere = set(
 198            z
 199            for z in nowZones
 200            if graph.zoneHierarchyLevel(z) == 0
 201        )
 202        here = departedFrom
 203        state = situation.state
 204        capabilities = base.effectiveCapabilitySet(state)
 205        mechanisms = state['mechanisms']
 206
 207        # Describe capabilities gained/lost relative to previous step
 208        # (i.e., as a result of the previous action)
 209        gained = (
 210            capabilities['capabilities']
 211          - prevCapabilities['capabilities']
 212        )
 213        gainedTokens = []
 214        for tokenType in capabilities['tokens']:
 215            net = (
 216                capabilities['tokens'][tokenType]
 217              - prevCapabilities['tokens'].get(tokenType, 0)
 218            )
 219            if net != 0:
 220                gainedTokens.append((tokenType, net))
 221        changed = [
 222            mID
 223            for mID in list(mechanisms.keys()) + list(prevMechanisms.keys())
 224            if mechanisms.get(mID) != prevMechanisms.get(mID)
 225        ]
 226
 227        for capability in sorted(gained):
 228            result += f"  Gained capability '{capability}'\n"
 229
 230        for tokenType, net in gainedTokens:
 231            if net > 0:
 232                result += f"  Gained {net} {tokenType}(s)\n"
 233            else:
 234                result += f"  Lost {-net} {tokenType}(s)\n"
 235
 236        for mID in changed:
 237            oldState = prevMechanisms.get(mID, base.DEFAULT_MECHANISM_STATE)
 238            newState = mechanisms.get(mID, base.DEFAULT_MECHANISM_STATE)
 239
 240            details = graph.mechanismDetails(mID)
 241            if details is None:
 242                mName = "(unknown)"
 243            else:
 244                mName = details[1]
 245            result += (
 246                f"  Set mechanism {mID} ({mName}) to {newState} (was"
 247                f" {oldState})"
 248            )
 249            # TODO: Test this!
 250
 251        if isinstance(departedFrom, base.DecisionID):
 252            # Print location info
 253            if here != last:
 254                if here is None:
 255                    result += "Without a position...\n"
 256                elif isinstance(here, set):
 257                    result += f"With {len(here)} active decisions\n"
 258                    # TODO: List them using namesListing?
 259                else:
 260                    result += f"At decision {graph.identityOf(here)}\n"
 261            newZones = zonesHere - zones
 262            for zone in sorted(newZones):
 263                result += f"  In zone {zone}\n"
 264            newRegions = regionsHere - regions
 265            for region in sorted(newRegions):
 266                result += f"  In region {region}\n"
 267
 268        elif isinstance(departedFrom, set):  # active in spreading domain
 269            spreadingDomain = graph.domainFor(list(departedFrom)[0])
 270            result += (
 271                f"  In domain {spreadingDomain} with {len(departedFrom)}"
 272                f" active decisions...\n"
 273            )
 274
 275        else:
 276            assert departedFrom is None
 277
 278        # Describe new position/positions at start of this step
 279        if len(newActive) > 1:
 280            newListing = ', '.join(
 281                sorted(graph.identityOf(n) for n in newActive)
 282            )
 283            result += (
 284                f"  There are {len(newActive)} new active decisions:"
 285                f"\n  {newListing}"
 286            )
 287
 288        elif len(newActive) == 1:
 289            here = list(newActive)[0]
 290
 291            outgoing = graph.destinationsFrom(here)
 292
 293            transitions = {t: d for (t, d) in outgoing.items() if d != here}
 294            actions = {t: d for (t, d) in outgoing.items() if d == here}
 295            if transitions:
 296                result += "  There are transitions:\n"
 297                for transition in sorted(transitions):
 298                    dest = transitions[transition]
 299                    if not graph.isConfirmed(dest):
 300                        destSpec = 'unconfirmed'
 301                    else:
 302                        destSpec = graph.identityOf(dest)
 303                    req = graph.getTransitionRequirement(here, transition)
 304                    rDesc = ''
 305                    if req != base.ReqNothing():
 306                        rDesc = f"; requires {req.unparse()}"
 307                    cDesc = describeConsequence(
 308                        graph.getConsequence(here, transition)
 309                    )
 310                    if cDesc:
 311                        cDesc = '; ' + cDesc
 312                    result += (
 313                        f"    {transition} to {destSpec}{rDesc}{cDesc}\n"
 314                    )
 315
 316            if actions:
 317                result += "  There are actions:\n"
 318                for action in sorted(actions):
 319                    req = graph.getTransitionRequirement(here, action)
 320                    rDesc = ''
 321                    if req != base.ReqNothing():
 322                        rDesc = f"; requires {req.unparse()}"
 323                    cDesc = describeConsequence(
 324                        graph.getConsequence(here, action)
 325                    )
 326                    if cDesc:
 327                        cDesc = '; ' + cDesc
 328                    if rDesc or cDesc:
 329                        desc = (rDesc + cDesc)[2:]  # chop '; ' from either
 330                        result += f"    {action} ({desc})\n"
 331                    else:
 332                        result += f"    {action}\n"
 333
 334        # note annotations
 335        if len(situation.annotations) > 0:
 336            result += (
 337                f"  {len(situation.annotations)} note(s) at this step\n"
 338            )
 339
 340        # Describe action taken
 341        if situation.action is None and situation.type == "pending":
 342            result += "Waiting for another action...\n"
 343        else:
 344            desc = base.describeExplorationAction(situation, situation.action)
 345            desc = desc[0].capitalize() + desc[1:]
 346            result += desc + '\n'
 347
 348        if i == len(exploration) - 1:
 349            result += "End of the exploration.\n"
 350
 351        # Update state variables
 352        oldActiveDecisions = activeDecisions
 353        prevCapabilities = capabilities
 354        prevMechanisms = mechanisms
 355        regions = regionsHere
 356        zones = zonesHere
 357        if here is not None:
 358            last = here
 359        lastState = state
 360
 361    return result
 362
 363
 364#-----------------------#
 365# Analysis result types #
 366#-----------------------#
 367
 368AnalysisUnit: 'TypeAlias' = Literal[
 369    'step',
 370    'stepDecision',
 371    'stepTransition',
 372    'decision',
 373    'transition',
 374    'exploration',
 375]
 376"""
 377The different kinds of analysis units we consider: per-step-per-decision,
 378per-step-per-transition, per-step, per-final-decision,
 379per-final-transition, and per-exploration (i.e. overall).
 380"""
 381
 382
 383AnalysisResults: 'TypeAlias' = Dict[str, Any]
 384"""
 385Analysis results are dictionaries that map analysis routine names to
 386results from those routines, which can be of any type.
 387"""
 388
 389
 390SpecificTransition: 'TypeAlias' = Tuple[base.DecisionID, base.Transition]
 391"""
 392A specific transition is identified by its source decision ID and its
 393transition name. Note that transitions which get renamed are treated as
 394two separate transitions.
 395"""
 396
 397OverspecificTransition: 'TypeAlias' = Tuple[
 398    base.DecisionID,
 399    base.Transition,
 400    base.DecisionID
 401]
 402"""
 403In contrast to a `SpecificTransition`, an `OverspecificTransition`
 404includes the destination of the transition, which might help disambiguate
 405cases where a transition is created, then re-targeted or deleted and
 406re-created with a different destination. Transitions which get renamed
 407still are treated as two separate transitions.
 408"""
 409
 410DecisionAnalyses: 'TypeAlias' = Dict[base.DecisionID, AnalysisResults]
 411"""
 412Decision analysis results are stored per-decision, with a dictionary of
 413property-name → value associations. These properties either apply to
 414decisions across all steps of an exploration, or apply to decisions in a
 415particular `core.DecisionGraph`.
 416"""
 417
 418TransitionAnalyses: 'TypeAlias' = Dict[OverspecificTransition, AnalysisResults]
 419"""
 420Per-transition analysis results, similar to `DecisionAnalyses`.
 421"""
 422
 423StepAnalyses: 'TypeAlias' = List[AnalysisResults]
 424"""
 425Per-exploration-step analysis results are stored in a list and indexed by
 426exploration step integers.
 427"""
 428
 429StepwiseDecisionAnalyses: 'TypeAlias' = List[DecisionAnalyses]
 430"""
 431Per-step-per-decision analysis results are stored as a list of decision
 432analysis results.
 433"""
 434
 435StepwiseTransitionAnalyses: 'TypeAlias' = List[TransitionAnalyses]
 436"""
 437Per-step-per-transition analysis results are stored as a list of
 438transition analysis results.
 439"""
 440
 441ExplorationAnalyses: 'TypeAlias' = AnalysisResults
 442"""
 443Whole-exploration analyses are just a normal `AnalysisResults` dictionary.
 444"""
 445
 446class FullAnalysisResults(TypedDict):
 447    """
 448    Full analysis results hold every kind of analysis result in one
 449    dictionary.
 450    """
 451    perDecision: DecisionAnalyses
 452    perTransition: TransitionAnalyses
 453    perStep: StepAnalyses
 454    perStepDecision: StepwiseDecisionAnalyses
 455    perStepTransition: StepwiseTransitionAnalyses
 456    overall: ExplorationAnalyses
 457
 458
 459def newFullAnalysisResults() -> FullAnalysisResults:
 460    """
 461    Returns a new empty `FullAnalysisResults` dictionary.
 462    """
 463    return {
 464        'perDecision': {},
 465        'perTransition': {},
 466        'perStep': [],
 467        'perStepDecision': [],
 468        'perStepTransition': [],
 469        'overall': {}
 470    }
 471
 472
 473Params = ParamSpec('Params')
 474'Parameter specification variable for `AnalysisFunction` definition.'
 475
 476
 477class AnalysisFunction(Protocol[Params]):
 478    """
 479    Analysis functions are callable, but also have a `_unit` attribute
 480    which is a string.
 481    """
 482    _unit: AnalysisUnit
 483    __name__: str
 484    __doc__: str
 485    def __call__(
 486        self,
 487        exploration: core.DiscreteExploration,
 488        *args: Params.args,
 489        **kwargs: Params.kwargs
 490    ) -> Any:
 491        ...
 492
 493
 494StepAnalyzer: 'TypeAlias' = AnalysisFunction[[int]]
 495'''
 496A step analyzer is a function which will receive a
 497`core.DiscreteExploration` along with the step in that exploration being
 498considered. It can return any type of analysis result.
 499'''
 500
 501StepDecisionAnalyzer: 'TypeAlias' = AnalysisFunction[[int, base.DecisionID]]
 502'''
 503Like a `StepAnalyzer` but also gets a decision ID to consider.
 504'''
 505
 506StepTransitionAnalyzer: 'TypeAlias' = AnalysisFunction[
 507    [int, base.DecisionID, base.Transition, base.DecisionID]
 508]
 509'''
 510Like a `StepAnalyzer` but also gets a source decision ID, a transition
 511name, and a destination decision ID to target.
 512'''
 513
 514
 515DecisionAnalyzer: 'TypeAlias' = AnalysisFunction[[base.DecisionID]]
 516'''
 517A decision analyzer gets full analysis results to update plus an
 518exploration and a particular decision ID to consider.
 519'''
 520
 521TransitionAnalyzer: 'TypeAlias' = AnalysisFunction[
 522    [base.DecisionID, base.Transition, base.DecisionID]
 523]
 524'''
 525Like a `DecisionAnalyzer` but gets a transition name as well.
 526'''
 527
 528ExplorationAnalyzer: 'TypeAlias' = AnalysisFunction[[]]
 529'''
 530Analyzes overall properties of an entire `core.DiscreteExploration`.
 531'''
 532
 533
 534#--------------------------#
 535# Analysis caching support #
 536#--------------------------#
 537
 538AnyAnalyzer: 'TypeAlias' = Union[
 539    ExplorationAnalyzer,
 540    TransitionAnalyzer,
 541    DecisionAnalyzer,
 542    StepAnalyzer,
 543    StepDecisionAnalyzer,
 544    StepTransitionAnalyzer
 545]
 546
 547
 548ANALYSIS_RESULTS: Dict[int, FullAnalysisResults] = {}
 549"""
 550Caches analysis results, keyed by the `id` of the
 551`core.DiscreteExploration` they're based on.
 552"""
 553
 554
 555class NotCached:
 556    """
 557    Reference object for specifying that no cached value is available,
 558    since `None` is a valid cached value.
 559    """
 560    pass
 561
 562
 563def lookupAnalysisResult(
 564    cache: FullAnalysisResults,
 565    analyzer: AnalysisFunction,
 566    argsInOrder: Sequence[Any]
 567) -> Union[Type[NotCached], Any]:
 568    """
 569    Looks up an analysis result for the given function in the given
 570    cache. The function must have been decorated with `analyzer`. The
 571    bound arguments must match the unit of analysis, for example, if the
 572    unit is 'stepDecision', the arguments must be those for a
 573    `StepDecisionAnalyzer`. The bound arguments should have had
 574    `apply_defaults` called already to fill in default argument values.
 575    Returns the special object `NotCached` if there is no cached value
 576    for the specified arguments yet.
 577    """
 578    unit = analyzer._unit
 579    if unit == 'step':
 580        whichStep = argsInOrder[1]
 581        perStep = cache['perStep']
 582        while len(perStep) <= whichStep:
 583            perStep.append({})
 584        return perStep[whichStep].get(analyzer.__name__, NotCached)
 585    elif unit == 'stepDecision':
 586        whichStep = argsInOrder[1]
 587        whichDecision = argsInOrder[2]
 588        perStepDecision = cache['perStepDecision']
 589        while len(perStepDecision) <= whichStep:
 590            perStepDecision.append({})
 591        forThis = perStepDecision[whichStep].get(whichDecision)
 592        if forThis is None:
 593            return NotCached
 594        return forThis.get(analyzer.__name__, NotCached)
 595    elif unit == 'stepTransition':
 596        whichStep = argsInOrder[1]
 597        whichTransition = (argsInOrder[2], argsInOrder[3], argsInOrder[4])
 598        perStepTransition = cache['perStepTransition']
 599        while len(perStepTransition) <= whichStep:
 600            perStepTransition.append({})
 601        forThis = perStepTransition[whichStep].get(whichTransition)
 602        if forThis is None:
 603            return NotCached
 604        return forThis.get(analyzer.__name__, NotCached)
 605    elif unit == 'decision':
 606        whichDecision = argsInOrder[1]
 607        perDecision = cache['perDecision']
 608        if whichDecision not in perDecision:
 609            return NotCached
 610        return perDecision[whichDecision].get(analyzer.__name__, NotCached)
 611    elif unit == 'transition':
 612        whichTransition = (argsInOrder[1], argsInOrder[2], argsInOrder[3])
 613        perTransition = cache['perTransition']
 614        if whichTransition not in perTransition:
 615            return NotCached
 616        return perTransition[whichTransition].get(
 617            analyzer.__name__,
 618            NotCached
 619        )
 620    elif unit == 'exploration':
 621        return cache['overall'].get(analyzer.__name__, NotCached)
 622    else:
 623        raise ValueError(f"Invalid analysis unit {unit!r}.")
 624
 625
 626def saveAnalysisResult(
 627    cache: FullAnalysisResults,
 628    result: Any,
 629    analyzer: AnalysisFunction,
 630    argsInOrder: Sequence[Any]
 631) -> None:
 632    """
 633    Saves an analysis result in the specified cache. The bound arguments
 634    must match the unit, for example, if the unit is 'stepDecision', the
 635    arguments must be those for a `StepDecisionAnalyzer`.
 636    """
 637    unit = analyzer._unit
 638    if unit == 'step':
 639        whichStep = argsInOrder[1]
 640        perStep = cache['perStep']
 641        while len(perStep) <= whichStep:
 642            perStep.append({})
 643        perStep[whichStep][analyzer.__name__] = result
 644    elif unit == 'stepDecision':
 645        whichStep = argsInOrder[1]
 646        whichDecision = argsInOrder[2]
 647        perStepDecision = cache['perStepDecision']
 648        while len(perStepDecision) <= whichStep:
 649            perStepDecision.append({})
 650        forThis = perStepDecision[whichStep].setdefault(whichDecision, {})
 651        forThis[analyzer.__name__] = result
 652    elif unit == 'stepTransition':
 653        whichStep = argsInOrder[1]
 654        whichTransition = (argsInOrder[2], argsInOrder[3], argsInOrder[4])
 655        perStepTransition = cache['perStepTransition']
 656        while len(perStepTransition) <= whichStep:
 657            perStepTransition.append({})
 658        forThis = perStepTransition[whichStep].setdefault(whichTransition, {})
 659        forThis[analyzer.__name__] = result
 660    elif unit == 'decision':
 661        whichDecision = argsInOrder[1]
 662        perDecision = cache['perDecision']
 663        perDecision.setdefault(whichDecision, {})[analyzer.__name__] = result
 664    elif unit == 'transition':
 665        whichTransition = (argsInOrder[1], argsInOrder[2], argsInOrder[3])
 666        perTransition = cache['perTransition']
 667        perTransition.setdefault(
 668            whichTransition,
 669            {}
 670        )[analyzer.__name__] = result
 671    elif unit == 'exploration':
 672        cache['overall'][analyzer.__name__] = result
 673    else:
 674        raise ValueError(f"Invalid analysis unit {unit!r}.")
 675
 676
 677ALL_ANALYZERS: Dict[str, AnyAnalyzer] = {}
 678"""
 679Holds all analyzers indexed by name with the analysis unit plus function
 680as the value. The `analyzer` decorator registers them.
 681"""
 682
 683
 684RECORD_PROFILE: bool = False
 685"""
 686Whether or not to record time spent by each analysis function.
 687"""
 688
 689
 690class AnalyzerPerf(TypedDict):
 691    """
 692    Tracks performance of an analysis function, recording total calls,
 693    non-cached calls, time spent looking up cached results, and time
 694    spent in non-cached calls (including the failed cache lookup and
 695    saving the result in the cache). 
 696    """
 697    calls: int
 698    nonCached: int
 699    lookupTime: float
 700    analyzeTime: float
 701
 702
 703def newAnalyzerPerf() -> AnalyzerPerf:
 704    """
 705    Creates a new empty `AnalyzerPerf` dictionary.
 706    """
 707    return {
 708        "calls": 0,
 709        "nonCached": 0,
 710        "lookupTime": 0.0,
 711        "analyzeTime": 0.0
 712    }
 713
 714
 715ANALYSIS_TIME_SPENT: Dict[str, AnalyzerPerf] = {}
 716"""
 717Records number-of-calls, number-of-non-cached calls, and time spent in
 718each analysis function, when `RECORD_PROFILE` is set to `True`.
 719"""
 720
 721
 722ELIDE: Set[str] = set()
 723"""
 724Analyzers which should not be included in CSV output by default.
 725"""
 726
 727FINAL_ONLY: Set[str] = set()
 728"""
 729Per-step/step-decision/step-transition analyzers which should by default
 730only be applied to the final step of an exploration to save time.
 731"""
 732
 733
 734def getArgsInOrder(
 735    f: Callable,
 736    *args: Any,
 737    **kwargs: Any
 738) -> List[Any]:
 739    """
 740    Given a callable and some arguments, returns a list of argument and
 741    some arguments, returns a list of argument values in the same order
 742    as that function would accept them from the given arguments,
 743    accounting for things like keyword arguments and default values.
 744
 745    For example:
 746
 747    >>> def f(a, /, b, *more, x=3, y=10, **kw):
 748    ...     pass
 749    >>> sig = inspect.Signature.from_callable(f)
 750    >>> getArgsInOrder(f, 1, 2)
 751    [1, 2, 3, 10]
 752    >>> getArgsInOrder(f, 4, 5, y=2, x=8)
 753    [4, 5, 8, 2]
 754    >>> getArgsInOrder(f, 4, y=2, x=8, b=3)
 755    [4, 3, 8, 2]
 756    >>> getArgsInOrder(f, 4, y=2, x=8, b=3)
 757    [4, 3, 8, 2]
 758    >>> getArgsInOrder(f, 1, 2, 3, 4)
 759    [1, 2, 3, 4, 3, 10]
 760    >>> getArgsInOrder(f, 1, 2, 3, 4, q=5, k=9)
 761    [1, 2, 3, 4, 3, 10, 5, 9]
 762    """
 763    sig = inspect.Signature.from_callable(f)
 764    bound = sig.bind(*args, **kwargs)
 765    bound.apply_defaults()
 766    result = []
 767    for paramName in sig.parameters:
 768        param = sig.parameters[paramName]
 769        if param.kind in (
 770            inspect.Parameter.POSITIONAL_ONLY,
 771            inspect.Parameter.POSITIONAL_OR_KEYWORD,
 772        ):
 773            result.append(bound.arguments[paramName])
 774        elif param.kind == inspect.Parameter.VAR_POSITIONAL:
 775            result.extend(bound.arguments[paramName])
 776        elif param.kind == inspect.Parameter.KEYWORD_ONLY:
 777            result.append(bound.arguments[paramName])
 778        elif param.kind == inspect.Parameter.VAR_KEYWORD:
 779            result.extend(bound.arguments[paramName].values())
 780
 781    return result
 782
 783
 784def analyzer(unit: AnalysisUnit) -> Callable[
 785    [Callable[Concatenate[core.DiscreteExploration, Params], Any]],
 786    AnalysisFunction
 787]:
 788    '''
 789    Decorator which sets up caching for an analysis function in the
 790    global `ANALYSIS_RESULTS` dictionary. Whenever the decorated function
 791    is called, it will first check whether a cached result is available
 792    for the same target exploration (by id) and additional target info
 793    based on the analysis unit type. If so, the cached result will be
 794    returned. This allows analysis functions to simply call each other
 795    when they need results and themselves recursively if they need to
 796    track things across steps/decisions, while avoiding tons of duplicate
 797    work.
 798    '''
 799    def makeCachedAnalyzer(
 800        baseFunction: Callable[
 801            Concatenate[core.DiscreteExploration, Params],
 802            Any
 803        ]
 804    ) -> AnalysisFunction:
 805        """
 806        Decoration function which registers an analysis function with
 807        pre-specified dependencies.
 808        """
 809        analysisFunction = cast(AnalysisFunction, baseFunction)
 810        analysisFunction._unit = unit
 811        analyzerName= analysisFunction.__name__
 812
 813        @functools.wraps(analysisFunction)
 814        def cachingAnalyzer(
 815            exploration: core.DiscreteExploration,
 816            *args: Params.args,
 817            **kwargs: Params.kwargs
 818        ):
 819            """
 820            This docstring will be replaced with the docstring of the
 821            decorated function plus a note about caching.
 822            """
 823            if RECORD_PROFILE:
 824                ANALYSIS_TIME_SPENT.setdefault(
 825                    analyzerName,
 826                    newAnalyzerPerf()
 827                )
 828                perf = ANALYSIS_TIME_SPENT[analyzerName]
 829                perf["calls"] += 1
 830                start = time.perf_counter()
 831            cache = ANALYSIS_RESULTS.setdefault(
 832                id(exploration),
 833                newFullAnalysisResults()
 834            )
 835            argsInOrder = getArgsInOrder(
 836                baseFunction,
 837                exploration,
 838                *args,
 839                **kwargs
 840            )
 841            cachedResult = lookupAnalysisResult(
 842                cache,
 843                analysisFunction,
 844                argsInOrder
 845            )
 846            if cachedResult is not NotCached:
 847                if RECORD_PROFILE:
 848                    perf["lookupTime"] += time.perf_counter() - start
 849                return cachedResult
 850
 851            result = analysisFunction(exploration, *args, **kwargs)
 852            saveAnalysisResult(cache, result, analysisFunction, argsInOrder)
 853            if RECORD_PROFILE:
 854                perf["nonCached"] += 1
 855                perf["analyzeTime"] += time.perf_counter() - start
 856            return result
 857
 858        cachingAnalyzer.__doc__ = (
 859            textwrap.dedent(analysisFunction.__doc__)
 860          + """
 861
 862This function's results are cached in the `ALL_ANALYZERS` dictionary, and
 863it returns cached results when possible. Use `clearAnalysisCache` to
 864clear the analysis cache.
 865"""
 866        )
 867
 868        # Save caching version of analyzer
 869        result = cast(AnalysisFunction, cachingAnalyzer)
 870        ALL_ANALYZERS[analyzerName] = result
 871        return result
 872
 873    return makeCachedAnalyzer
 874
 875
 876T = TypeVar('T', bound=AnalysisFunction)
 877'Type variable for `elide` and `finalOnly`.'
 878
 879def elide(analyzer: T) -> T:
 880    """
 881    Returns the given analyzer after noting that its result should *not*
 882    be included in CSV output by default.
 883    """
 884    ELIDE.add(analyzer.__name__)
 885    return analyzer
 886
 887
 888def finalOnly(analyzer: T) -> T:
 889    """
 890    Returns the given analyzer after noting that it should only be run on
 891    the final exploration step by default.
 892    """
 893    FINAL_ONLY.add(analyzer.__name__)
 894    return analyzer
 895
 896
 897#-----------------------#
 898# Generalizer Functions #
 899#-----------------------#
 900
 901AnalyzerType = TypeVar('AnalyzerType', bound=AnyAnalyzer)
 902"""
 903Type var to forward through analyzer types.
 904"""
 905
 906def registerCount(target: AnalyzerType, sizeName: str) -> AnalyzerType:
 907    """
 908    Registers a new analysis routine which uses the same analysis unit as
 909    the target routine but which returns the length of that routine's
 910    result. Returns `None` if the target routine does.
 911
 912    Needs the target routine and the name to register the new analysis
 913    routine under.
 914
 915    Returns the analysis function it creates.
 916    """
 917    def countAnalyzer(*args, **kwargs):
 918        'To be replaced'
 919        result = target(*args, **kwargs)
 920        if result is None:
 921            return None
 922        else:
 923            return len(result)
 924
 925    countAnalyzer.__doc__ = (
 926        f"Measures count of the {target.__name__!r} result applied to"
 927        f" {target._unit!r}."
 928    )
 929    countAnalyzer.__name__ = sizeName
 930
 931    # Register the new function & return the result
 932    return cast(
 933        AnalyzerType,
 934        analyzer(target._unit)(countAnalyzer)
 935    )
 936
 937
 938CombinerResult = TypeVar('CombinerResult')
 939"""
 940Type variable for the result of a combiner function.
 941"""
 942
 943StepCombiner: 'TypeAlias' = Callable[
 944    [
 945        Dict[Union[base.DecisionID, OverspecificTransition], Any],
 946        core.DiscreteExploration,
 947        int
 948    ],
 949    CombinerResult
 950]
 951"""
 952A combiner function which gets a dictionary of per-decision or
 953per-transition values along with an exploration object and a step index
 954and combines the values into a `CombinerResult` that's specific to that
 955step.
 956"""
 957
 958OverallCombiner: 'TypeAlias' = Callable[
 959    [
 960        Dict[Union[base.DecisionID, OverspecificTransition, int], Any],
 961        core.DiscreteExploration
 962    ],
 963    CombinerResult
 964]
 965"""
 966A combiner function which gets a dictionary of per-decision,
 967per-transition, and/or per-step values along with an exploration object
 968and combines the values into a `CombinerResult`.
 969"""
 970
 971
 972def registerStepCombined(
 973    name: str,
 974    resultName: str,
 975    combiner: StepCombiner[CombinerResult]
 976) -> StepAnalyzer:
 977    """
 978    Registers a new analysis routine which combines results of another
 979    routine either across all decisions/transitions at a step. The new
 980    routine will have a 'step' analysis unit.
 981
 982    Needs the name of the target routine, the name to register the new
 983    analysis routine under, and the function that will be called to
 984    combine results, given a dictionary of results that maps
 985    decisions/transitions to results for each.
 986
 987    Returns the analysis function it creates.
 988    """
 989    # Target function
 990    target = ALL_ANALYZERS[name]
 991    # Analysis unit of the target function
 992    targetUnit = target._unit
 993
 994    if targetUnit not in ('stepDecision', 'stepTransition'):
 995        raise ValueError(
 996            f"Target analysis routine {name!r} has incompatible analysis"
 997            f" unit {targetUnit!r}."
 998        )
 999
1000    def analysisCombiner(
1001        exploration: core.DiscreteExploration,
1002        step: int
1003    ) -> CombinerResult:
1004        'To be replaced'
1005        # Declare data here as generic type
1006        data: Dict[
1007            Union[base.DecisionID, OverspecificTransition, int],
1008            Any
1009        ]
1010        graph = exploration[step].graph
1011        if targetUnit == "stepDecision":
1012            analyzeStepDecision = cast(StepDecisionAnalyzer, target)
1013            data = {
1014                dID: analyzeStepDecision(exploration, step, dID)
1015                for dID in graph
1016            }
1017        elif targetUnit == "stepTransition":
1018            edges = graph.allEdges()
1019            analyzeStepTransition = cast(StepTransitionAnalyzer, target)
1020            data = {
1021                (src, transition, dst): analyzeStepTransition(
1022                    exploration,
1023                    step,
1024                    src,
1025                    transition,
1026                    dst
1027                )
1028                for (src, dst, transition) in edges
1029            }
1030        else:
1031            raise ValueError(
1032                f"Target analysis routine {name!r} has inconsistent"
1033                f" analysis unit {targetUnit!r} for 'step' result"
1034                f" unit."
1035            )
1036        return combiner(data, exploration, step)
1037
1038    analysisCombiner.__doc__ = (
1039        f"Computes {combiner.__name__} for the {name!r} result over all"
1040        f" {targetUnit}s at each step."
1041    )
1042    analysisCombiner.__name__ = resultName
1043
1044    # Register the new function & return it
1045    return analyzer("step")(analysisCombiner)
1046
1047
1048def registerFullCombined(
1049    name: str,
1050    resultName: str,
1051    combiner: OverallCombiner[CombinerResult]
1052) -> ExplorationAnalyzer:
1053    """
1054    Works like `registerStepCombined` but combines results over
1055    decisions/transitions/steps across the entire exploration to get one
1056    result for the entire thing, not one result per step. May also
1057    target an existing `ExplorationAnalyzer` whose result is a
1058    dictionary, in which case it will combine that dictionary's values.
1059
1060    Needs the name of the target routine, the name to register the new
1061    analysis routine under, and the function that will be called to
1062    combine results, given a dictionary of results that maps
1063    decisions/transitions/steps to results for each.
1064
1065    Returns the analysis function it creates.
1066    """
1067    # Target function
1068    target = ALL_ANALYZERS[name]
1069    # Analysis unit of the target function
1070    targetUnit = target._unit
1071    if targetUnit not in ('step', 'decision', 'transition', 'exploration'):
1072        raise ValueError(
1073            f"Target analysis routine {name!r} has incompatible analysis"
1074            f" unit {targetUnit!r}."
1075        )
1076
1077    def analysisCombiner(  # type: ignore
1078        exploration: core.DiscreteExploration,
1079    ) -> CombinerResult:
1080        'To be replaced'
1081        # Declare data here as generic type
1082        data: Dict[
1083            Union[base.DecisionID, OverspecificTransition, int],
1084            Any
1085        ]
1086        if targetUnit == "step":
1087            analyzeStep = cast(StepAnalyzer, target)
1088            data = {
1089                step: analyzeStep(exploration, step)
1090                for step in range(len(exploration))
1091            }
1092        elif targetUnit == "decision":
1093            analyzeDecision = cast(DecisionAnalyzer, target)
1094            data = {
1095                dID: analyzeDecision(exploration, dID)
1096                for dID in exploration.allDecisions()
1097            }
1098        elif targetUnit == "transition":
1099            analyzeTransition = cast(TransitionAnalyzer, target)
1100            data = {
1101                (src, transition, dst): analyzeTransition(
1102                    exploration,
1103                    src,
1104                    transition,
1105                    dst
1106                )
1107                for (src, transition, dst) in exploration.allTransitions()
1108            }
1109        elif targetUnit == "exploration":
1110            analyzeExploration = cast(ExplorationAnalyzer, target)
1111            data = analyzeExploration(exploration)
1112        else:
1113            raise ValueError(
1114                f"Target analysis routine {name!r} has inconsistent"
1115                f" analysis unit {targetUnit!r} for 'step' result"
1116                f" unit."
1117            )
1118        return combiner(data, exploration)
1119
1120    analysisCombiner.__doc__ = (
1121        f"Computes {combiner.__name__} for the {name!r} result over all"
1122        f" {targetUnit}s."
1123    )
1124    analysisCombiner.__name__ = resultName
1125
1126    # Register the new function & return it
1127    return analyzer("exploration")(analysisCombiner)
1128
1129
1130def sumCombiner(data, *_):
1131    """
1132    Computes sum over numeric data as a "combiner" function to be used
1133    with `registerStepCombined` or `registerFullCombined`.
1134
1135    Only sums values which are `int`s, `float`s, or `complex`es, ignoring
1136    any other values.
1137    """
1138    return sum(
1139        x for x in data.values() if isinstance(x, (int, float, complex))
1140    )
1141
1142
1143def meanCombiner(data, *_):
1144    """
1145    Computes mean over numeric data as a "combiner" function to be used
1146    with `registerStepCombined` or `registerFullCombined`.
1147
1148    Only counts values which are `int`s, `float`s, or `complex`es, ignoring
1149    any other values. Uses `None` as the result when there are 0 numeric
1150    values.
1151    """
1152    numeric = [
1153        x for x in data.values() if isinstance(x, (int, float, complex))
1154    ]
1155    if len(numeric) == 0:
1156        return None
1157    else:
1158        return sum(numeric) / len(numeric)
1159
1160
1161def medianCombiner(data, *_):
1162    """
1163    Computes median over numeric data as a "combiner" function to be used
1164    with `registerStepCombined` or `registerFullCombined`.
1165
1166    Only counts values which are `int`s, `float`s, or `complex`es, ignoring
1167    any other values. Uses `None` as the result when there are 0 numeric
1168    values.
1169    """
1170    numeric = sorted(
1171        x for x in data.values() if isinstance(x, (int, float, complex))
1172    )
1173    if len(numeric) == 0:
1174        return None
1175    elif len(numeric) == 1:
1176        return numeric[0]
1177    else:
1178        half = len(numeric) // 2
1179        if len(numeric) % 2 == 0:
1180            return (numeric[half - 1] + numeric[half]) / 2
1181        else:
1182            return numeric[half]
1183
1184
1185#---------------------------#
1186# Simple property functions #
1187#---------------------------#
1188
1189@analyzer('decision')
1190def finalIdentity(
1191    exploration: core.DiscreteExploration,
1192    decision: base.DecisionID
1193) -> str:
1194    """
1195    Returns the `identityOf` result for the specified decision in the
1196    last step in which that decision existed.
1197    """
1198    for i in range(-1, -len(exploration) - 1, -1):
1199        situation = exploration.getSituation(i)
1200        try:
1201            return situation.graph.identityOf(decision)
1202        except core.MissingDecisionError:
1203            pass
1204    raise core.MissingDecisionError(
1205        f"Decision {decision!r} never existed."
1206    )
1207
1208
1209@analyzer('step')
1210def currentDecision(
1211    exploration: core.DiscreteExploration,
1212    step: int
1213) -> Optional[base.DecisionID]:
1214    """
1215    Returns the `base.DecisionID` for the current decision in a given
1216    situation.
1217    """
1218    return exploration[step].state['primaryDecision']
1219
1220
1221@analyzer('step')
1222def currentDecisionIdentity(
1223    exploration: core.DiscreteExploration,
1224    step: int
1225) -> str:
1226    """
1227    Returns the `identityOf` string for the current decision in a given
1228    situation.
1229    """
1230    situation = exploration[step]
1231    return situation.graph.identityOf(situation.state['primaryDecision'])
1232
1233
1234@analyzer('step')
1235def observedSoFar(
1236    exploration: core.DiscreteExploration,
1237    step: int
1238) -> Set[base.DecisionID]:
1239    """
1240    Returns the set of all decision IDs observed so far. Note that some
1241    of them may no longer be present in the graph at the given step if
1242    they got merged or deleted.
1243    """
1244    # Can't allow negative steps (caching would fail)
1245    if step < 0:
1246        raise IndexError(f"Invalid step (can't be negative): {step!r}")
1247    elif step == 0:
1248        result = set()
1249    else:
1250        result = observedSoFar(exploration, step - 1)
1251    result |= set(exploration[step].graph)
1252    return result
1253
1254
1255totalDecisionsSoFar = registerCount(observedSoFar, 'totalDecisionsSoFar')
1256
1257
1258@analyzer('step')
1259def justObserved(
1260    exploration: core.DiscreteExploration,
1261    step: int
1262) -> Set[base.DecisionID]:
1263    """
1264    Returns the set of new `base.DecisionID`s that first appeared at the
1265    given step. Will be empty for steps where no new decisions are
1266    observed. Note that this is about decisions whose existence becomes
1267    known, NOT decisions which get confirmed.
1268    """
1269    if step == 0:
1270        return observedSoFar(exploration, step)
1271    else:
1272        return (
1273            observedSoFar(exploration, step - 1)
1274          - observedSoFar(exploration, step)
1275        )
1276
1277
1278newDecisionCount = registerCount(justObserved, 'newDecisionCount')
1279
1280
1281@elide
1282@analyzer('stepDecision')
1283def hasBeenObserved(
1284    exploration: core.DiscreteExploration,
1285    step: int,
1286    dID: base.DecisionID
1287) -> bool:
1288    """
1289    Whether or not the specified decision has been observed at or prior
1290    to the specified step. Note that it may or may not actually be a
1291    decision in the specified step (e.g., if it was previously observed
1292    but then deleted).
1293    """
1294    return dID in observedSoFar(exploration, step)
1295
1296
1297@analyzer('exploration')
1298def stepsObserved(
1299    exploration: core.DiscreteExploration,
1300) -> Dict[base.DecisionID, int]:
1301    """
1302    Returns a dictionary that holds the step at which each decision was
1303    first observed, keyed by decision ID.
1304    """
1305    result = {}
1306    soFar: Set[base.DecisionID] = set()
1307    for step, situation in enumerate(exploration):
1308        new = set(situation.graph) - soFar
1309        for dID in new:
1310            result[dID] = step
1311        soFar |= new
1312    return result
1313
1314
1315@analyzer('decision')
1316def stepObserved(
1317    exploration: core.DiscreteExploration,
1318    dID: base.DecisionID
1319) -> int:
1320    """
1321    Returns the step at which the specified decision was first observed
1322    (NOT confirmed).
1323    """
1324    try:
1325        return stepsObserved(exploration)[dID]
1326    except KeyError:
1327        raise core.MissingDecisionError(
1328            f"Decision {dID!r} was never observed."
1329        )
1330
1331
1332@analyzer('exploration')
1333def stepsConfirmed(
1334    exploration: core.DiscreteExploration,
1335) -> Dict[base.DecisionID, int]:
1336    """
1337    Given an exploration, returns a dictionary mapping decision IDs to
1338    the step at which each was first confirmed. Decisions which were
1339    never confirmed will not be included in the dictionary.
1340    """
1341    result = {}
1342    for i, situation in enumerate(exploration):
1343        for dID in situation.graph:
1344            if (
1345                dID not in result
1346            and 'unconfirmed' not in situation.graph.decisionTags(dID)
1347            ):
1348                result[dID] = i
1349    return result
1350
1351
1352@analyzer('decision')
1353def stepConfirmed(
1354    exploration: core.DiscreteExploration,
1355    dID: base.DecisionID
1356) -> Optional[int]:
1357    """
1358    Returns the step at which the specified decision was first confirmed,
1359    or `None` if it was never confirmed. Returns `None` for invalid
1360    decision IDs.
1361    """
1362    return stepsConfirmed(exploration).get(dID)
1363
1364
1365@analyzer('exploration')
1366def stepsVisited(
1367    exploration: core.DiscreteExploration,
1368) -> Dict[base.DecisionID, List[int]]:
1369    """
1370    Given an exploration, returns a dictionary mapping decision IDs to
1371    the list of steps at which each was visited. Decisions which were
1372    never visited will not be included in the dictionary.
1373    """
1374    result: Dict[base.DecisionID, List[int]] = {}
1375    for i, situation in enumerate(exploration):
1376        for dID in situation.graph:
1377            if dID in base.combinedDecisionSet(situation.state):
1378                result.setdefault(dID, []).append(i)
1379    return result
1380
1381
1382@finalOnly
1383@analyzer('stepDecision')
1384def hasBeenVisited(
1385    exploration: core.DiscreteExploration,
1386    step: int,
1387    dID: base.DecisionID
1388) -> bool:
1389    """
1390    Whether or not the specified decision has been visited at or prior
1391    to the specified step. Note that it may or may not actually be a
1392    decision in the specified step (e.g., if it was previously observed
1393    but then deleted).
1394    """
1395    visits = stepsVisited(exploration).get(dID, [])
1396    # No visits -> not visited yet
1397    if len(visits) == 0:
1398        return False
1399    else:
1400        # First visit was at or before this step
1401        return min(visits) <= step
1402
1403
1404@analyzer('decision')
1405def stepFirstVisited(
1406    exploration: core.DiscreteExploration,
1407    decision: base.DecisionID,
1408) -> Optional[int]:
1409    """
1410    Returns the first step at which the given decision was visited, or
1411    `None` if the decision was never visited.
1412    """
1413    vis = stepsVisited(exploration)
1414    if decision in vis:
1415        return min(vis[decision])
1416    else:
1417        return None
1418
1419
1420@analyzer('decision')
1421def stepsActive(
1422    exploration: core.DiscreteExploration,
1423    decision: base.DecisionID,
1424) -> Optional[int]:
1425    """
1426    Returns the total number of steps in which this decision was active.
1427    """
1428    vis = stepsVisited(exploration)
1429    if decision in vis:
1430        return len(vis[decision])
1431    else:
1432        return 0
1433
1434
1435@analyzer('exploration')
1436def stepsTransisionsObserved(
1437    exploration: core.DiscreteExploration
1438) -> Dict[OverspecificTransition, int]:
1439    """
1440    Returns a dictionary that holds the step at which each transition was
1441    first observed, keyed by (source-decision, transition-name,
1442    destination-decision) triples.
1443
1444    Does NOT distinguish between cases where a once-deleted transition
1445    was later reinstated (unless it had a different destination in the
1446    end).
1447    """
1448    result = {}
1449    for i, situation in enumerate(exploration):
1450        for dID in situation.graph:
1451            destinations = situation.graph.destinationsFrom(dID)
1452            for name, dest in destinations.items():
1453                key = (dID, name, dest)
1454                if key not in result:
1455                    result[key] = i
1456    return result
1457
1458
1459@analyzer('transition')
1460def stepObservedTransition(
1461    exploration: core.DiscreteExploration,
1462    source: base.DecisionID,
1463    transition: base.Transition,
1464    destination: base.DecisionID
1465) -> Optional[int]:
1466    """
1467    Returns the step within the exploration at which the specified
1468    transition was first observed. Note that transitions which get
1469    renamed do NOT preserve their identities, so a search for a renamed
1470    transition will return the step on which it was renamed (assuming it
1471    didn't change destination).
1472
1473    Returns `None` if the specified transition never existed in the
1474    exploration.
1475    """
1476    obs = stepsTransisionsObserved(exploration)
1477    return obs.get((source, transition, destination))
1478
1479
1480@analyzer('step')
1481def transitionTaken(
1482    exploration: core.DiscreteExploration,
1483    step: int
1484) -> Optional[OverspecificTransition]:
1485    """
1486    Returns the source decision Id, the name of the transition taken, and
1487    the destination decision ID at the given step. This is the transition
1488    chosen at that step whose consequences were triggered resulting in
1489    the next step. Returns `None` for steps where no transition was
1490    taken (e.g., wait, warp, etc.).
1491
1492    Note that in some cases due to e.g., a 'follow' effect, multiple
1493    transitions are taken at a step. In that case, this returns the name
1494    of the first transition taken (which would have triggered any
1495    others).
1496
1497    Also in some cases, there may be multiple starting nodes given, in
1498    which case the first such node (by ID order) which has a transition
1499    with the identified transition name will be returned, or None if none
1500    of them match.
1501    """
1502    start, transition, end = exploration.movementAtStep(step)
1503    graph = exploration[step].graph
1504    if start is None or transition is None:
1505        return None
1506    if isinstance(start, set):
1507        for dID in sorted(start):
1508            destination = graph.getDestination(dID, transition)
1509            if destination is not None:
1510                return (dID, transition, destination)
1511        return None
1512    else:
1513        destination = graph.getDestination(start, transition)
1514        if destination is not None:
1515            return (start, transition, destination)
1516        else:
1517            return None
1518
1519
1520@analyzer('exploration')
1521def transitionStepsTaken(
1522    exploration: core.DiscreteExploration
1523) -> Dict[OverspecificTransition, List[int]]:
1524    """
1525    Returns a dictionary mapping each specific transition that was taken
1526    at least once to the list of steps on which it was taken. Does NOT
1527    account for transitions elided by 'jaunt' warps, nor for transitions
1528    taken as a result of follow/bounce effects.
1529
1530    TODO: Account for those?
1531    """
1532    result: Dict[OverspecificTransition, List[int]] = {}
1533    for i in range(len(exploration)):
1534        taken = transitionTaken(exploration, i)
1535        if taken is not None:
1536            if taken in result:
1537                result[taken].append(i)
1538            else:
1539                result[taken] = [i]
1540
1541    return result
1542
1543
1544@analyzer('transition')
1545def stepsTaken(
1546    exploration: core.DiscreteExploration,
1547    source: base.DecisionID,
1548    transition: base.Transition,
1549    destination: base.DecisionID
1550) -> int:
1551    """
1552    Returns the list of exploration steps on which a particular
1553    transition has been taken. Returns an empty list for transitions that
1554    were never taken.
1555
1556    Note that this does NOT account for times taken as a result of
1557    follow/bounce effects, and it does NOT account for all times a
1558    transition was taken when warp effects are used as shorthand for
1559    jaunts across the graph.
1560    
1561    TODO: Try to account for those?
1562    """
1563    return transitionStepsTaken(exploration).get(
1564        (source, transition, destination),
1565        []
1566    )
1567
1568
1569@analyzer('transition')
1570def timesTaken(
1571    exploration: core.DiscreteExploration,
1572    source: base.DecisionID,
1573    transition: base.Transition,
1574    destination: base.DecisionID
1575) -> int:
1576    """
1577    Returns the number of times a particular transition has been taken
1578    throughout the exploration. Returns 0 for transitions that were never
1579    taken.
1580
1581    Note that this does NOT account for times taken as a result of
1582    follow/bounce effects, and it does NOT account for all times a
1583    transition was taken when warp effects are used as shorthand for
1584    jaunts across the graph.
1585    
1586    TODO: Try to account for those?
1587    """
1588    return len(stepsTaken(exploration, source, transition, destination))
1589
1590
1591#--------------------#
1592# Analysis functions #
1593#--------------------#
1594
1595def unexploredBranches(
1596    graph: core.DecisionGraph,
1597    context: Optional[base.RequirementContext] = None
1598) -> List[SpecificTransition]:
1599    """
1600    Returns a list of from-decision, transition-at-that-decision pairs
1601    which each identify an unexplored branch in the given graph.
1602
1603    When a `context` is provided it only counts options whose
1604    requirements are satisfied in that `RequirementContext`, and the
1605    'searchFrom' part of the context will be replaced by both ends of
1606    each transition tested. This doesn't perfectly map onto actually
1607    reachability since nodes between where the player is and where the
1608    option is might force changes in the game state that make it
1609    un-takeable.
1610
1611    TODO: add logic to detect trivially-unblocked edges?
1612    """
1613    result = []
1614    # TODO: Fix networkx type stubs for MultiDiGraph!
1615    for (src, dst, transition) in graph.allEdges():
1616        req = graph.getTransitionRequirement(src, transition)
1617        localContext: Optional[base.RequirementContext] = None
1618        if context is not None:
1619            localContext = base.RequirementContext(
1620                state=context.state,
1621                graph=context.graph,
1622                searchFrom=graph.bothEnds(src, transition)
1623            )
1624        # Check if this edge goes from a confirmed to an unconfirmed node
1625        if (
1626            graph.isConfirmed(src)
1627        and not graph.isConfirmed(dst)
1628        and (localContext is None or req.satisfied(localContext))
1629        ):
1630            result.append((src, transition))
1631    return result
1632
1633
1634@analyzer('step')
1635def allUnexploredBranches(
1636    exploration: core.DiscreteExploration,
1637    step: int
1638) -> List[SpecificTransition]:
1639    """
1640    Returns the list of unexplored branches in the specified situation's
1641    graph, regardless of traversibility (see `unexploredBranches`).
1642    """
1643    return unexploredBranches(exploration[step].graph)
1644
1645
1646unexploredBranchCount = registerCount(
1647    allUnexploredBranches,
1648    'unexploredBranchCount'
1649)
1650
1651
1652@analyzer('step')
1653def traversableUnexploredBranches(
1654    exploration: core.DiscreteExploration,
1655    step: int
1656) -> List[SpecificTransition]:
1657    """
1658    Returns the list of traversable unexplored branches in the specified
1659    situation's graph (see `unexploredBranches`). Does not perfectly
1660    account for all traversibility information, because it uses a single
1661    context from which to judge traversibility (TODO: Fix that).
1662    """
1663    situation = exploration[step]
1664    context = base.genericContextForSituation(
1665        situation,
1666        base.combinedDecisionSet(situation.state)
1667    )
1668    return unexploredBranches(situation.graph, context)
1669
1670
1671traversableUnexploredCount = registerCount(
1672    traversableUnexploredBranches,
1673    'traversableUnexploredCount'
1674)
1675
1676
1677@finalOnly
1678@analyzer('stepDecision')
1679def actions(
1680    exploration: core.DiscreteExploration,
1681    step: int,
1682    decision: base.DecisionID
1683) -> Optional[Set[base.Transition]]:
1684    """
1685    Given a particular decision at a particular step, returns the set of
1686    actions available at that decision in that step. Returns `None` if
1687    the specified decision does not exist.
1688    """
1689    graph = exploration[step].graph
1690    if decision not in graph:
1691        return None
1692    return graph.decisionActions(decision)
1693
1694
1695actionCount = registerCount(actions, 'actionCount')
1696finalOnly(actionCount)
1697
1698totalActions = registerStepCombined(
1699    'actionCount',
1700    'totalActions',
1701    sumCombiner
1702)
1703finalOnly(totalActions)
1704
1705meanActions = registerStepCombined(
1706    'actionCount',
1707    'meanActions',
1708    meanCombiner
1709)
1710finalOnly(meanActions)
1711
1712medianActions = registerStepCombined(
1713    'actionCount',
1714    'medianActions',
1715    medianCombiner
1716)
1717finalOnly(medianActions)
1718
1719
1720@finalOnly
1721@analyzer('stepDecision')
1722def branches(
1723    exploration: core.DiscreteExploration,
1724    step: int,
1725    decision: base.DecisionID
1726) -> Optional[int]:
1727    """
1728    Computes the number of branches at a particular decision, not
1729    counting actions, but counting as separate branches multiple
1730    transitions which lead to the same decision as each other. Returns
1731    `None` for unconfirmed and nonexistent decisions so that they aren't
1732    counted as part of averages, even though unconfirmed decisions do
1733    have countable branches.
1734    """
1735    graph = exploration[step].graph
1736    if decision not in graph or not graph.isConfirmed(decision):
1737        return None
1738
1739    dests = graph.destinationsFrom(decision)
1740    branches = 0
1741    for transition, dest in dests.items():
1742        if dest != decision:
1743            branches += 1
1744
1745    return branches
1746
1747
1748totalBranches = registerStepCombined(
1749    'branches',
1750    'totalBranches',
1751    sumCombiner
1752)
1753finalOnly(totalBranches)
1754
1755meanBranches = registerStepCombined(
1756    'branches',
1757    'meanBranches',
1758    meanCombiner
1759)
1760finalOnly(meanBranches)
1761
1762medianBranches = registerStepCombined(
1763    'branches',
1764    'medianBranches',
1765    medianCombiner
1766)
1767finalOnly(medianBranches)
1768
1769
1770@analyzer('decision')
1771def arrivals(
1772    exploration: core.DiscreteExploration,
1773    decision: base.DecisionID
1774) -> int:
1775    """
1776    Given an `DiscreteExploration` object and a particular `Decision`
1777    which exists at some point during that exploration, counts the number
1778    of times that decision was in the active decision set for a step
1779    after not being in that set the previous step. Effectively, counts
1780    how many times we arrived at that decision, ignoring steps where we
1781    remained at it due to a wait or an action or the like.
1782
1783    Returns 0 even for decisions that aren't part of the exploration.
1784    """
1785    visits = stepsVisited(exploration)
1786    result = 0
1787    prev = -2  # won't be contiguous with step 0
1788    for step in visits.get(decision, []):
1789        # if previous visited step wasn't the prior step it's a revisit
1790        if prev != step - 1:
1791            result += 1
1792        prev = step
1793
1794    return result
1795
1796
1797@analyzer('decision')
1798def revisits(
1799    exploration: core.DiscreteExploration,
1800    decision: base.DecisionID
1801) -> int:
1802    """
1803    Returns the number of times we revisited the target decision, which
1804    is just `arrivals` minus 1 for the first arrival, but not < 0.
1805    """
1806    return max(0, arrivals(exploration, decision) - 1)
1807
1808
1809totalRevisits = registerFullCombined(
1810    'revisits',
1811    'totalRevisits',
1812    sumCombiner
1813)
1814
1815meanRevisits = registerFullCombined(
1816    'revisits',
1817    'meanRevisits',
1818    meanCombiner
1819)
1820
1821medianRevisits = registerFullCombined(
1822    'revisits',
1823    'medianRevisits',
1824    medianCombiner
1825)
1826
1827
1828#-------------------#
1829# Paths & distances #
1830#-------------------#
1831
1832HopPaths: 'TypeAlias' = Dict[
1833    Tuple[base.DecisionID, base.DecisionID],
1834    Optional[List[base.DecisionID]]
1835]
1836"""
1837Records paths between decisions ignoring edge directions & requirements.
1838Stores a list of decision IDs to traverse keyed by a decision ID pair
1839where the smaller decision ID comes first (since paths are symmetric).
1840"""
1841
1842def hopDistance(
1843    hopPaths: HopPaths,
1844    src: base.DecisionID,
1845    dst: base.DecisionID
1846) -> Optional[int]:
1847    """
1848    Returns the number of hops required to move from the given source to
1849    the given destination, ignoring edge directions & requirements.
1850    Looks that up in the given `HopPaths` dictionary. Returns 0 when
1851    source and destination are the same.
1852
1853    For example:
1854
1855    >>> e = core.DiscreteExploration.example()
1856    >>> hops = shortestHopPaths(e)
1857    >>> hopDistance(hops, 0, 1)
1858    1
1859    >>> hopDistance(hops, 1, 0)
1860    1
1861    >>> hopDistance(hops, 0, 0)
1862    0
1863    >>> hopDistance(hops, 0, 0)
1864    0
1865    >>> hopDistance(hops, 0, 4) is None
1866    True
1867    >>> hopDistance(hops, 4, 0) is None
1868    True
1869    >>> hopDistance(hops, 0, 5)
1870    2
1871    >>> hopDistance(hops, 5, 0)
1872    2
1873    >>> hopDistance(hops, 5, 1)
1874    3
1875    >>> hopDistance(hops, 1, 5)
1876    3
1877    >>> hopDistance(hops, 3, 5)
1878    1
1879    >>> hopDistance(hops, 5, 3)
1880    1
1881    >>> dIDs = list(e[-1].graph)
1882    >>> for i, src in enumerate(dIDs):
1883    ...     for j in range(i + 1, len(dIDs)):
1884    ...         dst = dIDs[j]
1885    ...         assert (
1886    ...             hopDistance(hops, src, dst) == hopDistance(hops, dst, src)
1887    ...         )
1888    """
1889    if src == dst:
1890        return 0
1891    elif src < dst:
1892        path = hopPaths.get((src, dst))
1893        if path is None:
1894            return None
1895        else:
1896            return 1 + len(path)
1897    else:
1898        path = hopPaths.get((dst, src))
1899        if path is None:
1900            return None
1901        else:
1902            return 1 + len(path)
1903
1904
1905@elide
1906@analyzer('exploration')
1907def shortestHopPaths(
1908    exploration: core.DiscreteExploration,
1909    edgeFilter: Optional[Callable[
1910        [base.DecisionID, base.Transition, base.DecisionID, core.DecisionGraph],
1911        bool
1912    ]] = None
1913) -> HopPaths:
1914    """
1915    Creates a dictionary that holds shortest paths between pairs of
1916    nodes, ignoring edge directions and requirements entirely.
1917
1918    If given an `edgeFilter`, that function is applied with source ID,
1919    transition name, destination ID, and full graph as arguments and
1920    edges for which it returns False are ignored when computing hops.
1921    Note that you have to filter out all edges in both directions between
1922    two nodes for there not to be a 1-hop path between them.
1923
1924    Keys in the dictionary are pairs of decision IDs, where the decision
1925    with the smaller ID always comes first (because shortest hop paths
1926    are symmetric so we don't store the reverse paths). Values are lists
1927    of decision IDs that can be traversed to get from the first decision
1928    to the second, with an empty list indicating adjacent decisions
1929    (note that these "hop paths" cannot always be traversed in the
1930    actual graph because they may go the "wrong way" across one-way
1931    connections). The number of hops required to get between the nodes
1932    is one more than the length of the path. Decision pairs which are
1933    not reachable from each other will not be included in the
1934    dictionary. Only decisions present in the final graph in the
1935    exploration will be included, and only edges present in that final
1936    graph will be considered.
1937
1938    Where there are multiple shortest hop paths, an arbitrary one is
1939    included in the result.
1940
1941    TODO: EXAMPLE
1942    >>> e = core.DiscreteExploration.example()
1943    >>> print(e[-1].graph.namesListing(e[-1].graph))
1944      0 (House)
1945      1 (_u.0)
1946      2 (Cellar)
1947      3 (Yard)
1948      5 (Lane)
1949    <BLANKLINE>
1950    >>> shortest = dict(nx.all_pairs_shortest_path(e[-1].graph.connections()))
1951    >>> for src in shortest:
1952    ...    print(f"{src} -> {shortest[src]}")
1953    0 -> {0: [0], 1: [0, 1], 2: [0, 2], 3: [0, 3], 5: [0, 3, 5]}
1954    1 -> {1: [1], 0: [1, 0], 2: [1, 0, 2], 3: [1, 0, 3], 5: [1, 0, 3, 5]}
1955    2 -> {2: [2], 0: [2, 0], 3: [2, 3], 1: [2, 0, 1], 5: [2, 3, 5]}
1956    3 -> {3: [3], 0: [3, 0], 2: [3, 2], 5: [3, 5], 1: [3, 0, 1]}
1957    5 -> {5: [5], 3: [5, 3], 0: [5, 3, 0], 2: [5, 3, 2], 1: [5, 3, 0, 1]}
1958    >>> hops = shortestHopPaths(e)
1959    >>> for src in hops:
1960    ...     print(f"{src} -> {hops[src]}")
1961    (0, 1) -> []
1962    (0, 2) -> []
1963    (0, 3) -> []
1964    (0, 5) -> [3]
1965    (1, 2) -> [0]
1966    (1, 3) -> [0]
1967    (1, 5) -> [0, 3]
1968    (2, 3) -> []
1969    (2, 5) -> [3]
1970    (3, 5) -> []
1971    """
1972    graph = exploration[-1].graph
1973    allIDs = sorted(graph)
1974    connections = graph.connections(edgeFilter)
1975    shortest = dict(nx.all_pairs_shortest_path(connections))
1976
1977    result = {}
1978    for i, src in enumerate(allIDs):
1979        for j in range(i + 1, len(allIDs)):
1980            dst = allIDs[j]
1981            path = shortest.get(src, {}).get(dst, None)
1982            if path is not None:
1983                result[(src, dst)] = path[1:-1]
1984
1985    return result
1986
1987
1988#---------------#
1989# Full analysis #
1990#---------------#
1991
1992def runFullAnalysis(
1993    exploration: core.DiscreteExploration, 
1994    elide: Collection[str] = ELIDE,
1995    finalOnly: Collection[str] = FINAL_ONLY
1996) -> FullAnalysisResults:
1997    """
1998    Runs every single analysis function on every valid target for that
1999    function in the given exploration, building up the cache of
2000    `FullAnalysisResults` in `ALL_ANALYZERS`. Returns the relevant
2001    `FullAnalysisResults` object.
2002
2003    Skips analyzers in the provided `elide` collection, which by default
2004    is the `ELIDE` global set containing functions explicitly decorated
2005    with `elide`. Analyzers in the `FINAL_ONLY` set are only applied to
2006    the final decision graph in the exploration (although note that they
2007    might call other analyzers which recursively need to analyze prior
2008    steps). `finalOnly` only has an effect for analyzers with 'step',
2009    'stepDecision', or 'stepTransition' units.
2010    """
2011    for aName, analyzer in ALL_ANALYZERS.items():
2012        # Skip this one if we're told to
2013        if aName in elide:
2014            continue
2015        # Split out cases for each unit & apply as appropriate
2016        unit = analyzer._unit
2017        if unit == 'step':
2018            sa = cast(StepAnalyzer, analyzer)
2019            if aName in finalOnly:
2020                sa(exploration, len(exploration) - 1)
2021            else:
2022                for step in range(len(exploration)):
2023                    sa(exploration, step)
2024        elif unit == 'stepDecision':
2025            sda = cast(StepDecisionAnalyzer, analyzer)
2026            # Only apply to final graph if it's in finalOnly
2027            if aName in finalOnly:
2028                step = len(exploration) - 1
2029                for dID in exploration[step].graph:
2030                    sda(exploration, step, dID)
2031            else:
2032                for step in range(len(exploration)):
2033                    for dID in exploration[step].graph:
2034                        sda(exploration, step, dID)
2035        elif unit == 'stepTransition':
2036            sta = cast(StepTransitionAnalyzer, analyzer)
2037            if aName in finalOnly:
2038                step = len(exploration) - 1
2039                edges = exploration[step].graph.allEdges()
2040                for (src, dst, transition) in edges:
2041                    sta(exploration, step, src, transition, dst)
2042            else:
2043                for step in range(len(exploration)):
2044                    edges = exploration[step].graph.allEdges()
2045                    for (src, dst, transition) in edges:
2046                        sta(exploration, step, src, transition, dst)
2047        elif unit == 'decision':
2048            da = cast(DecisionAnalyzer, analyzer)
2049            for dID in exploration.allDecisions():
2050                da(exploration, dID)
2051        elif unit == 'transition':
2052            ta = cast(TransitionAnalyzer, analyzer)
2053            for (src, trans, dst) in exploration.allTransitions():
2054                ta(exploration, src, trans, dst)
2055        elif unit == 'exploration':
2056            ea = cast(ExplorationAnalyzer, analyzer)
2057            ea(exploration)
2058        else:
2059            raise ValueError(f"Invalid analysis unit {unit!r}.")
2060
2061    return ANALYSIS_RESULTS[id(exploration)]
2062
2063
2064#--------------------#
2065# Analysis accessors #
2066#--------------------#
2067
2068# These functions access pre-computed analysis results. Call
2069# `runFullAnalysis` first to populate those.
2070
2071
2072def getDecisionAnalyses(
2073    exploration: core.DiscreteExploration,
2074    dID: base.DecisionID
2075) -> AnalysisResults:
2076    """
2077    Retrieves all pre-computed all-step analysis results for the
2078    specified decision. Use `runFullAnalysis` or call specific analysis
2079    functions of interest first to populate these results. Does not
2080    include per-step decision analyses.
2081
2082    Returns the dictionary of `AnalysisResults`, which can be modified to
2083    update stored results if necessary (although it's better to write
2084    additional analysis routines using the `@analyzer` decorator).
2085    """
2086    cached = ANALYSIS_RESULTS.setdefault(
2087        id(exploration),
2088        newFullAnalysisResults()
2089    )
2090    return cached["perDecision"].setdefault(dID, {})
2091
2092
2093def getTransitionAnalyses(
2094    exploration: core.DiscreteExploration,
2095    source: base.DecisionID,
2096    transition: base.Transition,
2097    destination: base.DecisionID
2098) -> AnalysisResults:
2099    """
2100    Like `getDecisionAnalyses` but returns analyses for a transition
2101    instead of a decision.
2102    """
2103    cached = ANALYSIS_RESULTS.setdefault(
2104        id(exploration),
2105        newFullAnalysisResults()
2106    )
2107    return cached["perTransition"].setdefault(
2108        (source, transition, destination),
2109        {}
2110    )
2111
2112
2113def getStepDecisionAnalyses(
2114    exploration: core.DiscreteExploration,
2115    step: int,
2116    dID: base.DecisionID
2117) -> AnalysisResults:
2118    """
2119    Like `getDecisionAnalyses` but for analyses applicable only to the
2120    specified exploration step.
2121    """
2122    cached = ANALYSIS_RESULTS.setdefault(
2123        id(exploration),
2124        newFullAnalysisResults()
2125    )
2126    stepwise = cached.setdefault("perStepDecision", [])
2127    while step >= len(stepwise):
2128        stepwise.append({})
2129    return stepwise[step].setdefault(dID, {})
2130
2131
2132def getStepTransitionAnalyses(
2133    exploration: core.DiscreteExploration,
2134    step: int,
2135    source: base.DecisionID,
2136    transition: base.Transition,
2137    destination: base.DecisionID
2138) -> AnalysisResults:
2139    """
2140    Like `getStepDecisionAnalyses` but for a transition at a particular
2141    step, not a decision.
2142    """
2143    cached = ANALYSIS_RESULTS.setdefault(
2144        id(exploration),
2145        newFullAnalysisResults()
2146    )
2147    stepwise = cached.setdefault("perStepTransition", [])
2148    while step >= len(stepwise):
2149        stepwise.append({})
2150    return stepwise[step].setdefault((source, transition, destination), {})
2151
2152
2153def getStepAnalyses(
2154    exploration: core.DiscreteExploration,
2155    step: int
2156) -> AnalysisResults:
2157    """
2158    Like `getDecisionAnalyses` but retrieves full-step analysis results
2159    for the specified exploration step.
2160    """
2161    cached = ANALYSIS_RESULTS.setdefault(
2162        id(exploration),
2163        newFullAnalysisResults()
2164    )
2165    stepwise = cached.setdefault("perStep", [])
2166    while step >= len(stepwise):
2167        stepwise.append({})
2168    return stepwise[step]
2169
2170
2171def getExplorationAnalyses(
2172    exploration: core.DiscreteExploration
2173) -> AnalysisResults:
2174    """
2175    Like `getDecisionAnalyses` but retrieves full-exploration analysis
2176    results.
2177    """
2178    cached = ANALYSIS_RESULTS.setdefault(
2179        id(exploration),
2180        newFullAnalysisResults()
2181    )
2182    return cached.setdefault("overall", {})
2183
2184
2185class AnalyzersByUnit(TypedDict):
2186    """
2187    Holds lists of analyzers for each analysis unit type.
2188    """
2189    step: List[StepAnalyzer]
2190    stepDecision: List[StepDecisionAnalyzer]
2191    stepTransition: List[StepTransitionAnalyzer]
2192    decision: List[DecisionAnalyzer]
2193    transition: List[TransitionAnalyzer]
2194    exploration: List[ExplorationAnalyzer]
2195
2196
2197def analyzersByUnit(onlyInclude: Optional[Set[str]] = None) -> AnalyzersByUnit:
2198    """
2199    Returns an `AnalyzersByUnit` dictionary containing all analyzers
2200    from `ALL_ANALYZERS` which are in the given `onlyInclude` set (or
2201    just all of them if no set is specified). This will by default be all
2202    analyzers registered so far.
2203    """
2204    byUnit: AnalyzersByUnit = {
2205        "step": [],
2206        "stepDecision": [],
2207        "stepTransition": [],
2208        "decision": [],
2209        "transition": [],
2210        "exploration": []
2211    }
2212    for analyzerName in ALL_ANALYZERS:
2213        if onlyInclude is not None and analyzerName not in onlyInclude:
2214            continue
2215        analyzer = ALL_ANALYZERS[analyzerName]
2216        unit = analyzer._unit
2217        byUnit[unit].append(analyzer)  # type: ignore
2218        # Mypy will just have to trust that We've put the correct unit
2219        # values on each analyzer. That relationship is type-checked in
2220        # the `analyzer` definition.
2221
2222    return byUnit
def describeConsequence( consequence: List[Union[exploration.base.Challenge, exploration.base.Effect, exploration.base.Condition]]) -> str:
 31def describeConsequence(consequence: base.Consequence) -> str:
 32    """
 33    Returns a string which concisely describes a consequence list.
 34    Returns an empty string if given an empty consequence. Examples:
 35
 36    >>> describeConsequence([])
 37    ''
 38    >>> describeConsequence([
 39    ...     base.effect(gain=('gold', 5), delay=2, charges=3),
 40    ...     base.effect(lose='flight')
 41    ... ])
 42    'gain gold*5 ,2 =3; lose flight'
 43    >>> from . import commands
 44    >>> d = describeConsequence([
 45    ...     base.effect(edit=[
 46    ...         [
 47    ...             commands.command('val', '5'),
 48    ...             commands.command('empty', 'list'),
 49    ...             commands.command('append')
 50    ...         ],
 51    ...         [
 52    ...             commands.command('val', '11'),
 53    ...             commands.command('assign', 'var'),
 54    ...             commands.command('op', '+', '$var', '$var')
 55    ...         ],
 56    ...     ])
 57    ... ])
 58    >>> d
 59    'with consequences:\
 60\\n    edit {\
 61\\n      val 5;\
 62\\n      empty list;\
 63\\n      append $_;\
 64\\n    } {\
 65\\n      val 11;\
 66\\n      assign var $_;\
 67\\n      op + $var $var;\
 68\\n    }\
 69\\n'
 70    >>> for line in d.splitlines():
 71    ...     print(line)
 72    with consequences:
 73        edit {
 74          val 5;
 75          empty list;
 76          append $_;
 77        } {
 78          val 11;
 79          assign var $_;
 80          op + $var $var;
 81        }
 82    """
 83    edesc = ''
 84    pf = parsing.ParseFormat()
 85    if consequence:
 86        parts = []
 87        for item in consequence:
 88            # TODO: Challenges and Conditions here!
 89            if 'skills' in item:  # a Challenge
 90                item = cast(base.Challenge, item)
 91                parts.append(pf.unparseChallenge(item))
 92            elif 'value' in item:  # an Effect
 93                item = cast(base.Effect, item)
 94                parts.append(pf.unparseEffect(item))
 95            elif 'condition' in item:  # a Condition
 96                item = cast(base.Condition, item)
 97                parts.append(pf.unparseCondition(item))
 98            else:
 99                raise TypeError(
100                    f"Invalid consequence item (no 'skills', 'value', or"
101                    f" 'condition' key found):\n{repr(item)}"
102                )
103        edesc = '; '.join(parts)
104        if len(edesc) > 60 or '\n' in edesc:
105            edesc = 'with consequences:\n' + ';\n'.join(
106                textwrap.indent(part, '    ')
107                for part in parts
108            ) + '\n'
109
110    return edesc

Returns a string which concisely describes a consequence list. Returns an empty string if given an empty consequence. Examples:

>>> describeConsequence([])
''
>>> describeConsequence([
...     base.effect(gain=('gold', 5), delay=2, charges=3),
...     base.effect(lose='flight')
... ])
'gain gold*5 ,2 =3; lose flight'
>>> from . import commands
>>> d = describeConsequence([
...     base.effect(edit=[
...         [
...             commands.command('val', '5'),
...             commands.command('empty', 'list'),
...             commands.command('append')
...         ],
...         [
...             commands.command('val', '11'),
...             commands.command('assign', 'var'),
...             commands.command('op', '+', '$var', '$var')
...         ],
...     ])
... ])
>>> d
'with consequences:\n    edit {\n      val 5;\n      empty list;\n      append $_;\n    } {\n      val 11;\n      assign var $_;\n      op + $var $var;\n    }\n'
>>> for line in d.splitlines():
...     print(line)
with consequences:
    edit {
      val 5;
      empty list;
      append $_;
    } {
      val 11;
      assign var $_;
      op + $var $var;
    }
def describeProgress(exploration: exploration.core.DiscreteExploration) -> str:
113def describeProgress(exploration: core.DiscreteExploration) -> str:
114    """
115    Describes the progress of an exploration by noting each room/zone
116    visited and explaining the options visible at each point plus which
117    option was taken. Notes powers/tokens gained/lost along the way.
118    Returns a string.
119
120    Example:
121    >>> from exploration import journal
122    >>> e = journal.convertJournal('''\\
123    ... S Start::pit
124    ... A gain jump
125    ... A gain attack
126    ... n button check
127    ... zz Wilds
128    ... o up
129    ...   q _flight
130    ... o left
131    ... x left left_nook right
132    ... a geo_rock
133    ...   At gain geo*15
134    ...   At deactivate
135    ... o up
136    ...   q _tall_narrow
137    ... t right
138    ... o right
139    ...   q attack
140    ... ''')
141    >>> for line in describeProgress(e).splitlines():
142    ...    print(line)
143    Start of the exploration
144    Start exploring domain main at 0 (Start::pit)
145      Gained capability 'attack'
146      Gained capability 'jump'
147    At decision 0 (Start::pit)
148      In zone Start
149      In region Wilds
150      There are transitions:
151        left to unconfirmed
152        up to unconfirmed; requires _flight
153      1 note(s) at this step
154    Explore left from decision 0 (Start::pit) to 2 (now Start::left_nook)
155    At decision 2 (Start::left_nook)
156      There are transitions:
157        right to 0 (Start::pit)
158      There are actions:
159        geo_rock
160    Do action geo_rock
161      Gained 15 geo(s)
162    Take right from decision 2 (Start::left_nook) to 0 (Start::pit)
163    At decision 0 (Start::pit)
164      There are transitions:
165        left to 2 (Start::left_nook)
166        right to unconfirmed; requires attack
167        up to unconfirmed; requires _flight
168    Waiting for another action...
169    End of the exploration.
170    """
171    result = ''
172
173    regions: Set[base.Zone] = set()
174    zones: Set[base.Zone] = set()
175    last: Union[base.DecisionID, Set[base.DecisionID], None] = None
176    lastState: base.State = base.emptyState()
177    prevCapabilities = base.effectiveCapabilitySet(lastState)
178    prevMechanisms = lastState['mechanisms']
179    oldActiveDecisions: Set[base.DecisionID] = set()
180    for i, situation in enumerate(exploration):
181        if i == 0:
182            result += "Start of the exploration\n"
183
184        # Extract info
185        graph = situation.graph
186        activeDecisions = exploration.getActiveDecisions(i)
187        newActive = activeDecisions - oldActiveDecisions
188        departedFrom = exploration.movementAtStep(i)[0]
189        # TODO: use the other parts of this?
190        nowZones: Set[base.Zone] = set()
191        for active in activeDecisions:
192            nowZones |= graph.zoneAncestors(active)
193        regionsHere = set(
194            z
195            for z in nowZones
196            if graph.zoneHierarchyLevel(z) == 1
197        )
198        zonesHere = set(
199            z
200            for z in nowZones
201            if graph.zoneHierarchyLevel(z) == 0
202        )
203        here = departedFrom
204        state = situation.state
205        capabilities = base.effectiveCapabilitySet(state)
206        mechanisms = state['mechanisms']
207
208        # Describe capabilities gained/lost relative to previous step
209        # (i.e., as a result of the previous action)
210        gained = (
211            capabilities['capabilities']
212          - prevCapabilities['capabilities']
213        )
214        gainedTokens = []
215        for tokenType in capabilities['tokens']:
216            net = (
217                capabilities['tokens'][tokenType]
218              - prevCapabilities['tokens'].get(tokenType, 0)
219            )
220            if net != 0:
221                gainedTokens.append((tokenType, net))
222        changed = [
223            mID
224            for mID in list(mechanisms.keys()) + list(prevMechanisms.keys())
225            if mechanisms.get(mID) != prevMechanisms.get(mID)
226        ]
227
228        for capability in sorted(gained):
229            result += f"  Gained capability '{capability}'\n"
230
231        for tokenType, net in gainedTokens:
232            if net > 0:
233                result += f"  Gained {net} {tokenType}(s)\n"
234            else:
235                result += f"  Lost {-net} {tokenType}(s)\n"
236
237        for mID in changed:
238            oldState = prevMechanisms.get(mID, base.DEFAULT_MECHANISM_STATE)
239            newState = mechanisms.get(mID, base.DEFAULT_MECHANISM_STATE)
240
241            details = graph.mechanismDetails(mID)
242            if details is None:
243                mName = "(unknown)"
244            else:
245                mName = details[1]
246            result += (
247                f"  Set mechanism {mID} ({mName}) to {newState} (was"
248                f" {oldState})"
249            )
250            # TODO: Test this!
251
252        if isinstance(departedFrom, base.DecisionID):
253            # Print location info
254            if here != last:
255                if here is None:
256                    result += "Without a position...\n"
257                elif isinstance(here, set):
258                    result += f"With {len(here)} active decisions\n"
259                    # TODO: List them using namesListing?
260                else:
261                    result += f"At decision {graph.identityOf(here)}\n"
262            newZones = zonesHere - zones
263            for zone in sorted(newZones):
264                result += f"  In zone {zone}\n"
265            newRegions = regionsHere - regions
266            for region in sorted(newRegions):
267                result += f"  In region {region}\n"
268
269        elif isinstance(departedFrom, set):  # active in spreading domain
270            spreadingDomain = graph.domainFor(list(departedFrom)[0])
271            result += (
272                f"  In domain {spreadingDomain} with {len(departedFrom)}"
273                f" active decisions...\n"
274            )
275
276        else:
277            assert departedFrom is None
278
279        # Describe new position/positions at start of this step
280        if len(newActive) > 1:
281            newListing = ', '.join(
282                sorted(graph.identityOf(n) for n in newActive)
283            )
284            result += (
285                f"  There are {len(newActive)} new active decisions:"
286                f"\n  {newListing}"
287            )
288
289        elif len(newActive) == 1:
290            here = list(newActive)[0]
291
292            outgoing = graph.destinationsFrom(here)
293
294            transitions = {t: d for (t, d) in outgoing.items() if d != here}
295            actions = {t: d for (t, d) in outgoing.items() if d == here}
296            if transitions:
297                result += "  There are transitions:\n"
298                for transition in sorted(transitions):
299                    dest = transitions[transition]
300                    if not graph.isConfirmed(dest):
301                        destSpec = 'unconfirmed'
302                    else:
303                        destSpec = graph.identityOf(dest)
304                    req = graph.getTransitionRequirement(here, transition)
305                    rDesc = ''
306                    if req != base.ReqNothing():
307                        rDesc = f"; requires {req.unparse()}"
308                    cDesc = describeConsequence(
309                        graph.getConsequence(here, transition)
310                    )
311                    if cDesc:
312                        cDesc = '; ' + cDesc
313                    result += (
314                        f"    {transition} to {destSpec}{rDesc}{cDesc}\n"
315                    )
316
317            if actions:
318                result += "  There are actions:\n"
319                for action in sorted(actions):
320                    req = graph.getTransitionRequirement(here, action)
321                    rDesc = ''
322                    if req != base.ReqNothing():
323                        rDesc = f"; requires {req.unparse()}"
324                    cDesc = describeConsequence(
325                        graph.getConsequence(here, action)
326                    )
327                    if cDesc:
328                        cDesc = '; ' + cDesc
329                    if rDesc or cDesc:
330                        desc = (rDesc + cDesc)[2:]  # chop '; ' from either
331                        result += f"    {action} ({desc})\n"
332                    else:
333                        result += f"    {action}\n"
334
335        # note annotations
336        if len(situation.annotations) > 0:
337            result += (
338                f"  {len(situation.annotations)} note(s) at this step\n"
339            )
340
341        # Describe action taken
342        if situation.action is None and situation.type == "pending":
343            result += "Waiting for another action...\n"
344        else:
345            desc = base.describeExplorationAction(situation, situation.action)
346            desc = desc[0].capitalize() + desc[1:]
347            result += desc + '\n'
348
349        if i == len(exploration) - 1:
350            result += "End of the exploration.\n"
351
352        # Update state variables
353        oldActiveDecisions = activeDecisions
354        prevCapabilities = capabilities
355        prevMechanisms = mechanisms
356        regions = regionsHere
357        zones = zonesHere
358        if here is not None:
359            last = here
360        lastState = state
361
362    return result

Describes the progress of an exploration by noting each room/zone visited and explaining the options visible at each point plus which option was taken. Notes powers/tokens gained/lost along the way. Returns a string.

Example:

>>> from exploration import journal
>>> e = journal.convertJournal('''\
... S Start::pit
... A gain jump
... A gain attack
... n button check
... zz Wilds
... o up
...   q _flight
... o left
... x left left_nook right
... a geo_rock
...   At gain geo*15
...   At deactivate
... o up
...   q _tall_narrow
... t right
... o right
...   q attack
... ''')
>>> for line in describeProgress(e).splitlines():
...    print(line)
Start of the exploration
Start exploring domain main at 0 (Start::pit)
  Gained capability 'attack'
  Gained capability 'jump'
At decision 0 (Start::pit)
  In zone Start
  In region Wilds
  There are transitions:
    left to unconfirmed
    up to unconfirmed; requires _flight
  1 note(s) at this step
Explore left from decision 0 (Start::pit) to 2 (now Start::left_nook)
At decision 2 (Start::left_nook)
  There are transitions:
    right to 0 (Start::pit)
  There are actions:
    geo_rock
Do action geo_rock
  Gained 15 geo(s)
Take right from decision 2 (Start::left_nook) to 0 (Start::pit)
At decision 0 (Start::pit)
  There are transitions:
    left to 2 (Start::left_nook)
    right to unconfirmed; requires attack
    up to unconfirmed; requires _flight
Waiting for another action...
End of the exploration.
AnalysisUnit: TypeAlias = Literal['step', 'stepDecision', 'stepTransition', 'decision', 'transition', 'exploration']

The different kinds of analysis units we consider: per-step-per-decision, per-step-per-transition, per-step, per-final-decision, per-final-transition, and per-exploration (i.e. overall).

AnalysisResults: TypeAlias = Dict[str, Any]

Analysis results are dictionaries that map analysis routine names to results from those routines, which can be of any type.

SpecificTransition: TypeAlias = Tuple[int, str]

A specific transition is identified by its source decision ID and its transition name. Note that transitions which get renamed are treated as two separate transitions.

OverspecificTransition: TypeAlias = Tuple[int, str, int]

In contrast to a SpecificTransition, an OverspecificTransition includes the destination of the transition, which might help disambiguate cases where a transition is created, then re-targeted or deleted and re-created with a different destination. Transitions which get renamed still are treated as two separate transitions.

DecisionAnalyses: TypeAlias = Dict[int, Dict[str, Any]]

Decision analysis results are stored per-decision, with a dictionary of property-name → value associations. These properties either apply to decisions across all steps of an exploration, or apply to decisions in a particular core.DecisionGraph.

TransitionAnalyses: TypeAlias = Dict[Tuple[int, str, int], Dict[str, Any]]

Per-transition analysis results, similar to DecisionAnalyses.

StepAnalyses: TypeAlias = List[Dict[str, Any]]

Per-exploration-step analysis results are stored in a list and indexed by exploration step integers.

StepwiseDecisionAnalyses: TypeAlias = List[Dict[int, Dict[str, Any]]]

Per-step-per-decision analysis results are stored as a list of decision analysis results.

StepwiseTransitionAnalyses: TypeAlias = List[Dict[Tuple[int, str, int], Dict[str, Any]]]

Per-step-per-transition analysis results are stored as a list of transition analysis results.

ExplorationAnalyses: TypeAlias = Dict[str, Any]

Whole-exploration analyses are just a normal AnalysisResults dictionary.

class FullAnalysisResults(typing.TypedDict):
447class FullAnalysisResults(TypedDict):
448    """
449    Full analysis results hold every kind of analysis result in one
450    dictionary.
451    """
452    perDecision: DecisionAnalyses
453    perTransition: TransitionAnalyses
454    perStep: StepAnalyses
455    perStepDecision: StepwiseDecisionAnalyses
456    perStepTransition: StepwiseTransitionAnalyses
457    overall: ExplorationAnalyses

Full analysis results hold every kind of analysis result in one dictionary.

perDecision: Dict[int, Dict[str, Any]]
perTransition: Dict[Tuple[int, str, int], Dict[str, Any]]
perStep: List[Dict[str, Any]]
perStepDecision: List[Dict[int, Dict[str, Any]]]
perStepTransition: List[Dict[Tuple[int, str, int], Dict[str, Any]]]
overall: Dict[str, Any]
def newFullAnalysisResults() -> FullAnalysisResults:
460def newFullAnalysisResults() -> FullAnalysisResults:
461    """
462    Returns a new empty `FullAnalysisResults` dictionary.
463    """
464    return {
465        'perDecision': {},
466        'perTransition': {},
467        'perStep': [],
468        'perStepDecision': [],
469        'perStepTransition': [],
470        'overall': {}
471    }

Returns a new empty FullAnalysisResults dictionary.

Params = ~Params

Parameter specification variable for AnalysisFunction definition.

class AnalysisFunction(typing.Protocol[~Params]):
478class AnalysisFunction(Protocol[Params]):
479    """
480    Analysis functions are callable, but also have a `_unit` attribute
481    which is a string.
482    """
483    _unit: AnalysisUnit
484    __name__: str
485    __doc__: str
486    def __call__(
487        self,
488        exploration: core.DiscreteExploration,
489        *args: Params.args,
490        **kwargs: Params.kwargs
491    ) -> Any:
492        ...

Analysis functions are callable, but also have a _unit attribute which is a string.

AnalysisFunction(*args, **kwargs)
1953def _no_init_or_replace_init(self, *args, **kwargs):
1954    cls = type(self)
1955
1956    if cls._is_protocol:
1957        raise TypeError('Protocols cannot be instantiated')
1958
1959    # Already using a custom `__init__`. No need to calculate correct
1960    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1961    if cls.__init__ is not _no_init_or_replace_init:
1962        return
1963
1964    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1965    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1966    # searches for a proper new `__init__` in the MRO. The new `__init__`
1967    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1968    # instantiation of the protocol subclass will thus use the new
1969    # `__init__` and no longer call `_no_init_or_replace_init`.
1970    for base in cls.__mro__:
1971        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1972        if init is not _no_init_or_replace_init:
1973            cls.__init__ = init
1974            break
1975    else:
1976        # should not happen
1977        cls.__init__ = object.__init__
1978
1979    cls.__init__(self, *args, **kwargs)
StepAnalyzer: TypeAlias = AnalysisFunction[(<class 'int'>,)]

A step analyzer is a function which will receive a core.DiscreteExploration along with the step in that exploration being considered. It can return any type of analysis result.

StepDecisionAnalyzer: TypeAlias = AnalysisFunction[(<class 'int'>, <class 'int'>)]

Like a StepAnalyzer but also gets a decision ID to consider.

StepTransitionAnalyzer: TypeAlias = AnalysisFunction[(<class 'int'>, <class 'int'>, <class 'str'>, <class 'int'>)]

Like a StepAnalyzer but also gets a source decision ID, a transition name, and a destination decision ID to target.

DecisionAnalyzer: TypeAlias = AnalysisFunction[(<class 'int'>,)]

A decision analyzer gets full analysis results to update plus an exploration and a particular decision ID to consider.

TransitionAnalyzer: TypeAlias = AnalysisFunction[(<class 'int'>, <class 'str'>, <class 'int'>)]

Like a DecisionAnalyzer but gets a transition name as well.

ExplorationAnalyzer: TypeAlias = AnalysisFunction[()]

Analyzes overall properties of an entire core.DiscreteExploration.

AnyAnalyzer: TypeAlias = Union[AnalysisFunction[()], AnalysisFunction[(<class 'int'>, <class 'str'>, <class 'int'>)], AnalysisFunction[(<class 'int'>,)], AnalysisFunction[(<class 'int'>, <class 'int'>)], AnalysisFunction[(<class 'int'>, <class 'int'>, <class 'str'>, <class 'int'>)]]
ANALYSIS_RESULTS: Dict[int, FullAnalysisResults] = {}

Caches analysis results, keyed by the id of the core.DiscreteExploration they're based on.

class NotCached:
556class NotCached:
557    """
558    Reference object for specifying that no cached value is available,
559    since `None` is a valid cached value.
560    """
561    pass

Reference object for specifying that no cached value is available, since None is a valid cached value.

def lookupAnalysisResult( cache: FullAnalysisResults, analyzer: AnalysisFunction, argsInOrder: Sequence[Any]) -> Union[Type[NotCached], Any]:
564def lookupAnalysisResult(
565    cache: FullAnalysisResults,
566    analyzer: AnalysisFunction,
567    argsInOrder: Sequence[Any]
568) -> Union[Type[NotCached], Any]:
569    """
570    Looks up an analysis result for the given function in the given
571    cache. The function must have been decorated with `analyzer`. The
572    bound arguments must match the unit of analysis, for example, if the
573    unit is 'stepDecision', the arguments must be those for a
574    `StepDecisionAnalyzer`. The bound arguments should have had
575    `apply_defaults` called already to fill in default argument values.
576    Returns the special object `NotCached` if there is no cached value
577    for the specified arguments yet.
578    """
579    unit = analyzer._unit
580    if unit == 'step':
581        whichStep = argsInOrder[1]
582        perStep = cache['perStep']
583        while len(perStep) <= whichStep:
584            perStep.append({})
585        return perStep[whichStep].get(analyzer.__name__, NotCached)
586    elif unit == 'stepDecision':
587        whichStep = argsInOrder[1]
588        whichDecision = argsInOrder[2]
589        perStepDecision = cache['perStepDecision']
590        while len(perStepDecision) <= whichStep:
591            perStepDecision.append({})
592        forThis = perStepDecision[whichStep].get(whichDecision)
593        if forThis is None:
594            return NotCached
595        return forThis.get(analyzer.__name__, NotCached)
596    elif unit == 'stepTransition':
597        whichStep = argsInOrder[1]
598        whichTransition = (argsInOrder[2], argsInOrder[3], argsInOrder[4])
599        perStepTransition = cache['perStepTransition']
600        while len(perStepTransition) <= whichStep:
601            perStepTransition.append({})
602        forThis = perStepTransition[whichStep].get(whichTransition)
603        if forThis is None:
604            return NotCached
605        return forThis.get(analyzer.__name__, NotCached)
606    elif unit == 'decision':
607        whichDecision = argsInOrder[1]
608        perDecision = cache['perDecision']
609        if whichDecision not in perDecision:
610            return NotCached
611        return perDecision[whichDecision].get(analyzer.__name__, NotCached)
612    elif unit == 'transition':
613        whichTransition = (argsInOrder[1], argsInOrder[2], argsInOrder[3])
614        perTransition = cache['perTransition']
615        if whichTransition not in perTransition:
616            return NotCached
617        return perTransition[whichTransition].get(
618            analyzer.__name__,
619            NotCached
620        )
621    elif unit == 'exploration':
622        return cache['overall'].get(analyzer.__name__, NotCached)
623    else:
624        raise ValueError(f"Invalid analysis unit {unit!r}.")

Looks up an analysis result for the given function in the given cache. The function must have been decorated with analyzer. The bound arguments must match the unit of analysis, for example, if the unit is 'stepDecision', the arguments must be those for a StepDecisionAnalyzer. The bound arguments should have had apply_defaults called already to fill in default argument values. Returns the special object NotCached if there is no cached value for the specified arguments yet.

def saveAnalysisResult( cache: FullAnalysisResults, result: Any, analyzer: AnalysisFunction, argsInOrder: Sequence[Any]) -> None:
627def saveAnalysisResult(
628    cache: FullAnalysisResults,
629    result: Any,
630    analyzer: AnalysisFunction,
631    argsInOrder: Sequence[Any]
632) -> None:
633    """
634    Saves an analysis result in the specified cache. The bound arguments
635    must match the unit, for example, if the unit is 'stepDecision', the
636    arguments must be those for a `StepDecisionAnalyzer`.
637    """
638    unit = analyzer._unit
639    if unit == 'step':
640        whichStep = argsInOrder[1]
641        perStep = cache['perStep']
642        while len(perStep) <= whichStep:
643            perStep.append({})
644        perStep[whichStep][analyzer.__name__] = result
645    elif unit == 'stepDecision':
646        whichStep = argsInOrder[1]
647        whichDecision = argsInOrder[2]
648        perStepDecision = cache['perStepDecision']
649        while len(perStepDecision) <= whichStep:
650            perStepDecision.append({})
651        forThis = perStepDecision[whichStep].setdefault(whichDecision, {})
652        forThis[analyzer.__name__] = result
653    elif unit == 'stepTransition':
654        whichStep = argsInOrder[1]
655        whichTransition = (argsInOrder[2], argsInOrder[3], argsInOrder[4])
656        perStepTransition = cache['perStepTransition']
657        while len(perStepTransition) <= whichStep:
658            perStepTransition.append({})
659        forThis = perStepTransition[whichStep].setdefault(whichTransition, {})
660        forThis[analyzer.__name__] = result
661    elif unit == 'decision':
662        whichDecision = argsInOrder[1]
663        perDecision = cache['perDecision']
664        perDecision.setdefault(whichDecision, {})[analyzer.__name__] = result
665    elif unit == 'transition':
666        whichTransition = (argsInOrder[1], argsInOrder[2], argsInOrder[3])
667        perTransition = cache['perTransition']
668        perTransition.setdefault(
669            whichTransition,
670            {}
671        )[analyzer.__name__] = result
672    elif unit == 'exploration':
673        cache['overall'][analyzer.__name__] = result
674    else:
675        raise ValueError(f"Invalid analysis unit {unit!r}.")

Saves an analysis result in the specified cache. The bound arguments must match the unit, for example, if the unit is 'stepDecision', the arguments must be those for a StepDecisionAnalyzer.

ALL_ANALYZERS: Dict[str, Union[AnalysisFunction[()], AnalysisFunction[(<class 'int'>, <class 'str'>, <class 'int'>)], AnalysisFunction[(<class 'int'>,)], AnalysisFunction[(<class 'int'>, <class 'int'>)], AnalysisFunction[(<class 'int'>, <class 'int'>, <class 'str'>, <class 'int'>)]]] = {'finalIdentity': <function finalIdentity>, 'currentDecision': <function currentDecision>, 'currentDecisionIdentity': <function currentDecisionIdentity>, 'observedSoFar': <function observedSoFar>, 'totalDecisionsSoFar': <function registerCount.<locals>.countAnalyzer>, 'justObserved': <function justObserved>, 'newDecisionCount': <function registerCount.<locals>.countAnalyzer>, 'hasBeenObserved': <function hasBeenObserved>, 'stepsObserved': <function stepsObserved>, 'stepObserved': <function stepObserved>, 'stepsConfirmed': <function stepsConfirmed>, 'stepConfirmed': <function stepConfirmed>, 'stepsVisited': <function stepsVisited>, 'hasBeenVisited': <function hasBeenVisited>, 'stepFirstVisited': <function stepFirstVisited>, 'stepsActive': <function stepsActive>, 'stepsTransisionsObserved': <function stepsTransisionsObserved>, 'stepObservedTransition': <function stepObservedTransition>, 'transitionTaken': <function transitionTaken>, 'transitionStepsTaken': <function transitionStepsTaken>, 'stepsTaken': <function stepsTaken>, 'timesTaken': <function timesTaken>, 'allUnexploredBranches': <function allUnexploredBranches>, 'unexploredBranchCount': <function registerCount.<locals>.countAnalyzer>, 'traversableUnexploredBranches': <function traversableUnexploredBranches>, 'traversableUnexploredCount': <function registerCount.<locals>.countAnalyzer>, 'actions': <function actions>, 'actionCount': <function registerCount.<locals>.countAnalyzer>, 'totalActions': <function registerStepCombined.<locals>.analysisCombiner>, 'meanActions': <function registerStepCombined.<locals>.analysisCombiner>, 'medianActions': <function registerStepCombined.<locals>.analysisCombiner>, 'branches': <function branches>, 'totalBranches': <function registerStepCombined.<locals>.analysisCombiner>, 'meanBranches': <function registerStepCombined.<locals>.analysisCombiner>, 'medianBranches': <function registerStepCombined.<locals>.analysisCombiner>, 'arrivals': <function arrivals>, 'revisits': <function revisits>, 'totalRevisits': <function registerFullCombined.<locals>.analysisCombiner>, 'meanRevisits': <function registerFullCombined.<locals>.analysisCombiner>, 'medianRevisits': <function registerFullCombined.<locals>.analysisCombiner>, 'shortestHopPaths': <function shortestHopPaths>}

Holds all analyzers indexed by name with the analysis unit plus function as the value. The analyzer decorator registers them.

RECORD_PROFILE: bool = False

Whether or not to record time spent by each analysis function.

class AnalyzerPerf(typing.TypedDict):
691class AnalyzerPerf(TypedDict):
692    """
693    Tracks performance of an analysis function, recording total calls,
694    non-cached calls, time spent looking up cached results, and time
695    spent in non-cached calls (including the failed cache lookup and
696    saving the result in the cache). 
697    """
698    calls: int
699    nonCached: int
700    lookupTime: float
701    analyzeTime: float

Tracks performance of an analysis function, recording total calls, non-cached calls, time spent looking up cached results, and time spent in non-cached calls (including the failed cache lookup and saving the result in the cache).

calls: int
nonCached: int
lookupTime: float
analyzeTime: float
def newAnalyzerPerf() -> AnalyzerPerf:
704def newAnalyzerPerf() -> AnalyzerPerf:
705    """
706    Creates a new empty `AnalyzerPerf` dictionary.
707    """
708    return {
709        "calls": 0,
710        "nonCached": 0,
711        "lookupTime": 0.0,
712        "analyzeTime": 0.0
713    }

Creates a new empty AnalyzerPerf dictionary.

ANALYSIS_TIME_SPENT: Dict[str, AnalyzerPerf] = {}

Records number-of-calls, number-of-non-cached calls, and time spent in each analysis function, when RECORD_PROFILE is set to True.

ELIDE: Set[str] = {'hasBeenObserved', 'shortestHopPaths'}

Analyzers which should not be included in CSV output by default.

FINAL_ONLY: Set[str] = {'meanActions', 'actionCount', 'hasBeenVisited', 'actions', 'meanBranches', 'totalActions', 'branches', 'medianActions', 'totalBranches', 'medianBranches'}

Per-step/step-decision/step-transition analyzers which should by default only be applied to the final step of an exploration to save time.

def getArgsInOrder(f: Callable, *args: Any, **kwargs: Any) -> List[Any]:
735def getArgsInOrder(
736    f: Callable,
737    *args: Any,
738    **kwargs: Any
739) -> List[Any]:
740    """
741    Given a callable and some arguments, returns a list of argument and
742    some arguments, returns a list of argument values in the same order
743    as that function would accept them from the given arguments,
744    accounting for things like keyword arguments and default values.
745
746    For example:
747
748    >>> def f(a, /, b, *more, x=3, y=10, **kw):
749    ...     pass
750    >>> sig = inspect.Signature.from_callable(f)
751    >>> getArgsInOrder(f, 1, 2)
752    [1, 2, 3, 10]
753    >>> getArgsInOrder(f, 4, 5, y=2, x=8)
754    [4, 5, 8, 2]
755    >>> getArgsInOrder(f, 4, y=2, x=8, b=3)
756    [4, 3, 8, 2]
757    >>> getArgsInOrder(f, 4, y=2, x=8, b=3)
758    [4, 3, 8, 2]
759    >>> getArgsInOrder(f, 1, 2, 3, 4)
760    [1, 2, 3, 4, 3, 10]
761    >>> getArgsInOrder(f, 1, 2, 3, 4, q=5, k=9)
762    [1, 2, 3, 4, 3, 10, 5, 9]
763    """
764    sig = inspect.Signature.from_callable(f)
765    bound = sig.bind(*args, **kwargs)
766    bound.apply_defaults()
767    result = []
768    for paramName in sig.parameters:
769        param = sig.parameters[paramName]
770        if param.kind in (
771            inspect.Parameter.POSITIONAL_ONLY,
772            inspect.Parameter.POSITIONAL_OR_KEYWORD,
773        ):
774            result.append(bound.arguments[paramName])
775        elif param.kind == inspect.Parameter.VAR_POSITIONAL:
776            result.extend(bound.arguments[paramName])
777        elif param.kind == inspect.Parameter.KEYWORD_ONLY:
778            result.append(bound.arguments[paramName])
779        elif param.kind == inspect.Parameter.VAR_KEYWORD:
780            result.extend(bound.arguments[paramName].values())
781
782    return result

Given a callable and some arguments, returns a list of argument and some arguments, returns a list of argument values in the same order as that function would accept them from the given arguments, accounting for things like keyword arguments and default values.

For example:

>>> def f(a, /, b, *more, x=3, y=10, **kw):
...     pass
>>> sig = inspect.Signature.from_callable(f)
>>> getArgsInOrder(f, 1, 2)
[1, 2, 3, 10]
>>> getArgsInOrder(f, 4, 5, y=2, x=8)
[4, 5, 8, 2]
>>> getArgsInOrder(f, 4, y=2, x=8, b=3)
[4, 3, 8, 2]
>>> getArgsInOrder(f, 4, y=2, x=8, b=3)
[4, 3, 8, 2]
>>> getArgsInOrder(f, 1, 2, 3, 4)
[1, 2, 3, 4, 3, 10]
>>> getArgsInOrder(f, 1, 2, 3, 4, q=5, k=9)
[1, 2, 3, 4, 3, 10, 5, 9]
def analyzer( unit: Literal['step', 'stepDecision', 'stepTransition', 'decision', 'transition', 'exploration']) -> Callable[[Callable[Concatenate[exploration.core.DiscreteExploration, ~Params], Any]], AnalysisFunction]:
785def analyzer(unit: AnalysisUnit) -> Callable[
786    [Callable[Concatenate[core.DiscreteExploration, Params], Any]],
787    AnalysisFunction
788]:
789    '''
790    Decorator which sets up caching for an analysis function in the
791    global `ANALYSIS_RESULTS` dictionary. Whenever the decorated function
792    is called, it will first check whether a cached result is available
793    for the same target exploration (by id) and additional target info
794    based on the analysis unit type. If so, the cached result will be
795    returned. This allows analysis functions to simply call each other
796    when they need results and themselves recursively if they need to
797    track things across steps/decisions, while avoiding tons of duplicate
798    work.
799    '''
800    def makeCachedAnalyzer(
801        baseFunction: Callable[
802            Concatenate[core.DiscreteExploration, Params],
803            Any
804        ]
805    ) -> AnalysisFunction:
806        """
807        Decoration function which registers an analysis function with
808        pre-specified dependencies.
809        """
810        analysisFunction = cast(AnalysisFunction, baseFunction)
811        analysisFunction._unit = unit
812        analyzerName= analysisFunction.__name__
813
814        @functools.wraps(analysisFunction)
815        def cachingAnalyzer(
816            exploration: core.DiscreteExploration,
817            *args: Params.args,
818            **kwargs: Params.kwargs
819        ):
820            """
821            This docstring will be replaced with the docstring of the
822            decorated function plus a note about caching.
823            """
824            if RECORD_PROFILE:
825                ANALYSIS_TIME_SPENT.setdefault(
826                    analyzerName,
827                    newAnalyzerPerf()
828                )
829                perf = ANALYSIS_TIME_SPENT[analyzerName]
830                perf["calls"] += 1
831                start = time.perf_counter()
832            cache = ANALYSIS_RESULTS.setdefault(
833                id(exploration),
834                newFullAnalysisResults()
835            )
836            argsInOrder = getArgsInOrder(
837                baseFunction,
838                exploration,
839                *args,
840                **kwargs
841            )
842            cachedResult = lookupAnalysisResult(
843                cache,
844                analysisFunction,
845                argsInOrder
846            )
847            if cachedResult is not NotCached:
848                if RECORD_PROFILE:
849                    perf["lookupTime"] += time.perf_counter() - start
850                return cachedResult
851
852            result = analysisFunction(exploration, *args, **kwargs)
853            saveAnalysisResult(cache, result, analysisFunction, argsInOrder)
854            if RECORD_PROFILE:
855                perf["nonCached"] += 1
856                perf["analyzeTime"] += time.perf_counter() - start
857            return result
858
859        cachingAnalyzer.__doc__ = (
860            textwrap.dedent(analysisFunction.__doc__)
861          + """
862
863This function's results are cached in the `ALL_ANALYZERS` dictionary, and
864it returns cached results when possible. Use `clearAnalysisCache` to
865clear the analysis cache.
866"""
867        )
868
869        # Save caching version of analyzer
870        result = cast(AnalysisFunction, cachingAnalyzer)
871        ALL_ANALYZERS[analyzerName] = result
872        return result
873
874    return makeCachedAnalyzer

Decorator which sets up caching for an analysis function in the global ANALYSIS_RESULTS dictionary. Whenever the decorated function is called, it will first check whether a cached result is available for the same target exploration (by id) and additional target info based on the analysis unit type. If so, the cached result will be returned. This allows analysis functions to simply call each other when they need results and themselves recursively if they need to track things across steps/decisions, while avoiding tons of duplicate work.

T = ~T

Type variable for elide and finalOnly.

def elide(analyzer: ~T) -> ~T:
880def elide(analyzer: T) -> T:
881    """
882    Returns the given analyzer after noting that its result should *not*
883    be included in CSV output by default.
884    """
885    ELIDE.add(analyzer.__name__)
886    return analyzer

Returns the given analyzer after noting that its result should not be included in CSV output by default.

def finalOnly(analyzer: ~T) -> ~T:
889def finalOnly(analyzer: T) -> T:
890    """
891    Returns the given analyzer after noting that it should only be run on
892    the final exploration step by default.
893    """
894    FINAL_ONLY.add(analyzer.__name__)
895    return analyzer

Returns the given analyzer after noting that it should only be run on the final exploration step by default.

AnalyzerType = ~AnalyzerType

Type var to forward through analyzer types.

def registerCount(target: ~AnalyzerType, sizeName: str) -> ~AnalyzerType:
907def registerCount(target: AnalyzerType, sizeName: str) -> AnalyzerType:
908    """
909    Registers a new analysis routine which uses the same analysis unit as
910    the target routine but which returns the length of that routine's
911    result. Returns `None` if the target routine does.
912
913    Needs the target routine and the name to register the new analysis
914    routine under.
915
916    Returns the analysis function it creates.
917    """
918    def countAnalyzer(*args, **kwargs):
919        'To be replaced'
920        result = target(*args, **kwargs)
921        if result is None:
922            return None
923        else:
924            return len(result)
925
926    countAnalyzer.__doc__ = (
927        f"Measures count of the {target.__name__!r} result applied to"
928        f" {target._unit!r}."
929    )
930    countAnalyzer.__name__ = sizeName
931
932    # Register the new function & return the result
933    return cast(
934        AnalyzerType,
935        analyzer(target._unit)(countAnalyzer)
936    )

Registers a new analysis routine which uses the same analysis unit as the target routine but which returns the length of that routine's result. Returns None if the target routine does.

Needs the target routine and the name to register the new analysis routine under.

Returns the analysis function it creates.

CombinerResult = ~CombinerResult

Type variable for the result of a combiner function.

StepCombiner: TypeAlias = Callable[[Dict[Union[int, Tuple[int, str, int]], Any], exploration.core.DiscreteExploration, int], ~CombinerResult]

A combiner function which gets a dictionary of per-decision or per-transition values along with an exploration object and a step index and combines the values into a CombinerResult that's specific to that step.

OverallCombiner: TypeAlias = Callable[[Dict[Union[int, Tuple[int, str, int]], Any], exploration.core.DiscreteExploration], ~CombinerResult]

A combiner function which gets a dictionary of per-decision, per-transition, and/or per-step values along with an exploration object and combines the values into a CombinerResult.

def registerStepCombined( name: str, resultName: str, combiner: Callable[[Dict[Union[int, Tuple[int, str, int]], Any], exploration.core.DiscreteExploration, int], ~CombinerResult]) -> AnalysisFunction[(<class 'int'>,)]:
 973def registerStepCombined(
 974    name: str,
 975    resultName: str,
 976    combiner: StepCombiner[CombinerResult]
 977) -> StepAnalyzer:
 978    """
 979    Registers a new analysis routine which combines results of another
 980    routine either across all decisions/transitions at a step. The new
 981    routine will have a 'step' analysis unit.
 982
 983    Needs the name of the target routine, the name to register the new
 984    analysis routine under, and the function that will be called to
 985    combine results, given a dictionary of results that maps
 986    decisions/transitions to results for each.
 987
 988    Returns the analysis function it creates.
 989    """
 990    # Target function
 991    target = ALL_ANALYZERS[name]
 992    # Analysis unit of the target function
 993    targetUnit = target._unit
 994
 995    if targetUnit not in ('stepDecision', 'stepTransition'):
 996        raise ValueError(
 997            f"Target analysis routine {name!r} has incompatible analysis"
 998            f" unit {targetUnit!r}."
 999        )
1000
1001    def analysisCombiner(
1002        exploration: core.DiscreteExploration,
1003        step: int
1004    ) -> CombinerResult:
1005        'To be replaced'
1006        # Declare data here as generic type
1007        data: Dict[
1008            Union[base.DecisionID, OverspecificTransition, int],
1009            Any
1010        ]
1011        graph = exploration[step].graph
1012        if targetUnit == "stepDecision":
1013            analyzeStepDecision = cast(StepDecisionAnalyzer, target)
1014            data = {
1015                dID: analyzeStepDecision(exploration, step, dID)
1016                for dID in graph
1017            }
1018        elif targetUnit == "stepTransition":
1019            edges = graph.allEdges()
1020            analyzeStepTransition = cast(StepTransitionAnalyzer, target)
1021            data = {
1022                (src, transition, dst): analyzeStepTransition(
1023                    exploration,
1024                    step,
1025                    src,
1026                    transition,
1027                    dst
1028                )
1029                for (src, dst, transition) in edges
1030            }
1031        else:
1032            raise ValueError(
1033                f"Target analysis routine {name!r} has inconsistent"
1034                f" analysis unit {targetUnit!r} for 'step' result"
1035                f" unit."
1036            )
1037        return combiner(data, exploration, step)
1038
1039    analysisCombiner.__doc__ = (
1040        f"Computes {combiner.__name__} for the {name!r} result over all"
1041        f" {targetUnit}s at each step."
1042    )
1043    analysisCombiner.__name__ = resultName
1044
1045    # Register the new function & return it
1046    return analyzer("step")(analysisCombiner)

Registers a new analysis routine which combines results of another routine either across all decisions/transitions at a step. The new routine will have a 'step' analysis unit.

Needs the name of the target routine, the name to register the new analysis routine under, and the function that will be called to combine results, given a dictionary of results that maps decisions/transitions to results for each.

Returns the analysis function it creates.

def registerFullCombined( name: str, resultName: str, combiner: Callable[[Dict[Union[int, Tuple[int, str, int]], Any], exploration.core.DiscreteExploration], ~CombinerResult]) -> AnalysisFunction[()]:
1049def registerFullCombined(
1050    name: str,
1051    resultName: str,
1052    combiner: OverallCombiner[CombinerResult]
1053) -> ExplorationAnalyzer:
1054    """
1055    Works like `registerStepCombined` but combines results over
1056    decisions/transitions/steps across the entire exploration to get one
1057    result for the entire thing, not one result per step. May also
1058    target an existing `ExplorationAnalyzer` whose result is a
1059    dictionary, in which case it will combine that dictionary's values.
1060
1061    Needs the name of the target routine, the name to register the new
1062    analysis routine under, and the function that will be called to
1063    combine results, given a dictionary of results that maps
1064    decisions/transitions/steps to results for each.
1065
1066    Returns the analysis function it creates.
1067    """
1068    # Target function
1069    target = ALL_ANALYZERS[name]
1070    # Analysis unit of the target function
1071    targetUnit = target._unit
1072    if targetUnit not in ('step', 'decision', 'transition', 'exploration'):
1073        raise ValueError(
1074            f"Target analysis routine {name!r} has incompatible analysis"
1075            f" unit {targetUnit!r}."
1076        )
1077
1078    def analysisCombiner(  # type: ignore
1079        exploration: core.DiscreteExploration,
1080    ) -> CombinerResult:
1081        'To be replaced'
1082        # Declare data here as generic type
1083        data: Dict[
1084            Union[base.DecisionID, OverspecificTransition, int],
1085            Any
1086        ]
1087        if targetUnit == "step":
1088            analyzeStep = cast(StepAnalyzer, target)
1089            data = {
1090                step: analyzeStep(exploration, step)
1091                for step in range(len(exploration))
1092            }
1093        elif targetUnit == "decision":
1094            analyzeDecision = cast(DecisionAnalyzer, target)
1095            data = {
1096                dID: analyzeDecision(exploration, dID)
1097                for dID in exploration.allDecisions()
1098            }
1099        elif targetUnit == "transition":
1100            analyzeTransition = cast(TransitionAnalyzer, target)
1101            data = {
1102                (src, transition, dst): analyzeTransition(
1103                    exploration,
1104                    src,
1105                    transition,
1106                    dst
1107                )
1108                for (src, transition, dst) in exploration.allTransitions()
1109            }
1110        elif targetUnit == "exploration":
1111            analyzeExploration = cast(ExplorationAnalyzer, target)
1112            data = analyzeExploration(exploration)
1113        else:
1114            raise ValueError(
1115                f"Target analysis routine {name!r} has inconsistent"
1116                f" analysis unit {targetUnit!r} for 'step' result"
1117                f" unit."
1118            )
1119        return combiner(data, exploration)
1120
1121    analysisCombiner.__doc__ = (
1122        f"Computes {combiner.__name__} for the {name!r} result over all"
1123        f" {targetUnit}s."
1124    )
1125    analysisCombiner.__name__ = resultName
1126
1127    # Register the new function & return it
1128    return analyzer("exploration")(analysisCombiner)

Works like registerStepCombined but combines results over decisions/transitions/steps across the entire exploration to get one result for the entire thing, not one result per step. May also target an existing ExplorationAnalyzer whose result is a dictionary, in which case it will combine that dictionary's values.

Needs the name of the target routine, the name to register the new analysis routine under, and the function that will be called to combine results, given a dictionary of results that maps decisions/transitions/steps to results for each.

Returns the analysis function it creates.

def sumCombiner(data, *_):
1131def sumCombiner(data, *_):
1132    """
1133    Computes sum over numeric data as a "combiner" function to be used
1134    with `registerStepCombined` or `registerFullCombined`.
1135
1136    Only sums values which are `int`s, `float`s, or `complex`es, ignoring
1137    any other values.
1138    """
1139    return sum(
1140        x for x in data.values() if isinstance(x, (int, float, complex))
1141    )

Computes sum over numeric data as a "combiner" function to be used with registerStepCombined or registerFullCombined.

Only sums values which are ints, floats, or complexes, ignoring any other values.

def meanCombiner(data, *_):
1144def meanCombiner(data, *_):
1145    """
1146    Computes mean over numeric data as a "combiner" function to be used
1147    with `registerStepCombined` or `registerFullCombined`.
1148
1149    Only counts values which are `int`s, `float`s, or `complex`es, ignoring
1150    any other values. Uses `None` as the result when there are 0 numeric
1151    values.
1152    """
1153    numeric = [
1154        x for x in data.values() if isinstance(x, (int, float, complex))
1155    ]
1156    if len(numeric) == 0:
1157        return None
1158    else:
1159        return sum(numeric) / len(numeric)

Computes mean over numeric data as a "combiner" function to be used with registerStepCombined or registerFullCombined.

Only counts values which are ints, floats, or complexes, ignoring any other values. Uses None as the result when there are 0 numeric values.

def medianCombiner(data, *_):
1162def medianCombiner(data, *_):
1163    """
1164    Computes median over numeric data as a "combiner" function to be used
1165    with `registerStepCombined` or `registerFullCombined`.
1166
1167    Only counts values which are `int`s, `float`s, or `complex`es, ignoring
1168    any other values. Uses `None` as the result when there are 0 numeric
1169    values.
1170    """
1171    numeric = sorted(
1172        x for x in data.values() if isinstance(x, (int, float, complex))
1173    )
1174    if len(numeric) == 0:
1175        return None
1176    elif len(numeric) == 1:
1177        return numeric[0]
1178    else:
1179        half = len(numeric) // 2
1180        if len(numeric) % 2 == 0:
1181            return (numeric[half - 1] + numeric[half]) / 2
1182        else:
1183            return numeric[half]

Computes median over numeric data as a "combiner" function to be used with registerStepCombined or registerFullCombined.

Only counts values which are ints, floats, or complexes, ignoring any other values. Uses None as the result when there are 0 numeric values.

@analyzer('decision')
def finalIdentity(exploration: exploration.core.DiscreteExploration, decision: int) -> str:
1190@analyzer('decision')
1191def finalIdentity(
1192    exploration: core.DiscreteExploration,
1193    decision: base.DecisionID
1194) -> str:
1195    """
1196    Returns the `identityOf` result for the specified decision in the
1197    last step in which that decision existed.
1198    """
1199    for i in range(-1, -len(exploration) - 1, -1):
1200        situation = exploration.getSituation(i)
1201        try:
1202            return situation.graph.identityOf(decision)
1203        except core.MissingDecisionError:
1204            pass
1205    raise core.MissingDecisionError(
1206        f"Decision {decision!r} never existed."
1207    )

Returns the identityOf result for the specified decision in the last step in which that decision existed.

@analyzer('step')
def currentDecision( exploration: exploration.core.DiscreteExploration, step: int) -> Optional[int]:
1210@analyzer('step')
1211def currentDecision(
1212    exploration: core.DiscreteExploration,
1213    step: int
1214) -> Optional[base.DecisionID]:
1215    """
1216    Returns the `base.DecisionID` for the current decision in a given
1217    situation.
1218    """
1219    return exploration[step].state['primaryDecision']

Returns the base.DecisionID for the current decision in a given situation.

@analyzer('step')
def currentDecisionIdentity(exploration: exploration.core.DiscreteExploration, step: int) -> str:
1222@analyzer('step')
1223def currentDecisionIdentity(
1224    exploration: core.DiscreteExploration,
1225    step: int
1226) -> str:
1227    """
1228    Returns the `identityOf` string for the current decision in a given
1229    situation.
1230    """
1231    situation = exploration[step]
1232    return situation.graph.identityOf(situation.state['primaryDecision'])

Returns the identityOf string for the current decision in a given situation.

@analyzer('step')
def observedSoFar(exploration: exploration.core.DiscreteExploration, step: int) -> Set[int]:
1235@analyzer('step')
1236def observedSoFar(
1237    exploration: core.DiscreteExploration,
1238    step: int
1239) -> Set[base.DecisionID]:
1240    """
1241    Returns the set of all decision IDs observed so far. Note that some
1242    of them may no longer be present in the graph at the given step if
1243    they got merged or deleted.
1244    """
1245    # Can't allow negative steps (caching would fail)
1246    if step < 0:
1247        raise IndexError(f"Invalid step (can't be negative): {step!r}")
1248    elif step == 0:
1249        result = set()
1250    else:
1251        result = observedSoFar(exploration, step - 1)
1252    result |= set(exploration[step].graph)
1253    return result

Returns the set of all decision IDs observed so far. Note that some of them may no longer be present in the graph at the given step if they got merged or deleted.

def totalDecisionsSoFar(*args, **kwargs):
918    def countAnalyzer(*args, **kwargs):
919        'To be replaced'
920        result = target(*args, **kwargs)
921        if result is None:
922            return None
923        else:
924            return len(result)

Measures count of the 'observedSoFar' result applied to 'step'.

@analyzer('step')
def justObserved(exploration: exploration.core.DiscreteExploration, step: int) -> Set[int]:
1259@analyzer('step')
1260def justObserved(
1261    exploration: core.DiscreteExploration,
1262    step: int
1263) -> Set[base.DecisionID]:
1264    """
1265    Returns the set of new `base.DecisionID`s that first appeared at the
1266    given step. Will be empty for steps where no new decisions are
1267    observed. Note that this is about decisions whose existence becomes
1268    known, NOT decisions which get confirmed.
1269    """
1270    if step == 0:
1271        return observedSoFar(exploration, step)
1272    else:
1273        return (
1274            observedSoFar(exploration, step - 1)
1275          - observedSoFar(exploration, step)
1276        )

Returns the set of new base.DecisionIDs that first appeared at the given step. Will be empty for steps where no new decisions are observed. Note that this is about decisions whose existence becomes known, NOT decisions which get confirmed.

def newDecisionCount(*args, **kwargs):
918    def countAnalyzer(*args, **kwargs):
919        'To be replaced'
920        result = target(*args, **kwargs)
921        if result is None:
922            return None
923        else:
924            return len(result)

Measures count of the 'justObserved' result applied to 'step'.

@elide
@analyzer('stepDecision')
def hasBeenObserved( exploration: exploration.core.DiscreteExploration, step: int, dID: int) -> bool:
1282@elide
1283@analyzer('stepDecision')
1284def hasBeenObserved(
1285    exploration: core.DiscreteExploration,
1286    step: int,
1287    dID: base.DecisionID
1288) -> bool:
1289    """
1290    Whether or not the specified decision has been observed at or prior
1291    to the specified step. Note that it may or may not actually be a
1292    decision in the specified step (e.g., if it was previously observed
1293    but then deleted).
1294    """
1295    return dID in observedSoFar(exploration, step)

Whether or not the specified decision has been observed at or prior to the specified step. Note that it may or may not actually be a decision in the specified step (e.g., if it was previously observed but then deleted).

@analyzer('exploration')
def stepsObserved(exploration: exploration.core.DiscreteExploration) -> Dict[int, int]:
1298@analyzer('exploration')
1299def stepsObserved(
1300    exploration: core.DiscreteExploration,
1301) -> Dict[base.DecisionID, int]:
1302    """
1303    Returns a dictionary that holds the step at which each decision was
1304    first observed, keyed by decision ID.
1305    """
1306    result = {}
1307    soFar: Set[base.DecisionID] = set()
1308    for step, situation in enumerate(exploration):
1309        new = set(situation.graph) - soFar
1310        for dID in new:
1311            result[dID] = step
1312        soFar |= new
1313    return result

Returns a dictionary that holds the step at which each decision was first observed, keyed by decision ID.

@analyzer('decision')
def stepObserved(exploration: exploration.core.DiscreteExploration, dID: int) -> int:
1316@analyzer('decision')
1317def stepObserved(
1318    exploration: core.DiscreteExploration,
1319    dID: base.DecisionID
1320) -> int:
1321    """
1322    Returns the step at which the specified decision was first observed
1323    (NOT confirmed).
1324    """
1325    try:
1326        return stepsObserved(exploration)[dID]
1327    except KeyError:
1328        raise core.MissingDecisionError(
1329            f"Decision {dID!r} was never observed."
1330        )

Returns the step at which the specified decision was first observed (NOT confirmed).

@analyzer('exploration')
def stepsConfirmed(exploration: exploration.core.DiscreteExploration) -> Dict[int, int]:
1333@analyzer('exploration')
1334def stepsConfirmed(
1335    exploration: core.DiscreteExploration,
1336) -> Dict[base.DecisionID, int]:
1337    """
1338    Given an exploration, returns a dictionary mapping decision IDs to
1339    the step at which each was first confirmed. Decisions which were
1340    never confirmed will not be included in the dictionary.
1341    """
1342    result = {}
1343    for i, situation in enumerate(exploration):
1344        for dID in situation.graph:
1345            if (
1346                dID not in result
1347            and 'unconfirmed' not in situation.graph.decisionTags(dID)
1348            ):
1349                result[dID] = i
1350    return result

Given an exploration, returns a dictionary mapping decision IDs to the step at which each was first confirmed. Decisions which were never confirmed will not be included in the dictionary.

@analyzer('decision')
def stepConfirmed( exploration: exploration.core.DiscreteExploration, dID: int) -> Optional[int]:
1353@analyzer('decision')
1354def stepConfirmed(
1355    exploration: core.DiscreteExploration,
1356    dID: base.DecisionID
1357) -> Optional[int]:
1358    """
1359    Returns the step at which the specified decision was first confirmed,
1360    or `None` if it was never confirmed. Returns `None` for invalid
1361    decision IDs.
1362    """
1363    return stepsConfirmed(exploration).get(dID)

Returns the step at which the specified decision was first confirmed, or None if it was never confirmed. Returns None for invalid decision IDs.

@analyzer('exploration')
def stepsVisited( exploration: exploration.core.DiscreteExploration) -> Dict[int, List[int]]:
1366@analyzer('exploration')
1367def stepsVisited(
1368    exploration: core.DiscreteExploration,
1369) -> Dict[base.DecisionID, List[int]]:
1370    """
1371    Given an exploration, returns a dictionary mapping decision IDs to
1372    the list of steps at which each was visited. Decisions which were
1373    never visited will not be included in the dictionary.
1374    """
1375    result: Dict[base.DecisionID, List[int]] = {}
1376    for i, situation in enumerate(exploration):
1377        for dID in situation.graph:
1378            if dID in base.combinedDecisionSet(situation.state):
1379                result.setdefault(dID, []).append(i)
1380    return result

Given an exploration, returns a dictionary mapping decision IDs to the list of steps at which each was visited. Decisions which were never visited will not be included in the dictionary.

@finalOnly
@analyzer('stepDecision')
def hasBeenVisited( exploration: exploration.core.DiscreteExploration, step: int, dID: int) -> bool:
1383@finalOnly
1384@analyzer('stepDecision')
1385def hasBeenVisited(
1386    exploration: core.DiscreteExploration,
1387    step: int,
1388    dID: base.DecisionID
1389) -> bool:
1390    """
1391    Whether or not the specified decision has been visited at or prior
1392    to the specified step. Note that it may or may not actually be a
1393    decision in the specified step (e.g., if it was previously observed
1394    but then deleted).
1395    """
1396    visits = stepsVisited(exploration).get(dID, [])
1397    # No visits -> not visited yet
1398    if len(visits) == 0:
1399        return False
1400    else:
1401        # First visit was at or before this step
1402        return min(visits) <= step

Whether or not the specified decision has been visited at or prior to the specified step. Note that it may or may not actually be a decision in the specified step (e.g., if it was previously observed but then deleted).

@analyzer('decision')
def stepFirstVisited( exploration: exploration.core.DiscreteExploration, decision: int) -> Optional[int]:
1405@analyzer('decision')
1406def stepFirstVisited(
1407    exploration: core.DiscreteExploration,
1408    decision: base.DecisionID,
1409) -> Optional[int]:
1410    """
1411    Returns the first step at which the given decision was visited, or
1412    `None` if the decision was never visited.
1413    """
1414    vis = stepsVisited(exploration)
1415    if decision in vis:
1416        return min(vis[decision])
1417    else:
1418        return None

Returns the first step at which the given decision was visited, or None if the decision was never visited.

@analyzer('decision')
def stepsActive( exploration: exploration.core.DiscreteExploration, decision: int) -> Optional[int]:
1421@analyzer('decision')
1422def stepsActive(
1423    exploration: core.DiscreteExploration,
1424    decision: base.DecisionID,
1425) -> Optional[int]:
1426    """
1427    Returns the total number of steps in which this decision was active.
1428    """
1429    vis = stepsVisited(exploration)
1430    if decision in vis:
1431        return len(vis[decision])
1432    else:
1433        return 0

Returns the total number of steps in which this decision was active.

@analyzer('exploration')
def stepsTransisionsObserved( exploration: exploration.core.DiscreteExploration) -> Dict[Tuple[int, str, int], int]:
1436@analyzer('exploration')
1437def stepsTransisionsObserved(
1438    exploration: core.DiscreteExploration
1439) -> Dict[OverspecificTransition, int]:
1440    """
1441    Returns a dictionary that holds the step at which each transition was
1442    first observed, keyed by (source-decision, transition-name,
1443    destination-decision) triples.
1444
1445    Does NOT distinguish between cases where a once-deleted transition
1446    was later reinstated (unless it had a different destination in the
1447    end).
1448    """
1449    result = {}
1450    for i, situation in enumerate(exploration):
1451        for dID in situation.graph:
1452            destinations = situation.graph.destinationsFrom(dID)
1453            for name, dest in destinations.items():
1454                key = (dID, name, dest)
1455                if key not in result:
1456                    result[key] = i
1457    return result

Returns a dictionary that holds the step at which each transition was first observed, keyed by (source-decision, transition-name, destination-decision) triples.

Does NOT distinguish between cases where a once-deleted transition was later reinstated (unless it had a different destination in the end).

@analyzer('transition')
def stepObservedTransition( exploration: exploration.core.DiscreteExploration, source: int, transition: str, destination: int) -> Optional[int]:
1460@analyzer('transition')
1461def stepObservedTransition(
1462    exploration: core.DiscreteExploration,
1463    source: base.DecisionID,
1464    transition: base.Transition,
1465    destination: base.DecisionID
1466) -> Optional[int]:
1467    """
1468    Returns the step within the exploration at which the specified
1469    transition was first observed. Note that transitions which get
1470    renamed do NOT preserve their identities, so a search for a renamed
1471    transition will return the step on which it was renamed (assuming it
1472    didn't change destination).
1473
1474    Returns `None` if the specified transition never existed in the
1475    exploration.
1476    """
1477    obs = stepsTransisionsObserved(exploration)
1478    return obs.get((source, transition, destination))

Returns the step within the exploration at which the specified transition was first observed. Note that transitions which get renamed do NOT preserve their identities, so a search for a renamed transition will return the step on which it was renamed (assuming it didn't change destination).

Returns None if the specified transition never existed in the exploration.

@analyzer('step')
def transitionTaken( exploration: exploration.core.DiscreteExploration, step: int) -> Optional[Tuple[int, str, int]]:
1481@analyzer('step')
1482def transitionTaken(
1483    exploration: core.DiscreteExploration,
1484    step: int
1485) -> Optional[OverspecificTransition]:
1486    """
1487    Returns the source decision Id, the name of the transition taken, and
1488    the destination decision ID at the given step. This is the transition
1489    chosen at that step whose consequences were triggered resulting in
1490    the next step. Returns `None` for steps where no transition was
1491    taken (e.g., wait, warp, etc.).
1492
1493    Note that in some cases due to e.g., a 'follow' effect, multiple
1494    transitions are taken at a step. In that case, this returns the name
1495    of the first transition taken (which would have triggered any
1496    others).
1497
1498    Also in some cases, there may be multiple starting nodes given, in
1499    which case the first such node (by ID order) which has a transition
1500    with the identified transition name will be returned, or None if none
1501    of them match.
1502    """
1503    start, transition, end = exploration.movementAtStep(step)
1504    graph = exploration[step].graph
1505    if start is None or transition is None:
1506        return None
1507    if isinstance(start, set):
1508        for dID in sorted(start):
1509            destination = graph.getDestination(dID, transition)
1510            if destination is not None:
1511                return (dID, transition, destination)
1512        return None
1513    else:
1514        destination = graph.getDestination(start, transition)
1515        if destination is not None:
1516            return (start, transition, destination)
1517        else:
1518            return None

Returns the source decision Id, the name of the transition taken, and the destination decision ID at the given step. This is the transition chosen at that step whose consequences were triggered resulting in the next step. Returns None for steps where no transition was taken (e.g., wait, warp, etc.).

Note that in some cases due to e.g., a 'follow' effect, multiple transitions are taken at a step. In that case, this returns the name of the first transition taken (which would have triggered any others).

Also in some cases, there may be multiple starting nodes given, in which case the first such node (by ID order) which has a transition with the identified transition name will be returned, or None if none of them match.

@analyzer('exploration')
def transitionStepsTaken( exploration: exploration.core.DiscreteExploration) -> Dict[Tuple[int, str, int], List[int]]:
1521@analyzer('exploration')
1522def transitionStepsTaken(
1523    exploration: core.DiscreteExploration
1524) -> Dict[OverspecificTransition, List[int]]:
1525    """
1526    Returns a dictionary mapping each specific transition that was taken
1527    at least once to the list of steps on which it was taken. Does NOT
1528    account for transitions elided by 'jaunt' warps, nor for transitions
1529    taken as a result of follow/bounce effects.
1530
1531    TODO: Account for those?
1532    """
1533    result: Dict[OverspecificTransition, List[int]] = {}
1534    for i in range(len(exploration)):
1535        taken = transitionTaken(exploration, i)
1536        if taken is not None:
1537            if taken in result:
1538                result[taken].append(i)
1539            else:
1540                result[taken] = [i]
1541
1542    return result

Returns a dictionary mapping each specific transition that was taken at least once to the list of steps on which it was taken. Does NOT account for transitions elided by 'jaunt' warps, nor for transitions taken as a result of follow/bounce effects.

TODO: Account for those?

@analyzer('transition')
def stepsTaken( exploration: exploration.core.DiscreteExploration, source: int, transition: str, destination: int) -> int:
1545@analyzer('transition')
1546def stepsTaken(
1547    exploration: core.DiscreteExploration,
1548    source: base.DecisionID,
1549    transition: base.Transition,
1550    destination: base.DecisionID
1551) -> int:
1552    """
1553    Returns the list of exploration steps on which a particular
1554    transition has been taken. Returns an empty list for transitions that
1555    were never taken.
1556
1557    Note that this does NOT account for times taken as a result of
1558    follow/bounce effects, and it does NOT account for all times a
1559    transition was taken when warp effects are used as shorthand for
1560    jaunts across the graph.
1561    
1562    TODO: Try to account for those?
1563    """
1564    return transitionStepsTaken(exploration).get(
1565        (source, transition, destination),
1566        []
1567    )

Returns the list of exploration steps on which a particular transition has been taken. Returns an empty list for transitions that were never taken.

Note that this does NOT account for times taken as a result of follow/bounce effects, and it does NOT account for all times a transition was taken when warp effects are used as shorthand for jaunts across the graph.

TODO: Try to account for those?

@analyzer('transition')
def timesTaken( exploration: exploration.core.DiscreteExploration, source: int, transition: str, destination: int) -> int:
1570@analyzer('transition')
1571def timesTaken(
1572    exploration: core.DiscreteExploration,
1573    source: base.DecisionID,
1574    transition: base.Transition,
1575    destination: base.DecisionID
1576) -> int:
1577    """
1578    Returns the number of times a particular transition has been taken
1579    throughout the exploration. Returns 0 for transitions that were never
1580    taken.
1581
1582    Note that this does NOT account for times taken as a result of
1583    follow/bounce effects, and it does NOT account for all times a
1584    transition was taken when warp effects are used as shorthand for
1585    jaunts across the graph.
1586    
1587    TODO: Try to account for those?
1588    """
1589    return len(stepsTaken(exploration, source, transition, destination))

Returns the number of times a particular transition has been taken throughout the exploration. Returns 0 for transitions that were never taken.

Note that this does NOT account for times taken as a result of follow/bounce effects, and it does NOT account for all times a transition was taken when warp effects are used as shorthand for jaunts across the graph.

TODO: Try to account for those?

def unexploredBranches( graph: exploration.core.DecisionGraph, context: Optional[exploration.base.RequirementContext] = None) -> List[Tuple[int, str]]:
1596def unexploredBranches(
1597    graph: core.DecisionGraph,
1598    context: Optional[base.RequirementContext] = None
1599) -> List[SpecificTransition]:
1600    """
1601    Returns a list of from-decision, transition-at-that-decision pairs
1602    which each identify an unexplored branch in the given graph.
1603
1604    When a `context` is provided it only counts options whose
1605    requirements are satisfied in that `RequirementContext`, and the
1606    'searchFrom' part of the context will be replaced by both ends of
1607    each transition tested. This doesn't perfectly map onto actually
1608    reachability since nodes between where the player is and where the
1609    option is might force changes in the game state that make it
1610    un-takeable.
1611
1612    TODO: add logic to detect trivially-unblocked edges?
1613    """
1614    result = []
1615    # TODO: Fix networkx type stubs for MultiDiGraph!
1616    for (src, dst, transition) in graph.allEdges():
1617        req = graph.getTransitionRequirement(src, transition)
1618        localContext: Optional[base.RequirementContext] = None
1619        if context is not None:
1620            localContext = base.RequirementContext(
1621                state=context.state,
1622                graph=context.graph,
1623                searchFrom=graph.bothEnds(src, transition)
1624            )
1625        # Check if this edge goes from a confirmed to an unconfirmed node
1626        if (
1627            graph.isConfirmed(src)
1628        and not graph.isConfirmed(dst)
1629        and (localContext is None or req.satisfied(localContext))
1630        ):
1631            result.append((src, transition))
1632    return result

Returns a list of from-decision, transition-at-that-decision pairs which each identify an unexplored branch in the given graph.

When a context is provided it only counts options whose requirements are satisfied in that RequirementContext, and the 'searchFrom' part of the context will be replaced by both ends of each transition tested. This doesn't perfectly map onto actually reachability since nodes between where the player is and where the option is might force changes in the game state that make it un-takeable.

TODO: add logic to detect trivially-unblocked edges?

@analyzer('step')
def allUnexploredBranches( exploration: exploration.core.DiscreteExploration, step: int) -> List[Tuple[int, str]]:
1635@analyzer('step')
1636def allUnexploredBranches(
1637    exploration: core.DiscreteExploration,
1638    step: int
1639) -> List[SpecificTransition]:
1640    """
1641    Returns the list of unexplored branches in the specified situation's
1642    graph, regardless of traversibility (see `unexploredBranches`).
1643    """
1644    return unexploredBranches(exploration[step].graph)

Returns the list of unexplored branches in the specified situation's graph, regardless of traversibility (see unexploredBranches).

def unexploredBranchCount(*args, **kwargs):
918    def countAnalyzer(*args, **kwargs):
919        'To be replaced'
920        result = target(*args, **kwargs)
921        if result is None:
922            return None
923        else:
924            return len(result)

Measures count of the 'allUnexploredBranches' result applied to 'step'.

@analyzer('step')
def traversableUnexploredBranches( exploration: exploration.core.DiscreteExploration, step: int) -> List[Tuple[int, str]]:
1653@analyzer('step')
1654def traversableUnexploredBranches(
1655    exploration: core.DiscreteExploration,
1656    step: int
1657) -> List[SpecificTransition]:
1658    """
1659    Returns the list of traversable unexplored branches in the specified
1660    situation's graph (see `unexploredBranches`). Does not perfectly
1661    account for all traversibility information, because it uses a single
1662    context from which to judge traversibility (TODO: Fix that).
1663    """
1664    situation = exploration[step]
1665    context = base.genericContextForSituation(
1666        situation,
1667        base.combinedDecisionSet(situation.state)
1668    )
1669    return unexploredBranches(situation.graph, context)

Returns the list of traversable unexplored branches in the specified situation's graph (see unexploredBranches). Does not perfectly account for all traversibility information, because it uses a single context from which to judge traversibility (TODO: Fix that).

def traversableUnexploredCount(*args, **kwargs):
918    def countAnalyzer(*args, **kwargs):
919        'To be replaced'
920        result = target(*args, **kwargs)
921        if result is None:
922            return None
923        else:
924            return len(result)

Measures count of the 'traversableUnexploredBranches' result applied to 'step'.

@finalOnly
@analyzer('stepDecision')
def actions( exploration: exploration.core.DiscreteExploration, step: int, decision: int) -> Optional[Set[str]]:
1678@finalOnly
1679@analyzer('stepDecision')
1680def actions(
1681    exploration: core.DiscreteExploration,
1682    step: int,
1683    decision: base.DecisionID
1684) -> Optional[Set[base.Transition]]:
1685    """
1686    Given a particular decision at a particular step, returns the set of
1687    actions available at that decision in that step. Returns `None` if
1688    the specified decision does not exist.
1689    """
1690    graph = exploration[step].graph
1691    if decision not in graph:
1692        return None
1693    return graph.decisionActions(decision)

Given a particular decision at a particular step, returns the set of actions available at that decision in that step. Returns None if the specified decision does not exist.

def actionCount(*args, **kwargs):
918    def countAnalyzer(*args, **kwargs):
919        'To be replaced'
920        result = target(*args, **kwargs)
921        if result is None:
922            return None
923        else:
924            return len(result)

Measures count of the 'actions' result applied to 'stepDecision'.

def totalActions( exploration: exploration.core.DiscreteExploration, step: int) -> ~CombinerResult:
1001    def analysisCombiner(
1002        exploration: core.DiscreteExploration,
1003        step: int
1004    ) -> CombinerResult:
1005        'To be replaced'
1006        # Declare data here as generic type
1007        data: Dict[
1008            Union[base.DecisionID, OverspecificTransition, int],
1009            Any
1010        ]
1011        graph = exploration[step].graph
1012        if targetUnit == "stepDecision":
1013            analyzeStepDecision = cast(StepDecisionAnalyzer, target)
1014            data = {
1015                dID: analyzeStepDecision(exploration, step, dID)
1016                for dID in graph
1017            }
1018        elif targetUnit == "stepTransition":
1019            edges = graph.allEdges()
1020            analyzeStepTransition = cast(StepTransitionAnalyzer, target)
1021            data = {
1022                (src, transition, dst): analyzeStepTransition(
1023                    exploration,
1024                    step,
1025                    src,
1026                    transition,
1027                    dst
1028                )
1029                for (src, dst, transition) in edges
1030            }
1031        else:
1032            raise ValueError(
1033                f"Target analysis routine {name!r} has inconsistent"
1034                f" analysis unit {targetUnit!r} for 'step' result"
1035                f" unit."
1036            )
1037        return combiner(data, exploration, step)

Computes sumCombiner for the 'actionCount' result over all stepDecisions at each step.

def meanActions( exploration: exploration.core.DiscreteExploration, step: int) -> ~CombinerResult:
1001    def analysisCombiner(
1002        exploration: core.DiscreteExploration,
1003        step: int
1004    ) -> CombinerResult:
1005        'To be replaced'
1006        # Declare data here as generic type
1007        data: Dict[
1008            Union[base.DecisionID, OverspecificTransition, int],
1009            Any
1010        ]
1011        graph = exploration[step].graph
1012        if targetUnit == "stepDecision":
1013            analyzeStepDecision = cast(StepDecisionAnalyzer, target)
1014            data = {
1015                dID: analyzeStepDecision(exploration, step, dID)
1016                for dID in graph
1017            }
1018        elif targetUnit == "stepTransition":
1019            edges = graph.allEdges()
1020            analyzeStepTransition = cast(StepTransitionAnalyzer, target)
1021            data = {
1022                (src, transition, dst): analyzeStepTransition(
1023                    exploration,
1024                    step,
1025                    src,
1026                    transition,
1027                    dst
1028                )
1029                for (src, dst, transition) in edges
1030            }
1031        else:
1032            raise ValueError(
1033                f"Target analysis routine {name!r} has inconsistent"
1034                f" analysis unit {targetUnit!r} for 'step' result"
1035                f" unit."
1036            )
1037        return combiner(data, exploration, step)

Computes meanCombiner for the 'actionCount' result over all stepDecisions at each step.

def medianActions( exploration: exploration.core.DiscreteExploration, step: int) -> ~CombinerResult:
1001    def analysisCombiner(
1002        exploration: core.DiscreteExploration,
1003        step: int
1004    ) -> CombinerResult:
1005        'To be replaced'
1006        # Declare data here as generic type
1007        data: Dict[
1008            Union[base.DecisionID, OverspecificTransition, int],
1009            Any
1010        ]
1011        graph = exploration[step].graph
1012        if targetUnit == "stepDecision":
1013            analyzeStepDecision = cast(StepDecisionAnalyzer, target)
1014            data = {
1015                dID: analyzeStepDecision(exploration, step, dID)
1016                for dID in graph
1017            }
1018        elif targetUnit == "stepTransition":
1019            edges = graph.allEdges()
1020            analyzeStepTransition = cast(StepTransitionAnalyzer, target)
1021            data = {
1022                (src, transition, dst): analyzeStepTransition(
1023                    exploration,
1024                    step,
1025                    src,
1026                    transition,
1027                    dst
1028                )
1029                for (src, dst, transition) in edges
1030            }
1031        else:
1032            raise ValueError(
1033                f"Target analysis routine {name!r} has inconsistent"
1034                f" analysis unit {targetUnit!r} for 'step' result"
1035                f" unit."
1036            )
1037        return combiner(data, exploration, step)

Computes medianCombiner for the 'actionCount' result over all stepDecisions at each step.

@finalOnly
@analyzer('stepDecision')
def branches( exploration: exploration.core.DiscreteExploration, step: int, decision: int) -> Optional[int]:
1721@finalOnly
1722@analyzer('stepDecision')
1723def branches(
1724    exploration: core.DiscreteExploration,
1725    step: int,
1726    decision: base.DecisionID
1727) -> Optional[int]:
1728    """
1729    Computes the number of branches at a particular decision, not
1730    counting actions, but counting as separate branches multiple
1731    transitions which lead to the same decision as each other. Returns
1732    `None` for unconfirmed and nonexistent decisions so that they aren't
1733    counted as part of averages, even though unconfirmed decisions do
1734    have countable branches.
1735    """
1736    graph = exploration[step].graph
1737    if decision not in graph or not graph.isConfirmed(decision):
1738        return None
1739
1740    dests = graph.destinationsFrom(decision)
1741    branches = 0
1742    for transition, dest in dests.items():
1743        if dest != decision:
1744            branches += 1
1745
1746    return branches

Computes the number of branches at a particular decision, not counting actions, but counting as separate branches multiple transitions which lead to the same decision as each other. Returns None for unconfirmed and nonexistent decisions so that they aren't counted as part of averages, even though unconfirmed decisions do have countable branches.

def totalBranches( exploration: exploration.core.DiscreteExploration, step: int) -> ~CombinerResult:
1001    def analysisCombiner(
1002        exploration: core.DiscreteExploration,
1003        step: int
1004    ) -> CombinerResult:
1005        'To be replaced'
1006        # Declare data here as generic type
1007        data: Dict[
1008            Union[base.DecisionID, OverspecificTransition, int],
1009            Any
1010        ]
1011        graph = exploration[step].graph
1012        if targetUnit == "stepDecision":
1013            analyzeStepDecision = cast(StepDecisionAnalyzer, target)
1014            data = {
1015                dID: analyzeStepDecision(exploration, step, dID)
1016                for dID in graph
1017            }
1018        elif targetUnit == "stepTransition":
1019            edges = graph.allEdges()
1020            analyzeStepTransition = cast(StepTransitionAnalyzer, target)
1021            data = {
1022                (src, transition, dst): analyzeStepTransition(
1023                    exploration,
1024                    step,
1025                    src,
1026                    transition,
1027                    dst
1028                )
1029                for (src, dst, transition) in edges
1030            }
1031        else:
1032            raise ValueError(
1033                f"Target analysis routine {name!r} has inconsistent"
1034                f" analysis unit {targetUnit!r} for 'step' result"
1035                f" unit."
1036            )
1037        return combiner(data, exploration, step)

Computes sumCombiner for the 'branches' result over all stepDecisions at each step.

def meanBranches( exploration: exploration.core.DiscreteExploration, step: int) -> ~CombinerResult:
1001    def analysisCombiner(
1002        exploration: core.DiscreteExploration,
1003        step: int
1004    ) -> CombinerResult:
1005        'To be replaced'
1006        # Declare data here as generic type
1007        data: Dict[
1008            Union[base.DecisionID, OverspecificTransition, int],
1009            Any
1010        ]
1011        graph = exploration[step].graph
1012        if targetUnit == "stepDecision":
1013            analyzeStepDecision = cast(StepDecisionAnalyzer, target)
1014            data = {
1015                dID: analyzeStepDecision(exploration, step, dID)
1016                for dID in graph
1017            }
1018        elif targetUnit == "stepTransition":
1019            edges = graph.allEdges()
1020            analyzeStepTransition = cast(StepTransitionAnalyzer, target)
1021            data = {
1022                (src, transition, dst): analyzeStepTransition(
1023                    exploration,
1024                    step,
1025                    src,
1026                    transition,
1027                    dst
1028                )
1029                for (src, dst, transition) in edges
1030            }
1031        else:
1032            raise ValueError(
1033                f"Target analysis routine {name!r} has inconsistent"
1034                f" analysis unit {targetUnit!r} for 'step' result"
1035                f" unit."
1036            )
1037        return combiner(data, exploration, step)

Computes meanCombiner for the 'branches' result over all stepDecisions at each step.

def medianBranches( exploration: exploration.core.DiscreteExploration, step: int) -> ~CombinerResult:
1001    def analysisCombiner(
1002        exploration: core.DiscreteExploration,
1003        step: int
1004    ) -> CombinerResult:
1005        'To be replaced'
1006        # Declare data here as generic type
1007        data: Dict[
1008            Union[base.DecisionID, OverspecificTransition, int],
1009            Any
1010        ]
1011        graph = exploration[step].graph
1012        if targetUnit == "stepDecision":
1013            analyzeStepDecision = cast(StepDecisionAnalyzer, target)
1014            data = {
1015                dID: analyzeStepDecision(exploration, step, dID)
1016                for dID in graph
1017            }
1018        elif targetUnit == "stepTransition":
1019            edges = graph.allEdges()
1020            analyzeStepTransition = cast(StepTransitionAnalyzer, target)
1021            data = {
1022                (src, transition, dst): analyzeStepTransition(
1023                    exploration,
1024                    step,
1025                    src,
1026                    transition,
1027                    dst
1028                )
1029                for (src, dst, transition) in edges
1030            }
1031        else:
1032            raise ValueError(
1033                f"Target analysis routine {name!r} has inconsistent"
1034                f" analysis unit {targetUnit!r} for 'step' result"
1035                f" unit."
1036            )
1037        return combiner(data, exploration, step)

Computes medianCombiner for the 'branches' result over all stepDecisions at each step.

@analyzer('decision')
def arrivals(exploration: exploration.core.DiscreteExploration, decision: int) -> int:
1771@analyzer('decision')
1772def arrivals(
1773    exploration: core.DiscreteExploration,
1774    decision: base.DecisionID
1775) -> int:
1776    """
1777    Given an `DiscreteExploration` object and a particular `Decision`
1778    which exists at some point during that exploration, counts the number
1779    of times that decision was in the active decision set for a step
1780    after not being in that set the previous step. Effectively, counts
1781    how many times we arrived at that decision, ignoring steps where we
1782    remained at it due to a wait or an action or the like.
1783
1784    Returns 0 even for decisions that aren't part of the exploration.
1785    """
1786    visits = stepsVisited(exploration)
1787    result = 0
1788    prev = -2  # won't be contiguous with step 0
1789    for step in visits.get(decision, []):
1790        # if previous visited step wasn't the prior step it's a revisit
1791        if prev != step - 1:
1792            result += 1
1793        prev = step
1794
1795    return result

Given an DiscreteExploration object and a particular Decision which exists at some point during that exploration, counts the number of times that decision was in the active decision set for a step after not being in that set the previous step. Effectively, counts how many times we arrived at that decision, ignoring steps where we remained at it due to a wait or an action or the like.

Returns 0 even for decisions that aren't part of the exploration.

@analyzer('decision')
def revisits(exploration: exploration.core.DiscreteExploration, decision: int) -> int:
1798@analyzer('decision')
1799def revisits(
1800    exploration: core.DiscreteExploration,
1801    decision: base.DecisionID
1802) -> int:
1803    """
1804    Returns the number of times we revisited the target decision, which
1805    is just `arrivals` minus 1 for the first arrival, but not < 0.
1806    """
1807    return max(0, arrivals(exploration, decision) - 1)

Returns the number of times we revisited the target decision, which is just arrivals minus 1 for the first arrival, but not < 0.

def totalRevisits(exploration: exploration.core.DiscreteExploration) -> ~CombinerResult:
1078    def analysisCombiner(  # type: ignore
1079        exploration: core.DiscreteExploration,
1080    ) -> CombinerResult:
1081        'To be replaced'
1082        # Declare data here as generic type
1083        data: Dict[
1084            Union[base.DecisionID, OverspecificTransition, int],
1085            Any
1086        ]
1087        if targetUnit == "step":
1088            analyzeStep = cast(StepAnalyzer, target)
1089            data = {
1090                step: analyzeStep(exploration, step)
1091                for step in range(len(exploration))
1092            }
1093        elif targetUnit == "decision":
1094            analyzeDecision = cast(DecisionAnalyzer, target)
1095            data = {
1096                dID: analyzeDecision(exploration, dID)
1097                for dID in exploration.allDecisions()
1098            }
1099        elif targetUnit == "transition":
1100            analyzeTransition = cast(TransitionAnalyzer, target)
1101            data = {
1102                (src, transition, dst): analyzeTransition(
1103                    exploration,
1104                    src,
1105                    transition,
1106                    dst
1107                )
1108                for (src, transition, dst) in exploration.allTransitions()
1109            }
1110        elif targetUnit == "exploration":
1111            analyzeExploration = cast(ExplorationAnalyzer, target)
1112            data = analyzeExploration(exploration)
1113        else:
1114            raise ValueError(
1115                f"Target analysis routine {name!r} has inconsistent"
1116                f" analysis unit {targetUnit!r} for 'step' result"
1117                f" unit."
1118            )
1119        return combiner(data, exploration)

Computes sumCombiner for the 'revisits' result over all decisions.

def meanRevisits(exploration: exploration.core.DiscreteExploration) -> ~CombinerResult:
1078    def analysisCombiner(  # type: ignore
1079        exploration: core.DiscreteExploration,
1080    ) -> CombinerResult:
1081        'To be replaced'
1082        # Declare data here as generic type
1083        data: Dict[
1084            Union[base.DecisionID, OverspecificTransition, int],
1085            Any
1086        ]
1087        if targetUnit == "step":
1088            analyzeStep = cast(StepAnalyzer, target)
1089            data = {
1090                step: analyzeStep(exploration, step)
1091                for step in range(len(exploration))
1092            }
1093        elif targetUnit == "decision":
1094            analyzeDecision = cast(DecisionAnalyzer, target)
1095            data = {
1096                dID: analyzeDecision(exploration, dID)
1097                for dID in exploration.allDecisions()
1098            }
1099        elif targetUnit == "transition":
1100            analyzeTransition = cast(TransitionAnalyzer, target)
1101            data = {
1102                (src, transition, dst): analyzeTransition(
1103                    exploration,
1104                    src,
1105                    transition,
1106                    dst
1107                )
1108                for (src, transition, dst) in exploration.allTransitions()
1109            }
1110        elif targetUnit == "exploration":
1111            analyzeExploration = cast(ExplorationAnalyzer, target)
1112            data = analyzeExploration(exploration)
1113        else:
1114            raise ValueError(
1115                f"Target analysis routine {name!r} has inconsistent"
1116                f" analysis unit {targetUnit!r} for 'step' result"
1117                f" unit."
1118            )
1119        return combiner(data, exploration)

Computes meanCombiner for the 'revisits' result over all decisions.

def medianRevisits(exploration: exploration.core.DiscreteExploration) -> ~CombinerResult:
1078    def analysisCombiner(  # type: ignore
1079        exploration: core.DiscreteExploration,
1080    ) -> CombinerResult:
1081        'To be replaced'
1082        # Declare data here as generic type
1083        data: Dict[
1084            Union[base.DecisionID, OverspecificTransition, int],
1085            Any
1086        ]
1087        if targetUnit == "step":
1088            analyzeStep = cast(StepAnalyzer, target)
1089            data = {
1090                step: analyzeStep(exploration, step)
1091                for step in range(len(exploration))
1092            }
1093        elif targetUnit == "decision":
1094            analyzeDecision = cast(DecisionAnalyzer, target)
1095            data = {
1096                dID: analyzeDecision(exploration, dID)
1097                for dID in exploration.allDecisions()
1098            }
1099        elif targetUnit == "transition":
1100            analyzeTransition = cast(TransitionAnalyzer, target)
1101            data = {
1102                (src, transition, dst): analyzeTransition(
1103                    exploration,
1104                    src,
1105                    transition,
1106                    dst
1107                )
1108                for (src, transition, dst) in exploration.allTransitions()
1109            }
1110        elif targetUnit == "exploration":
1111            analyzeExploration = cast(ExplorationAnalyzer, target)
1112            data = analyzeExploration(exploration)
1113        else:
1114            raise ValueError(
1115                f"Target analysis routine {name!r} has inconsistent"
1116                f" analysis unit {targetUnit!r} for 'step' result"
1117                f" unit."
1118            )
1119        return combiner(data, exploration)

Computes medianCombiner for the 'revisits' result over all decisions.

HopPaths: TypeAlias = Dict[Tuple[int, int], Optional[List[int]]]

Records paths between decisions ignoring edge directions & requirements. Stores a list of decision IDs to traverse keyed by a decision ID pair where the smaller decision ID comes first (since paths are symmetric).

def hopDistance( hopPaths: Dict[Tuple[int, int], Optional[List[int]]], src: int, dst: int) -> Optional[int]:
1843def hopDistance(
1844    hopPaths: HopPaths,
1845    src: base.DecisionID,
1846    dst: base.DecisionID
1847) -> Optional[int]:
1848    """
1849    Returns the number of hops required to move from the given source to
1850    the given destination, ignoring edge directions & requirements.
1851    Looks that up in the given `HopPaths` dictionary. Returns 0 when
1852    source and destination are the same.
1853
1854    For example:
1855
1856    >>> e = core.DiscreteExploration.example()
1857    >>> hops = shortestHopPaths(e)
1858    >>> hopDistance(hops, 0, 1)
1859    1
1860    >>> hopDistance(hops, 1, 0)
1861    1
1862    >>> hopDistance(hops, 0, 0)
1863    0
1864    >>> hopDistance(hops, 0, 0)
1865    0
1866    >>> hopDistance(hops, 0, 4) is None
1867    True
1868    >>> hopDistance(hops, 4, 0) is None
1869    True
1870    >>> hopDistance(hops, 0, 5)
1871    2
1872    >>> hopDistance(hops, 5, 0)
1873    2
1874    >>> hopDistance(hops, 5, 1)
1875    3
1876    >>> hopDistance(hops, 1, 5)
1877    3
1878    >>> hopDistance(hops, 3, 5)
1879    1
1880    >>> hopDistance(hops, 5, 3)
1881    1
1882    >>> dIDs = list(e[-1].graph)
1883    >>> for i, src in enumerate(dIDs):
1884    ...     for j in range(i + 1, len(dIDs)):
1885    ...         dst = dIDs[j]
1886    ...         assert (
1887    ...             hopDistance(hops, src, dst) == hopDistance(hops, dst, src)
1888    ...         )
1889    """
1890    if src == dst:
1891        return 0
1892    elif src < dst:
1893        path = hopPaths.get((src, dst))
1894        if path is None:
1895            return None
1896        else:
1897            return 1 + len(path)
1898    else:
1899        path = hopPaths.get((dst, src))
1900        if path is None:
1901            return None
1902        else:
1903            return 1 + len(path)

Returns the number of hops required to move from the given source to the given destination, ignoring edge directions & requirements. Looks that up in the given HopPaths dictionary. Returns 0 when source and destination are the same.

For example:

>>> e = core.DiscreteExploration.example()
>>> hops = shortestHopPaths(e)
>>> hopDistance(hops, 0, 1)
1
>>> hopDistance(hops, 1, 0)
1
>>> hopDistance(hops, 0, 0)
0
>>> hopDistance(hops, 0, 0)
0
>>> hopDistance(hops, 0, 4) is None
True
>>> hopDistance(hops, 4, 0) is None
True
>>> hopDistance(hops, 0, 5)
2
>>> hopDistance(hops, 5, 0)
2
>>> hopDistance(hops, 5, 1)
3
>>> hopDistance(hops, 1, 5)
3
>>> hopDistance(hops, 3, 5)
1
>>> hopDistance(hops, 5, 3)
1
>>> dIDs = list(e[-1].graph)
>>> for i, src in enumerate(dIDs):
...     for j in range(i + 1, len(dIDs)):
...         dst = dIDs[j]
...         assert (
...             hopDistance(hops, src, dst) == hopDistance(hops, dst, src)
...         )
@elide
@analyzer('exploration')
def shortestHopPaths( exploration: exploration.core.DiscreteExploration, edgeFilter: Optional[Callable[[int, str, int, exploration.core.DecisionGraph], bool]] = None) -> Dict[Tuple[int, int], Optional[List[int]]]:
1906@elide
1907@analyzer('exploration')
1908def shortestHopPaths(
1909    exploration: core.DiscreteExploration,
1910    edgeFilter: Optional[Callable[
1911        [base.DecisionID, base.Transition, base.DecisionID, core.DecisionGraph],
1912        bool
1913    ]] = None
1914) -> HopPaths:
1915    """
1916    Creates a dictionary that holds shortest paths between pairs of
1917    nodes, ignoring edge directions and requirements entirely.
1918
1919    If given an `edgeFilter`, that function is applied with source ID,
1920    transition name, destination ID, and full graph as arguments and
1921    edges for which it returns False are ignored when computing hops.
1922    Note that you have to filter out all edges in both directions between
1923    two nodes for there not to be a 1-hop path between them.
1924
1925    Keys in the dictionary are pairs of decision IDs, where the decision
1926    with the smaller ID always comes first (because shortest hop paths
1927    are symmetric so we don't store the reverse paths). Values are lists
1928    of decision IDs that can be traversed to get from the first decision
1929    to the second, with an empty list indicating adjacent decisions
1930    (note that these "hop paths" cannot always be traversed in the
1931    actual graph because they may go the "wrong way" across one-way
1932    connections). The number of hops required to get between the nodes
1933    is one more than the length of the path. Decision pairs which are
1934    not reachable from each other will not be included in the
1935    dictionary. Only decisions present in the final graph in the
1936    exploration will be included, and only edges present in that final
1937    graph will be considered.
1938
1939    Where there are multiple shortest hop paths, an arbitrary one is
1940    included in the result.
1941
1942    TODO: EXAMPLE
1943    >>> e = core.DiscreteExploration.example()
1944    >>> print(e[-1].graph.namesListing(e[-1].graph))
1945      0 (House)
1946      1 (_u.0)
1947      2 (Cellar)
1948      3 (Yard)
1949      5 (Lane)
1950    <BLANKLINE>
1951    >>> shortest = dict(nx.all_pairs_shortest_path(e[-1].graph.connections()))
1952    >>> for src in shortest:
1953    ...    print(f"{src} -> {shortest[src]}")
1954    0 -> {0: [0], 1: [0, 1], 2: [0, 2], 3: [0, 3], 5: [0, 3, 5]}
1955    1 -> {1: [1], 0: [1, 0], 2: [1, 0, 2], 3: [1, 0, 3], 5: [1, 0, 3, 5]}
1956    2 -> {2: [2], 0: [2, 0], 3: [2, 3], 1: [2, 0, 1], 5: [2, 3, 5]}
1957    3 -> {3: [3], 0: [3, 0], 2: [3, 2], 5: [3, 5], 1: [3, 0, 1]}
1958    5 -> {5: [5], 3: [5, 3], 0: [5, 3, 0], 2: [5, 3, 2], 1: [5, 3, 0, 1]}
1959    >>> hops = shortestHopPaths(e)
1960    >>> for src in hops:
1961    ...     print(f"{src} -> {hops[src]}")
1962    (0, 1) -> []
1963    (0, 2) -> []
1964    (0, 3) -> []
1965    (0, 5) -> [3]
1966    (1, 2) -> [0]
1967    (1, 3) -> [0]
1968    (1, 5) -> [0, 3]
1969    (2, 3) -> []
1970    (2, 5) -> [3]
1971    (3, 5) -> []
1972    """
1973    graph = exploration[-1].graph
1974    allIDs = sorted(graph)
1975    connections = graph.connections(edgeFilter)
1976    shortest = dict(nx.all_pairs_shortest_path(connections))
1977
1978    result = {}
1979    for i, src in enumerate(allIDs):
1980        for j in range(i + 1, len(allIDs)):
1981            dst = allIDs[j]
1982            path = shortest.get(src, {}).get(dst, None)
1983            if path is not None:
1984                result[(src, dst)] = path[1:-1]
1985
1986    return result

Creates a dictionary that holds shortest paths between pairs of nodes, ignoring edge directions and requirements entirely.

If given an edgeFilter, that function is applied with source ID, transition name, destination ID, and full graph as arguments and edges for which it returns False are ignored when computing hops. Note that you have to filter out all edges in both directions between two nodes for there not to be a 1-hop path between them.

Keys in the dictionary are pairs of decision IDs, where the decision with the smaller ID always comes first (because shortest hop paths are symmetric so we don't store the reverse paths). Values are lists of decision IDs that can be traversed to get from the first decision to the second, with an empty list indicating adjacent decisions (note that these "hop paths" cannot always be traversed in the actual graph because they may go the "wrong way" across one-way connections). The number of hops required to get between the nodes is one more than the length of the path. Decision pairs which are not reachable from each other will not be included in the dictionary. Only decisions present in the final graph in the exploration will be included, and only edges present in that final graph will be considered.

Where there are multiple shortest hop paths, an arbitrary one is included in the result.

TODO: EXAMPLE

>>> e = core.DiscreteExploration.example()
>>> print(e[-1].graph.namesListing(e[-1].graph))
  0 (House)
  1 (_u.0)
  2 (Cellar)
  3 (Yard)
  5 (Lane)
<BLANKLINE>
>>> shortest = dict(nx.all_pairs_shortest_path(e[-1].graph.connections()))
>>> for src in shortest:
...    print(f"{src} -> {shortest[src]}")
0 -> {0: [0], 1: [0, 1], 2: [0, 2], 3: [0, 3], 5: [0, 3, 5]}
1 -> {1: [1], 0: [1, 0], 2: [1, 0, 2], 3: [1, 0, 3], 5: [1, 0, 3, 5]}
2 -> {2: [2], 0: [2, 0], 3: [2, 3], 1: [2, 0, 1], 5: [2, 3, 5]}
3 -> {3: [3], 0: [3, 0], 2: [3, 2], 5: [3, 5], 1: [3, 0, 1]}
5 -> {5: [5], 3: [5, 3], 0: [5, 3, 0], 2: [5, 3, 2], 1: [5, 3, 0, 1]}
>>> hops = shortestHopPaths(e)
>>> for src in hops:
...     print(f"{src} -> {hops[src]}")
(0, 1) -> []
(0, 2) -> []
(0, 3) -> []
(0, 5) -> [3]
(1, 2) -> [0]
(1, 3) -> [0]
(1, 5) -> [0, 3]
(2, 3) -> []
(2, 5) -> [3]
(3, 5) -> []
def runFullAnalysis( exploration: exploration.core.DiscreteExploration, elide: Collection[str] = {'hasBeenObserved', 'shortestHopPaths'}, finalOnly: Collection[str] = {'meanActions', 'actionCount', 'hasBeenVisited', 'actions', 'meanBranches', 'totalActions', 'branches', 'medianActions', 'totalBranches', 'medianBranches'}) -> FullAnalysisResults:
1993def runFullAnalysis(
1994    exploration: core.DiscreteExploration, 
1995    elide: Collection[str] = ELIDE,
1996    finalOnly: Collection[str] = FINAL_ONLY
1997) -> FullAnalysisResults:
1998    """
1999    Runs every single analysis function on every valid target for that
2000    function in the given exploration, building up the cache of
2001    `FullAnalysisResults` in `ALL_ANALYZERS`. Returns the relevant
2002    `FullAnalysisResults` object.
2003
2004    Skips analyzers in the provided `elide` collection, which by default
2005    is the `ELIDE` global set containing functions explicitly decorated
2006    with `elide`. Analyzers in the `FINAL_ONLY` set are only applied to
2007    the final decision graph in the exploration (although note that they
2008    might call other analyzers which recursively need to analyze prior
2009    steps). `finalOnly` only has an effect for analyzers with 'step',
2010    'stepDecision', or 'stepTransition' units.
2011    """
2012    for aName, analyzer in ALL_ANALYZERS.items():
2013        # Skip this one if we're told to
2014        if aName in elide:
2015            continue
2016        # Split out cases for each unit & apply as appropriate
2017        unit = analyzer._unit
2018        if unit == 'step':
2019            sa = cast(StepAnalyzer, analyzer)
2020            if aName in finalOnly:
2021                sa(exploration, len(exploration) - 1)
2022            else:
2023                for step in range(len(exploration)):
2024                    sa(exploration, step)
2025        elif unit == 'stepDecision':
2026            sda = cast(StepDecisionAnalyzer, analyzer)
2027            # Only apply to final graph if it's in finalOnly
2028            if aName in finalOnly:
2029                step = len(exploration) - 1
2030                for dID in exploration[step].graph:
2031                    sda(exploration, step, dID)
2032            else:
2033                for step in range(len(exploration)):
2034                    for dID in exploration[step].graph:
2035                        sda(exploration, step, dID)
2036        elif unit == 'stepTransition':
2037            sta = cast(StepTransitionAnalyzer, analyzer)
2038            if aName in finalOnly:
2039                step = len(exploration) - 1
2040                edges = exploration[step].graph.allEdges()
2041                for (src, dst, transition) in edges:
2042                    sta(exploration, step, src, transition, dst)
2043            else:
2044                for step in range(len(exploration)):
2045                    edges = exploration[step].graph.allEdges()
2046                    for (src, dst, transition) in edges:
2047                        sta(exploration, step, src, transition, dst)
2048        elif unit == 'decision':
2049            da = cast(DecisionAnalyzer, analyzer)
2050            for dID in exploration.allDecisions():
2051                da(exploration, dID)
2052        elif unit == 'transition':
2053            ta = cast(TransitionAnalyzer, analyzer)
2054            for (src, trans, dst) in exploration.allTransitions():
2055                ta(exploration, src, trans, dst)
2056        elif unit == 'exploration':
2057            ea = cast(ExplorationAnalyzer, analyzer)
2058            ea(exploration)
2059        else:
2060            raise ValueError(f"Invalid analysis unit {unit!r}.")
2061
2062    return ANALYSIS_RESULTS[id(exploration)]

Runs every single analysis function on every valid target for that function in the given exploration, building up the cache of FullAnalysisResults in ALL_ANALYZERS. Returns the relevant FullAnalysisResults object.

Skips analyzers in the provided elide collection, which by default is the ELIDE global set containing functions explicitly decorated with elide. Analyzers in the FINAL_ONLY set are only applied to the final decision graph in the exploration (although note that they might call other analyzers which recursively need to analyze prior steps). finalOnly only has an effect for analyzers with 'step', 'stepDecision', or 'stepTransition' units.

def getDecisionAnalyses( exploration: exploration.core.DiscreteExploration, dID: int) -> Dict[str, Any]:
2073def getDecisionAnalyses(
2074    exploration: core.DiscreteExploration,
2075    dID: base.DecisionID
2076) -> AnalysisResults:
2077    """
2078    Retrieves all pre-computed all-step analysis results for the
2079    specified decision. Use `runFullAnalysis` or call specific analysis
2080    functions of interest first to populate these results. Does not
2081    include per-step decision analyses.
2082
2083    Returns the dictionary of `AnalysisResults`, which can be modified to
2084    update stored results if necessary (although it's better to write
2085    additional analysis routines using the `@analyzer` decorator).
2086    """
2087    cached = ANALYSIS_RESULTS.setdefault(
2088        id(exploration),
2089        newFullAnalysisResults()
2090    )
2091    return cached["perDecision"].setdefault(dID, {})

Retrieves all pre-computed all-step analysis results for the specified decision. Use runFullAnalysis or call specific analysis functions of interest first to populate these results. Does not include per-step decision analyses.

Returns the dictionary of AnalysisResults, which can be modified to update stored results if necessary (although it's better to write additional analysis routines using the @analyzer decorator).

def getTransitionAnalyses( exploration: exploration.core.DiscreteExploration, source: int, transition: str, destination: int) -> Dict[str, Any]:
2094def getTransitionAnalyses(
2095    exploration: core.DiscreteExploration,
2096    source: base.DecisionID,
2097    transition: base.Transition,
2098    destination: base.DecisionID
2099) -> AnalysisResults:
2100    """
2101    Like `getDecisionAnalyses` but returns analyses for a transition
2102    instead of a decision.
2103    """
2104    cached = ANALYSIS_RESULTS.setdefault(
2105        id(exploration),
2106        newFullAnalysisResults()
2107    )
2108    return cached["perTransition"].setdefault(
2109        (source, transition, destination),
2110        {}
2111    )

Like getDecisionAnalyses but returns analyses for a transition instead of a decision.

def getStepDecisionAnalyses( exploration: exploration.core.DiscreteExploration, step: int, dID: int) -> Dict[str, Any]:
2114def getStepDecisionAnalyses(
2115    exploration: core.DiscreteExploration,
2116    step: int,
2117    dID: base.DecisionID
2118) -> AnalysisResults:
2119    """
2120    Like `getDecisionAnalyses` but for analyses applicable only to the
2121    specified exploration step.
2122    """
2123    cached = ANALYSIS_RESULTS.setdefault(
2124        id(exploration),
2125        newFullAnalysisResults()
2126    )
2127    stepwise = cached.setdefault("perStepDecision", [])
2128    while step >= len(stepwise):
2129        stepwise.append({})
2130    return stepwise[step].setdefault(dID, {})

Like getDecisionAnalyses but for analyses applicable only to the specified exploration step.

def getStepTransitionAnalyses( exploration: exploration.core.DiscreteExploration, step: int, source: int, transition: str, destination: int) -> Dict[str, Any]:
2133def getStepTransitionAnalyses(
2134    exploration: core.DiscreteExploration,
2135    step: int,
2136    source: base.DecisionID,
2137    transition: base.Transition,
2138    destination: base.DecisionID
2139) -> AnalysisResults:
2140    """
2141    Like `getStepDecisionAnalyses` but for a transition at a particular
2142    step, not a decision.
2143    """
2144    cached = ANALYSIS_RESULTS.setdefault(
2145        id(exploration),
2146        newFullAnalysisResults()
2147    )
2148    stepwise = cached.setdefault("perStepTransition", [])
2149    while step >= len(stepwise):
2150        stepwise.append({})
2151    return stepwise[step].setdefault((source, transition, destination), {})

Like getStepDecisionAnalyses but for a transition at a particular step, not a decision.

def getStepAnalyses( exploration: exploration.core.DiscreteExploration, step: int) -> Dict[str, Any]:
2154def getStepAnalyses(
2155    exploration: core.DiscreteExploration,
2156    step: int
2157) -> AnalysisResults:
2158    """
2159    Like `getDecisionAnalyses` but retrieves full-step analysis results
2160    for the specified exploration step.
2161    """
2162    cached = ANALYSIS_RESULTS.setdefault(
2163        id(exploration),
2164        newFullAnalysisResults()
2165    )
2166    stepwise = cached.setdefault("perStep", [])
2167    while step >= len(stepwise):
2168        stepwise.append({})
2169    return stepwise[step]

Like getDecisionAnalyses but retrieves full-step analysis results for the specified exploration step.

def getExplorationAnalyses(exploration: exploration.core.DiscreteExploration) -> Dict[str, Any]:
2172def getExplorationAnalyses(
2173    exploration: core.DiscreteExploration
2174) -> AnalysisResults:
2175    """
2176    Like `getDecisionAnalyses` but retrieves full-exploration analysis
2177    results.
2178    """
2179    cached = ANALYSIS_RESULTS.setdefault(
2180        id(exploration),
2181        newFullAnalysisResults()
2182    )
2183    return cached.setdefault("overall", {})

Like getDecisionAnalyses but retrieves full-exploration analysis results.

class AnalyzersByUnit(typing.TypedDict):
2186class AnalyzersByUnit(TypedDict):
2187    """
2188    Holds lists of analyzers for each analysis unit type.
2189    """
2190    step: List[StepAnalyzer]
2191    stepDecision: List[StepDecisionAnalyzer]
2192    stepTransition: List[StepTransitionAnalyzer]
2193    decision: List[DecisionAnalyzer]
2194    transition: List[TransitionAnalyzer]
2195    exploration: List[ExplorationAnalyzer]

Holds lists of analyzers for each analysis unit type.

step: List[AnalysisFunction[(<class 'int'>,)]]
stepDecision: List[AnalysisFunction[(<class 'int'>, <class 'int'>)]]
stepTransition: List[AnalysisFunction[(<class 'int'>, <class 'int'>, <class 'str'>, <class 'int'>)]]
decision: List[AnalysisFunction[(<class 'int'>,)]]
transition: List[AnalysisFunction[(<class 'int'>, <class 'str'>, <class 'int'>)]]
exploration: List[AnalysisFunction[()]]
def analyzersByUnit( onlyInclude: Optional[Set[str]] = None) -> AnalyzersByUnit:
2198def analyzersByUnit(onlyInclude: Optional[Set[str]] = None) -> AnalyzersByUnit:
2199    """
2200    Returns an `AnalyzersByUnit` dictionary containing all analyzers
2201    from `ALL_ANALYZERS` which are in the given `onlyInclude` set (or
2202    just all of them if no set is specified). This will by default be all
2203    analyzers registered so far.
2204    """
2205    byUnit: AnalyzersByUnit = {
2206        "step": [],
2207        "stepDecision": [],
2208        "stepTransition": [],
2209        "decision": [],
2210        "transition": [],
2211        "exploration": []
2212    }
2213    for analyzerName in ALL_ANALYZERS:
2214        if onlyInclude is not None and analyzerName not in onlyInclude:
2215            continue
2216        analyzer = ALL_ANALYZERS[analyzerName]
2217        unit = analyzer._unit
2218        byUnit[unit].append(analyzer)  # type: ignore
2219        # Mypy will just have to trust that We've put the correct unit
2220        # values on each analyzer. That relationship is type-checked in
2221        # the `analyzer` definition.
2222
2223    return byUnit

Returns an AnalyzersByUnit dictionary containing all analyzers from ALL_ANALYZERS which are in the given onlyInclude set (or just all of them if no set is specified). This will by default be all analyzers registered so far.