_discover.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. from __future__ import annotations
  2. import collections
  3. import inspect
  4. from typing import Any, Iterator
  5. from twisted.python.modules import PythonAttribute, PythonModule, getModule
  6. from automat import MethodicalMachine
  7. from ._typed import TypeMachine, InputProtocol, Core
  8. def isOriginalLocation(attr: PythonAttribute | PythonModule) -> bool:
  9. """
  10. Attempt to discover if this appearance of a PythonAttribute
  11. representing a class refers to the module where that class was
  12. defined.
  13. """
  14. sourceModule = inspect.getmodule(attr.load())
  15. if sourceModule is None:
  16. return False
  17. currentModule = attr
  18. while not isinstance(currentModule, PythonModule):
  19. currentModule = currentModule.onObject
  20. return currentModule.name == sourceModule.__name__
  21. def findMachinesViaWrapper(
  22. within: PythonModule | PythonAttribute,
  23. ) -> Iterator[tuple[str, MethodicalMachine | TypeMachine[InputProtocol, Core]]]:
  24. """
  25. Recursively yield L{MethodicalMachine}s and their FQPNs within a
  26. L{PythonModule} or a L{twisted.python.modules.PythonAttribute}
  27. wrapper object.
  28. Note that L{PythonModule}s may refer to packages, as well.
  29. The discovery heuristic considers L{MethodicalMachine} instances
  30. that are module-level attributes or class-level attributes
  31. accessible from module scope. Machines inside nested classes will
  32. be discovered, but those returned from functions or methods will not be.
  33. @type within: L{PythonModule} or L{twisted.python.modules.PythonAttribute}
  34. @param within: Where to start the search.
  35. @return: a generator which yields FQPN, L{MethodicalMachine} pairs.
  36. """
  37. queue = collections.deque([within])
  38. visited: set[
  39. PythonModule
  40. | PythonAttribute
  41. | MethodicalMachine
  42. | TypeMachine[InputProtocol, Core]
  43. | type[Any]
  44. ] = set()
  45. while queue:
  46. attr = queue.pop()
  47. value = attr.load()
  48. if (
  49. isinstance(value, MethodicalMachine) or isinstance(value, TypeMachine)
  50. ) and value not in visited:
  51. visited.add(value)
  52. yield attr.name, value
  53. elif (
  54. inspect.isclass(value) and isOriginalLocation(attr) and value not in visited
  55. ):
  56. visited.add(value)
  57. queue.extendleft(attr.iterAttributes())
  58. elif isinstance(attr, PythonModule) and value not in visited:
  59. visited.add(value)
  60. queue.extendleft(attr.iterAttributes())
  61. queue.extendleft(attr.iterModules())
  62. class InvalidFQPN(Exception):
  63. """
  64. The given FQPN was not a dot-separated list of Python objects.
  65. """
  66. class NoModule(InvalidFQPN):
  67. """
  68. A prefix of the FQPN was not an importable module or package.
  69. """
  70. class NoObject(InvalidFQPN):
  71. """
  72. A suffix of the FQPN was not an accessible object
  73. """
  74. def wrapFQPN(fqpn: str) -> PythonModule | PythonAttribute:
  75. """
  76. Given an FQPN, retrieve the object via the global Python module
  77. namespace and wrap it with a L{PythonModule} or a
  78. L{twisted.python.modules.PythonAttribute}.
  79. """
  80. # largely cribbed from t.p.reflect.namedAny
  81. if not fqpn:
  82. raise InvalidFQPN("FQPN was empty")
  83. components = collections.deque(fqpn.split("."))
  84. if "" in components:
  85. raise InvalidFQPN(
  86. "name must be a string giving a '.'-separated list of Python "
  87. "identifiers, not %r" % (fqpn,)
  88. )
  89. component = components.popleft()
  90. try:
  91. module = getModule(component)
  92. except KeyError:
  93. raise NoModule(component)
  94. # find the bottom-most module
  95. while components:
  96. component = components.popleft()
  97. try:
  98. module = module[component]
  99. except KeyError:
  100. components.appendleft(component)
  101. break
  102. else:
  103. module.load()
  104. else:
  105. return module
  106. # find the bottom-most attribute
  107. attribute = module
  108. for component in components:
  109. try:
  110. attribute = next(
  111. child
  112. for child in attribute.iterAttributes()
  113. if child.name.rsplit(".", 1)[-1] == component
  114. )
  115. except StopIteration:
  116. raise NoObject("{}.{}".format(attribute.name, component))
  117. return attribute
  118. def findMachines(
  119. fqpn: str,
  120. ) -> Iterator[tuple[str, MethodicalMachine | TypeMachine[InputProtocol, Core]]]:
  121. """
  122. Recursively yield L{MethodicalMachine}s and their FQPNs in and under the a
  123. Python object specified by an FQPN.
  124. The discovery heuristic considers L{MethodicalMachine} instances that are
  125. module-level attributes or class-level attributes accessible from module
  126. scope. Machines inside nested classes will be discovered, but those
  127. returned from functions or methods will not be.
  128. @param fqpn: a fully-qualified Python identifier (i.e. the dotted
  129. identifier of an object defined at module or class scope, including the
  130. package and modele names); where to start the search.
  131. @return: a generator which yields (C{FQPN}, L{MethodicalMachine}) pairs.
  132. """
  133. return findMachinesViaWrapper(wrapFQPN(fqpn))