base.py 47 KB


  1. # -*- test-case-name: twisted.test.test_internet,twisted.internet.test.test_core -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Very basic functionality for a Reactor implementation.
  6. """
  7. import builtins
  8. import socket # needed only for sync-dns
  9. import warnings
  10. from abc import ABC, abstractmethod
  11. from heapq import heapify, heappop, heappush
  12. from traceback import format_stack
  13. from types import FrameType
  14. from typing import (
  15. TYPE_CHECKING,
  16. Any,
  17. Callable,
  18. Dict,
  19. List,
  20. NewType,
  21. Optional,
  22. Sequence,
  23. Set,
  24. Tuple,
  25. Union,
  26. cast,
  27. )
  28. from zope.interface import classImplements, implementer
  29. from twisted.internet import abstract, defer, error, fdesc, main, threads
  30. from twisted.internet._resolver import (
  31. ComplexResolverSimplifier as _ComplexResolverSimplifier,
  32. GAIResolver as _GAIResolver,
  33. SimpleResolverComplexifier as _SimpleResolverComplexifier,
  34. )
  35. from twisted.internet.defer import Deferred, DeferredList
  36. from twisted.internet.interfaces import (
  37. IAddress,
  38. IConnector,
  39. IDelayedCall,
  40. IHostnameResolver,
  41. IProtocol,
  42. IReactorCore,
  43. IReactorFromThreads,
  44. IReactorPluggableNameResolver,
  45. IReactorPluggableResolver,
  46. IReactorThreads,
  47. IReactorTime,
  48. IReadDescriptor,
  49. IResolverSimple,
  50. IWriteDescriptor,
  51. _ISupportsExitSignalCapturing,
  52. )
  53. from twisted.internet.protocol import ClientFactory
  54. from twisted.logger import Logger
  55. from twisted.python import reflect
  56. from twisted.python.failure import Failure
  57. from twisted.python.log import callWithLogger as _callWithLogger
  58. from twisted.python.runtime import platform, seconds as runtimeSeconds
  59. from ._signals import SignalHandling, _WithoutSignalHandling, _WithSignalHandling
  60. if TYPE_CHECKING:
  61. from twisted.internet.tcp import Client
  62. # This import is for side-effects! Even if you don't see any code using it
  63. # in this module, don't delete it.
  64. from twisted.python import threadable
  65. if platform.supportsThreads():
  66. from twisted.python.threadpool import ThreadPool
  67. else:
  68. ThreadPool = None # type: ignore[misc, assignment]
  69. _log = Logger()
  70. # Pre-allocate some static application-code failure logging handlers so that we
  71. # do not need to allocate them in performance-sensitive bits of code below.
  72. _topHandler = _log.failureHandler("Unexpected error in main loop")
  73. _threadCallHandler = _log.failureHandler("while calling from thread")
  74. _systemEventHandler = _log.failureHandler("While calling system event trigger handler")
  75. @implementer(IDelayedCall)
  76. class DelayedCall:
  77. # enable .debug to record creator call stack, and it will be logged if
  78. # an exception occurs while the function is being run
  79. debug = False
  80. _repr: Optional[str] = None
  81. # In debug mode, the call stack at the time of instantiation.
  82. creator: Optional[Sequence[str]] = None
  83. def __init__(
  84. self,
  85. time: float,
  86. func: Callable[..., Any],
  87. args: Sequence[object],
  88. kw: Dict[str, object],
  89. cancel: Callable[["DelayedCall"], None],
  90. reset: Callable[["DelayedCall"], None],
  91. seconds: Callable[[], float] = runtimeSeconds,
  92. ) -> None:
  93. """
  94. @param time: Seconds from the epoch at which to call C{func}.
  95. @param func: The callable to call.
  96. @param args: The positional arguments to pass to the callable.
  97. @param kw: The keyword arguments to pass to the callable.
  98. @param cancel: A callable which will be called with this
  99. DelayedCall before cancellation.
  100. @param reset: A callable which will be called with this
  101. DelayedCall after changing this DelayedCall's scheduled
  102. execution time. The callable should adjust any necessary
  103. scheduling details to ensure this DelayedCall is invoked
  104. at the new appropriate time.
  105. @param seconds: If provided, a no-argument callable which will be
  106. used to determine the current time any time that information is
  107. needed.
  108. """
  109. self.time, self.func, self.args, self.kw = time, func, args, kw
  110. self.resetter = reset
  111. self.canceller = cancel
  112. self.seconds = seconds
  113. self.cancelled = self.called = 0
  114. self.delayed_time = 0.0
  115. if self.debug:
  116. self.creator = format_stack()[:-2]
  117. def getTime(self) -> float:
  118. """
  119. Return the time at which this call will fire
  120. @return: The number of seconds after the epoch at which this call is
  121. scheduled to be made.
  122. """
  123. return self.time + self.delayed_time
  124. def cancel(self) -> None:
  125. """
  126. Unschedule this call
  127. @raise AlreadyCancelled: Raised if this call has already been
  128. unscheduled.
  129. @raise AlreadyCalled: Raised if this call has already been made.
  130. """
  131. if self.cancelled:
  132. raise error.AlreadyCancelled
  133. elif self.called:
  134. raise error.AlreadyCalled
  135. else:
  136. self.canceller(self)
  137. self.cancelled = 1
  138. if self.debug:
  139. self._repr = repr(self)
  140. del self.func, self.args, self.kw
  141. def reset(self, secondsFromNow: float) -> None:
  142. """
  143. Reschedule this call for a different time
  144. @param secondsFromNow: The number of seconds from the time of the
  145. C{reset} call at which this call will be scheduled.
  146. @raise AlreadyCancelled: Raised if this call has been cancelled.
  147. @raise AlreadyCalled: Raised if this call has already been made.
  148. """
  149. if self.cancelled:
  150. raise error.AlreadyCancelled
  151. elif self.called:
  152. raise error.AlreadyCalled
  153. else:
  154. newTime = self.seconds() + secondsFromNow
  155. if newTime < self.time:
  156. self.delayed_time = 0.0
  157. self.time = newTime
  158. self.resetter(self)
  159. else:
  160. self.delayed_time = newTime - self.time
  161. def delay(self, secondsLater: float) -> None:
  162. """
  163. Reschedule this call for a later time
  164. @param secondsLater: The number of seconds after the originally
  165. scheduled time for which to reschedule this call.
  166. @raise AlreadyCancelled: Raised if this call has been cancelled.
  167. @raise AlreadyCalled: Raised if this call has already been made.
  168. """
  169. if self.cancelled:
  170. raise error.AlreadyCancelled
  171. elif self.called:
  172. raise error.AlreadyCalled
  173. else:
  174. self.delayed_time += secondsLater
  175. if self.delayed_time < 0.0:
  176. self.activate_delay()
  177. self.resetter(self)
  178. def activate_delay(self) -> None:
  179. self.time += self.delayed_time
  180. self.delayed_time = 0.0
  181. def active(self) -> bool:
  182. """Determine whether this call is still pending
  183. @return: True if this call has not yet been made or cancelled,
  184. False otherwise.
  185. """
  186. return not (self.cancelled or self.called)
  187. def __le__(self, other: "DelayedCall") -> bool:
  188. """
  189. Implement C{<=} operator between two L{DelayedCall} instances.
  190. Comparison is based on the C{time} attribute (unadjusted by the
  191. delayed time).
  192. """
  193. return self.time <= other.time
  194. def __lt__(self, other: "DelayedCall") -> bool:
  195. """
  196. Implement C{<} operator between two L{DelayedCall} instances.
  197. Comparison is based on the C{time} attribute (unadjusted by the
  198. delayed time).
  199. """
  200. return self.time < other.time
  201. def __repr__(self) -> str:
  202. """
  203. Implement C{repr()} for L{DelayedCall} instances.
  204. @returns: String containing details of the L{DelayedCall}.
  205. """
  206. if self._repr is not None:
  207. return self._repr
  208. if hasattr(self, "func"):
  209. # This code should be replaced by a utility function in reflect;
  210. # see ticket #6066:
  211. func = getattr(self.func, "__qualname__", None)
  212. if func is None:
  213. func = getattr(self.func, "__name__", None)
  214. if func is not None:
  215. imClass = getattr(self.func, "im_class", None)
  216. if imClass is not None:
  217. func = f"{imClass}.{func}"
  218. if func is None:
  219. func = reflect.safe_repr(self.func)
  220. else:
  221. func = None
  222. now = self.seconds()
  223. L = [
  224. "<DelayedCall 0x%x [%ss] called=%s cancelled=%s"
  225. % (id(self), self.time - now, self.called, self.cancelled)
  226. ]
  227. if func is not None:
  228. L.extend((" ", func, "("))
  229. if self.args:
  230. L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
  231. if self.kw:
  232. L.append(", ")
  233. if self.kw:
  234. L.append(
  235. ", ".join(
  236. [f"{k}={reflect.safe_repr(v)}" for (k, v) in self.kw.items()]
  237. )
  238. )
  239. L.append(")")
  240. if self.creator is not None:
  241. L.append("\n\ntraceback at creation: \n\n%s" % (" ".join(self.creator)))
  242. L.append(">")
  243. return "".join(L)
  244. @implementer(IResolverSimple)
  245. class ThreadedResolver:
  246. """
  247. L{ThreadedResolver} uses a reactor, a threadpool, and
  248. L{socket.gethostbyname} to perform name lookups without blocking the
  249. reactor thread. It also supports timeouts indepedently from whatever
  250. timeout logic L{socket.gethostbyname} might have.
  251. @ivar reactor: The reactor the threadpool of which will be used to call
  252. L{socket.gethostbyname} and the I/O thread of which the result will be
  253. delivered.
  254. """
  255. def __init__(self, reactor: "ReactorBase") -> None:
  256. self.reactor = reactor
  257. self._runningQueries: Dict[
  258. Deferred[str], Tuple[Deferred[str], IDelayedCall]
  259. ] = {}
  260. def _fail(self, name: str, err: str) -> Failure:
  261. lookupError = error.DNSLookupError(f"address {name!r} not found: {err}")
  262. return Failure(lookupError)
  263. def _cleanup(self, name: str, lookupDeferred: Deferred[str]) -> None:
  264. userDeferred, cancelCall = self._runningQueries[lookupDeferred]
  265. del self._runningQueries[lookupDeferred]
  266. userDeferred.errback(self._fail(name, "timeout error"))
  267. def _checkTimeout(
  268. self, result: Union[str, Failure], name: str, lookupDeferred: Deferred[str]
  269. ) -> None:
  270. try:
  271. userDeferred, cancelCall = self._runningQueries[lookupDeferred]
  272. except KeyError:
  273. pass
  274. else:
  275. del self._runningQueries[lookupDeferred]
  276. cancelCall.cancel()
  277. if isinstance(result, Failure):
  278. userDeferred.errback(self._fail(name, result.getErrorMessage()))
  279. else:
  280. userDeferred.callback(result)
  281. def getHostByName(
  282. self, name: str, timeout: Sequence[int] = (1, 3, 11, 45)
  283. ) -> Deferred[str]:
  284. """
  285. See L{twisted.internet.interfaces.IResolverSimple.getHostByName}.
  286. Note that the elements of C{timeout} are summed and the result is used
  287. as a timeout for the lookup. Any intermediate timeout or retry logic
  288. is left up to the platform via L{socket.gethostbyname}.
  289. """
  290. if timeout:
  291. timeoutDelay = sum(timeout)
  292. else:
  293. timeoutDelay = 60
  294. userDeferred: Deferred[str] = Deferred()
  295. lookupDeferred = threads.deferToThreadPool(
  296. cast(IReactorFromThreads, self.reactor),
  297. cast(IReactorThreads, self.reactor).getThreadPool(),
  298. socket.gethostbyname,
  299. name,
  300. )
  301. cancelCall = cast(IReactorTime, self.reactor).callLater(
  302. timeoutDelay, self._cleanup, name, lookupDeferred
  303. )
  304. self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
  305. _: Deferred[None] = lookupDeferred.addBoth(
  306. self._checkTimeout, name, lookupDeferred
  307. )
  308. return userDeferred
  309. @implementer(IResolverSimple)
  310. class BlockingResolver:
  311. def getHostByName(
  312. self, name: str, timeout: Sequence[int] = (1, 3, 11, 45)
  313. ) -> Deferred[str]:
  314. try:
  315. address = socket.gethostbyname(name)
  316. except OSError:
  317. msg = f"address {name!r} not found"
  318. err = error.DNSLookupError(msg)
  319. return defer.fail(err)
  320. else:
  321. return defer.succeed(address)
  322. _ThreePhaseEventTriggerCallable = Callable[..., Any]
  323. _ThreePhaseEventTrigger = Tuple[
  324. _ThreePhaseEventTriggerCallable, Tuple[object, ...], Dict[str, object]
  325. ]
  326. _ThreePhaseEventTriggerHandle = NewType(
  327. "_ThreePhaseEventTriggerHandle",
  328. Tuple[str, _ThreePhaseEventTriggerCallable, Tuple[object, ...], Dict[str, object]],
  329. )
  330. class _ThreePhaseEvent:
  331. """
  332. Collection of callables (with arguments) which can be invoked as a group in
  333. a particular order.
  334. This provides the underlying implementation for the reactor's system event
  335. triggers. An instance of this class tracks triggers for all phases of a
  336. single type of event.
  337. @ivar before: A list of the before-phase triggers containing three-tuples
  338. of a callable, a tuple of positional arguments, and a dict of keyword
  339. arguments
  340. @ivar finishedBefore: A list of the before-phase triggers which have
  341. already been executed. This is only populated in the C{'BEFORE'} state.
  342. @ivar during: A list of the during-phase triggers containing three-tuples
  343. of a callable, a tuple of positional arguments, and a dict of keyword
  344. arguments
  345. @ivar after: A list of the after-phase triggers containing three-tuples
  346. of a callable, a tuple of positional arguments, and a dict of keyword
  347. arguments
  348. @ivar state: A string indicating what is currently going on with this
  349. object. One of C{'BASE'} (for when nothing in particular is happening;
  350. this is the initial value), C{'BEFORE'} (when the before-phase triggers
  351. are in the process of being executed).
  352. """
  353. def __init__(self) -> None:
  354. self.before: List[_ThreePhaseEventTrigger] = []
  355. self.during: List[_ThreePhaseEventTrigger] = []
  356. self.after: List[_ThreePhaseEventTrigger] = []
  357. self.state = "BASE"
  358. def addTrigger(
  359. self,
  360. phase: str,
  361. callable: _ThreePhaseEventTriggerCallable,
  362. *args: object,
  363. **kwargs: object,
  364. ) -> _ThreePhaseEventTriggerHandle:
  365. """
  366. Add a trigger to the indicate phase.
  367. @param phase: One of C{'before'}, C{'during'}, or C{'after'}.
  368. @param callable: An object to be called when this event is triggered.
  369. @param args: Positional arguments to pass to C{callable}.
  370. @param kwargs: Keyword arguments to pass to C{callable}.
  371. @return: An opaque handle which may be passed to L{removeTrigger} to
  372. reverse the effects of calling this method.
  373. """
  374. if phase not in ("before", "during", "after"):
  375. raise KeyError("invalid phase")
  376. getattr(self, phase).append((callable, args, kwargs))
  377. return _ThreePhaseEventTriggerHandle((phase, callable, args, kwargs))
  378. def removeTrigger(self, handle: _ThreePhaseEventTriggerHandle) -> None:
  379. """
  380. Remove a previously added trigger callable.
  381. @param handle: An object previously returned by L{addTrigger}. The
  382. trigger added by that call will be removed.
  383. @raise ValueError: If the trigger associated with C{handle} has already
  384. been removed or if C{handle} is not a valid handle.
  385. """
  386. getattr(self, "removeTrigger_" + self.state)(handle)
  387. def removeTrigger_BASE(self, handle: _ThreePhaseEventTriggerHandle) -> None:
  388. """
  389. Just try to remove the trigger.
  390. @see: removeTrigger
  391. """
  392. try:
  393. phase, callable, args, kwargs = handle
  394. except (TypeError, ValueError):
  395. raise ValueError("invalid trigger handle")
  396. else:
  397. if phase not in ("before", "during", "after"):
  398. raise KeyError("invalid phase")
  399. getattr(self, phase).remove((callable, args, kwargs))
  400. def removeTrigger_BEFORE(self, handle: _ThreePhaseEventTriggerHandle) -> None:
  401. """
  402. Remove the trigger if it has yet to be executed, otherwise emit a
  403. warning that in the future an exception will be raised when removing an
  404. already-executed trigger.
  405. @see: removeTrigger
  406. """
  407. phase, callable, args, kwargs = handle
  408. if phase != "before":
  409. return self.removeTrigger_BASE(handle)
  410. if (callable, args, kwargs) in self.finishedBefore:
  411. warnings.warn(
  412. "Removing already-fired system event triggers will raise an "
  413. "exception in a future version of Twisted.",
  414. category=DeprecationWarning,
  415. stacklevel=3,
  416. )
  417. else:
  418. self.removeTrigger_BASE(handle)
  419. def fireEvent(self) -> None:
  420. """
  421. Call the triggers added to this event.
  422. """
  423. self.state = "BEFORE"
  424. self.finishedBefore = []
  425. beforeResults: List[Deferred[object]] = []
  426. while self.before:
  427. callable, args, kwargs = self.before.pop(0)
  428. self.finishedBefore.append((callable, args, kwargs))
  429. result = None
  430. with _systemEventHandler:
  431. result = callable(*args, **kwargs)
  432. if isinstance(result, Deferred):
  433. beforeResults.append(result)
  434. DeferredList(beforeResults).addCallback(self._continueFiring)
  435. def _continueFiring(self, ignored: object) -> None:
  436. """
  437. Call the during and after phase triggers for this event.
  438. """
  439. self.state = "BASE"
  440. self.finishedBefore = []
  441. for phase in self.during, self.after:
  442. while phase:
  443. callable, args, kwargs = phase.pop(0)
  444. with _systemEventHandler:
  445. callable(*args, **kwargs)
  446. @implementer(IReactorPluggableNameResolver, IReactorPluggableResolver)
  447. class PluggableResolverMixin:
  448. """
  449. A mixin which implements the pluggable resolver reactor interfaces.
  450. @ivar resolver: The installed L{IResolverSimple}.
  451. @ivar _nameResolver: The installed L{IHostnameResolver}.
  452. """
  453. resolver: IResolverSimple = BlockingResolver()
  454. _nameResolver: IHostnameResolver = _SimpleResolverComplexifier(resolver)
  455. # IReactorPluggableResolver
  456. def installResolver(self, resolver: IResolverSimple) -> IResolverSimple:
  457. """
  458. See L{IReactorPluggableResolver}.
  459. @param resolver: see L{IReactorPluggableResolver}.
  460. @return: see L{IReactorPluggableResolver}.
  461. """
  462. assert IResolverSimple.providedBy(resolver)
  463. oldResolver = self.resolver
  464. self.resolver = resolver
  465. self._nameResolver = _SimpleResolverComplexifier(resolver)
  466. return oldResolver
  467. # IReactorPluggableNameResolver
  468. def installNameResolver(self, resolver: IHostnameResolver) -> IHostnameResolver:
  469. """
  470. See L{IReactorPluggableNameResolver}.
  471. @param resolver: See L{IReactorPluggableNameResolver}.
  472. @return: see L{IReactorPluggableNameResolver}.
  473. """
  474. previousNameResolver = self._nameResolver
  475. self._nameResolver = resolver
  476. self.resolver = _ComplexResolverSimplifier(resolver)
  477. return previousNameResolver
  478. @property
  479. def nameResolver(self) -> IHostnameResolver:
  480. """
  481. Implementation of read-only
  482. L{IReactorPluggableNameResolver.nameResolver}.
  483. """
  484. return self._nameResolver
  485. _SystemEventID = NewType("_SystemEventID", Tuple[str, _ThreePhaseEventTriggerHandle])
  486. _ThreadCall = Tuple[Callable[..., Any], Tuple[object, ...], Dict[str, object]]
  487. _DEFAULT_DELAYED_CALL_LOGGING_HANDLER = _log.failureHandler("while handling timed call")
  488. @implementer(IReactorCore, IReactorTime, _ISupportsExitSignalCapturing)
  489. class ReactorBase(PluggableResolverMixin):
  490. """
  491. Default base class for Reactors.
  492. @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
  493. and C{reactor.stop}. This should be replaced with an explicit state
  494. machine.
  495. @ivar _justStopped: A flag which is true between the time C{reactor.stop}
  496. is called and the time the shutdown system event is fired. This is
  497. used to determine whether that event should be fired after each
  498. iteration through the mainloop. This should be replaced with an
  499. explicit state machine.
  500. @ivar _started: A flag which is true from the time C{reactor.run} is called
  501. until the time C{reactor.run} returns. This is used to prevent calls
  502. to C{reactor.run} on a running reactor. This should be replaced with
  503. an explicit state machine.
  504. @ivar running: See L{IReactorCore.running}
  505. @ivar _registerAsIOThread: A flag controlling whether the reactor will
  506. register the thread it is running in as the I/O thread when it starts.
  507. If C{True}, registration will be done, otherwise it will not be.
  508. @ivar _exitSignal: See L{_ISupportsExitSignalCapturing._exitSignal}
  509. @ivar _installSignalHandlers: A flag which indicates whether any signal
  510. handlers will be installed during startup. This includes handlers for
  511. SIGCHLD to monitor child processes, and SIGINT, SIGTERM, and SIGBREAK
  512. @ivar _signals: An object which knows how to install and uninstall the
  513. reactor's signal-handling behavior.
  514. """
  515. _registerAsIOThread = True
  516. _stopped = True
  517. installed = False
  518. usingThreads = False
  519. _exitSignal = None
  520. # Set to something meaningful between startRunning and shortly before run
  521. # returns. We don't know the value to be used by `run` until that method
  522. # itself is called and we learn the value of installSignalHandlers.
  523. # However, we can use a no-op implementation until then.
  524. _signals: SignalHandling = _WithoutSignalHandling()
  525. __name__ = "twisted.internet.reactor"
  526. def __init__(self) -> None:
  527. super().__init__()
  528. self.threadCallQueue: List[_ThreadCall] = []
  529. self._eventTriggers: Dict[str, _ThreePhaseEvent] = {}
  530. self._pendingTimedCalls: List[DelayedCall] = []
  531. self._newTimedCalls: List[DelayedCall] = []
  532. self._cancellations = 0
  533. self.running = False
  534. self._started = False
  535. self._justStopped = False
  536. self._startedBefore = False
  537. # reactor internal readers, e.g. the waker.
  538. # Using Any as the type here… unable to find a suitable defined interface
  539. self._internalReaders: Set[Any] = set()
  540. self.waker: Any = None
  541. # Arrange for the running attribute to change to True at the right time
  542. # and let a subclass possibly do other things at that time (eg install
  543. # signal handlers).
  544. self.addSystemEventTrigger("during", "startup", self._reallyStartRunning)
  545. self.addSystemEventTrigger("during", "shutdown", self.crash)
  546. self.addSystemEventTrigger("during", "shutdown", self.disconnectAll)
  547. if platform.supportsThreads():
  548. self._initThreads()
  549. self.installWaker()
  550. # Signal handling pieces
  551. _installSignalHandlers: bool = False
  552. def _makeSignalHandling(self, installSignalHandlers: bool) -> SignalHandling:
  553. """
  554. Get an appropriate signal handling object.
  555. @param installSignalHandlers: Indicate whether to even try to do any
  556. signal handling. If C{False} then the result will be a no-op
  557. implementation.
  558. """
  559. if installSignalHandlers:
  560. return self._signalsFactory()
  561. return _WithoutSignalHandling()
  562. def _signalsFactory(self) -> SignalHandling:
  563. """
  564. Get a signal handling object that implements the basic behavior of
  565. stopping the reactor on SIGINT, SIGBREAK, and SIGTERM.
  566. """
  567. return _WithSignalHandling(
  568. self.sigInt,
  569. self.sigBreak,
  570. self.sigTerm,
  571. )
  572. def _addInternalReader(self, reader: IReadDescriptor) -> None:
  573. """
  574. Add a read descriptor which is part of the implementation of the
  575. reactor itself.
  576. The read descriptor will not be removed by L{IReactorFDSet.removeAll}.
  577. """
  578. self._internalReaders.add(reader)
  579. self.addReader(reader)
  580. def _removeInternalReader(self, reader: IReadDescriptor) -> None:
  581. """
  582. Remove a read descriptor which is part of the implementation of the
  583. reactor itself.
  584. """
  585. self._internalReaders.remove(reader)
  586. self.removeReader(reader)
  587. def run(self, installSignalHandlers: bool = True) -> None:
  588. self.startRunning(installSignalHandlers=installSignalHandlers)
  589. try:
  590. self.mainLoop()
  591. finally:
  592. self._signals.uninstall()
  593. def mainLoop(self) -> None:
  594. while self._started:
  595. with _topHandler:
  596. # Advance simulation time in delayed event processors.
  597. self.runUntilCurrent()
  598. t2 = self.timeout()
  599. t = self.running and t2
  600. self.doIteration(t)
  601. _log.info("Main loop terminated.")
  602. # override in subclasses
  603. _lock = None
  604. def installWaker(self) -> None:
  605. raise NotImplementedError(
  606. reflect.qual(self.__class__) + " did not implement installWaker"
  607. )
  608. def wakeUp(self) -> None:
  609. """
  610. Wake up the event loop.
  611. """
  612. if self.waker:
  613. self.waker.wakeUp()
  614. # if the waker isn't installed, the reactor isn't running, and
  615. # therefore doesn't need to be woken up
  616. def doIteration(self, delay: Optional[float]) -> None:
  617. """
  618. Do one iteration over the readers and writers which have been added.
  619. """
  620. raise NotImplementedError(
  621. reflect.qual(self.__class__) + " did not implement doIteration"
  622. )
  623. def addReader(self, reader: IReadDescriptor) -> None:
  624. raise NotImplementedError(
  625. reflect.qual(self.__class__) + " did not implement addReader"
  626. )
  627. def addWriter(self, writer: IWriteDescriptor) -> None:
  628. raise NotImplementedError(
  629. reflect.qual(self.__class__) + " did not implement addWriter"
  630. )
  631. def removeReader(self, reader: IReadDescriptor) -> None:
  632. raise NotImplementedError(
  633. reflect.qual(self.__class__) + " did not implement removeReader"
  634. )
  635. def removeWriter(self, writer: IWriteDescriptor) -> None:
  636. raise NotImplementedError(
  637. reflect.qual(self.__class__) + " did not implement removeWriter"
  638. )
  639. def removeAll(self) -> List[Union[IReadDescriptor, IWriteDescriptor]]:
  640. raise NotImplementedError(
  641. reflect.qual(self.__class__) + " did not implement removeAll"
  642. )
  643. def getReaders(self) -> List[IReadDescriptor]:
  644. raise NotImplementedError(
  645. reflect.qual(self.__class__) + " did not implement getReaders"
  646. )
  647. def getWriters(self) -> List[IWriteDescriptor]:
  648. raise NotImplementedError(
  649. reflect.qual(self.__class__) + " did not implement getWriters"
  650. )
  651. # IReactorCore
  652. def resolve(
  653. self, name: str, timeout: Sequence[int] = (1, 3, 11, 45)
  654. ) -> Deferred[str]:
  655. """
  656. Return a Deferred that will resolve a hostname."""
  657. if not name:
  658. # XXX - This is *less than* '::', and will screw up IPv6 servers
  659. return defer.succeed("0.0.0.0")
  660. if abstract.isIPAddress(name):
  661. return defer.succeed(name)
  662. return self.resolver.getHostByName(name, timeout)
  663. def stop(self) -> None:
  664. """
  665. See twisted.internet.interfaces.IReactorCore.stop.
  666. """
  667. if self._stopped:
  668. raise error.ReactorNotRunning("Can't stop reactor that isn't running.")
  669. self._stopped = True
  670. self._justStopped = True
  671. self._startedBefore = True
  672. def crash(self) -> None:
  673. """
  674. See twisted.internet.interfaces.IReactorCore.crash.
  675. Reset reactor state tracking attributes and re-initialize certain
  676. state-transition helpers which were set up in C{__init__} but later
  677. destroyed (through use).
  678. """
  679. self._started = False
  680. self.running = False
  681. self.addSystemEventTrigger("during", "startup", self._reallyStartRunning)
  682. def sigInt(self, number: int, frame: Optional[FrameType] = None) -> None:
  683. """
  684. Handle a SIGINT interrupt.
  685. @param number: See handler specification in L{signal.signal}
  686. @param frame: See handler specification in L{signal.signal}
  687. """
  688. _log.info("Received SIGINT, shutting down.")
  689. self.callFromThread(self.stop)
  690. self._exitSignal = number
  691. def sigBreak(self, number: int, frame: Optional[FrameType] = None) -> None:
  692. """
  693. Handle a SIGBREAK interrupt.
  694. @param number: See handler specification in L{signal.signal}
  695. @param frame: See handler specification in L{signal.signal}
  696. """
  697. _log.info("Received SIGBREAK, shutting down.")
  698. self.callFromThread(self.stop)
  699. self._exitSignal = number
  700. def sigTerm(self, number: int, frame: Optional[FrameType] = None) -> None:
  701. """
  702. Handle a SIGTERM interrupt.
  703. @param number: See handler specification in L{signal.signal}
  704. @param frame: See handler specification in L{signal.signal}
  705. """
  706. _log.info("Received SIGTERM, shutting down.")
  707. self.callFromThread(self.stop)
  708. self._exitSignal = number
  709. def disconnectAll(self) -> None:
  710. """Disconnect every reader, and writer in the system."""
  711. selectables = self.removeAll()
  712. for reader in selectables:
  713. _callWithLogger(
  714. reader, reader.connectionLost, Failure(main.CONNECTION_LOST)
  715. )
  716. def iterate(self, delay: float = 0.0) -> None:
  717. """
  718. See twisted.internet.interfaces.IReactorCore.iterate.
  719. """
  720. self.runUntilCurrent()
  721. self.doIteration(delay)
  722. def fireSystemEvent(self, eventType: str) -> None:
  723. """
  724. See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
  725. """
  726. event = self._eventTriggers.get(eventType)
  727. if event is not None:
  728. event.fireEvent()
  729. def addSystemEventTrigger(
  730. self,
  731. phase: str,
  732. eventType: str,
  733. callable: Callable[..., Any],
  734. *args: object,
  735. **kwargs: object,
  736. ) -> _SystemEventID:
  737. """
  738. See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
  739. """
  740. assert builtins.callable(callable), f"{callable} is not callable"
  741. if eventType not in self._eventTriggers:
  742. self._eventTriggers[eventType] = _ThreePhaseEvent()
  743. return _SystemEventID(
  744. (
  745. eventType,
  746. self._eventTriggers[eventType].addTrigger(
  747. phase, callable, *args, **kwargs
  748. ),
  749. )
  750. )
  751. def removeSystemEventTrigger(self, triggerID: _SystemEventID) -> None:
  752. """
  753. See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
  754. """
  755. eventType, handle = triggerID
  756. self._eventTriggers[eventType].removeTrigger(handle)
  757. def callWhenRunning(
  758. self, callable: Callable[..., Any], *args: object, **kwargs: object
  759. ) -> Optional[_SystemEventID]:
  760. """
  761. See twisted.internet.interfaces.IReactorCore.callWhenRunning.
  762. """
  763. if self.running:
  764. callable(*args, **kwargs)
  765. return None
  766. else:
  767. return self.addSystemEventTrigger(
  768. "after", "startup", callable, *args, **kwargs
  769. )
  770. def startRunning(self, installSignalHandlers: bool = True) -> None:
  771. """
  772. Method called when reactor starts: do some initialization and fire
  773. startup events.
  774. Don't call this directly, call reactor.run() instead: it should take
  775. care of calling this.
  776. This method is somewhat misnamed. The reactor will not necessarily be
  777. in the running state by the time this method returns. The only
  778. guarantee is that it will be on its way to the running state.
  779. @param installSignalHandlers: A flag which, if set, indicates that
  780. handlers for a number of (implementation-defined) signals should be
  781. installed during startup.
  782. """
  783. if self._started:
  784. raise error.ReactorAlreadyRunning()
  785. if self._startedBefore:
  786. raise error.ReactorNotRestartable()
  787. self._signals.uninstall()
  788. self._installSignalHandlers = installSignalHandlers
  789. self._signals = self._makeSignalHandling(installSignalHandlers)
  790. self._started = True
  791. self._stopped = False
  792. if self._registerAsIOThread:
  793. threadable.registerAsIOThread()
  794. self.fireSystemEvent("startup")
  795. def _reallyStartRunning(self) -> None:
  796. """
  797. Method called to transition to the running state. This should happen
  798. in the I{during startup} event trigger phase.
  799. """
  800. self.running = True
  801. if self._installSignalHandlers:
  802. # Make sure this happens before after-startup events, since the
  803. # expectation of after-startup is that the reactor is fully
  804. # initialized. Don't do it right away for historical reasons
  805. # (perhaps some before-startup triggers don't want there to be a
  806. # custom SIGCHLD handler so that they can run child processes with
  807. # some blocking api).
  808. self._signals.install()
  809. # IReactorTime
  810. seconds = staticmethod(runtimeSeconds)
  811. def callLater(
  812. self, delay: float, callable: Callable[..., Any], *args: object, **kw: object
  813. ) -> DelayedCall:
  814. """
  815. See twisted.internet.interfaces.IReactorTime.callLater.
  816. """
  817. assert delay >= 0, f"{delay} is not greater than or equal to 0 seconds"
  818. delayedCall = DelayedCall(
  819. self.seconds() + delay,
  820. callable,
  821. args,
  822. kw,
  823. self._cancelCallLater,
  824. self._moveCallLaterSooner,
  825. seconds=self.seconds,
  826. )
  827. self._newTimedCalls.append(delayedCall)
  828. return delayedCall
  829. def _moveCallLaterSooner(self, delayedCall: DelayedCall) -> None:
  830. # Linear time find: slow.
  831. heap = self._pendingTimedCalls
  832. try:
  833. pos = heap.index(delayedCall)
  834. # Move elt up the heap until it rests at the right place.
  835. elt = heap[pos]
  836. while pos != 0:
  837. parent = (pos - 1) // 2
  838. if heap[parent] <= elt:
  839. break
  840. # move parent down
  841. heap[pos] = heap[parent]
  842. pos = parent
  843. heap[pos] = elt
  844. except ValueError:
  845. # element was not found in heap - oh well...
  846. pass
  847. def _cancelCallLater(self, delayedCall: DelayedCall) -> None:
  848. self._cancellations += 1
  849. def getDelayedCalls(self) -> Sequence[IDelayedCall]:
  850. """
  851. See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls}
  852. """
  853. return [
  854. x
  855. for x in (self._pendingTimedCalls + self._newTimedCalls)
  856. if not x.cancelled
  857. ]
  858. def _insertNewDelayedCalls(self) -> None:
  859. # This function is called twice per reactor iteration, once in
  860. # timeout() and once in runUntilCurrent(), and in most cases there
  861. # won't be any new timeouts. So have a fast path for the empty case.
  862. if not self._newTimedCalls:
  863. return
  864. for call in self._newTimedCalls:
  865. if call.cancelled:
  866. self._cancellations -= 1
  867. else:
  868. call.activate_delay()
  869. heappush(self._pendingTimedCalls, call)
  870. self._newTimedCalls = []
  871. def timeout(self) -> Optional[float]:
  872. """
  873. Determine the longest time the reactor may sleep (waiting on I/O
  874. notification, perhaps) before it must wake up to service a time-related
  875. event.
  876. @return: The maximum number of seconds the reactor may sleep.
  877. """
  878. # insert new delayed calls to make sure to include them in timeout value
  879. self._insertNewDelayedCalls()
  880. if not self._pendingTimedCalls:
  881. return None
  882. delay = self._pendingTimedCalls[0].time - self.seconds()
  883. # Pick a somewhat arbitrary maximum possible value for the timeout.
  884. # This value is 2 ** 31 / 1000, which is the number of seconds which can
  885. # be represented as an integer number of milliseconds in a signed 32 bit
  886. # integer. This particular limit is imposed by the epoll_wait(3)
  887. # interface which accepts a timeout as a C "int" type and treats it as
  888. # representing a number of milliseconds.
  889. longest = 2147483
  890. # Don't let the delay be in the past (negative) or exceed a plausible
  891. # maximum (platform-imposed) interval.
  892. return max(0, min(longest, delay))
  893. def runUntilCurrent(self) -> None:
  894. """
  895. Run all pending timed calls.
  896. """
  897. if self.threadCallQueue:
  898. # Keep track of how many calls we actually make, as we're
  899. # making them, in case another call is added to the queue
  900. # while we're in this loop.
  901. count = 0
  902. total = len(self.threadCallQueue)
  903. for f, a, kw in self.threadCallQueue:
  904. with _threadCallHandler:
  905. f(*a, **kw)
  906. count += 1
  907. if count == total:
  908. break
  909. del self.threadCallQueue[:count]
  910. if self.threadCallQueue:
  911. self.wakeUp()
  912. # insert new delayed calls now
  913. self._insertNewDelayedCalls()
  914. now = self.seconds()
  915. while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
  916. call = heappop(self._pendingTimedCalls)
  917. if call.cancelled:
  918. self._cancellations -= 1
  919. continue
  920. if call.delayed_time > 0.0:
  921. call.activate_delay()
  922. heappush(self._pendingTimedCalls, call)
  923. continue
  924. logHandler = (
  925. _log.failuresHandled(
  926. "while handling timed call {previous()}",
  927. previous=lambda creator=call.creator: (
  928. "\n"
  929. + (" C: from a DelayedCall created here:\n")
  930. + " C:"
  931. + "".join(creator).rstrip().replace("\n", "\n C:")
  932. + "\n"
  933. ),
  934. )
  935. if call.creator
  936. # A much faster logging handler for the common case where extra
  937. # debug info is not being output:
  938. else _DEFAULT_DELAYED_CALL_LOGGING_HANDLER
  939. )
  940. with logHandler:
  941. call.called = 1
  942. call.func(*call.args, **call.kw)
  943. if (
  944. self._cancellations > 50
  945. and self._cancellations > len(self._pendingTimedCalls) >> 1
  946. ):
  947. self._cancellations = 0
  948. self._pendingTimedCalls = [
  949. x for x in self._pendingTimedCalls if not x.cancelled
  950. ]
  951. heapify(self._pendingTimedCalls)
  952. if self._justStopped:
  953. self._justStopped = False
  954. self.fireSystemEvent("shutdown")
  955. # IReactorThreads
  956. if platform.supportsThreads():
  957. assert ThreadPool is not None
  958. threadpool = None
  959. # ID of the trigger starting the threadpool
  960. _threadpoolStartupID = None
  961. # ID of the trigger stopping the threadpool
  962. threadpoolShutdownID = None
  963. def _initThreads(self) -> None:
  964. self.installNameResolver(
  965. _GAIResolver(cast(IReactorThreads, self), self.getThreadPool)
  966. )
  967. self.usingThreads = True
  968. # `IReactorFromThreads` defines the first named argument as
  969. # `callable: Callable[..., Any]` but this defines it as `f`
  970. # really both should be defined using py3.8 positional only
  971. def callFromThread( # type: ignore[override]
  972. self, f: Callable[..., Any], *args: object, **kwargs: object
  973. ) -> None:
  974. """
  975. See
  976. L{twisted.internet.interfaces.IReactorFromThreads.callFromThread}.
  977. """
  978. assert callable(f), f"{f} is not callable"
  979. # lists are thread-safe in CPython, but not in Jython
  980. # this is probably a bug in Jython, but until fixed this code
  981. # won't work in Jython.
  982. self.threadCallQueue.append((f, args, kwargs))
  983. self.wakeUp()
  984. def _initThreadPool(self) -> None:
  985. """
  986. Create the threadpool accessible with callFromThread.
  987. """
  988. self.threadpool = ThreadPool(0, 10, "twisted.internet.reactor")
  989. self._threadpoolStartupID = self.callWhenRunning(self.threadpool.start)
  990. self.threadpoolShutdownID = self.addSystemEventTrigger(
  991. "during", "shutdown", self._stopThreadPool
  992. )
  993. def _uninstallHandler(self) -> None:
  994. self._signals.uninstall()
  995. def _stopThreadPool(self) -> None:
  996. """
  997. Stop the reactor threadpool. This method is only valid if there
  998. is currently a threadpool (created by L{_initThreadPool}). It
  999. is not intended to be called directly; instead, it will be
  1000. called by a shutdown trigger created in L{_initThreadPool}.
  1001. """
  1002. triggers = [self._threadpoolStartupID, self.threadpoolShutdownID]
  1003. for trigger in filter(None, triggers):
  1004. try:
  1005. self.removeSystemEventTrigger(trigger)
  1006. except ValueError:
  1007. pass
  1008. self._threadpoolStartupID = None
  1009. self.threadpoolShutdownID = None
  1010. assert self.threadpool is not None
  1011. self.threadpool.stop()
  1012. self.threadpool = None
  1013. def getThreadPool(self) -> ThreadPool:
  1014. """
  1015. See L{twisted.internet.interfaces.IReactorThreads.getThreadPool}.
  1016. """
  1017. if self.threadpool is None:
  1018. self._initThreadPool()
  1019. assert self.threadpool is not None
  1020. return self.threadpool
  1021. # `IReactorInThreads` defines the first named argument as
  1022. # `callable: Callable[..., Any]` but this defines it as `_callable`
  1023. # really both should be defined using py3.8 positional only
  1024. def callInThread( # type: ignore[override]
  1025. self, _callable: Callable[..., Any], *args: object, **kwargs: object
  1026. ) -> None:
  1027. """
  1028. See L{twisted.internet.interfaces.IReactorInThreads.callInThread}.
  1029. """
  1030. self.getThreadPool().callInThread(_callable, *args, **kwargs)
  1031. def suggestThreadPoolSize(self, size: int) -> None:
  1032. """
  1033. See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}.
  1034. """
  1035. self.getThreadPool().adjustPoolsize(maxthreads=size)
  1036. else:
  1037. # This is for signal handlers.
  1038. def callFromThread(
  1039. self, f: Callable[..., Any], *args: object, **kwargs: object
  1040. ) -> None:
  1041. assert callable(f), f"{f} is not callable"
  1042. # See comment in the other callFromThread implementation.
  1043. self.threadCallQueue.append((f, args, kwargs))
  1044. if platform.supportsThreads():
  1045. classImplements(ReactorBase, IReactorThreads)
  1046. @implementer(IConnector)
  1047. class BaseConnector(ABC):
  1048. """
  1049. Basic implementation of L{IConnector}.
  1050. State can be: "connecting", "connected", "disconnected"
  1051. """
  1052. timeoutID = None
  1053. factoryStarted = 0
  1054. def __init__(
  1055. self, factory: ClientFactory, timeout: float, reactor: ReactorBase
  1056. ) -> None:
  1057. self.state = "disconnected"
  1058. self.reactor = reactor
  1059. self.factory = factory
  1060. self.timeout = timeout
  1061. def disconnect(self) -> None:
  1062. """Disconnect whatever our state is."""
  1063. if self.state == "connecting":
  1064. self.stopConnecting()
  1065. elif self.state == "connected":
  1066. assert self.transport is not None
  1067. self.transport.loseConnection()
  1068. @abstractmethod
  1069. def _makeTransport(self) -> "Client":
  1070. pass
  1071. def connect(self) -> None:
  1072. """Start connection to remote server."""
  1073. if self.state != "disconnected":
  1074. raise RuntimeError("can't connect in this state")
  1075. self.state = "connecting"
  1076. if not self.factoryStarted:
  1077. self.factory.doStart()
  1078. self.factoryStarted = 1
  1079. self.transport: Optional[Client] = self._makeTransport()
  1080. if self.timeout is not None:
  1081. self.timeoutID = self.reactor.callLater(
  1082. self.timeout, self.transport.failIfNotConnected, error.TimeoutError()
  1083. )
  1084. self.factory.startedConnecting(self)
  1085. def stopConnecting(self) -> None:
  1086. """Stop attempting to connect."""
  1087. if self.state != "connecting":
  1088. raise error.NotConnectingError("we're not trying to connect")
  1089. assert self.transport is not None
  1090. self.state = "disconnected"
  1091. self.transport.failIfNotConnected(error.UserError())
  1092. del self.transport
  1093. def cancelTimeout(self) -> None:
  1094. if self.timeoutID is not None:
  1095. try:
  1096. self.timeoutID.cancel()
  1097. except ValueError:
  1098. pass
  1099. del self.timeoutID
  1100. def buildProtocol(self, addr: IAddress) -> Optional[IProtocol]:
  1101. self.state = "connected"
  1102. self.cancelTimeout()
  1103. return self.factory.buildProtocol(addr)
  1104. def connectionFailed(self, reason: Failure) -> None:
  1105. self.cancelTimeout()
  1106. self.transport = None
  1107. self.state = "disconnected"
  1108. self.factory.clientConnectionFailed(self, reason)
  1109. if self.state == "disconnected":
  1110. # factory hasn't called our connect() method
  1111. self.factory.doStop()
  1112. self.factoryStarted = 0
  1113. def connectionLost(self, reason: Failure) -> None:
  1114. self.state = "disconnected"
  1115. self.factory.clientConnectionLost(self, reason)
  1116. if self.state == "disconnected":
  1117. # factory hasn't called our connect() method
  1118. self.factory.doStop()
  1119. self.factoryStarted = 0
  1120. def getDestination(self) -> IAddress:
  1121. raise NotImplementedError(
  1122. reflect.qual(self.__class__) + " did not implement " "getDestination"
  1123. )
  1124. def __repr__(self) -> str:
  1125. return "<{} instance at 0x{:x} {} {}>".format(
  1126. reflect.qual(self.__class__),
  1127. id(self),
  1128. self.state,
  1129. self.getDestination(),
  1130. )
  1131. class BasePort(abstract.FileDescriptor):
  1132. """Basic implementation of a ListeningPort.
  1133. Note: This does not actually implement IListeningPort.
  1134. """
  1135. addressFamily: socket.AddressFamily = None # type: ignore[assignment]
  1136. socketType: socket.SocketKind = None # type: ignore[assignment]
  1137. def createInternetSocket(self) -> socket.socket:
  1138. s = socket.socket(self.addressFamily, self.socketType)
  1139. s.setblocking(False)
  1140. fdesc._setCloseOnExec(s.fileno())
  1141. return s
  1142. def doWrite(self) -> Optional[Failure]:
  1143. """Raises a RuntimeError"""
  1144. raise RuntimeError("doWrite called on a %s" % reflect.qual(self.__class__))
  1145. __all__: List[str] = []