123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- # -*- test-case-name: twisted.test.test_internet -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Select reactor
- """
- from __future__ import annotations
- import select
- import sys
- from errno import EBADF, EINTR
- from time import sleep
- from typing import Callable, Type, TypeVar
- from zope.interface import implementer
- from twisted.internet import posixbase
- from twisted.internet.interfaces import IReactorFDSet, IReadDescriptor, IWriteDescriptor
- from twisted.python import log
- from twisted.python.runtime import platformType
- def win32select(r, w, e, timeout=None):
- """Win32 select wrapper."""
- if not (r or w):
- # windows select() exits immediately when no sockets
- if timeout is None:
- timeout = 0.01
- else:
- timeout = min(timeout, 0.001)
- sleep(timeout)
- return [], [], []
- # windows doesn't process 'signals' inside select(), so we set a max
- # time or ctrl-c will never be recognized
- if timeout is None or timeout > 0.5:
- timeout = 0.5
- r, w, e = select.select(r, w, w, timeout)
- return r, w + e, []
- if platformType == "win32":
- _select = win32select
- else:
- _select = select.select
- try:
- from twisted.internet.win32eventreactor import _ThreadedWin32EventsMixin
- except ImportError:
- _extraBase: Type[object] = object
- else:
- _extraBase = _ThreadedWin32EventsMixin
- _T = TypeVar("_T")
- def _onePreen(
- toPreen: list[_T],
- preenInto: set[_T],
- disconnect: Callable[[_T, Exception, bool], None],
- ) -> None:
- preenInto.clear()
- for selectable in toPreen:
- try:
- select.select([selectable], [selectable], [selectable], 0)
- except Exception as e:
- log.msg("bad descriptor %s" % selectable)
- disconnect(selectable, e, False)
- else:
- preenInto.add(selectable)
- def _preenDescriptors(
- reads: set[IReadDescriptor],
- writes: set[IWriteDescriptor],
- disconnect: Callable[[IReadDescriptor | IWriteDescriptor, Exception, bool], None],
- ) -> None:
- log.msg("Malformed file descriptor found. Preening lists.")
- readers: list[IReadDescriptor] = list(reads)
- writers: list[IWriteDescriptor] = list(writes)
- _onePreen(readers, reads, disconnect)
- _onePreen(writers, writes, disconnect)
- @implementer(IReactorFDSet)
- class SelectReactor(posixbase.PosixReactorBase, _extraBase): # type: ignore[misc,valid-type]
- """
- A select() based reactor - runs on all POSIX platforms and on Win32.
- @ivar _reads: A set containing L{FileDescriptor} instances which will be
- checked for read events.
- @ivar _writes: A set containing L{FileDescriptor} instances which will be
- checked for writability.
- """
- def __init__(self) -> None:
- """
- Initialize file descriptor tracking dictionaries and the base class.
- """
- self._reads: set[IReadDescriptor] = set()
- self._writes: set[IWriteDescriptor] = set()
- posixbase.PosixReactorBase.__init__(self)
- def _preenDescriptors(self) -> None:
- _preenDescriptors(self._reads, self._writes, self._disconnectSelectable)
- def doSelect(self, timeout):
- """
- Run one iteration of the I/O monitor loop.
- This will run all selectables who had input or output readiness
- waiting for them.
- """
- try:
- r, w, ignored = _select(self._reads, self._writes, [], timeout)
- except ValueError:
- # Possibly a file descriptor has gone negative?
- self._preenDescriptors()
- return
- except TypeError:
- # Something *totally* invalid (object w/o fileno, non-integral
- # result) was passed
- log.err()
- self._preenDescriptors()
- return
- except OSError as se:
- # select(2) encountered an error, perhaps while calling the fileno()
- # method of a socket. (Python 2.6 socket.error is an IOError
- # subclass, but on Python 2.5 and earlier it is not.)
- if se.args[0] in (0, 2):
- # windows does this if it got an empty list
- if (not self._reads) and (not self._writes):
- return
- else:
- raise
- elif se.args[0] == EINTR:
- return
- elif se.args[0] == EBADF:
- self._preenDescriptors()
- return
- else:
- # OK, I really don't know what's going on. Blow up.
- raise
- _drdw = self._doReadOrWrite
- _logrun = log.callWithLogger
- for selectables, method, fdset in (
- (r, "doRead", self._reads),
- (w, "doWrite", self._writes),
- ):
- for selectable in selectables:
- # if this was disconnected in another thread, kill it.
- # ^^^^ --- what the !@#*? serious! -exarkun
- if selectable not in fdset: # type:ignore[operator]
- continue
- # This for pausing input when we're not ready for more.
- _logrun(selectable, _drdw, selectable, method)
- doIteration = doSelect
- def _doReadOrWrite(self, selectable, method):
- try:
- why = getattr(selectable, method)()
- except BaseException:
- why = sys.exc_info()[1]
- log.err()
- if why:
- self._disconnectSelectable(selectable, why, method == "doRead")
- def addReader(self, reader):
- """
- Add a FileDescriptor for notification of data available to read.
- """
- self._reads.add(reader)
- def addWriter(self, writer):
- """
- Add a FileDescriptor for notification of data available to write.
- """
- self._writes.add(writer)
- def removeReader(self, reader):
- """
- Remove a Selectable for notification of data available to read.
- """
- self._reads.discard(reader)
- def removeWriter(self, writer):
- """
- Remove a Selectable for notification of data available to write.
- """
- self._writes.discard(writer)
- def removeAll(self):
- return self._removeAll(self._reads, self._writes)
- def getReaders(self):
- return list(self._reads)
- def getWriters(self):
- return list(self._writes)
- def install():
- """Configure the twisted mainloop to be run using the select() reactor."""
- reactor = SelectReactor()
- from twisted.internet.main import installReactor
- installReactor(reactor)
- __all__ = ["install"]
|