utility.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. # -*- test-case-name: twisted.words.test.test_xishutil -*-
  2. #
  3. # Copyright (c) Twisted Matrix Laboratories.
  4. # See LICENSE for details.
  5. """
  6. Event Dispatching and Callback utilities.
  7. """
  8. from twisted.python import log
  9. from twisted.words.xish import xpath
  10. class _MethodWrapper:
  11. """
  12. Internal class for tracking method calls.
  13. """
  14. def __init__(self, method, *args, **kwargs):
  15. self.method = method
  16. self.args = args
  17. self.kwargs = kwargs
  18. def __call__(self, *args, **kwargs):
  19. nargs = self.args + args
  20. nkwargs = self.kwargs.copy()
  21. nkwargs.update(kwargs)
  22. self.method(*nargs, **nkwargs)
  23. class CallbackList:
  24. """
  25. Container for callbacks.
  26. Event queries are linked to lists of callables. When a matching event
  27. occurs, these callables are called in sequence. One-time callbacks
  28. are removed from the list after the first time the event was triggered.
  29. Arguments to callbacks are split spread across two sets. The first set,
  30. callback specific, is passed to C{addCallback} and is used for all
  31. subsequent event triggers. The second set is passed to C{callback} and is
  32. event specific. Positional arguments in the second set come after the
  33. positional arguments of the first set. Keyword arguments in the second set
  34. override those in the first set.
  35. @ivar callbacks: The registered callbacks as mapping from the callable to a
  36. tuple of a wrapper for that callable that keeps the
  37. callback specific arguments and a boolean that signifies
  38. if it is to be called only once.
  39. @type callbacks: C{dict}
  40. """
  41. def __init__(self):
  42. self.callbacks = {}
  43. def addCallback(self, onetime, method, *args, **kwargs):
  44. """
  45. Add callback.
  46. The arguments passed are used as callback specific arguments.
  47. @param onetime: If C{True}, this callback is called at most once.
  48. @type onetime: C{bool}
  49. @param method: The callback callable to be added.
  50. @param args: Positional arguments to the callable.
  51. @type args: C{list}
  52. @param kwargs: Keyword arguments to the callable.
  53. @type kwargs: C{dict}
  54. """
  55. if method not in self.callbacks:
  56. self.callbacks[method] = (_MethodWrapper(method, *args, **kwargs), onetime)
  57. def removeCallback(self, method):
  58. """
  59. Remove callback.
  60. @param method: The callable to be removed.
  61. """
  62. if method in self.callbacks:
  63. del self.callbacks[method]
  64. def callback(self, *args, **kwargs):
  65. """
  66. Call all registered callbacks.
  67. The passed arguments are event specific and augment and override
  68. the callback specific arguments as described above.
  69. @note: Exceptions raised by callbacks are trapped and logged. They will
  70. not propagate up to make sure other callbacks will still be
  71. called, and the event dispatching always succeeds.
  72. @param args: Positional arguments to the callable.
  73. @type args: C{list}
  74. @param kwargs: Keyword arguments to the callable.
  75. @type kwargs: C{dict}
  76. """
  77. for key, (methodwrapper, onetime) in list(self.callbacks.items()):
  78. try:
  79. methodwrapper(*args, **kwargs)
  80. except BaseException:
  81. log.err()
  82. if onetime:
  83. del self.callbacks[key]
  84. def isEmpty(self):
  85. """
  86. Return if list of registered callbacks is empty.
  87. @rtype: C{bool}
  88. """
  89. return len(self.callbacks) == 0
  90. class EventDispatcher:
  91. """
  92. Event dispatching service.
  93. The C{EventDispatcher} allows observers to be registered for certain events
  94. that are dispatched. There are two types of events: XPath events and Named
  95. events.
  96. Every dispatch is triggered by calling L{dispatch} with a data object and,
  97. for named events, the name of the event.
  98. When an XPath type event is dispatched, the associated object is assumed to
  99. be an L{Element<twisted.words.xish.domish.Element>} instance, which is
  100. matched against all registered XPath queries. For every match, the
  101. respective observer will be called with the data object.
  102. A named event will simply call each registered observer for that particular
  103. event name, with the data object. Unlike XPath type events, the data object
  104. is not restricted to L{Element<twisted.words.xish.domish.Element>}, but can
  105. be anything.
  106. When registering observers, the event that is to be observed is specified
  107. using an L{xpath.XPathQuery} instance or a string. In the latter case, the
  108. string can also contain the string representation of an XPath expression.
  109. To distinguish these from named events, each named event should start with
  110. a special prefix that is stored in C{self.prefix}. It defaults to
  111. C{//event/}.
  112. Observers registered using L{addObserver} are persistent: after the
  113. observer has been triggered by a dispatch, it remains registered for a
  114. possible next dispatch. If instead L{addOnetimeObserver} was used to
  115. observe an event, the observer is removed from the list of observers after
  116. the first observed event.
  117. Observers can also be prioritized, by providing an optional C{priority}
  118. parameter to the L{addObserver} and L{addOnetimeObserver} methods. Higher
  119. priority observers are then called before lower priority observers.
  120. Finally, observers can be unregistered by using L{removeObserver}.
  121. """
  122. def __init__(self, eventprefix="//event/"):
  123. self.prefix = eventprefix
  124. self._eventObservers = {}
  125. self._xpathObservers = {}
  126. self._dispatchDepth = 0 # Flag indicating levels of dispatching
  127. # in progress
  128. self._updateQueue = [] # Queued updates for observer ops
  129. def _getEventAndObservers(self, event):
  130. if isinstance(event, xpath.XPathQuery):
  131. # Treat as xpath
  132. observers = self._xpathObservers
  133. else:
  134. if self.prefix == event[: len(self.prefix)]:
  135. # Treat as event
  136. observers = self._eventObservers
  137. else:
  138. # Treat as xpath
  139. event = xpath.internQuery(event)
  140. observers = self._xpathObservers
  141. return event, observers
  142. def addOnetimeObserver(self, event, observerfn, priority=0, *args, **kwargs):
  143. """
  144. Register a one-time observer for an event.
  145. Like L{addObserver}, but is only triggered at most once. See there
  146. for a description of the parameters.
  147. """
  148. self._addObserver(True, event, observerfn, priority, *args, **kwargs)
  149. def addObserver(self, event, observerfn, priority=0, *args, **kwargs):
  150. """
  151. Register an observer for an event.
  152. Each observer will be registered with a certain priority. Higher
  153. priority observers get called before lower priority observers.
  154. @param event: Name or XPath query for the event to be monitored.
  155. @type event: C{str} or L{xpath.XPathQuery}.
  156. @param observerfn: Function to be called when the specified event
  157. has been triggered. This callable takes
  158. one parameter: the data object that triggered
  159. the event. When specified, the C{*args} and
  160. C{**kwargs} parameters to addObserver are being used
  161. as additional parameters to the registered observer
  162. callable.
  163. @param priority: (Optional) priority of this observer in relation to
  164. other observer that match the same event. Defaults to
  165. C{0}.
  166. @type priority: C{int}
  167. """
  168. self._addObserver(False, event, observerfn, priority, *args, **kwargs)
  169. def _addObserver(self, onetime, event, observerfn, priority, *args, **kwargs):
  170. # If this is happening in the middle of the dispatch, queue
  171. # it up for processing after the dispatch completes
  172. if self._dispatchDepth > 0:
  173. self._updateQueue.append(
  174. lambda: self._addObserver(
  175. onetime, event, observerfn, priority, *args, **kwargs
  176. )
  177. )
  178. return
  179. event, observers = self._getEventAndObservers(event)
  180. if priority not in observers:
  181. cbl = CallbackList()
  182. observers[priority] = {event: cbl}
  183. else:
  184. priorityObservers = observers[priority]
  185. if event not in priorityObservers:
  186. cbl = CallbackList()
  187. observers[priority][event] = cbl
  188. else:
  189. cbl = priorityObservers[event]
  190. cbl.addCallback(onetime, observerfn, *args, **kwargs)
  191. def removeObserver(self, event, observerfn):
  192. """
  193. Remove callable as observer for an event.
  194. The observer callable is removed for all priority levels for the
  195. specified event.
  196. @param event: Event for which the observer callable was registered.
  197. @type event: C{str} or L{xpath.XPathQuery}
  198. @param observerfn: Observer callable to be unregistered.
  199. """
  200. # If this is happening in the middle of the dispatch, queue
  201. # it up for processing after the dispatch completes
  202. if self._dispatchDepth > 0:
  203. self._updateQueue.append(lambda: self.removeObserver(event, observerfn))
  204. return
  205. event, observers = self._getEventAndObservers(event)
  206. emptyLists = []
  207. for priority, priorityObservers in observers.items():
  208. for query, callbacklist in priorityObservers.items():
  209. if event == query:
  210. callbacklist.removeCallback(observerfn)
  211. if callbacklist.isEmpty():
  212. emptyLists.append((priority, query))
  213. for priority, query in emptyLists:
  214. del observers[priority][query]
  215. def dispatch(self, obj, event=None):
  216. """
  217. Dispatch an event.
  218. When C{event} is L{None}, an XPath type event is triggered, and
  219. C{obj} is assumed to be an instance of
  220. L{Element<twisted.words.xish.domish.Element>}. Otherwise, C{event}
  221. holds the name of the named event being triggered. In the latter case,
  222. C{obj} can be anything.
  223. @param obj: The object to be dispatched.
  224. @param event: Optional event name.
  225. @type event: C{str}
  226. """
  227. foundTarget = False
  228. self._dispatchDepth += 1
  229. if event != None:
  230. # Named event
  231. observers = self._eventObservers
  232. match = lambda query, obj: query == event
  233. else:
  234. # XPath event
  235. observers = self._xpathObservers
  236. match = lambda query, obj: query.matches(obj)
  237. priorities = list(observers.keys())
  238. priorities.sort()
  239. priorities.reverse()
  240. emptyLists = []
  241. for priority in priorities:
  242. for query, callbacklist in observers[priority].items():
  243. if match(query, obj):
  244. callbacklist.callback(obj)
  245. foundTarget = True
  246. if callbacklist.isEmpty():
  247. emptyLists.append((priority, query))
  248. for priority, query in emptyLists:
  249. del observers[priority][query]
  250. self._dispatchDepth -= 1
  251. # If this is a dispatch within a dispatch, don't
  252. # do anything with the updateQueue -- it needs to
  253. # wait until we've back all the way out of the stack
  254. if self._dispatchDepth == 0:
  255. # Deal with pending update operations
  256. for f in self._updateQueue:
  257. f()
  258. self._updateQueue = []
  259. return foundTarget
  260. class XmlPipe:
  261. """
  262. XML stream pipe.
  263. Connects two objects that communicate stanzas through an XML stream like
  264. interface. Each of the ends of the pipe (sink and source) can be used to
  265. send XML stanzas to the other side, or add observers to process XML stanzas
  266. that were sent from the other side.
  267. XML pipes are usually used in place of regular XML streams that are
  268. transported over TCP. This is the reason for the use of the names source
  269. and sink for both ends of the pipe. The source side corresponds with the
  270. entity that initiated the TCP connection, whereas the sink corresponds with
  271. the entity that accepts that connection. In this object, though, the source
  272. and sink are treated equally.
  273. Unlike Jabber
  274. L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>}s, the sink
  275. and source objects are assumed to represent an eternal connected and
  276. initialized XML stream. As such, events corresponding to connection,
  277. disconnection, initialization and stream errors are not dispatched or
  278. processed.
  279. @since: 8.2
  280. @ivar source: Source XML stream.
  281. @ivar sink: Sink XML stream.
  282. """
  283. def __init__(self):
  284. self.source = EventDispatcher()
  285. self.sink = EventDispatcher()
  286. self.source.send = lambda obj: self.sink.dispatch(obj)
  287. self.sink.send = lambda obj: self.source.dispatch(obj)