12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304 |
- # -*- test-case-name: twisted.test.test_internet,twisted.internet.test.test_core -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Very basic functionality for a Reactor implementation.
- """
- from __future__ import division, absolute_import
- import socket # needed only for sync-dns
- from zope.interface import implementer, classImplements
- import sys
- import warnings
- from heapq import heappush, heappop, heapify
- import traceback
- from twisted.internet.interfaces import (
- IReactorCore, IReactorTime, IReactorThreads, IResolverSimple,
- IReactorPluggableResolver, IReactorPluggableNameResolver, IConnector,
- IDelayedCall, _ISupportsExitSignalCapturing
- )
- from twisted.internet import fdesc, main, error, abstract, defer, threads
- from twisted.internet._resolver import (
- GAIResolver as _GAIResolver,
- ComplexResolverSimplifier as _ComplexResolverSimplifier,
- SimpleResolverComplexifier as _SimpleResolverComplexifier,
- )
- from twisted.python import log, failure, reflect
- from twisted.python.compat import unicode, iteritems
- from twisted.python.runtime import seconds as runtimeSeconds, platform
- from twisted.internet.defer import Deferred, DeferredList
- from twisted.python._oldstyle import _oldStyle
- # This import is for side-effects! Even if you don't see any code using it
- # in this module, don't delete it.
- from twisted.python import threadable
- @implementer(IDelayedCall)
- @_oldStyle
- class DelayedCall:
- # enable .debug to record creator call stack, and it will be logged if
- # an exception occurs while the function is being run
- debug = False
- _repr = None
- def __init__(self, time, func, args, kw, cancel, reset,
- seconds=runtimeSeconds):
- """
- @param time: Seconds from the epoch at which to call C{func}.
- @param func: The callable to call.
- @param args: The positional arguments to pass to the callable.
- @param kw: The keyword arguments to pass to the callable.
- @param cancel: A callable which will be called with this
- DelayedCall before cancellation.
- @param reset: A callable which will be called with this
- DelayedCall after changing this DelayedCall's scheduled
- execution time. The callable should adjust any necessary
- scheduling details to ensure this DelayedCall is invoked
- at the new appropriate time.
- @param seconds: If provided, a no-argument callable which will be
- used to determine the current time any time that information is
- needed.
- """
- self.time, self.func, self.args, self.kw = time, func, args, kw
- self.resetter = reset
- self.canceller = cancel
- self.seconds = seconds
- self.cancelled = self.called = 0
- self.delayed_time = 0
- if self.debug:
- self.creator = traceback.format_stack()[:-2]
- def getTime(self):
- """Return the time at which this call will fire
- @rtype: C{float}
- @return: The number of seconds after the epoch at which this call is
- scheduled to be made.
- """
- return self.time + self.delayed_time
- def cancel(self):
- """Unschedule this call
- @raise AlreadyCancelled: Raised if this call has already been
- unscheduled.
- @raise AlreadyCalled: Raised if this call has already been made.
- """
- if self.cancelled:
- raise error.AlreadyCancelled
- elif self.called:
- raise error.AlreadyCalled
- else:
- self.canceller(self)
- self.cancelled = 1
- if self.debug:
- self._repr = repr(self)
- del self.func, self.args, self.kw
- def reset(self, secondsFromNow):
- """Reschedule this call for a different time
- @type secondsFromNow: C{float}
- @param secondsFromNow: The number of seconds from the time of the
- C{reset} call at which this call will be scheduled.
- @raise AlreadyCancelled: Raised if this call has been cancelled.
- @raise AlreadyCalled: Raised if this call has already been made.
- """
- if self.cancelled:
- raise error.AlreadyCancelled
- elif self.called:
- raise error.AlreadyCalled
- else:
- newTime = self.seconds() + secondsFromNow
- if newTime < self.time:
- self.delayed_time = 0
- self.time = newTime
- self.resetter(self)
- else:
- self.delayed_time = newTime - self.time
- def delay(self, secondsLater):
- """Reschedule this call for a later time
- @type secondsLater: C{float}
- @param secondsLater: The number of seconds after the originally
- scheduled time for which to reschedule this call.
- @raise AlreadyCancelled: Raised if this call has been cancelled.
- @raise AlreadyCalled: Raised if this call has already been made.
- """
- if self.cancelled:
- raise error.AlreadyCancelled
- elif self.called:
- raise error.AlreadyCalled
- else:
- self.delayed_time += secondsLater
- if self.delayed_time < 0:
- self.activate_delay()
- self.resetter(self)
- def activate_delay(self):
- self.time += self.delayed_time
- self.delayed_time = 0
- def active(self):
- """Determine whether this call is still pending
- @rtype: C{bool}
- @return: True if this call has not yet been made or cancelled,
- False otherwise.
- """
- return not (self.cancelled or self.called)
- def __le__(self, other):
- """
- Implement C{<=} operator between two L{DelayedCall} instances.
- Comparison is based on the C{time} attribute (unadjusted by the
- delayed time).
- """
- return self.time <= other.time
- def __lt__(self, other):
- """
- Implement C{<} operator between two L{DelayedCall} instances.
- Comparison is based on the C{time} attribute (unadjusted by the
- delayed time).
- """
- return self.time < other.time
- def __repr__(self):
- """
- Implement C{repr()} for L{DelayedCall} instances.
- @rtype: C{str}
- @returns: String containing details of the L{DelayedCall}.
- """
- if self._repr is not None:
- return self._repr
- if hasattr(self, 'func'):
- # This code should be replaced by a utility function in reflect;
- # see ticket #6066:
- if hasattr(self.func, '__qualname__'):
- func = self.func.__qualname__
- elif hasattr(self.func, '__name__'):
- func = self.func.func_name
- if hasattr(self.func, 'im_class'):
- func = self.func.im_class.__name__ + '.' + func
- else:
- func = reflect.safe_repr(self.func)
- else:
- func = None
- now = self.seconds()
- L = ["<DelayedCall 0x%x [%ss] called=%s cancelled=%s" % (
- id(self), self.time - now, self.called,
- self.cancelled)]
- if func is not None:
- L.extend((" ", func, "("))
- if self.args:
- L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
- if self.kw:
- L.append(", ")
- if self.kw:
- L.append(", ".join(['%s=%s' % (k, reflect.safe_repr(v)) for (k, v) in self.kw.items()]))
- L.append(")")
- if self.debug:
- L.append("\n\ntraceback at creation: \n\n%s" % (' '.join(self.creator)))
- L.append('>')
- return "".join(L)
- @implementer(IResolverSimple)
- class ThreadedResolver(object):
- """
- L{ThreadedResolver} uses a reactor, a threadpool, and
- L{socket.gethostbyname} to perform name lookups without blocking the
- reactor thread. It also supports timeouts indepedently from whatever
- timeout logic L{socket.gethostbyname} might have.
- @ivar reactor: The reactor the threadpool of which will be used to call
- L{socket.gethostbyname} and the I/O thread of which the result will be
- delivered.
- """
- def __init__(self, reactor):
- self.reactor = reactor
- self._runningQueries = {}
- def _fail(self, name, err):
- err = error.DNSLookupError("address %r not found: %s" % (name, err))
- return failure.Failure(err)
- def _cleanup(self, name, lookupDeferred):
- userDeferred, cancelCall = self._runningQueries[lookupDeferred]
- del self._runningQueries[lookupDeferred]
- userDeferred.errback(self._fail(name, "timeout error"))
- def _checkTimeout(self, result, name, lookupDeferred):
- try:
- userDeferred, cancelCall = self._runningQueries[lookupDeferred]
- except KeyError:
- pass
- else:
- del self._runningQueries[lookupDeferred]
- cancelCall.cancel()
- if isinstance(result, failure.Failure):
- userDeferred.errback(self._fail(name, result.getErrorMessage()))
- else:
- userDeferred.callback(result)
- def getHostByName(self, name, timeout = (1, 3, 11, 45)):
- """
- See L{twisted.internet.interfaces.IResolverSimple.getHostByName}.
- Note that the elements of C{timeout} are summed and the result is used
- as a timeout for the lookup. Any intermediate timeout or retry logic
- is left up to the platform via L{socket.gethostbyname}.
- """
- if timeout:
- timeoutDelay = sum(timeout)
- else:
- timeoutDelay = 60
- userDeferred = defer.Deferred()
- lookupDeferred = threads.deferToThreadPool(
- self.reactor, self.reactor.getThreadPool(),
- socket.gethostbyname, name)
- cancelCall = self.reactor.callLater(
- timeoutDelay, self._cleanup, name, lookupDeferred)
- self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
- lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
- return userDeferred
- @implementer(IResolverSimple)
- @_oldStyle
- class BlockingResolver:
- def getHostByName(self, name, timeout = (1, 3, 11, 45)):
- try:
- address = socket.gethostbyname(name)
- except socket.error:
- msg = "address %r not found" % (name,)
- err = error.DNSLookupError(msg)
- return defer.fail(err)
- else:
- return defer.succeed(address)
- class _ThreePhaseEvent(object):
- """
- Collection of callables (with arguments) which can be invoked as a group in
- a particular order.
- This provides the underlying implementation for the reactor's system event
- triggers. An instance of this class tracks triggers for all phases of a
- single type of event.
- @ivar before: A list of the before-phase triggers containing three-tuples
- of a callable, a tuple of positional arguments, and a dict of keyword
- arguments
- @ivar finishedBefore: A list of the before-phase triggers which have
- already been executed. This is only populated in the C{'BEFORE'} state.
- @ivar during: A list of the during-phase triggers containing three-tuples
- of a callable, a tuple of positional arguments, and a dict of keyword
- arguments
- @ivar after: A list of the after-phase triggers containing three-tuples
- of a callable, a tuple of positional arguments, and a dict of keyword
- arguments
- @ivar state: A string indicating what is currently going on with this
- object. One of C{'BASE'} (for when nothing in particular is happening;
- this is the initial value), C{'BEFORE'} (when the before-phase triggers
- are in the process of being executed).
- """
- def __init__(self):
- self.before = []
- self.during = []
- self.after = []
- self.state = 'BASE'
- def addTrigger(self, phase, callable, *args, **kwargs):
- """
- Add a trigger to the indicate phase.
- @param phase: One of C{'before'}, C{'during'}, or C{'after'}.
- @param callable: An object to be called when this event is triggered.
- @param *args: Positional arguments to pass to C{callable}.
- @param **kwargs: Keyword arguments to pass to C{callable}.
- @return: An opaque handle which may be passed to L{removeTrigger} to
- reverse the effects of calling this method.
- """
- if phase not in ('before', 'during', 'after'):
- raise KeyError("invalid phase")
- getattr(self, phase).append((callable, args, kwargs))
- return phase, callable, args, kwargs
- def removeTrigger(self, handle):
- """
- Remove a previously added trigger callable.
- @param handle: An object previously returned by L{addTrigger}. The
- trigger added by that call will be removed.
- @raise ValueError: If the trigger associated with C{handle} has already
- been removed or if C{handle} is not a valid handle.
- """
- return getattr(self, 'removeTrigger_' + self.state)(handle)
- def removeTrigger_BASE(self, handle):
- """
- Just try to remove the trigger.
- @see: removeTrigger
- """
- try:
- phase, callable, args, kwargs = handle
- except (TypeError, ValueError):
- raise ValueError("invalid trigger handle")
- else:
- if phase not in ('before', 'during', 'after'):
- raise KeyError("invalid phase")
- getattr(self, phase).remove((callable, args, kwargs))
- def removeTrigger_BEFORE(self, handle):
- """
- Remove the trigger if it has yet to be executed, otherwise emit a
- warning that in the future an exception will be raised when removing an
- already-executed trigger.
- @see: removeTrigger
- """
- phase, callable, args, kwargs = handle
- if phase != 'before':
- return self.removeTrigger_BASE(handle)
- if (callable, args, kwargs) in self.finishedBefore:
- warnings.warn(
- "Removing already-fired system event triggers will raise an "
- "exception in a future version of Twisted.",
- category=DeprecationWarning,
- stacklevel=3)
- else:
- self.removeTrigger_BASE(handle)
- def fireEvent(self):
- """
- Call the triggers added to this event.
- """
- self.state = 'BEFORE'
- self.finishedBefore = []
- beforeResults = []
- while self.before:
- callable, args, kwargs = self.before.pop(0)
- self.finishedBefore.append((callable, args, kwargs))
- try:
- result = callable(*args, **kwargs)
- except:
- log.err()
- else:
- if isinstance(result, Deferred):
- beforeResults.append(result)
- DeferredList(beforeResults).addCallback(self._continueFiring)
- def _continueFiring(self, ignored):
- """
- Call the during and after phase triggers for this event.
- """
- self.state = 'BASE'
- self.finishedBefore = []
- for phase in self.during, self.after:
- while phase:
- callable, args, kwargs = phase.pop(0)
- try:
- callable(*args, **kwargs)
- except:
- log.err()
- @implementer(IReactorPluggableNameResolver, IReactorPluggableResolver)
- class PluggableResolverMixin(object):
- """
- A mixin which implements the pluggable resolver reactor interfaces.
- @ivar resolver: The installed L{IResolverSimple}.
- @ivar _nameResolver: The installed L{IHostnameResolver}.
- """
- resolver = BlockingResolver()
- _nameResolver = _SimpleResolverComplexifier(resolver)
- # IReactorPluggableResolver
- def installResolver(self, resolver):
- """
- See L{IReactorPluggableResolver}.
- @param resolver: see L{IReactorPluggableResolver}.
- @return: see L{IReactorPluggableResolver}.
- """
- assert IResolverSimple.providedBy(resolver)
- oldResolver = self.resolver
- self.resolver = resolver
- self._nameResolver = _SimpleResolverComplexifier(resolver)
- return oldResolver
- # IReactorPluggableNameResolver
- def installNameResolver(self, resolver):
- """
- See L{IReactorPluggableNameResolver}.
- @param resolver: See L{IReactorPluggableNameResolver}.
- @return: see L{IReactorPluggableNameResolver}.
- """
- previousNameResolver = self._nameResolver
- self._nameResolver = resolver
- self.resolver = _ComplexResolverSimplifier(resolver)
- return previousNameResolver
- @property
- def nameResolver(self):
- """
- Implementation of read-only
- L{IReactorPluggableNameResolver.nameResolver}.
- """
- return self._nameResolver
- @implementer(IReactorCore, IReactorTime, _ISupportsExitSignalCapturing)
- class ReactorBase(PluggableResolverMixin):
- """
- Default base class for Reactors.
- @type _stopped: C{bool}
- @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
- and C{reactor.stop}. This should be replaced with an explicit state
- machine.
- @type _justStopped: C{bool}
- @ivar _justStopped: A flag which is true between the time C{reactor.stop}
- is called and the time the shutdown system event is fired. This is
- used to determine whether that event should be fired after each
- iteration through the mainloop. This should be replaced with an
- explicit state machine.
- @type _started: C{bool}
- @ivar _started: A flag which is true from the time C{reactor.run} is called
- until the time C{reactor.run} returns. This is used to prevent calls
- to C{reactor.run} on a running reactor. This should be replaced with
- an explicit state machine.
- @ivar running: See L{IReactorCore.running}
- @ivar _registerAsIOThread: A flag controlling whether the reactor will
- register the thread it is running in as the I/O thread when it starts.
- If C{True}, registration will be done, otherwise it will not be.
- @ivar _exitSignal: See L{_ISupportsExitSignalCapturing._exitSignal}
- """
- _registerAsIOThread = True
- _stopped = True
- installed = False
- usingThreads = False
- _exitSignal = None
- __name__ = "twisted.internet.reactor"
- def __init__(self):
- super(ReactorBase, self).__init__()
- self.threadCallQueue = []
- self._eventTriggers = {}
- self._pendingTimedCalls = []
- self._newTimedCalls = []
- self._cancellations = 0
- self.running = False
- self._started = False
- self._justStopped = False
- self._startedBefore = False
- # reactor internal readers, e.g. the waker.
- self._internalReaders = set()
- self.waker = None
- # Arrange for the running attribute to change to True at the right time
- # and let a subclass possibly do other things at that time (eg install
- # signal handlers).
- self.addSystemEventTrigger(
- 'during', 'startup', self._reallyStartRunning)
- self.addSystemEventTrigger('during', 'shutdown', self.crash)
- self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
- if platform.supportsThreads():
- self._initThreads()
- self.installWaker()
- # override in subclasses
- _lock = None
- def installWaker(self):
- raise NotImplementedError(
- reflect.qual(self.__class__) + " did not implement installWaker")
- def wakeUp(self):
- """
- Wake up the event loop.
- """
- if self.waker:
- self.waker.wakeUp()
- # if the waker isn't installed, the reactor isn't running, and
- # therefore doesn't need to be woken up
- def doIteration(self, delay):
- """
- Do one iteration over the readers and writers which have been added.
- """
- raise NotImplementedError(
- reflect.qual(self.__class__) + " did not implement doIteration")
- def addReader(self, reader):
- raise NotImplementedError(
- reflect.qual(self.__class__) + " did not implement addReader")
- def addWriter(self, writer):
- raise NotImplementedError(
- reflect.qual(self.__class__) + " did not implement addWriter")
- def removeReader(self, reader):
- raise NotImplementedError(
- reflect.qual(self.__class__) + " did not implement removeReader")
- def removeWriter(self, writer):
- raise NotImplementedError(
- reflect.qual(self.__class__) + " did not implement removeWriter")
- def removeAll(self):
- raise NotImplementedError(
- reflect.qual(self.__class__) + " did not implement removeAll")
- def getReaders(self):
- raise NotImplementedError(
- reflect.qual(self.__class__) + " did not implement getReaders")
- def getWriters(self):
- raise NotImplementedError(
- reflect.qual(self.__class__) + " did not implement getWriters")
- # IReactorCore
- def resolve(self, name, timeout=(1, 3, 11, 45)):
- """Return a Deferred that will resolve a hostname.
- """
- if not name:
- # XXX - This is *less than* '::', and will screw up IPv6 servers
- return defer.succeed('0.0.0.0')
- if abstract.isIPAddress(name):
- return defer.succeed(name)
- return self.resolver.getHostByName(name, timeout)
- def stop(self):
- """
- See twisted.internet.interfaces.IReactorCore.stop.
- """
- if self._stopped:
- raise error.ReactorNotRunning(
- "Can't stop reactor that isn't running.")
- self._stopped = True
- self._justStopped = True
- self._startedBefore = True
- def crash(self):
- """
- See twisted.internet.interfaces.IReactorCore.crash.
- Reset reactor state tracking attributes and re-initialize certain
- state-transition helpers which were set up in C{__init__} but later
- destroyed (through use).
- """
- self._started = False
- self.running = False
- self.addSystemEventTrigger(
- 'during', 'startup', self._reallyStartRunning)
- def sigInt(self, *args):
- """
- Handle a SIGINT interrupt.
- @param args: See handler specification in L{signal.signal}
- """
- log.msg("Received SIGINT, shutting down.")
- self.callFromThread(self.stop)
- self._exitSignal = args[0]
- def sigBreak(self, *args):
- """
- Handle a SIGBREAK interrupt.
- @param args: See handler specification in L{signal.signal}
- """
- log.msg("Received SIGBREAK, shutting down.")
- self.callFromThread(self.stop)
- self._exitSignal = args[0]
- def sigTerm(self, *args):
- """
- Handle a SIGTERM interrupt.
- @param args: See handler specification in L{signal.signal}
- """
- log.msg("Received SIGTERM, shutting down.")
- self.callFromThread(self.stop)
- self._exitSignal = args[0]
- def disconnectAll(self):
- """Disconnect every reader, and writer in the system.
- """
- selectables = self.removeAll()
- for reader in selectables:
- log.callWithLogger(reader,
- reader.connectionLost,
- failure.Failure(main.CONNECTION_LOST))
- def iterate(self, delay=0):
- """See twisted.internet.interfaces.IReactorCore.iterate.
- """
- self.runUntilCurrent()
- self.doIteration(delay)
- def fireSystemEvent(self, eventType):
- """See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
- """
- event = self._eventTriggers.get(eventType)
- if event is not None:
- event.fireEvent()
- def addSystemEventTrigger(self, _phase, _eventType, _f, *args, **kw):
- """See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
- """
- assert callable(_f), "%s is not callable" % _f
- if _eventType not in self._eventTriggers:
- self._eventTriggers[_eventType] = _ThreePhaseEvent()
- return (_eventType, self._eventTriggers[_eventType].addTrigger(
- _phase, _f, *args, **kw))
- def removeSystemEventTrigger(self, triggerID):
- """See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
- """
- eventType, handle = triggerID
- self._eventTriggers[eventType].removeTrigger(handle)
- def callWhenRunning(self, _callable, *args, **kw):
- """See twisted.internet.interfaces.IReactorCore.callWhenRunning.
- """
- if self.running:
- _callable(*args, **kw)
- else:
- return self.addSystemEventTrigger('after', 'startup',
- _callable, *args, **kw)
- def startRunning(self):
- """
- Method called when reactor starts: do some initialization and fire
- startup events.
- Don't call this directly, call reactor.run() instead: it should take
- care of calling this.
- This method is somewhat misnamed. The reactor will not necessarily be
- in the running state by the time this method returns. The only
- guarantee is that it will be on its way to the running state.
- """
- if self._started:
- raise error.ReactorAlreadyRunning()
- if self._startedBefore:
- raise error.ReactorNotRestartable()
- self._started = True
- self._stopped = False
- if self._registerAsIOThread:
- threadable.registerAsIOThread()
- self.fireSystemEvent('startup')
- def _reallyStartRunning(self):
- """
- Method called to transition to the running state. This should happen
- in the I{during startup} event trigger phase.
- """
- self.running = True
- # IReactorTime
- seconds = staticmethod(runtimeSeconds)
- def callLater(self, _seconds, _f, *args, **kw):
- """See twisted.internet.interfaces.IReactorTime.callLater.
- """
- assert callable(_f), "%s is not callable" % _f
- assert _seconds >= 0, \
- "%s is not greater than or equal to 0 seconds" % (_seconds,)
- tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
- self._cancelCallLater,
- self._moveCallLaterSooner,
- seconds=self.seconds)
- self._newTimedCalls.append(tple)
- return tple
- def _moveCallLaterSooner(self, tple):
- # Linear time find: slow.
- heap = self._pendingTimedCalls
- try:
- pos = heap.index(tple)
- # Move elt up the heap until it rests at the right place.
- elt = heap[pos]
- while pos != 0:
- parent = (pos-1) // 2
- if heap[parent] <= elt:
- break
- # move parent down
- heap[pos] = heap[parent]
- pos = parent
- heap[pos] = elt
- except ValueError:
- # element was not found in heap - oh well...
- pass
- def _cancelCallLater(self, tple):
- self._cancellations+=1
- def getDelayedCalls(self):
- """
- Return all the outstanding delayed calls in the system.
- They are returned in no particular order.
- This method is not efficient -- it is really only meant for
- test cases.
- @return: A list of outstanding delayed calls.
- @type: L{list} of L{DelayedCall}
- """
- return [x for x in (self._pendingTimedCalls + self._newTimedCalls) if not x.cancelled]
- def _insertNewDelayedCalls(self):
- for call in self._newTimedCalls:
- if call.cancelled:
- self._cancellations-=1
- else:
- call.activate_delay()
- heappush(self._pendingTimedCalls, call)
- self._newTimedCalls = []
- def timeout(self):
- """
- Determine the longest time the reactor may sleep (waiting on I/O
- notification, perhaps) before it must wake up to service a time-related
- event.
- @return: The maximum number of seconds the reactor may sleep.
- @rtype: L{float}
- """
- # insert new delayed calls to make sure to include them in timeout value
- self._insertNewDelayedCalls()
- if not self._pendingTimedCalls:
- return None
- delay = self._pendingTimedCalls[0].time - self.seconds()
- # Pick a somewhat arbitrary maximum possible value for the timeout.
- # This value is 2 ** 31 / 1000, which is the number of seconds which can
- # be represented as an integer number of milliseconds in a signed 32 bit
- # integer. This particular limit is imposed by the epoll_wait(3)
- # interface which accepts a timeout as a C "int" type and treats it as
- # representing a number of milliseconds.
- longest = 2147483
- # Don't let the delay be in the past (negative) or exceed a plausible
- # maximum (platform-imposed) interval.
- return max(0, min(longest, delay))
- def runUntilCurrent(self):
- """
- Run all pending timed calls.
- """
- if self.threadCallQueue:
- # Keep track of how many calls we actually make, as we're
- # making them, in case another call is added to the queue
- # while we're in this loop.
- count = 0
- total = len(self.threadCallQueue)
- for (f, a, kw) in self.threadCallQueue:
- try:
- f(*a, **kw)
- except:
- log.err()
- count += 1
- if count == total:
- break
- del self.threadCallQueue[:count]
- if self.threadCallQueue:
- self.wakeUp()
- # insert new delayed calls now
- self._insertNewDelayedCalls()
- now = self.seconds()
- while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
- call = heappop(self._pendingTimedCalls)
- if call.cancelled:
- self._cancellations-=1
- continue
- if call.delayed_time > 0:
- call.activate_delay()
- heappush(self._pendingTimedCalls, call)
- continue
- try:
- call.called = 1
- call.func(*call.args, **call.kw)
- except:
- log.deferr()
- if hasattr(call, "creator"):
- e = "\n"
- e += " C: previous exception occurred in " + \
- "a DelayedCall created here:\n"
- e += " C:"
- e += "".join(call.creator).rstrip().replace("\n","\n C:")
- e += "\n"
- log.msg(e)
- if (self._cancellations > 50 and
- self._cancellations > len(self._pendingTimedCalls) >> 1):
- self._cancellations = 0
- self._pendingTimedCalls = [x for x in self._pendingTimedCalls
- if not x.cancelled]
- heapify(self._pendingTimedCalls)
- if self._justStopped:
- self._justStopped = False
- self.fireSystemEvent("shutdown")
- # IReactorProcess
- def _checkProcessArgs(self, args, env):
- """
- Check for valid arguments and environment to spawnProcess.
- @return: A two element tuple giving values to use when creating the
- process. The first element of the tuple is a C{list} of C{bytes}
- giving the values for argv of the child process. The second element
- of the tuple is either L{None} if C{env} was L{None} or a C{dict}
- mapping C{bytes} environment keys to C{bytes} environment values.
- """
- # Any unicode string which Python would successfully implicitly
- # encode to a byte string would have worked before these explicit
- # checks were added. Anything which would have failed with a
- # UnicodeEncodeError during that implicit encoding step would have
- # raised an exception in the child process and that would have been
- # a pain in the butt to debug.
- #
- # So, we will explicitly attempt the same encoding which Python
- # would implicitly do later. If it fails, we will report an error
- # without ever spawning a child process. If it succeeds, we'll save
- # the result so that Python doesn't need to do it implicitly later.
- #
- # -exarkun
- defaultEncoding = sys.getfilesystemencoding()
- # Common check function
- def argChecker(arg):
- """
- Return either L{bytes} or L{None}. If the given value is not
- allowable for some reason, L{None} is returned. Otherwise, a
- possibly different object which should be used in place of arg is
- returned. This forces unicode encoding to happen now, rather than
- implicitly later.
- """
- if isinstance(arg, unicode):
- try:
- arg = arg.encode(defaultEncoding)
- except UnicodeEncodeError:
- return None
- if isinstance(arg, bytes) and b'\0' not in arg:
- return arg
- return None
- # Make a few tests to check input validity
- if not isinstance(args, (tuple, list)):
- raise TypeError("Arguments must be a tuple or list")
- outputArgs = []
- for arg in args:
- arg = argChecker(arg)
- if arg is None:
- raise TypeError("Arguments contain a non-string value")
- else:
- outputArgs.append(arg)
- outputEnv = None
- if env is not None:
- outputEnv = {}
- for key, val in iteritems(env):
- key = argChecker(key)
- if key is None:
- raise TypeError("Environment contains a non-string key")
- val = argChecker(val)
- if val is None:
- raise TypeError("Environment contains a non-string value")
- outputEnv[key] = val
- return outputArgs, outputEnv
- # IReactorThreads
- if platform.supportsThreads():
- threadpool = None
- # ID of the trigger starting the threadpool
- _threadpoolStartupID = None
- # ID of the trigger stopping the threadpool
- threadpoolShutdownID = None
- def _initThreads(self):
- self.installNameResolver(_GAIResolver(self, self.getThreadPool))
- self.usingThreads = True
- def callFromThread(self, f, *args, **kw):
- """
- See
- L{twisted.internet.interfaces.IReactorFromThreads.callFromThread}.
- """
- assert callable(f), "%s is not callable" % (f,)
- # lists are thread-safe in CPython, but not in Jython
- # this is probably a bug in Jython, but until fixed this code
- # won't work in Jython.
- self.threadCallQueue.append((f, args, kw))
- self.wakeUp()
- def _initThreadPool(self):
- """
- Create the threadpool accessible with callFromThread.
- """
- from twisted.python import threadpool
- self.threadpool = threadpool.ThreadPool(
- 0, 10, 'twisted.internet.reactor')
- self._threadpoolStartupID = self.callWhenRunning(
- self.threadpool.start)
- self.threadpoolShutdownID = self.addSystemEventTrigger(
- 'during', 'shutdown', self._stopThreadPool)
- def _uninstallHandler(self):
- pass
- def _stopThreadPool(self):
- """
- Stop the reactor threadpool. This method is only valid if there
- is currently a threadpool (created by L{_initThreadPool}). It
- is not intended to be called directly; instead, it will be
- called by a shutdown trigger created in L{_initThreadPool}.
- """
- triggers = [self._threadpoolStartupID, self.threadpoolShutdownID]
- for trigger in filter(None, triggers):
- try:
- self.removeSystemEventTrigger(trigger)
- except ValueError:
- pass
- self._threadpoolStartupID = None
- self.threadpoolShutdownID = None
- self.threadpool.stop()
- self.threadpool = None
- def getThreadPool(self):
- """
- See L{twisted.internet.interfaces.IReactorThreads.getThreadPool}.
- """
- if self.threadpool is None:
- self._initThreadPool()
- return self.threadpool
- def callInThread(self, _callable, *args, **kwargs):
- """
- See L{twisted.internet.interfaces.IReactorInThreads.callInThread}.
- """
- self.getThreadPool().callInThread(_callable, *args, **kwargs)
- def suggestThreadPoolSize(self, size):
- """
- See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}.
- """
- self.getThreadPool().adjustPoolsize(maxthreads=size)
- else:
- # This is for signal handlers.
- def callFromThread(self, f, *args, **kw):
- assert callable(f), "%s is not callable" % (f,)
- # See comment in the other callFromThread implementation.
- self.threadCallQueue.append((f, args, kw))
- if platform.supportsThreads():
- classImplements(ReactorBase, IReactorThreads)
- @implementer(IConnector)
- @_oldStyle
- class BaseConnector:
- """Basic implementation of connector.
- State can be: "connecting", "connected", "disconnected"
- """
- timeoutID = None
- factoryStarted = 0
- def __init__(self, factory, timeout, reactor):
- self.state = "disconnected"
- self.reactor = reactor
- self.factory = factory
- self.timeout = timeout
- def disconnect(self):
- """Disconnect whatever our state is."""
- if self.state == 'connecting':
- self.stopConnecting()
- elif self.state == 'connected':
- self.transport.loseConnection()
- def connect(self):
- """Start connection to remote server."""
- if self.state != "disconnected":
- raise RuntimeError("can't connect in this state")
- self.state = "connecting"
- if not self.factoryStarted:
- self.factory.doStart()
- self.factoryStarted = 1
- self.transport = transport = self._makeTransport()
- if self.timeout is not None:
- self.timeoutID = self.reactor.callLater(self.timeout, transport.failIfNotConnected, error.TimeoutError())
- self.factory.startedConnecting(self)
- def stopConnecting(self):
- """Stop attempting to connect."""
- if self.state != "connecting":
- raise error.NotConnectingError("we're not trying to connect")
- self.state = "disconnected"
- self.transport.failIfNotConnected(error.UserError())
- del self.transport
- def cancelTimeout(self):
- if self.timeoutID is not None:
- try:
- self.timeoutID.cancel()
- except ValueError:
- pass
- del self.timeoutID
- def buildProtocol(self, addr):
- self.state = "connected"
- self.cancelTimeout()
- return self.factory.buildProtocol(addr)
- def connectionFailed(self, reason):
- self.cancelTimeout()
- self.transport = None
- self.state = "disconnected"
- self.factory.clientConnectionFailed(self, reason)
- if self.state == "disconnected":
- # factory hasn't called our connect() method
- self.factory.doStop()
- self.factoryStarted = 0
- def connectionLost(self, reason):
- self.state = "disconnected"
- self.factory.clientConnectionLost(self, reason)
- if self.state == "disconnected":
- # factory hasn't called our connect() method
- self.factory.doStop()
- self.factoryStarted = 0
- def getDestination(self):
- raise NotImplementedError(
- reflect.qual(self.__class__) + " did not implement "
- "getDestination")
- def __repr__(self):
- return "<%s instance at 0x%x %s %s>" % (
- reflect.qual(self.__class__), id(self), self.state,
- self.getDestination())
- class BasePort(abstract.FileDescriptor):
- """Basic implementation of a ListeningPort.
- Note: This does not actually implement IListeningPort.
- """
- addressFamily = None
- socketType = None
- def createInternetSocket(self):
- s = socket.socket(self.addressFamily, self.socketType)
- s.setblocking(0)
- fdesc._setCloseOnExec(s.fileno())
- return s
- def doWrite(self):
- """Raises a RuntimeError"""
- raise RuntimeError(
- "doWrite called on a %s" % reflect.qual(self.__class__))
- class _SignalReactorMixin(object):
- """
- Private mixin to manage signals: it installs signal handlers at start time,
- and define run method.
- It can only be used mixed in with L{ReactorBase}, and has to be defined
- first in the inheritance (so that method resolution order finds
- startRunning first).
- @type _installSignalHandlers: C{bool}
- @ivar _installSignalHandlers: A flag which indicates whether any signal
- handlers will be installed during startup. This includes handlers for
- SIGCHLD to monitor child processes, and SIGINT, SIGTERM, and SIGBREAK
- to stop the reactor.
- """
- _installSignalHandlers = False
- def _handleSignals(self):
- """
- Install the signal handlers for the Twisted event loop.
- """
- try:
- import signal
- except ImportError:
- log.msg("Warning: signal module unavailable -- "
- "not installing signal handlers.")
- return
- if signal.getsignal(signal.SIGINT) == signal.default_int_handler:
- # only handle if there isn't already a handler, e.g. for Pdb.
- signal.signal(signal.SIGINT, self.sigInt)
- signal.signal(signal.SIGTERM, self.sigTerm)
- # Catch Ctrl-Break in windows
- if hasattr(signal, "SIGBREAK"):
- signal.signal(signal.SIGBREAK, self.sigBreak)
- def startRunning(self, installSignalHandlers=True):
- """
- Extend the base implementation in order to remember whether signal
- handlers should be installed later.
- @type installSignalHandlers: C{bool}
- @param installSignalHandlers: A flag which, if set, indicates that
- handlers for a number of (implementation-defined) signals should be
- installed during startup.
- """
- self._installSignalHandlers = installSignalHandlers
- ReactorBase.startRunning(self)
- def _reallyStartRunning(self):
- """
- Extend the base implementation by also installing signal handlers, if
- C{self._installSignalHandlers} is true.
- """
- ReactorBase._reallyStartRunning(self)
- if self._installSignalHandlers:
- # Make sure this happens before after-startup events, since the
- # expectation of after-startup is that the reactor is fully
- # initialized. Don't do it right away for historical reasons
- # (perhaps some before-startup triggers don't want there to be a
- # custom SIGCHLD handler so that they can run child processes with
- # some blocking api).
- self._handleSignals()
- def run(self, installSignalHandlers=True):
- self.startRunning(installSignalHandlers=installSignalHandlers)
- self.mainLoop()
- def mainLoop(self):
- while self._started:
- try:
- while self._started:
- # Advance simulation time in delayed event
- # processors.
- self.runUntilCurrent()
- t2 = self.timeout()
- t = self.running and t2
- self.doIteration(t)
- except:
- log.msg("Unexpected error in main loop.")
- log.err()
- else:
- log.msg('Main loop terminated.')
- __all__ = []
|