base.py 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345
  1. # -*- test-case-name: twisted.test.test_internet,twisted.internet.test.test_core -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Very basic functionality for a Reactor implementation.
  6. """
  7. import builtins
  8. import socket # needed only for sync-dns
  9. import warnings
  10. from abc import ABC, abstractmethod
  11. from heapq import heapify, heappop, heappush
  12. from traceback import format_stack
  13. from types import FrameType
  14. from typing import (
  15. TYPE_CHECKING,
  16. Any,
  17. Callable,
  18. Dict,
  19. List,
  20. NewType,
  21. Optional,
  22. Sequence,
  23. Set,
  24. Tuple,
  25. Union,
  26. cast,
  27. )
  28. from zope.interface import classImplements, implementer
  29. from twisted.internet import abstract, defer, error, fdesc, main, threads
  30. from twisted.internet._resolver import (
  31. ComplexResolverSimplifier as _ComplexResolverSimplifier,
  32. GAIResolver as _GAIResolver,
  33. SimpleResolverComplexifier as _SimpleResolverComplexifier,
  34. )
  35. from twisted.internet.defer import Deferred, DeferredList
  36. from twisted.internet.interfaces import (
  37. IAddress,
  38. IConnector,
  39. IDelayedCall,
  40. IHostnameResolver,
  41. IProtocol,
  42. IReactorCore,
  43. IReactorFromThreads,
  44. IReactorPluggableNameResolver,
  45. IReactorPluggableResolver,
  46. IReactorThreads,
  47. IReactorTime,
  48. IReadDescriptor,
  49. IResolverSimple,
  50. IWriteDescriptor,
  51. _ISupportsExitSignalCapturing,
  52. )
  53. from twisted.internet.protocol import ClientFactory
  54. from twisted.python import log, reflect
  55. from twisted.python.failure import Failure
  56. from twisted.python.runtime import platform, seconds as runtimeSeconds
  57. from ._signals import SignalHandling, _WithoutSignalHandling, _WithSignalHandling
  58. if TYPE_CHECKING:
  59. from twisted.internet.tcp import Client
  60. # This import is for side-effects! Even if you don't see any code using it
  61. # in this module, don't delete it.
  62. from twisted.python import threadable
  63. if platform.supportsThreads():
  64. from twisted.python.threadpool import ThreadPool
  65. else:
  66. ThreadPool = None # type: ignore[misc, assignment]
  67. @implementer(IDelayedCall)
  68. class DelayedCall:
  69. # enable .debug to record creator call stack, and it will be logged if
  70. # an exception occurs while the function is being run
  71. debug = False
  72. _repr: Optional[str] = None
  73. # In debug mode, the call stack at the time of instantiation.
  74. creator: Optional[Sequence[str]] = None
  75. def __init__(
  76. self,
  77. time: float,
  78. func: Callable[..., Any],
  79. args: Sequence[object],
  80. kw: Dict[str, object],
  81. cancel: Callable[["DelayedCall"], None],
  82. reset: Callable[["DelayedCall"], None],
  83. seconds: Callable[[], float] = runtimeSeconds,
  84. ) -> None:
  85. """
  86. @param time: Seconds from the epoch at which to call C{func}.
  87. @param func: The callable to call.
  88. @param args: The positional arguments to pass to the callable.
  89. @param kw: The keyword arguments to pass to the callable.
  90. @param cancel: A callable which will be called with this
  91. DelayedCall before cancellation.
  92. @param reset: A callable which will be called with this
  93. DelayedCall after changing this DelayedCall's scheduled
  94. execution time. The callable should adjust any necessary
  95. scheduling details to ensure this DelayedCall is invoked
  96. at the new appropriate time.
  97. @param seconds: If provided, a no-argument callable which will be
  98. used to determine the current time any time that information is
  99. needed.
  100. """
  101. self.time, self.func, self.args, self.kw = time, func, args, kw
  102. self.resetter = reset
  103. self.canceller = cancel
  104. self.seconds = seconds
  105. self.cancelled = self.called = 0
  106. self.delayed_time = 0.0
  107. if self.debug:
  108. self.creator = format_stack()[:-2]
  109. def getTime(self) -> float:
  110. """
  111. Return the time at which this call will fire
  112. @return: The number of seconds after the epoch at which this call is
  113. scheduled to be made.
  114. """
  115. return self.time + self.delayed_time
  116. def cancel(self) -> None:
  117. """
  118. Unschedule this call
  119. @raise AlreadyCancelled: Raised if this call has already been
  120. unscheduled.
  121. @raise AlreadyCalled: Raised if this call has already been made.
  122. """
  123. if self.cancelled:
  124. raise error.AlreadyCancelled
  125. elif self.called:
  126. raise error.AlreadyCalled
  127. else:
  128. self.canceller(self)
  129. self.cancelled = 1
  130. if self.debug:
  131. self._repr = repr(self)
  132. del self.func, self.args, self.kw
  133. def reset(self, secondsFromNow: float) -> None:
  134. """
  135. Reschedule this call for a different time
  136. @param secondsFromNow: The number of seconds from the time of the
  137. C{reset} call at which this call will be scheduled.
  138. @raise AlreadyCancelled: Raised if this call has been cancelled.
  139. @raise AlreadyCalled: Raised if this call has already been made.
  140. """
  141. if self.cancelled:
  142. raise error.AlreadyCancelled
  143. elif self.called:
  144. raise error.AlreadyCalled
  145. else:
  146. newTime = self.seconds() + secondsFromNow
  147. if newTime < self.time:
  148. self.delayed_time = 0.0
  149. self.time = newTime
  150. self.resetter(self)
  151. else:
  152. self.delayed_time = newTime - self.time
  153. def delay(self, secondsLater: float) -> None:
  154. """
  155. Reschedule this call for a later time
  156. @param secondsLater: The number of seconds after the originally
  157. scheduled time for which to reschedule this call.
  158. @raise AlreadyCancelled: Raised if this call has been cancelled.
  159. @raise AlreadyCalled: Raised if this call has already been made.
  160. """
  161. if self.cancelled:
  162. raise error.AlreadyCancelled
  163. elif self.called:
  164. raise error.AlreadyCalled
  165. else:
  166. self.delayed_time += secondsLater
  167. if self.delayed_time < 0.0:
  168. self.activate_delay()
  169. self.resetter(self)
  170. def activate_delay(self) -> None:
  171. self.time += self.delayed_time
  172. self.delayed_time = 0.0
  173. def active(self) -> bool:
  174. """Determine whether this call is still pending
  175. @return: True if this call has not yet been made or cancelled,
  176. False otherwise.
  177. """
  178. return not (self.cancelled or self.called)
  179. def __le__(self, other: object) -> bool:
  180. """
  181. Implement C{<=} operator between two L{DelayedCall} instances.
  182. Comparison is based on the C{time} attribute (unadjusted by the
  183. delayed time).
  184. """
  185. if isinstance(other, DelayedCall):
  186. return self.time <= other.time
  187. else:
  188. return NotImplemented
  189. def __lt__(self, other: object) -> bool:
  190. """
  191. Implement C{<} operator between two L{DelayedCall} instances.
  192. Comparison is based on the C{time} attribute (unadjusted by the
  193. delayed time).
  194. """
  195. if isinstance(other, DelayedCall):
  196. return self.time < other.time
  197. else:
  198. return NotImplemented
  199. def __repr__(self) -> str:
  200. """
  201. Implement C{repr()} for L{DelayedCall} instances.
  202. @returns: String containing details of the L{DelayedCall}.
  203. """
  204. if self._repr is not None:
  205. return self._repr
  206. if hasattr(self, "func"):
  207. # This code should be replaced by a utility function in reflect;
  208. # see ticket #6066:
  209. func = getattr(self.func, "__qualname__", None)
  210. if func is None:
  211. func = getattr(self.func, "__name__", None)
  212. if func is not None:
  213. imClass = getattr(self.func, "im_class", None)
  214. if imClass is not None:
  215. func = f"{imClass}.{func}"
  216. if func is None:
  217. func = reflect.safe_repr(self.func)
  218. else:
  219. func = None
  220. now = self.seconds()
  221. L = [
  222. "<DelayedCall 0x%x [%ss] called=%s cancelled=%s"
  223. % (id(self), self.time - now, self.called, self.cancelled)
  224. ]
  225. if func is not None:
  226. L.extend((" ", func, "("))
  227. if self.args:
  228. L.append(", ".join([reflect.safe_repr(e) for e in self.args]))
  229. if self.kw:
  230. L.append(", ")
  231. if self.kw:
  232. L.append(
  233. ", ".join(
  234. [f"{k}={reflect.safe_repr(v)}" for (k, v) in self.kw.items()]
  235. )
  236. )
  237. L.append(")")
  238. if self.creator is not None:
  239. L.append("\n\ntraceback at creation: \n\n%s" % (" ".join(self.creator)))
  240. L.append(">")
  241. return "".join(L)
  242. @implementer(IResolverSimple)
  243. class ThreadedResolver:
  244. """
  245. L{ThreadedResolver} uses a reactor, a threadpool, and
  246. L{socket.gethostbyname} to perform name lookups without blocking the
  247. reactor thread. It also supports timeouts indepedently from whatever
  248. timeout logic L{socket.gethostbyname} might have.
  249. @ivar reactor: The reactor the threadpool of which will be used to call
  250. L{socket.gethostbyname} and the I/O thread of which the result will be
  251. delivered.
  252. """
  253. def __init__(self, reactor: "ReactorBase") -> None:
  254. self.reactor = reactor
  255. self._runningQueries: Dict[
  256. Deferred[str], Tuple[Deferred[str], IDelayedCall]
  257. ] = {}
  258. def _fail(self, name: str, err: str) -> Failure:
  259. lookupError = error.DNSLookupError(f"address {name!r} not found: {err}")
  260. return Failure(lookupError)
  261. def _cleanup(self, name: str, lookupDeferred: Deferred[str]) -> None:
  262. userDeferred, cancelCall = self._runningQueries[lookupDeferred]
  263. del self._runningQueries[lookupDeferred]
  264. userDeferred.errback(self._fail(name, "timeout error"))
  265. def _checkTimeout(
  266. self, result: Union[str, Failure], name: str, lookupDeferred: Deferred[str]
  267. ) -> None:
  268. try:
  269. userDeferred, cancelCall = self._runningQueries[lookupDeferred]
  270. except KeyError:
  271. pass
  272. else:
  273. del self._runningQueries[lookupDeferred]
  274. cancelCall.cancel()
  275. if isinstance(result, Failure):
  276. userDeferred.errback(self._fail(name, result.getErrorMessage()))
  277. else:
  278. userDeferred.callback(result)
  279. def getHostByName(
  280. self, name: str, timeout: Sequence[int] = (1, 3, 11, 45)
  281. ) -> Deferred[str]:
  282. """
  283. See L{twisted.internet.interfaces.IResolverSimple.getHostByName}.
  284. Note that the elements of C{timeout} are summed and the result is used
  285. as a timeout for the lookup. Any intermediate timeout or retry logic
  286. is left up to the platform via L{socket.gethostbyname}.
  287. """
  288. if timeout:
  289. timeoutDelay = sum(timeout)
  290. else:
  291. timeoutDelay = 60
  292. userDeferred: Deferred[str] = Deferred()
  293. lookupDeferred = threads.deferToThreadPool(
  294. cast(IReactorFromThreads, self.reactor),
  295. cast(IReactorThreads, self.reactor).getThreadPool(),
  296. socket.gethostbyname,
  297. name,
  298. )
  299. cancelCall = cast(IReactorTime, self.reactor).callLater(
  300. timeoutDelay, self._cleanup, name, lookupDeferred
  301. )
  302. self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
  303. _: Deferred[None] = lookupDeferred.addBoth(
  304. self._checkTimeout, name, lookupDeferred
  305. )
  306. return userDeferred
  307. @implementer(IResolverSimple)
  308. class BlockingResolver:
  309. def getHostByName(
  310. self, name: str, timeout: Sequence[int] = (1, 3, 11, 45)
  311. ) -> Deferred[str]:
  312. try:
  313. address = socket.gethostbyname(name)
  314. except OSError:
  315. msg = f"address {name!r} not found"
  316. err = error.DNSLookupError(msg)
  317. return defer.fail(err)
  318. else:
  319. return defer.succeed(address)
  320. _ThreePhaseEventTriggerCallable = Callable[..., Any]
  321. _ThreePhaseEventTrigger = Tuple[
  322. _ThreePhaseEventTriggerCallable, Tuple[object, ...], Dict[str, object]
  323. ]
  324. _ThreePhaseEventTriggerHandle = NewType(
  325. "_ThreePhaseEventTriggerHandle",
  326. Tuple[str, _ThreePhaseEventTriggerCallable, Tuple[object, ...], Dict[str, object]],
  327. )
  328. class _ThreePhaseEvent:
  329. """
  330. Collection of callables (with arguments) which can be invoked as a group in
  331. a particular order.
  332. This provides the underlying implementation for the reactor's system event
  333. triggers. An instance of this class tracks triggers for all phases of a
  334. single type of event.
  335. @ivar before: A list of the before-phase triggers containing three-tuples
  336. of a callable, a tuple of positional arguments, and a dict of keyword
  337. arguments
  338. @ivar finishedBefore: A list of the before-phase triggers which have
  339. already been executed. This is only populated in the C{'BEFORE'} state.
  340. @ivar during: A list of the during-phase triggers containing three-tuples
  341. of a callable, a tuple of positional arguments, and a dict of keyword
  342. arguments
  343. @ivar after: A list of the after-phase triggers containing three-tuples
  344. of a callable, a tuple of positional arguments, and a dict of keyword
  345. arguments
  346. @ivar state: A string indicating what is currently going on with this
  347. object. One of C{'BASE'} (for when nothing in particular is happening;
  348. this is the initial value), C{'BEFORE'} (when the before-phase triggers
  349. are in the process of being executed).
  350. """
  351. def __init__(self) -> None:
  352. self.before: List[_ThreePhaseEventTrigger] = []
  353. self.during: List[_ThreePhaseEventTrigger] = []
  354. self.after: List[_ThreePhaseEventTrigger] = []
  355. self.state = "BASE"
  356. def addTrigger(
  357. self,
  358. phase: str,
  359. callable: _ThreePhaseEventTriggerCallable,
  360. *args: object,
  361. **kwargs: object,
  362. ) -> _ThreePhaseEventTriggerHandle:
  363. """
  364. Add a trigger to the indicate phase.
  365. @param phase: One of C{'before'}, C{'during'}, or C{'after'}.
  366. @param callable: An object to be called when this event is triggered.
  367. @param args: Positional arguments to pass to C{callable}.
  368. @param kwargs: Keyword arguments to pass to C{callable}.
  369. @return: An opaque handle which may be passed to L{removeTrigger} to
  370. reverse the effects of calling this method.
  371. """
  372. if phase not in ("before", "during", "after"):
  373. raise KeyError("invalid phase")
  374. getattr(self, phase).append((callable, args, kwargs))
  375. return _ThreePhaseEventTriggerHandle((phase, callable, args, kwargs))
  376. def removeTrigger(self, handle: _ThreePhaseEventTriggerHandle) -> None:
  377. """
  378. Remove a previously added trigger callable.
  379. @param handle: An object previously returned by L{addTrigger}. The
  380. trigger added by that call will be removed.
  381. @raise ValueError: If the trigger associated with C{handle} has already
  382. been removed or if C{handle} is not a valid handle.
  383. """
  384. getattr(self, "removeTrigger_" + self.state)(handle)
  385. def removeTrigger_BASE(self, handle: _ThreePhaseEventTriggerHandle) -> None:
  386. """
  387. Just try to remove the trigger.
  388. @see: removeTrigger
  389. """
  390. try:
  391. phase, callable, args, kwargs = handle
  392. except (TypeError, ValueError):
  393. raise ValueError("invalid trigger handle")
  394. else:
  395. if phase not in ("before", "during", "after"):
  396. raise KeyError("invalid phase")
  397. getattr(self, phase).remove((callable, args, kwargs))
  398. def removeTrigger_BEFORE(self, handle: _ThreePhaseEventTriggerHandle) -> None:
  399. """
  400. Remove the trigger if it has yet to be executed, otherwise emit a
  401. warning that in the future an exception will be raised when removing an
  402. already-executed trigger.
  403. @see: removeTrigger
  404. """
  405. phase, callable, args, kwargs = handle
  406. if phase != "before":
  407. return self.removeTrigger_BASE(handle)
  408. if (callable, args, kwargs) in self.finishedBefore:
  409. warnings.warn(
  410. "Removing already-fired system event triggers will raise an "
  411. "exception in a future version of Twisted.",
  412. category=DeprecationWarning,
  413. stacklevel=3,
  414. )
  415. else:
  416. self.removeTrigger_BASE(handle)
  417. def fireEvent(self) -> None:
  418. """
  419. Call the triggers added to this event.
  420. """
  421. self.state = "BEFORE"
  422. self.finishedBefore = []
  423. beforeResults: List[Deferred[object]] = []
  424. while self.before:
  425. callable, args, kwargs = self.before.pop(0)
  426. self.finishedBefore.append((callable, args, kwargs))
  427. try:
  428. result = callable(*args, **kwargs)
  429. except BaseException:
  430. log.err()
  431. else:
  432. if isinstance(result, Deferred):
  433. beforeResults.append(result)
  434. DeferredList(beforeResults).addCallback(self._continueFiring)
  435. def _continueFiring(self, ignored: object) -> None:
  436. """
  437. Call the during and after phase triggers for this event.
  438. """
  439. self.state = "BASE"
  440. self.finishedBefore = []
  441. for phase in self.during, self.after:
  442. while phase:
  443. callable, args, kwargs = phase.pop(0)
  444. try:
  445. callable(*args, **kwargs)
  446. except BaseException:
  447. log.err()
  448. @implementer(IReactorPluggableNameResolver, IReactorPluggableResolver)
  449. class PluggableResolverMixin:
  450. """
  451. A mixin which implements the pluggable resolver reactor interfaces.
  452. @ivar resolver: The installed L{IResolverSimple}.
  453. @ivar _nameResolver: The installed L{IHostnameResolver}.
  454. """
  455. resolver: IResolverSimple = BlockingResolver()
  456. _nameResolver: IHostnameResolver = _SimpleResolverComplexifier(resolver)
  457. # IReactorPluggableResolver
  458. def installResolver(self, resolver: IResolverSimple) -> IResolverSimple:
  459. """
  460. See L{IReactorPluggableResolver}.
  461. @param resolver: see L{IReactorPluggableResolver}.
  462. @return: see L{IReactorPluggableResolver}.
  463. """
  464. assert IResolverSimple.providedBy(resolver)
  465. oldResolver = self.resolver
  466. self.resolver = resolver
  467. self._nameResolver = _SimpleResolverComplexifier(resolver)
  468. return oldResolver
  469. # IReactorPluggableNameResolver
  470. def installNameResolver(self, resolver: IHostnameResolver) -> IHostnameResolver:
  471. """
  472. See L{IReactorPluggableNameResolver}.
  473. @param resolver: See L{IReactorPluggableNameResolver}.
  474. @return: see L{IReactorPluggableNameResolver}.
  475. """
  476. previousNameResolver = self._nameResolver
  477. self._nameResolver = resolver
  478. self.resolver = _ComplexResolverSimplifier(resolver)
  479. return previousNameResolver
  480. @property
  481. def nameResolver(self) -> IHostnameResolver:
  482. """
  483. Implementation of read-only
  484. L{IReactorPluggableNameResolver.nameResolver}.
  485. """
  486. return self._nameResolver
  487. _SystemEventID = NewType("_SystemEventID", Tuple[str, _ThreePhaseEventTriggerHandle])
  488. _ThreadCall = Tuple[Callable[..., Any], Tuple[object, ...], Dict[str, object]]
  489. @implementer(IReactorCore, IReactorTime, _ISupportsExitSignalCapturing)
  490. class ReactorBase(PluggableResolverMixin):
  491. """
  492. Default base class for Reactors.
  493. @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
  494. and C{reactor.stop}. This should be replaced with an explicit state
  495. machine.
  496. @ivar _justStopped: A flag which is true between the time C{reactor.stop}
  497. is called and the time the shutdown system event is fired. This is
  498. used to determine whether that event should be fired after each
  499. iteration through the mainloop. This should be replaced with an
  500. explicit state machine.
  501. @ivar _started: A flag which is true from the time C{reactor.run} is called
  502. until the time C{reactor.run} returns. This is used to prevent calls
  503. to C{reactor.run} on a running reactor. This should be replaced with
  504. an explicit state machine.
  505. @ivar running: See L{IReactorCore.running}
  506. @ivar _registerAsIOThread: A flag controlling whether the reactor will
  507. register the thread it is running in as the I/O thread when it starts.
  508. If C{True}, registration will be done, otherwise it will not be.
  509. @ivar _exitSignal: See L{_ISupportsExitSignalCapturing._exitSignal}
  510. @ivar _installSignalHandlers: A flag which indicates whether any signal
  511. handlers will be installed during startup. This includes handlers for
  512. SIGCHLD to monitor child processes, and SIGINT, SIGTERM, and SIGBREAK
  513. @ivar _signals: An object which knows how to install and uninstall the
  514. reactor's signal-handling behavior.
  515. """
  516. _registerAsIOThread = True
  517. _stopped = True
  518. installed = False
  519. usingThreads = False
  520. _exitSignal = None
  521. # Set to something meaningful between startRunning and shortly before run
  522. # returns. We don't know the value to be used by `run` until that method
  523. # itself is called and we learn the value of installSignalHandlers.
  524. # However, we can use a no-op implementation until then.
  525. _signals: SignalHandling = _WithoutSignalHandling()
  526. __name__ = "twisted.internet.reactor"
  527. def __init__(self) -> None:
  528. super().__init__()
  529. self.threadCallQueue: List[_ThreadCall] = []
  530. self._eventTriggers: Dict[str, _ThreePhaseEvent] = {}
  531. self._pendingTimedCalls: List[DelayedCall] = []
  532. self._newTimedCalls: List[DelayedCall] = []
  533. self._cancellations = 0
  534. self.running = False
  535. self._started = False
  536. self._justStopped = False
  537. self._startedBefore = False
  538. # reactor internal readers, e.g. the waker.
  539. # Using Any as the type here… unable to find a suitable defined interface
  540. self._internalReaders: Set[Any] = set()
  541. self.waker: Any = None
  542. # Arrange for the running attribute to change to True at the right time
  543. # and let a subclass possibly do other things at that time (eg install
  544. # signal handlers).
  545. self.addSystemEventTrigger("during", "startup", self._reallyStartRunning)
  546. self.addSystemEventTrigger("during", "shutdown", self.crash)
  547. self.addSystemEventTrigger("during", "shutdown", self.disconnectAll)
  548. if platform.supportsThreads():
  549. self._initThreads()
  550. self.installWaker()
  551. # Signal handling pieces
  552. _installSignalHandlers: bool = False
  553. def _makeSignalHandling(self, installSignalHandlers: bool) -> SignalHandling:
  554. """
  555. Get an appropriate signal handling object.
  556. @param installSignalHandlers: Indicate whether to even try to do any
  557. signal handling. If C{False} then the result will be a no-op
  558. implementation.
  559. """
  560. if installSignalHandlers:
  561. return self._signalsFactory()
  562. return _WithoutSignalHandling()
  563. def _signalsFactory(self) -> SignalHandling:
  564. """
  565. Get a signal handling object that implements the basic behavior of
  566. stopping the reactor on SIGINT, SIGBREAK, and SIGTERM.
  567. """
  568. return _WithSignalHandling(
  569. self.sigInt,
  570. self.sigBreak,
  571. self.sigTerm,
  572. )
  573. def _addInternalReader(self, reader: IReadDescriptor) -> None:
  574. """
  575. Add a read descriptor which is part of the implementation of the
  576. reactor itself.
  577. The read descriptor will not be removed by L{IReactorFDSet.removeAll}.
  578. """
  579. self._internalReaders.add(reader)
  580. self.addReader(reader)
  581. def _removeInternalReader(self, reader: IReadDescriptor) -> None:
  582. """
  583. Remove a read descriptor which is part of the implementation of the
  584. reactor itself.
  585. """
  586. self._internalReaders.remove(reader)
  587. self.removeReader(reader)
  588. def run(self, installSignalHandlers: bool = True) -> None:
  589. self.startRunning(installSignalHandlers=installSignalHandlers)
  590. try:
  591. self.mainLoop()
  592. finally:
  593. self._signals.uninstall()
  594. def mainLoop(self) -> None:
  595. while self._started:
  596. try:
  597. while self._started:
  598. # Advance simulation time in delayed event
  599. # processors.
  600. self.runUntilCurrent()
  601. t2 = self.timeout()
  602. t = self.running and t2
  603. self.doIteration(t)
  604. except BaseException:
  605. log.msg("Unexpected error in main loop.")
  606. log.err()
  607. else:
  608. log.msg("Main loop terminated.") # type:ignore[unreachable]
  609. # override in subclasses
  610. _lock = None
  611. def installWaker(self) -> None:
  612. raise NotImplementedError(
  613. reflect.qual(self.__class__) + " did not implement installWaker"
  614. )
  615. def wakeUp(self) -> None:
  616. """
  617. Wake up the event loop.
  618. """
  619. if self.waker:
  620. self.waker.wakeUp()
  621. # if the waker isn't installed, the reactor isn't running, and
  622. # therefore doesn't need to be woken up
  623. def doIteration(self, delay: Optional[float]) -> None:
  624. """
  625. Do one iteration over the readers and writers which have been added.
  626. """
  627. raise NotImplementedError(
  628. reflect.qual(self.__class__) + " did not implement doIteration"
  629. )
  630. def addReader(self, reader: IReadDescriptor) -> None:
  631. raise NotImplementedError(
  632. reflect.qual(self.__class__) + " did not implement addReader"
  633. )
  634. def addWriter(self, writer: IWriteDescriptor) -> None:
  635. raise NotImplementedError(
  636. reflect.qual(self.__class__) + " did not implement addWriter"
  637. )
  638. def removeReader(self, reader: IReadDescriptor) -> None:
  639. raise NotImplementedError(
  640. reflect.qual(self.__class__) + " did not implement removeReader"
  641. )
  642. def removeWriter(self, writer: IWriteDescriptor) -> None:
  643. raise NotImplementedError(
  644. reflect.qual(self.__class__) + " did not implement removeWriter"
  645. )
  646. def removeAll(self) -> List[Union[IReadDescriptor, IWriteDescriptor]]:
  647. raise NotImplementedError(
  648. reflect.qual(self.__class__) + " did not implement removeAll"
  649. )
  650. def getReaders(self) -> List[IReadDescriptor]:
  651. raise NotImplementedError(
  652. reflect.qual(self.__class__) + " did not implement getReaders"
  653. )
  654. def getWriters(self) -> List[IWriteDescriptor]:
  655. raise NotImplementedError(
  656. reflect.qual(self.__class__) + " did not implement getWriters"
  657. )
  658. # IReactorCore
  659. def resolve(
  660. self, name: str, timeout: Sequence[int] = (1, 3, 11, 45)
  661. ) -> Deferred[str]:
  662. """
  663. Return a Deferred that will resolve a hostname."""
  664. if not name:
  665. # XXX - This is *less than* '::', and will screw up IPv6 servers
  666. return defer.succeed("0.0.0.0")
  667. if abstract.isIPAddress(name):
  668. return defer.succeed(name)
  669. return self.resolver.getHostByName(name, timeout)
  670. def stop(self) -> None:
  671. """
  672. See twisted.internet.interfaces.IReactorCore.stop.
  673. """
  674. if self._stopped:
  675. raise error.ReactorNotRunning("Can't stop reactor that isn't running.")
  676. self._stopped = True
  677. self._justStopped = True
  678. self._startedBefore = True
  679. def crash(self) -> None:
  680. """
  681. See twisted.internet.interfaces.IReactorCore.crash.
  682. Reset reactor state tracking attributes and re-initialize certain
  683. state-transition helpers which were set up in C{__init__} but later
  684. destroyed (through use).
  685. """
  686. self._started = False
  687. self.running = False
  688. self.addSystemEventTrigger("during", "startup", self._reallyStartRunning)
  689. def sigInt(self, number: int, frame: Optional[FrameType] = None) -> None:
  690. """
  691. Handle a SIGINT interrupt.
  692. @param number: See handler specification in L{signal.signal}
  693. @param frame: See handler specification in L{signal.signal}
  694. """
  695. log.msg("Received SIGINT, shutting down.")
  696. self.callFromThread(self.stop)
  697. self._exitSignal = number
  698. def sigBreak(self, number: int, frame: Optional[FrameType] = None) -> None:
  699. """
  700. Handle a SIGBREAK interrupt.
  701. @param number: See handler specification in L{signal.signal}
  702. @param frame: See handler specification in L{signal.signal}
  703. """
  704. log.msg("Received SIGBREAK, shutting down.")
  705. self.callFromThread(self.stop)
  706. self._exitSignal = number
  707. def sigTerm(self, number: int, frame: Optional[FrameType] = None) -> None:
  708. """
  709. Handle a SIGTERM interrupt.
  710. @param number: See handler specification in L{signal.signal}
  711. @param frame: See handler specification in L{signal.signal}
  712. """
  713. log.msg("Received SIGTERM, shutting down.")
  714. self.callFromThread(self.stop)
  715. self._exitSignal = number
  716. def disconnectAll(self) -> None:
  717. """Disconnect every reader, and writer in the system."""
  718. selectables = self.removeAll()
  719. for reader in selectables:
  720. log.callWithLogger(
  721. reader, reader.connectionLost, Failure(main.CONNECTION_LOST)
  722. )
  723. def iterate(self, delay: float = 0.0) -> None:
  724. """
  725. See twisted.internet.interfaces.IReactorCore.iterate.
  726. """
  727. self.runUntilCurrent()
  728. self.doIteration(delay)
  729. def fireSystemEvent(self, eventType: str) -> None:
  730. """
  731. See twisted.internet.interfaces.IReactorCore.fireSystemEvent.
  732. """
  733. event = self._eventTriggers.get(eventType)
  734. if event is not None:
  735. event.fireEvent()
  736. def addSystemEventTrigger(
  737. self,
  738. phase: str,
  739. eventType: str,
  740. callable: Callable[..., Any],
  741. *args: object,
  742. **kwargs: object,
  743. ) -> _SystemEventID:
  744. """
  745. See twisted.internet.interfaces.IReactorCore.addSystemEventTrigger.
  746. """
  747. assert builtins.callable(callable), f"{callable} is not callable"
  748. if eventType not in self._eventTriggers:
  749. self._eventTriggers[eventType] = _ThreePhaseEvent()
  750. return _SystemEventID(
  751. (
  752. eventType,
  753. self._eventTriggers[eventType].addTrigger(
  754. phase, callable, *args, **kwargs
  755. ),
  756. )
  757. )
  758. def removeSystemEventTrigger(self, triggerID: _SystemEventID) -> None:
  759. """
  760. See twisted.internet.interfaces.IReactorCore.removeSystemEventTrigger.
  761. """
  762. eventType, handle = triggerID
  763. self._eventTriggers[eventType].removeTrigger(handle)
  764. def callWhenRunning(
  765. self, callable: Callable[..., Any], *args: object, **kwargs: object
  766. ) -> Optional[_SystemEventID]:
  767. """
  768. See twisted.internet.interfaces.IReactorCore.callWhenRunning.
  769. """
  770. if self.running:
  771. callable(*args, **kwargs)
  772. return None
  773. else:
  774. return self.addSystemEventTrigger(
  775. "after", "startup", callable, *args, **kwargs
  776. )
  777. def startRunning(self, installSignalHandlers: bool = True) -> None:
  778. """
  779. Method called when reactor starts: do some initialization and fire
  780. startup events.
  781. Don't call this directly, call reactor.run() instead: it should take
  782. care of calling this.
  783. This method is somewhat misnamed. The reactor will not necessarily be
  784. in the running state by the time this method returns. The only
  785. guarantee is that it will be on its way to the running state.
  786. @param installSignalHandlers: A flag which, if set, indicates that
  787. handlers for a number of (implementation-defined) signals should be
  788. installed during startup.
  789. """
  790. if self._started:
  791. raise error.ReactorAlreadyRunning()
  792. if self._startedBefore:
  793. raise error.ReactorNotRestartable()
  794. self._signals.uninstall()
  795. self._installSignalHandlers = installSignalHandlers
  796. self._signals = self._makeSignalHandling(installSignalHandlers)
  797. self._started = True
  798. self._stopped = False
  799. if self._registerAsIOThread:
  800. threadable.registerAsIOThread()
  801. self.fireSystemEvent("startup")
  802. def _reallyStartRunning(self) -> None:
  803. """
  804. Method called to transition to the running state. This should happen
  805. in the I{during startup} event trigger phase.
  806. """
  807. self.running = True
  808. if self._installSignalHandlers:
  809. # Make sure this happens before after-startup events, since the
  810. # expectation of after-startup is that the reactor is fully
  811. # initialized. Don't do it right away for historical reasons
  812. # (perhaps some before-startup triggers don't want there to be a
  813. # custom SIGCHLD handler so that they can run child processes with
  814. # some blocking api).
  815. self._signals.install()
  816. # IReactorTime
  817. seconds = staticmethod(runtimeSeconds)
  818. def callLater(
  819. self, delay: float, callable: Callable[..., Any], *args: object, **kw: object
  820. ) -> DelayedCall:
  821. """
  822. See twisted.internet.interfaces.IReactorTime.callLater.
  823. """
  824. assert builtins.callable(callable), f"{callable} is not callable"
  825. assert delay >= 0, f"{delay} is not greater than or equal to 0 seconds"
  826. delayedCall = DelayedCall(
  827. self.seconds() + delay,
  828. callable,
  829. args,
  830. kw,
  831. self._cancelCallLater,
  832. self._moveCallLaterSooner,
  833. seconds=self.seconds,
  834. )
  835. self._newTimedCalls.append(delayedCall)
  836. return delayedCall
  837. def _moveCallLaterSooner(self, delayedCall: DelayedCall) -> None:
  838. # Linear time find: slow.
  839. heap = self._pendingTimedCalls
  840. try:
  841. pos = heap.index(delayedCall)
  842. # Move elt up the heap until it rests at the right place.
  843. elt = heap[pos]
  844. while pos != 0:
  845. parent = (pos - 1) // 2
  846. if heap[parent] <= elt:
  847. break
  848. # move parent down
  849. heap[pos] = heap[parent]
  850. pos = parent
  851. heap[pos] = elt
  852. except ValueError:
  853. # element was not found in heap - oh well...
  854. pass
  855. def _cancelCallLater(self, delayedCall: DelayedCall) -> None:
  856. self._cancellations += 1
  857. def getDelayedCalls(self) -> Sequence[IDelayedCall]:
  858. """
  859. See L{twisted.internet.interfaces.IReactorTime.getDelayedCalls}
  860. """
  861. return [
  862. x
  863. for x in (self._pendingTimedCalls + self._newTimedCalls)
  864. if not x.cancelled
  865. ]
  866. def _insertNewDelayedCalls(self) -> None:
  867. for call in self._newTimedCalls:
  868. if call.cancelled:
  869. self._cancellations -= 1
  870. else:
  871. call.activate_delay()
  872. heappush(self._pendingTimedCalls, call)
  873. self._newTimedCalls = []
  874. def timeout(self) -> Optional[float]:
  875. """
  876. Determine the longest time the reactor may sleep (waiting on I/O
  877. notification, perhaps) before it must wake up to service a time-related
  878. event.
  879. @return: The maximum number of seconds the reactor may sleep.
  880. """
  881. # insert new delayed calls to make sure to include them in timeout value
  882. self._insertNewDelayedCalls()
  883. if not self._pendingTimedCalls:
  884. return None
  885. delay = self._pendingTimedCalls[0].time - self.seconds()
  886. # Pick a somewhat arbitrary maximum possible value for the timeout.
  887. # This value is 2 ** 31 / 1000, which is the number of seconds which can
  888. # be represented as an integer number of milliseconds in a signed 32 bit
  889. # integer. This particular limit is imposed by the epoll_wait(3)
  890. # interface which accepts a timeout as a C "int" type and treats it as
  891. # representing a number of milliseconds.
  892. longest = 2147483
  893. # Don't let the delay be in the past (negative) or exceed a plausible
  894. # maximum (platform-imposed) interval.
  895. return max(0, min(longest, delay))
  896. def runUntilCurrent(self) -> None:
  897. """
  898. Run all pending timed calls.
  899. """
  900. if self.threadCallQueue:
  901. # Keep track of how many calls we actually make, as we're
  902. # making them, in case another call is added to the queue
  903. # while we're in this loop.
  904. count = 0
  905. total = len(self.threadCallQueue)
  906. for f, a, kw in self.threadCallQueue:
  907. try:
  908. f(*a, **kw)
  909. except BaseException:
  910. log.err()
  911. count += 1
  912. if count == total:
  913. break
  914. del self.threadCallQueue[:count]
  915. if self.threadCallQueue:
  916. self.wakeUp()
  917. # insert new delayed calls now
  918. self._insertNewDelayedCalls()
  919. now = self.seconds()
  920. while self._pendingTimedCalls and (self._pendingTimedCalls[0].time <= now):
  921. call = heappop(self._pendingTimedCalls)
  922. if call.cancelled:
  923. self._cancellations -= 1
  924. continue
  925. if call.delayed_time > 0.0:
  926. call.activate_delay()
  927. heappush(self._pendingTimedCalls, call)
  928. continue
  929. try:
  930. call.called = 1
  931. call.func(*call.args, **call.kw)
  932. except BaseException:
  933. log.err()
  934. if call.creator is not None:
  935. e = "\n"
  936. e += (
  937. " C: previous exception occurred in "
  938. + "a DelayedCall created here:\n"
  939. )
  940. e += " C:"
  941. e += "".join(call.creator).rstrip().replace("\n", "\n C:")
  942. e += "\n"
  943. log.msg(e)
  944. if (
  945. self._cancellations > 50
  946. and self._cancellations > len(self._pendingTimedCalls) >> 1
  947. ):
  948. self._cancellations = 0
  949. self._pendingTimedCalls = [
  950. x for x in self._pendingTimedCalls if not x.cancelled
  951. ]
  952. heapify(self._pendingTimedCalls)
  953. if self._justStopped:
  954. self._justStopped = False
  955. self.fireSystemEvent("shutdown")
  956. # IReactorThreads
  957. if platform.supportsThreads():
  958. assert ThreadPool is not None
  959. threadpool = None
  960. # ID of the trigger starting the threadpool
  961. _threadpoolStartupID = None
  962. # ID of the trigger stopping the threadpool
  963. threadpoolShutdownID = None
  964. def _initThreads(self) -> None:
  965. self.installNameResolver(
  966. _GAIResolver(cast(IReactorThreads, self), self.getThreadPool)
  967. )
  968. self.usingThreads = True
  969. # `IReactorFromThreads` defines the first named argument as
  970. # `callable: Callable[..., Any]` but this defines it as `f`
  971. # really both should be defined using py3.8 positional only
  972. def callFromThread( # type: ignore[override]
  973. self, f: Callable[..., Any], *args: object, **kwargs: object
  974. ) -> None:
  975. """
  976. See
  977. L{twisted.internet.interfaces.IReactorFromThreads.callFromThread}.
  978. """
  979. assert callable(f), f"{f} is not callable"
  980. # lists are thread-safe in CPython, but not in Jython
  981. # this is probably a bug in Jython, but until fixed this code
  982. # won't work in Jython.
  983. self.threadCallQueue.append((f, args, kwargs))
  984. self.wakeUp()
  985. def _initThreadPool(self) -> None:
  986. """
  987. Create the threadpool accessible with callFromThread.
  988. """
  989. self.threadpool = ThreadPool(0, 10, "twisted.internet.reactor")
  990. self._threadpoolStartupID = self.callWhenRunning(self.threadpool.start)
  991. self.threadpoolShutdownID = self.addSystemEventTrigger(
  992. "during", "shutdown", self._stopThreadPool
  993. )
  994. def _uninstallHandler(self) -> None:
  995. self._signals.uninstall()
  996. def _stopThreadPool(self) -> None:
  997. """
  998. Stop the reactor threadpool. This method is only valid if there
  999. is currently a threadpool (created by L{_initThreadPool}). It
  1000. is not intended to be called directly; instead, it will be
  1001. called by a shutdown trigger created in L{_initThreadPool}.
  1002. """
  1003. triggers = [self._threadpoolStartupID, self.threadpoolShutdownID]
  1004. for trigger in filter(None, triggers):
  1005. try:
  1006. self.removeSystemEventTrigger(trigger)
  1007. except ValueError:
  1008. pass
  1009. self._threadpoolStartupID = None
  1010. self.threadpoolShutdownID = None
  1011. assert self.threadpool is not None
  1012. self.threadpool.stop()
  1013. self.threadpool = None
  1014. def getThreadPool(self) -> ThreadPool:
  1015. """
  1016. See L{twisted.internet.interfaces.IReactorThreads.getThreadPool}.
  1017. """
  1018. if self.threadpool is None:
  1019. self._initThreadPool()
  1020. assert self.threadpool is not None
  1021. return self.threadpool
  1022. # `IReactorInThreads` defines the first named argument as
  1023. # `callable: Callable[..., Any]` but this defines it as `_callable`
  1024. # really both should be defined using py3.8 positional only
  1025. def callInThread( # type: ignore[override]
  1026. self, _callable: Callable[..., Any], *args: object, **kwargs: object
  1027. ) -> None:
  1028. """
  1029. See L{twisted.internet.interfaces.IReactorInThreads.callInThread}.
  1030. """
  1031. self.getThreadPool().callInThread(_callable, *args, **kwargs)
  1032. def suggestThreadPoolSize(self, size: int) -> None:
  1033. """
  1034. See L{twisted.internet.interfaces.IReactorThreads.suggestThreadPoolSize}.
  1035. """
  1036. self.getThreadPool().adjustPoolsize(maxthreads=size)
  1037. else:
  1038. # This is for signal handlers.
  1039. def callFromThread(
  1040. self, f: Callable[..., Any], *args: object, **kwargs: object
  1041. ) -> None:
  1042. assert callable(f), f"{f} is not callable"
  1043. # See comment in the other callFromThread implementation.
  1044. self.threadCallQueue.append((f, args, kwargs))
  1045. if platform.supportsThreads():
  1046. classImplements(ReactorBase, IReactorThreads)
  1047. @implementer(IConnector)
  1048. class BaseConnector(ABC):
  1049. """
  1050. Basic implementation of L{IConnector}.
  1051. State can be: "connecting", "connected", "disconnected"
  1052. """
  1053. timeoutID = None
  1054. factoryStarted = 0
  1055. def __init__(
  1056. self, factory: ClientFactory, timeout: float, reactor: ReactorBase
  1057. ) -> None:
  1058. self.state = "disconnected"
  1059. self.reactor = reactor
  1060. self.factory = factory
  1061. self.timeout = timeout
  1062. def disconnect(self) -> None:
  1063. """Disconnect whatever our state is."""
  1064. if self.state == "connecting":
  1065. self.stopConnecting()
  1066. elif self.state == "connected":
  1067. assert self.transport is not None
  1068. self.transport.loseConnection()
  1069. @abstractmethod
  1070. def _makeTransport(self) -> "Client":
  1071. pass
  1072. def connect(self) -> None:
  1073. """Start connection to remote server."""
  1074. if self.state != "disconnected":
  1075. raise RuntimeError("can't connect in this state")
  1076. self.state = "connecting"
  1077. if not self.factoryStarted:
  1078. self.factory.doStart()
  1079. self.factoryStarted = 1
  1080. self.transport: Optional[Client] = self._makeTransport()
  1081. if self.timeout is not None:
  1082. self.timeoutID = self.reactor.callLater(
  1083. self.timeout, self.transport.failIfNotConnected, error.TimeoutError()
  1084. )
  1085. self.factory.startedConnecting(self)
  1086. def stopConnecting(self) -> None:
  1087. """Stop attempting to connect."""
  1088. if self.state != "connecting":
  1089. raise error.NotConnectingError("we're not trying to connect")
  1090. assert self.transport is not None
  1091. self.state = "disconnected"
  1092. self.transport.failIfNotConnected(error.UserError())
  1093. del self.transport
  1094. def cancelTimeout(self) -> None:
  1095. if self.timeoutID is not None:
  1096. try:
  1097. self.timeoutID.cancel()
  1098. except ValueError:
  1099. pass
  1100. del self.timeoutID
  1101. def buildProtocol(self, addr: IAddress) -> Optional[IProtocol]:
  1102. self.state = "connected"
  1103. self.cancelTimeout()
  1104. return self.factory.buildProtocol(addr)
  1105. def connectionFailed(self, reason: Failure) -> None:
  1106. self.cancelTimeout()
  1107. self.transport = None
  1108. self.state = "disconnected"
  1109. self.factory.clientConnectionFailed(self, reason)
  1110. if self.state == "disconnected":
  1111. # factory hasn't called our connect() method
  1112. self.factory.doStop()
  1113. self.factoryStarted = 0
  1114. def connectionLost(self, reason: Failure) -> None:
  1115. self.state = "disconnected"
  1116. self.factory.clientConnectionLost(self, reason)
  1117. if self.state == "disconnected":
  1118. # factory hasn't called our connect() method
  1119. self.factory.doStop()
  1120. self.factoryStarted = 0
  1121. def getDestination(self) -> IAddress:
  1122. raise NotImplementedError(
  1123. reflect.qual(self.__class__) + " did not implement " "getDestination"
  1124. )
  1125. def __repr__(self) -> str:
  1126. return "<{} instance at 0x{:x} {} {}>".format(
  1127. reflect.qual(self.__class__),
  1128. id(self),
  1129. self.state,
  1130. self.getDestination(),
  1131. )
  1132. class BasePort(abstract.FileDescriptor):
  1133. """Basic implementation of a ListeningPort.
  1134. Note: This does not actually implement IListeningPort.
  1135. """
  1136. addressFamily: socket.AddressFamily = None # type: ignore[assignment]
  1137. socketType: socket.SocketKind = None # type: ignore[assignment]
  1138. def createInternetSocket(self) -> socket.socket:
  1139. s = socket.socket(self.addressFamily, self.socketType)
  1140. s.setblocking(False)
  1141. fdesc._setCloseOnExec(s.fileno())
  1142. return s
  1143. def doWrite(self) -> Optional[Failure]:
  1144. """Raises a RuntimeError"""
  1145. raise RuntimeError("doWrite called on a %s" % reflect.qual(self.__class__))
  1146. __all__: List[str] = []