123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342 |
- # -*- test-case-name: twisted.test.test_internet -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Threaded select reactor
- The threadedselectreactor is a specialized reactor for integrating with
- arbitrary foreign event loop, such as those you find in GUI toolkits.
- There are three things you'll need to do to use this reactor.
- Install the reactor at the beginning of your program, before importing the rest
- of Twisted::
- | from twisted.internet import _threadedselect
- | _threadedselect.install()
- Interleave this reactor with your foreign event loop, at some point after your
- event loop is initialized::
- | from twisted.internet import reactor
- | reactor.interleave(foreignEventLoopWakerFunction)
- | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop)
- Instead of shutting down the foreign event loop directly, shut down the
- reactor::
- | from twisted.internet import reactor
- | reactor.stop()
- In order for Twisted to do its work in the main thread (the thread that
- interleave is called from), a waker function is necessary. The waker function
- will be called from a "background" thread with one argument: func. The waker
- function's purpose is to call func() from the main thread. Many GUI toolkits
- ship with appropriate waker functions. One example of this is wxPython's
- wx.callAfter (may be wxCallAfter in older versions of wxPython). These would
- be used in place of "foreignEventLoopWakerFunction" in the above example.
- The other integration point at which the foreign event loop and this reactor
- must integrate is shutdown. In order to ensure clean shutdown of Twisted, you
- must allow for Twisted to come to a complete stop before quitting the
- application. Typically, you will do this by setting up an after shutdown
- trigger to stop your foreign event loop, and call reactor.stop() where you
- would normally have initiated the shutdown procedure for the foreign event
- loop. Shutdown functions that could be used in place of "foreignEventloopStop"
- would be the ExitMainLoop method of the wxApp instance with wxPython.
- """
- from __future__ import annotations
- from errno import EBADF, EINTR
- from queue import Empty, Queue
- from threading import Thread
- from typing import Any, Callable
- from zope.interface import implementer
- from twisted._threads import ThreadWorker
- from twisted.internet import posixbase
- from twisted.internet.interfaces import IReactorFDSet, IReadDescriptor, IWriteDescriptor
- from twisted.internet.selectreactor import _preenDescriptors, _select
- from twisted.logger import Logger
- from twisted.python.log import callWithLogger as _callWithLogger
- _log = Logger()
- def raiseException(e):
- raise e
- def _threadsafeSelect(
- timeout: float | None,
- readmap: dict[int, IReadDescriptor],
- writemap: dict[int, IWriteDescriptor],
- handleResult: Callable[
- [
- list[int],
- list[int],
- dict[int, IReadDescriptor],
- dict[int, IWriteDescriptor],
- bool,
- ],
- None,
- ],
- ) -> None:
- """
- Invoke C{select}. This will be called in a non-main thread, so it is very
- careful to work only on integers and avoid calling any application code.
- """
- preen = False
- r = []
- w = []
- while 1:
- readints = readmap.keys()
- writeints = writemap.keys()
- try:
- result = _select(readints, writeints, [], timeout)
- except ValueError:
- # Possible problems with file descriptors that were passed:
- # ValueError may indicate that a file descriptor has gone negative.
- preen = True
- break
- except OSError as se:
- # The select() system call encountered an error.
- if se.args[0] == EINTR:
- # EINTR is hard to replicate in tests using an actual select(),
- # and I don't want to dedicate effort to testing this function
- # when it needs to be refactored with selectreactor.
- return # pragma: no cover
- elif se.args[0] == EBADF:
- preen = True
- break
- else:
- # OK, I really don't know what's going on. Blow up. Never
- # mind with the coverage here, since we are just trying to make
- # sure we don't swallow an exception.
- raise # pragma: no cover
- else:
- r, w, ignored = result
- break
- handleResult(r, w, readmap, writemap, preen)
- @implementer(IReactorFDSet)
- class ThreadedSelectReactor(posixbase.PosixReactorBase):
- """A threaded select() based reactor - runs on all POSIX platforms and on
- Win32.
- """
- def __init__(
- self, waker: Callable[[Callable[[], None]], None] | None = None
- ) -> None:
- self.reads: set[IReadDescriptor] = set()
- self.writes: set[IWriteDescriptor] = set()
- posixbase.PosixReactorBase.__init__(self)
- self._selectorThread: ThreadWorker | None = None
- self.mainWaker = waker
- self._iterationQueue: Queue[Callable[[], None]] | None = None
- def wakeUp(self):
- # we want to wake up from any thread
- self.waker.wakeUp()
- def callLater(self, *args, **kw):
- tple = posixbase.PosixReactorBase.callLater(self, *args, **kw)
- self.wakeUp()
- return tple
- def _doReadOrWrite(self, selectable: object, method: str) -> None:
- with _log.failuresHandled(
- "while handling selectable {sel}", sel=selectable
- ) as op:
- why = getattr(selectable, method)()
- if (fail := op.failure) is not None:
- why = fail.value
- if why:
- self._disconnectSelectable(selectable, why, method == "doRead")
- def _selectOnce(self, timeout: float | None, keepGoing: bool) -> None:
- reads: dict[int, Any] = {}
- writes: dict[int, Any] = {}
- for isRead, fdmap, d in [
- (True, self.reads, reads),
- (False, self.writes, writes),
- ]:
- for each in fdmap: # type:ignore[attr-defined]
- d[each.fileno()] = each
- mainWaker = self.mainWaker
- assert mainWaker is not None, (
- "neither .interleave() nor .mainLoop() / .run() called, "
- "but we are somehow running the reactor"
- )
- def callReadsAndWrites(
- r: list[int],
- w: list[int],
- readmap: dict[int, IReadDescriptor],
- writemap: dict[int, IWriteDescriptor],
- preen: bool,
- ) -> None:
- @mainWaker
- def onMainThread() -> None:
- if preen:
- _preenDescriptors(
- self.reads, self.writes, self._disconnectSelectable
- )
- return
- _drdw = self._doReadOrWrite
- for readable in r:
- rselectable = readmap[readable]
- if rselectable in self.reads:
- _callWithLogger(rselectable, _drdw, rselectable, "doRead")
- for writable in w:
- wselectable = writemap[writable]
- if wselectable in self.writes:
- _callWithLogger(wselectable, _drdw, wselectable, "doWrite")
- self.runUntilCurrent()
- if self._started and keepGoing:
- # see coverage note in .interleave()
- self._selectOnce(self.timeout(), True) # pragma: no cover
- else:
- self._cleanUpThread()
- if self._selectorThread is None:
- self._selectorThread = ThreadWorker(
- lambda target: Thread(target=target).start(), Queue()
- )
- self._selectorThread.do(
- lambda: _threadsafeSelect(timeout, reads, writes, callReadsAndWrites)
- )
- def _cleanUpThread(self) -> None:
- """
- Ensure that the selector thread is stopped.
- """
- oldThread, self._selectorThread = self._selectorThread, None
- if oldThread is not None:
- oldThread.quit()
- def interleave(
- self,
- waker: Callable[[Callable[[], None]], None],
- installSignalHandlers: bool = True,
- ) -> None:
- """
- interleave(waker) interleaves this reactor with the current application
- by moving the blocking parts of the reactor (select() in this case) to
- a separate thread. This is typically useful for integration with GUI
- applications which have their own event loop already running.
- See the module docstring for more information.
- """
- # TODO: This method is excluded from coverage because it only happens
- # in the case where we are actually running on a foreign event loop,
- # and twisted's test suite isn't set up that way. It would be nice to
- # add some dedicated tests for ThreadedSelectReactor that covered this
- # case.
- self.mainWaker = waker # pragma: no cover
- self.startRunning(installSignalHandlers) # pragma: no cover
- self._selectOnce(0.0, True) # pragma: no cover
- def addReader(self, reader: IReadDescriptor) -> None:
- """Add a FileDescriptor for notification of data available to read."""
- self.reads.add(reader)
- self.wakeUp()
- def addWriter(self, writer: IWriteDescriptor) -> None:
- """Add a FileDescriptor for notification of data available to write."""
- self.writes.add(writer)
- self.wakeUp()
- def removeReader(self, reader: IReadDescriptor) -> None:
- """Remove a Selectable for notification of data available to read."""
- if reader in self.reads:
- self.reads.remove(reader)
- def removeWriter(self, writer: IWriteDescriptor) -> None:
- """Remove a Selectable for notification of data available to write."""
- if writer in self.writes:
- self.writes.remove(writer)
- def removeAll(self) -> list[IReadDescriptor | IWriteDescriptor]:
- return self._removeAll(self.reads, self.writes) # type:ignore[no-any-return]
- def getReaders(self) -> list[IReadDescriptor]:
- return list(self.reads)
- def getWriters(self) -> list[IWriteDescriptor]:
- return list(self.writes)
- def stop(self):
- """
- Extend the base stop implementation to also wake up the select thread so
- that C{runUntilCurrent} notices the reactor should stop.
- """
- posixbase.PosixReactorBase.stop(self)
- self.wakeUp()
- def crash(self):
- posixbase.PosixReactorBase.crash(self)
- self.wakeUp()
- # The following methods are mostly for test-suite support, to make
- # ThreadedSelectReactor behave like another reactor you might call run()
- # on.
- def _testMainLoopSetup(self) -> None:
- """
- Mostly for compliance with L{IReactorCore} and usability with the
- tests, set up a fake blocking main-loop; make the "foreign" main loop
- we are interfacing with be C{self.mainLoop()}, that is reading from a
- basic Queue.
- """
- self._iterationQueue = Queue()
- self.mainWaker = self._iterationQueue.put
- def _uninstallHandler(self) -> None:
- """
- Handle uninstallation to ensure that cleanup is properly performed by
- ReactorBuilder tests.
- """
- super()._uninstallHandler()
- self._cleanUpThread()
- def iterate(self, timeout: float = 0.0) -> None:
- if self._iterationQueue is None and self.mainWaker is None: # pragma: no branch
- self._testMainLoopSetup()
- self.wakeUp()
- super().iterate(timeout)
- def doIteration(self, timeout: float | None) -> None:
- assert self._iterationQueue is not None
- self._selectOnce(timeout, False)
- try:
- work = self._iterationQueue.get(timeout=timeout)
- except Empty:
- return
- work()
- def mainLoop(self) -> None:
- """
- This should not normally be run.
- """
- self._testMainLoopSetup()
- super().mainLoop()
- def install():
- """Configure the twisted mainloop to be run using the select() reactor."""
- reactor = ThreadedSelectReactor()
- from twisted.internet.main import installReactor
- installReactor(reactor)
- return reactor
- __all__ = ["install"]
|