123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- # -*- test-case-name: twisted.internet.test.test_iocp -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Reactor that uses IO completion ports
- """
- import warnings, socket, sys
- from zope.interface import implementer
- from twisted.internet import base, interfaces, main, error
- from twisted.python import log, failure
- from twisted.internet._dumbwin32proc import Process
- from twisted.internet.win32eventreactor import _ThreadedWin32EventsMixin
- from twisted.internet.iocpreactor import iocpsupport as _iocp
- from twisted.internet.iocpreactor.const import WAIT_TIMEOUT
- from twisted.internet.iocpreactor import tcp, udp
- try:
- from twisted.protocols.tls import TLSMemoryBIOFactory
- except ImportError:
- # Either pyOpenSSL isn't installed, or it is too old for this code to work.
- # The reactor won't provide IReactorSSL.
- TLSMemoryBIOFactory = None
- _extraInterfaces = ()
- warnings.warn(
- "pyOpenSSL 0.10 or newer is required for SSL support in iocpreactor. "
- "It is missing, so the reactor will not support SSL APIs.")
- else:
- _extraInterfaces = (interfaces.IReactorSSL,)
- MAX_TIMEOUT = 2000 # 2 seconds, see doIteration for explanation
- EVENTS_PER_LOOP = 1000 # XXX: what's a good value here?
- # keys to associate with normal and waker events
- KEY_NORMAL, KEY_WAKEUP = range(2)
- _NO_GETHANDLE = error.ConnectionFdescWentAway(
- 'Handler has no getFileHandle method')
- _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
- @implementer(interfaces.IReactorTCP, interfaces.IReactorUDP,
- interfaces.IReactorMulticast, interfaces.IReactorProcess,
- *_extraInterfaces)
- class IOCPReactor(base._SignalReactorMixin, base.ReactorBase,
- _ThreadedWin32EventsMixin):
- port = None
- def __init__(self):
- base.ReactorBase.__init__(self)
- self.port = _iocp.CompletionPort()
- self.handles = set()
- def addActiveHandle(self, handle):
- self.handles.add(handle)
- def removeActiveHandle(self, handle):
- self.handles.discard(handle)
- def doIteration(self, timeout):
- """
- Poll the IO completion port for new events.
- """
- # This function sits and waits for an IO completion event.
- #
- # There are two requirements: process IO events as soon as they arrive
- # and process ctrl-break from the user in a reasonable amount of time.
- #
- # There are three kinds of waiting.
- # 1) GetQueuedCompletionStatus (self.port.getEvent) to wait for IO
- # events only.
- # 2) Msg* family of wait functions that can stop waiting when
- # ctrl-break is detected (then, I think, Python converts it into a
- # KeyboardInterrupt)
- # 3) *Ex family of wait functions that put the thread into an
- # "alertable" wait state which is supposedly triggered by IO completion
- #
- # 2) and 3) can be combined. Trouble is, my IO completion is not
- # causing 3) to trigger, possibly because I do not use an IO completion
- # callback. Windows is weird.
- # There are two ways to handle this. I could use MsgWaitForSingleObject
- # here and GetQueuedCompletionStatus in a thread. Or I could poll with
- # a reasonable interval. Guess what! Threads are hard.
- processed_events = 0
- if timeout is None:
- timeout = MAX_TIMEOUT
- else:
- timeout = min(MAX_TIMEOUT, int(1000*timeout))
- rc, numBytes, key, evt = self.port.getEvent(timeout)
- while 1:
- if rc == WAIT_TIMEOUT:
- break
- if key != KEY_WAKEUP:
- assert key == KEY_NORMAL
- log.callWithLogger(evt.owner, self._callEventCallback,
- rc, numBytes, evt)
- processed_events += 1
- if processed_events >= EVENTS_PER_LOOP:
- break
- rc, numBytes, key, evt = self.port.getEvent(0)
- def _callEventCallback(self, rc, numBytes, evt):
- owner = evt.owner
- why = None
- try:
- evt.callback(rc, numBytes, evt)
- handfn = getattr(owner, 'getFileHandle', None)
- if not handfn:
- why = _NO_GETHANDLE
- elif handfn() == -1:
- why = _NO_FILEDESC
- if why:
- return # ignore handles that were closed
- except:
- why = sys.exc_info()[1]
- log.err()
- if why:
- owner.loseConnection(failure.Failure(why))
- def installWaker(self):
- pass
- def wakeUp(self):
- self.port.postEvent(0, KEY_WAKEUP, None)
- def registerHandle(self, handle):
- self.port.addHandle(handle, KEY_NORMAL)
- def createSocket(self, af, stype):
- skt = socket.socket(af, stype)
- self.registerHandle(skt.fileno())
- return skt
- def listenTCP(self, port, factory, backlog=50, interface=''):
- """
- @see: twisted.internet.interfaces.IReactorTCP.listenTCP
- """
- p = tcp.Port(port, factory, backlog, interface, self)
- p.startListening()
- return p
- def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
- """
- @see: twisted.internet.interfaces.IReactorTCP.connectTCP
- """
- c = tcp.Connector(host, port, factory, timeout, bindAddress, self)
- c.connect()
- return c
- if TLSMemoryBIOFactory is not None:
- def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
- """
- @see: twisted.internet.interfaces.IReactorSSL.listenSSL
- """
- port = self.listenTCP(
- port,
- TLSMemoryBIOFactory(contextFactory, False, factory),
- backlog, interface)
- port._type = 'TLS'
- return port
- def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
- """
- @see: twisted.internet.interfaces.IReactorSSL.connectSSL
- """
- return self.connectTCP(
- host, port,
- TLSMemoryBIOFactory(contextFactory, True, factory),
- timeout, bindAddress)
- else:
- def listenSSL(self, port, factory, contextFactory, backlog=50, interface=''):
- """
- Non-implementation of L{IReactorSSL.listenSSL}. Some dependency
- is not satisfied. This implementation always raises
- L{NotImplementedError}.
- """
- raise NotImplementedError(
- "pyOpenSSL 0.10 or newer is required for SSL support in "
- "iocpreactor. It is missing, so the reactor does not support "
- "SSL APIs.")
- def connectSSL(self, host, port, factory, contextFactory, timeout=30, bindAddress=None):
- """
- Non-implementation of L{IReactorSSL.connectSSL}. Some dependency
- is not satisfied. This implementation always raises
- L{NotImplementedError}.
- """
- raise NotImplementedError(
- "pyOpenSSL 0.10 or newer is required for SSL support in "
- "iocpreactor. It is missing, so the reactor does not support "
- "SSL APIs.")
- def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
- """
- Connects a given L{DatagramProtocol} to the given numeric UDP port.
- @returns: object conforming to L{IListeningPort}.
- """
- p = udp.Port(port, protocol, interface, maxPacketSize, self)
- p.startListening()
- return p
- def listenMulticast(self, port, protocol, interface='', maxPacketSize=8192,
- listenMultiple=False):
- """
- Connects a given DatagramProtocol to the given numeric UDP port.
- EXPERIMENTAL.
- @returns: object conforming to IListeningPort.
- """
- p = udp.MulticastPort(port, protocol, interface, maxPacketSize, self,
- listenMultiple)
- p.startListening()
- return p
- def spawnProcess(self, processProtocol, executable, args=(), env={},
- path=None, uid=None, gid=None, usePTY=0, childFDs=None):
- """
- Spawn a process.
- """
- if uid is not None:
- raise ValueError("Setting UID is unsupported on this platform.")
- if gid is not None:
- raise ValueError("Setting GID is unsupported on this platform.")
- if usePTY:
- raise ValueError("PTYs are unsupported on this platform.")
- if childFDs is not None:
- raise ValueError(
- "Custom child file descriptor mappings are unsupported on "
- "this platform.")
- args, env = self._checkProcessArgs(args, env)
- return Process(self, processProtocol, executable, args, env, path)
- def removeAll(self):
- res = list(self.handles)
- self.handles.clear()
- return res
- def install():
- r = IOCPReactor()
- main.installReactor(r)
- __all__ = ['IOCPReactor', 'install']
|