utility.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. # -*- test-case-name: twisted.words.test.test_xishutil -*-
  2. #
  3. # Copyright (c) Twisted Matrix Laboratories.
  4. # See LICENSE for details.
  5. """
  6. Event Dispatching and Callback utilities.
  7. """
  8. from __future__ import absolute_import, division
  9. from twisted.python import log
  10. from twisted.python.compat import iteritems
  11. from twisted.words.xish import xpath
  12. class _MethodWrapper(object):
  13. """
  14. Internal class for tracking method calls.
  15. """
  16. def __init__(self, method, *args, **kwargs):
  17. self.method = method
  18. self.args = args
  19. self.kwargs = kwargs
  20. def __call__(self, *args, **kwargs):
  21. nargs = self.args + args
  22. nkwargs = self.kwargs.copy()
  23. nkwargs.update(kwargs)
  24. self.method(*nargs, **nkwargs)
  25. class CallbackList:
  26. """
  27. Container for callbacks.
  28. Event queries are linked to lists of callables. When a matching event
  29. occurs, these callables are called in sequence. One-time callbacks
  30. are removed from the list after the first time the event was triggered.
  31. Arguments to callbacks are split spread across two sets. The first set,
  32. callback specific, is passed to C{addCallback} and is used for all
  33. subsequent event triggers. The second set is passed to C{callback} and is
  34. event specific. Positional arguments in the second set come after the
  35. positional arguments of the first set. Keyword arguments in the second set
  36. override those in the first set.
  37. @ivar callbacks: The registered callbacks as mapping from the callable to a
  38. tuple of a wrapper for that callable that keeps the
  39. callback specific arguments and a boolean that signifies
  40. if it is to be called only once.
  41. @type callbacks: C{dict}
  42. """
  43. def __init__(self):
  44. self.callbacks = {}
  45. def addCallback(self, onetime, method, *args, **kwargs):
  46. """
  47. Add callback.
  48. The arguments passed are used as callback specific arguments.
  49. @param onetime: If C{True}, this callback is called at most once.
  50. @type onetime: C{bool}
  51. @param method: The callback callable to be added.
  52. @param args: Positional arguments to the callable.
  53. @type args: C{list}
  54. @param kwargs: Keyword arguments to the callable.
  55. @type kwargs: C{dict}
  56. """
  57. if not method in self.callbacks:
  58. self.callbacks[method] = (_MethodWrapper(method, *args, **kwargs),
  59. onetime)
  60. def removeCallback(self, method):
  61. """
  62. Remove callback.
  63. @param method: The callable to be removed.
  64. """
  65. if method in self.callbacks:
  66. del self.callbacks[method]
  67. def callback(self, *args, **kwargs):
  68. """
  69. Call all registered callbacks.
  70. The passed arguments are event specific and augment and override
  71. the callback specific arguments as described above.
  72. @note: Exceptions raised by callbacks are trapped and logged. They will
  73. not propagate up to make sure other callbacks will still be
  74. called, and the event dispatching always succeeds.
  75. @param args: Positional arguments to the callable.
  76. @type args: C{list}
  77. @param kwargs: Keyword arguments to the callable.
  78. @type kwargs: C{dict}
  79. """
  80. for key, (methodwrapper, onetime) in list(self.callbacks.items()):
  81. try:
  82. methodwrapper(*args, **kwargs)
  83. except:
  84. log.err()
  85. if onetime:
  86. del self.callbacks[key]
  87. def isEmpty(self):
  88. """
  89. Return if list of registered callbacks is empty.
  90. @rtype: C{bool}
  91. """
  92. return len(self.callbacks) == 0
  93. class EventDispatcher:
  94. """
  95. Event dispatching service.
  96. The C{EventDispatcher} allows observers to be registered for certain events
  97. that are dispatched. There are two types of events: XPath events and Named
  98. events.
  99. Every dispatch is triggered by calling L{dispatch} with a data object and,
  100. for named events, the name of the event.
  101. When an XPath type event is dispatched, the associated object is assumed to
  102. be an L{Element<twisted.words.xish.domish.Element>} instance, which is
  103. matched against all registered XPath queries. For every match, the
  104. respective observer will be called with the data object.
  105. A named event will simply call each registered observer for that particular
  106. event name, with the data object. Unlike XPath type events, the data object
  107. is not restricted to L{Element<twisted.words.xish.domish.Element>}, but can
  108. be anything.
  109. When registering observers, the event that is to be observed is specified
  110. using an L{xpath.XPathQuery} instance or a string. In the latter case, the
  111. string can also contain the string representation of an XPath expression.
  112. To distinguish these from named events, each named event should start with
  113. a special prefix that is stored in C{self.prefix}. It defaults to
  114. C{//event/}.
  115. Observers registered using L{addObserver} are persistent: after the
  116. observer has been triggered by a dispatch, it remains registered for a
  117. possible next dispatch. If instead L{addOnetimeObserver} was used to
  118. observe an event, the observer is removed from the list of observers after
  119. the first observed event.
  120. Observers can also be prioritized, by providing an optional C{priority}
  121. parameter to the L{addObserver} and L{addOnetimeObserver} methods. Higher
  122. priority observers are then called before lower priority observers.
  123. Finally, observers can be unregistered by using L{removeObserver}.
  124. """
  125. def __init__(self, eventprefix="//event/"):
  126. self.prefix = eventprefix
  127. self._eventObservers = {}
  128. self._xpathObservers = {}
  129. self._dispatchDepth = 0 # Flag indicating levels of dispatching
  130. # in progress
  131. self._updateQueue = [] # Queued updates for observer ops
  132. def _getEventAndObservers(self, event):
  133. if isinstance(event, xpath.XPathQuery):
  134. # Treat as xpath
  135. observers = self._xpathObservers
  136. else:
  137. if self.prefix == event[:len(self.prefix)]:
  138. # Treat as event
  139. observers = self._eventObservers
  140. else:
  141. # Treat as xpath
  142. event = xpath.internQuery(event)
  143. observers = self._xpathObservers
  144. return event, observers
  145. def addOnetimeObserver(self, event, observerfn, priority=0, *args, **kwargs):
  146. """
  147. Register a one-time observer for an event.
  148. Like L{addObserver}, but is only triggered at most once. See there
  149. for a description of the parameters.
  150. """
  151. self._addObserver(True, event, observerfn, priority, *args, **kwargs)
  152. def addObserver(self, event, observerfn, priority=0, *args, **kwargs):
  153. """
  154. Register an observer for an event.
  155. Each observer will be registered with a certain priority. Higher
  156. priority observers get called before lower priority observers.
  157. @param event: Name or XPath query for the event to be monitored.
  158. @type event: C{str} or L{xpath.XPathQuery}.
  159. @param observerfn: Function to be called when the specified event
  160. has been triggered. This callable takes
  161. one parameter: the data object that triggered
  162. the event. When specified, the C{*args} and
  163. C{**kwargs} parameters to addObserver are being used
  164. as additional parameters to the registered observer
  165. callable.
  166. @param priority: (Optional) priority of this observer in relation to
  167. other observer that match the same event. Defaults to
  168. C{0}.
  169. @type priority: C{int}
  170. """
  171. self._addObserver(False, event, observerfn, priority, *args, **kwargs)
  172. def _addObserver(self, onetime, event, observerfn, priority, *args, **kwargs):
  173. # If this is happening in the middle of the dispatch, queue
  174. # it up for processing after the dispatch completes
  175. if self._dispatchDepth > 0:
  176. self._updateQueue.append(lambda:self._addObserver(onetime, event, observerfn, priority, *args, **kwargs))
  177. return
  178. event, observers = self._getEventAndObservers(event)
  179. if priority not in observers:
  180. cbl = CallbackList()
  181. observers[priority] = {event: cbl}
  182. else:
  183. priorityObservers = observers[priority]
  184. if event not in priorityObservers:
  185. cbl = CallbackList()
  186. observers[priority][event] = cbl
  187. else:
  188. cbl = priorityObservers[event]
  189. cbl.addCallback(onetime, observerfn, *args, **kwargs)
  190. def removeObserver(self, event, observerfn):
  191. """
  192. Remove callable as observer for an event.
  193. The observer callable is removed for all priority levels for the
  194. specified event.
  195. @param event: Event for which the observer callable was registered.
  196. @type event: C{str} or L{xpath.XPathQuery}
  197. @param observerfn: Observer callable to be unregistered.
  198. """
  199. # If this is happening in the middle of the dispatch, queue
  200. # it up for processing after the dispatch completes
  201. if self._dispatchDepth > 0:
  202. self._updateQueue.append(lambda:self.removeObserver(event, observerfn))
  203. return
  204. event, observers = self._getEventAndObservers(event)
  205. emptyLists = []
  206. for priority, priorityObservers in iteritems(observers):
  207. for query, callbacklist in iteritems(priorityObservers):
  208. if event == query:
  209. callbacklist.removeCallback(observerfn)
  210. if callbacklist.isEmpty():
  211. emptyLists.append((priority, query))
  212. for priority, query in emptyLists:
  213. del observers[priority][query]
  214. def dispatch(self, obj, event=None):
  215. """
  216. Dispatch an event.
  217. When C{event} is L{None}, an XPath type event is triggered, and
  218. C{obj} is assumed to be an instance of
  219. L{Element<twisted.words.xish.domish.Element>}. Otherwise, C{event}
  220. holds the name of the named event being triggered. In the latter case,
  221. C{obj} can be anything.
  222. @param obj: The object to be dispatched.
  223. @param event: Optional event name.
  224. @type event: C{str}
  225. """
  226. foundTarget = False
  227. self._dispatchDepth += 1
  228. if event != None:
  229. # Named event
  230. observers = self._eventObservers
  231. match = lambda query, obj: query == event
  232. else:
  233. # XPath event
  234. observers = self._xpathObservers
  235. match = lambda query, obj: query.matches(obj)
  236. priorities = list(observers.keys())
  237. priorities.sort()
  238. priorities.reverse()
  239. emptyLists = []
  240. for priority in priorities:
  241. for query, callbacklist in iteritems(observers[priority]):
  242. if match(query, obj):
  243. callbacklist.callback(obj)
  244. foundTarget = True
  245. if callbacklist.isEmpty():
  246. emptyLists.append((priority, query))
  247. for priority, query in emptyLists:
  248. del observers[priority][query]
  249. self._dispatchDepth -= 1
  250. # If this is a dispatch within a dispatch, don't
  251. # do anything with the updateQueue -- it needs to
  252. # wait until we've back all the way out of the stack
  253. if self._dispatchDepth == 0:
  254. # Deal with pending update operations
  255. for f in self._updateQueue:
  256. f()
  257. self._updateQueue = []
  258. return foundTarget
  259. class XmlPipe(object):
  260. """
  261. XML stream pipe.
  262. Connects two objects that communicate stanzas through an XML stream like
  263. interface. Each of the ends of the pipe (sink and source) can be used to
  264. send XML stanzas to the other side, or add observers to process XML stanzas
  265. that were sent from the other side.
  266. XML pipes are usually used in place of regular XML streams that are
  267. transported over TCP. This is the reason for the use of the names source
  268. and sink for both ends of the pipe. The source side corresponds with the
  269. entity that initiated the TCP connection, whereas the sink corresponds with
  270. the entity that accepts that connection. In this object, though, the source
  271. and sink are treated equally.
  272. Unlike Jabber
  273. L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>}s, the sink
  274. and source objects are assumed to represent an eternal connected and
  275. initialized XML stream. As such, events corresponding to connection,
  276. disconnection, initialization and stream errors are not dispatched or
  277. processed.
  278. @since: 8.2
  279. @ivar source: Source XML stream.
  280. @ivar sink: Sink XML stream.
  281. """
  282. def __init__(self):
  283. self.source = EventDispatcher()
  284. self.sink = EventDispatcher()
  285. self.source.send = lambda obj: self.sink.dispatch(obj)
  286. self.sink.send = lambda obj: self.source.dispatch(obj)