_threadedselect.py 12 KB


  1. # -*- test-case-name: twisted.test.test_internet -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Threaded select reactor
  6. The threadedselectreactor is a specialized reactor for integrating with
  7. arbitrary foreign event loop, such as those you find in GUI toolkits.
  8. There are three things you'll need to do to use this reactor.
  9. Install the reactor at the beginning of your program, before importing the rest
  10. of Twisted::
  11. | from twisted.internet import _threadedselect
  12. | _threadedselect.install()
  13. Interleave this reactor with your foreign event loop, at some point after your
  14. event loop is initialized::
  15. | from twisted.internet import reactor
  16. | reactor.interleave(foreignEventLoopWakerFunction)
  17. | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop)
  18. Instead of shutting down the foreign event loop directly, shut down the
  19. reactor::
  20. | from twisted.internet import reactor
  21. | reactor.stop()
  22. In order for Twisted to do its work in the main thread (the thread that
  23. interleave is called from), a waker function is necessary. The waker function
  24. will be called from a "background" thread with one argument: func. The waker
  25. function's purpose is to call func() from the main thread. Many GUI toolkits
  26. ship with appropriate waker functions. One example of this is wxPython's
  27. wx.callAfter (may be wxCallAfter in older versions of wxPython). These would
  28. be used in place of "foreignEventLoopWakerFunction" in the above example.
  29. The other integration point at which the foreign event loop and this reactor
  30. must integrate is shutdown. In order to ensure clean shutdown of Twisted, you
  31. must allow for Twisted to come to a complete stop before quitting the
  32. application. Typically, you will do this by setting up an after shutdown
  33. trigger to stop your foreign event loop, and call reactor.stop() where you
  34. would normally have initiated the shutdown procedure for the foreign event
  35. loop. Shutdown functions that could be used in place of "foreignEventloopStop"
  36. would be the ExitMainLoop method of the wxApp instance with wxPython.
  37. """
  38. from __future__ import annotations
  39. from errno import EBADF, EINTR
  40. from queue import Empty, Queue
  41. from threading import Thread
  42. from typing import Any, Callable
  43. from zope.interface import implementer
  44. from twisted._threads import ThreadWorker
  45. from twisted.internet import posixbase
  46. from twisted.internet.interfaces import IReactorFDSet, IReadDescriptor, IWriteDescriptor
  47. from twisted.internet.selectreactor import _preenDescriptors, _select
  48. from twisted.logger import Logger
  49. from twisted.python.log import callWithLogger as _callWithLogger
  50. _log = Logger()
  51. def raiseException(e):
  52. raise e
  53. def _threadsafeSelect(
  54. timeout: float | None,
  55. readmap: dict[int, IReadDescriptor],
  56. writemap: dict[int, IWriteDescriptor],
  57. handleResult: Callable[
  58. [
  59. list[int],
  60. list[int],
  61. dict[int, IReadDescriptor],
  62. dict[int, IWriteDescriptor],
  63. bool,
  64. ],
  65. None,
  66. ],
  67. ) -> None:
  68. """
  69. Invoke C{select}. This will be called in a non-main thread, so it is very
  70. careful to work only on integers and avoid calling any application code.
  71. """
  72. preen = False
  73. r = []
  74. w = []
  75. while 1:
  76. readints = readmap.keys()
  77. writeints = writemap.keys()
  78. try:
  79. result = _select(readints, writeints, [], timeout)
  80. except ValueError:
  81. # Possible problems with file descriptors that were passed:
  82. # ValueError may indicate that a file descriptor has gone negative.
  83. preen = True
  84. break
  85. except OSError as se:
  86. # The select() system call encountered an error.
  87. if se.args[0] == EINTR:
  88. # EINTR is hard to replicate in tests using an actual select(),
  89. # and I don't want to dedicate effort to testing this function
  90. # when it needs to be refactored with selectreactor.
  91. return # pragma: no cover
  92. elif se.args[0] == EBADF:
  93. preen = True
  94. break
  95. else:
  96. # OK, I really don't know what's going on. Blow up. Never
  97. # mind with the coverage here, since we are just trying to make
  98. # sure we don't swallow an exception.
  99. raise # pragma: no cover
  100. else:
  101. r, w, ignored = result
  102. break
  103. handleResult(r, w, readmap, writemap, preen)
  104. @implementer(IReactorFDSet)
  105. class ThreadedSelectReactor(posixbase.PosixReactorBase):
  106. """A threaded select() based reactor - runs on all POSIX platforms and on
  107. Win32.
  108. """
  109. def __init__(
  110. self, waker: Callable[[Callable[[], None]], None] | None = None
  111. ) -> None:
  112. self.reads: set[IReadDescriptor] = set()
  113. self.writes: set[IWriteDescriptor] = set()
  114. posixbase.PosixReactorBase.__init__(self)
  115. self._selectorThread: ThreadWorker | None = None
  116. self.mainWaker = waker
  117. self._iterationQueue: Queue[Callable[[], None]] | None = None
  118. def wakeUp(self):
  119. # we want to wake up from any thread
  120. self.waker.wakeUp()
  121. def callLater(self, *args, **kw):
  122. tple = posixbase.PosixReactorBase.callLater(self, *args, **kw)
  123. self.wakeUp()
  124. return tple
  125. def _doReadOrWrite(self, selectable: object, method: str) -> None:
  126. with _log.failuresHandled(
  127. "while handling selectable {sel}", sel=selectable
  128. ) as op:
  129. why = getattr(selectable, method)()
  130. if (fail := op.failure) is not None:
  131. why = fail.value
  132. if why:
  133. self._disconnectSelectable(selectable, why, method == "doRead")
  134. def _selectOnce(self, timeout: float | None, keepGoing: bool) -> None:
  135. reads: dict[int, Any] = {}
  136. writes: dict[int, Any] = {}
  137. for isRead, fdmap, d in [
  138. (True, self.reads, reads),
  139. (False, self.writes, writes),
  140. ]:
  141. for each in fdmap: # type:ignore[attr-defined]
  142. d[each.fileno()] = each
  143. mainWaker = self.mainWaker
  144. assert mainWaker is not None, (
  145. "neither .interleave() nor .mainLoop() / .run() called, "
  146. "but we are somehow running the reactor"
  147. )
  148. def callReadsAndWrites(
  149. r: list[int],
  150. w: list[int],
  151. readmap: dict[int, IReadDescriptor],
  152. writemap: dict[int, IWriteDescriptor],
  153. preen: bool,
  154. ) -> None:
  155. @mainWaker
  156. def onMainThread() -> None:
  157. if preen:
  158. _preenDescriptors(
  159. self.reads, self.writes, self._disconnectSelectable
  160. )
  161. return
  162. _drdw = self._doReadOrWrite
  163. for readable in r:
  164. rselectable = readmap[readable]
  165. if rselectable in self.reads:
  166. _callWithLogger(rselectable, _drdw, rselectable, "doRead")
  167. for writable in w:
  168. wselectable = writemap[writable]
  169. if wselectable in self.writes:
  170. _callWithLogger(wselectable, _drdw, wselectable, "doWrite")
  171. self.runUntilCurrent()
  172. if self._started and keepGoing:
  173. # see coverage note in .interleave()
  174. self._selectOnce(self.timeout(), True) # pragma: no cover
  175. else:
  176. self._cleanUpThread()
  177. if self._selectorThread is None:
  178. self._selectorThread = ThreadWorker(
  179. lambda target: Thread(target=target).start(), Queue()
  180. )
  181. self._selectorThread.do(
  182. lambda: _threadsafeSelect(timeout, reads, writes, callReadsAndWrites)
  183. )
  184. def _cleanUpThread(self) -> None:
  185. """
  186. Ensure that the selector thread is stopped.
  187. """
  188. oldThread, self._selectorThread = self._selectorThread, None
  189. if oldThread is not None:
  190. oldThread.quit()
  191. def interleave(
  192. self,
  193. waker: Callable[[Callable[[], None]], None],
  194. installSignalHandlers: bool = True,
  195. ) -> None:
  196. """
  197. interleave(waker) interleaves this reactor with the current application
  198. by moving the blocking parts of the reactor (select() in this case) to
  199. a separate thread. This is typically useful for integration with GUI
  200. applications which have their own event loop already running.
  201. See the module docstring for more information.
  202. """
  203. # TODO: This method is excluded from coverage because it only happens
  204. # in the case where we are actually running on a foreign event loop,
  205. # and twisted's test suite isn't set up that way. It would be nice to
  206. # add some dedicated tests for ThreadedSelectReactor that covered this
  207. # case.
  208. self.mainWaker = waker # pragma: no cover
  209. self.startRunning(installSignalHandlers) # pragma: no cover
  210. self._selectOnce(0.0, True) # pragma: no cover
  211. def addReader(self, reader: IReadDescriptor) -> None:
  212. """Add a FileDescriptor for notification of data available to read."""
  213. self.reads.add(reader)
  214. self.wakeUp()
  215. def addWriter(self, writer: IWriteDescriptor) -> None:
  216. """Add a FileDescriptor for notification of data available to write."""
  217. self.writes.add(writer)
  218. self.wakeUp()
  219. def removeReader(self, reader: IReadDescriptor) -> None:
  220. """Remove a Selectable for notification of data available to read."""
  221. if reader in self.reads:
  222. self.reads.remove(reader)
  223. def removeWriter(self, writer: IWriteDescriptor) -> None:
  224. """Remove a Selectable for notification of data available to write."""
  225. if writer in self.writes:
  226. self.writes.remove(writer)
  227. def removeAll(self) -> list[IReadDescriptor | IWriteDescriptor]:
  228. return self._removeAll(self.reads, self.writes) # type:ignore[no-any-return]
  229. def getReaders(self) -> list[IReadDescriptor]:
  230. return list(self.reads)
  231. def getWriters(self) -> list[IWriteDescriptor]:
  232. return list(self.writes)
  233. def stop(self):
  234. """
  235. Extend the base stop implementation to also wake up the select thread so
  236. that C{runUntilCurrent} notices the reactor should stop.
  237. """
  238. posixbase.PosixReactorBase.stop(self)
  239. self.wakeUp()
  240. def crash(self):
  241. posixbase.PosixReactorBase.crash(self)
  242. self.wakeUp()
  243. # The following methods are mostly for test-suite support, to make
  244. # ThreadedSelectReactor behave like another reactor you might call run()
  245. # on.
  246. def _testMainLoopSetup(self) -> None:
  247. """
  248. Mostly for compliance with L{IReactorCore} and usability with the
  249. tests, set up a fake blocking main-loop; make the "foreign" main loop
  250. we are interfacing with be C{self.mainLoop()}, that is reading from a
  251. basic Queue.
  252. """
  253. self._iterationQueue = Queue()
  254. self.mainWaker = self._iterationQueue.put
  255. def _uninstallHandler(self) -> None:
  256. """
  257. Handle uninstallation to ensure that cleanup is properly performed by
  258. ReactorBuilder tests.
  259. """
  260. super()._uninstallHandler()
  261. self._cleanUpThread()
  262. def iterate(self, timeout: float = 0.0) -> None:
  263. if self._iterationQueue is None and self.mainWaker is None: # pragma: no branch
  264. self._testMainLoopSetup()
  265. self.wakeUp()
  266. super().iterate(timeout)
  267. def doIteration(self, timeout: float | None) -> None:
  268. assert self._iterationQueue is not None
  269. self._selectOnce(timeout, False)
  270. try:
  271. work = self._iterationQueue.get(timeout=timeout)
  272. except Empty:
  273. return
  274. work()
  275. def mainLoop(self) -> None:
  276. """
  277. This should not normally be run.
  278. """
  279. self._testMainLoopSetup()
  280. super().mainLoop()
  281. def install():
  282. """Configure the twisted mainloop to be run using the select() reactor."""
  283. reactor = ThreadedSelectReactor()
  284. from twisted.internet.main import installReactor
  285. installReactor(reactor)
  286. return reactor
  287. __all__ = ["install"]