123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- from __future__ import annotations
- import collections
- import inspect
- from typing import Any, Iterator
- from twisted.python.modules import PythonAttribute, PythonModule, getModule
- from automat import MethodicalMachine
- from ._typed import TypeMachine, InputProtocol, Core
- def isOriginalLocation(attr: PythonAttribute | PythonModule) -> bool:
- """
- Attempt to discover if this appearance of a PythonAttribute
- representing a class refers to the module where that class was
- defined.
- """
- sourceModule = inspect.getmodule(attr.load())
- if sourceModule is None:
- return False
- currentModule = attr
- while not isinstance(currentModule, PythonModule):
- currentModule = currentModule.onObject
- return currentModule.name == sourceModule.__name__
- def findMachinesViaWrapper(
- within: PythonModule | PythonAttribute,
- ) -> Iterator[tuple[str, MethodicalMachine | TypeMachine[InputProtocol, Core]]]:
- """
- Recursively yield L{MethodicalMachine}s and their FQPNs within a
- L{PythonModule} or a L{twisted.python.modules.PythonAttribute}
- wrapper object.
- Note that L{PythonModule}s may refer to packages, as well.
- The discovery heuristic considers L{MethodicalMachine} instances
- that are module-level attributes or class-level attributes
- accessible from module scope. Machines inside nested classes will
- be discovered, but those returned from functions or methods will not be.
- @type within: L{PythonModule} or L{twisted.python.modules.PythonAttribute}
- @param within: Where to start the search.
- @return: a generator which yields FQPN, L{MethodicalMachine} pairs.
- """
- queue = collections.deque([within])
- visited: set[
- PythonModule
- | PythonAttribute
- | MethodicalMachine
- | TypeMachine[InputProtocol, Core]
- | type[Any]
- ] = set()
- while queue:
- attr = queue.pop()
- value = attr.load()
- if (
- isinstance(value, MethodicalMachine) or isinstance(value, TypeMachine)
- ) and value not in visited:
- visited.add(value)
- yield attr.name, value
- elif (
- inspect.isclass(value) and isOriginalLocation(attr) and value not in visited
- ):
- visited.add(value)
- queue.extendleft(attr.iterAttributes())
- elif isinstance(attr, PythonModule) and value not in visited:
- visited.add(value)
- queue.extendleft(attr.iterAttributes())
- queue.extendleft(attr.iterModules())
- class InvalidFQPN(Exception):
- """
- The given FQPN was not a dot-separated list of Python objects.
- """
- class NoModule(InvalidFQPN):
- """
- A prefix of the FQPN was not an importable module or package.
- """
- class NoObject(InvalidFQPN):
- """
- A suffix of the FQPN was not an accessible object
- """
- def wrapFQPN(fqpn: str) -> PythonModule | PythonAttribute:
- """
- Given an FQPN, retrieve the object via the global Python module
- namespace and wrap it with a L{PythonModule} or a
- L{twisted.python.modules.PythonAttribute}.
- """
- # largely cribbed from t.p.reflect.namedAny
- if not fqpn:
- raise InvalidFQPN("FQPN was empty")
- components = collections.deque(fqpn.split("."))
- if "" in components:
- raise InvalidFQPN(
- "name must be a string giving a '.'-separated list of Python "
- "identifiers, not %r" % (fqpn,)
- )
- component = components.popleft()
- try:
- module = getModule(component)
- except KeyError:
- raise NoModule(component)
- # find the bottom-most module
- while components:
- component = components.popleft()
- try:
- module = module[component]
- except KeyError:
- components.appendleft(component)
- break
- else:
- module.load()
- else:
- return module
- # find the bottom-most attribute
- attribute = module
- for component in components:
- try:
- attribute = next(
- child
- for child in attribute.iterAttributes()
- if child.name.rsplit(".", 1)[-1] == component
- )
- except StopIteration:
- raise NoObject("{}.{}".format(attribute.name, component))
- return attribute
- def findMachines(
- fqpn: str,
- ) -> Iterator[tuple[str, MethodicalMachine | TypeMachine[InputProtocol, Core]]]:
- """
- Recursively yield L{MethodicalMachine}s and their FQPNs in and under the a
- Python object specified by an FQPN.
- The discovery heuristic considers L{MethodicalMachine} instances that are
- module-level attributes or class-level attributes accessible from module
- scope. Machines inside nested classes will be discovered, but those
- returned from functions or methods will not be.
- @param fqpn: a fully-qualified Python identifier (i.e. the dotted
- identifier of an object defined at module or class scope, including the
- package and modele names); where to start the search.
- @return: a generator which yields (C{FQPN}, L{MethodicalMachine}) pairs.
- """
- return findMachinesViaWrapper(wrapFQPN(fqpn))
|