_client_service.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. # -*- test-case-name: twisted.application.test.test_internet,twisted.test.test_application,twisted.test.test_cooperator -*-
  2. """
  3. Implementation of L{twisted.application.internet.ClientService}, particularly
  4. its U{automat <https://automat.readthedocs.org/>} state machine.
  5. """
  6. from __future__ import annotations
  7. from dataclasses import dataclass, field
  8. from random import random as _goodEnoughRandom
  9. from typing import Callable, Optional, Protocol as TypingProtocol, TypeVar, Union
  10. from zope.interface import implementer
  11. from automat import TypeMachineBuilder, pep614
  12. from twisted.application.service import Service
  13. from twisted.internet.defer import (
  14. CancelledError,
  15. Deferred,
  16. fail,
  17. maybeDeferred,
  18. succeed,
  19. )
  20. from twisted.internet.interfaces import (
  21. IAddress,
  22. IDelayedCall,
  23. IProtocol,
  24. IProtocolFactory,
  25. IReactorTime,
  26. IStreamClientEndpoint,
  27. ITransport,
  28. )
  29. from twisted.logger import Logger
  30. from twisted.python.failure import Failure
  31. T = TypeVar("T")
  32. def _maybeGlobalReactor(maybeReactor: Optional[T]) -> T:
  33. """
  34. @return: the argument, or the global reactor if the argument is L{None}.
  35. """
  36. if maybeReactor is None:
  37. from twisted.internet import reactor
  38. return reactor # type:ignore[return-value]
  39. else:
  40. return maybeReactor
  41. class _Client(TypingProtocol):
  42. def start(self) -> None:
  43. """
  44. Start this L{ClientService}, initiating the connection retry loop.
  45. """
  46. def stop(self) -> Deferred[None]:
  47. """
  48. Stop trying to connect and disconnect any current connection.
  49. @return: a L{Deferred} that fires when all outstanding connections are
  50. closed and all in-progress connection attempts halted.
  51. """
  52. def _connectionMade(self, protocol: _ReconnectingProtocolProxy) -> None:
  53. """
  54. A connection has been made.
  55. @param protocol: The protocol of the connection.
  56. """
  57. def _connectionFailed(self, failure: Failure) -> None:
  58. """
  59. Deliver connection failures to any L{ClientService.whenConnected}
  60. L{Deferred}s that have met their failAfterFailures threshold.
  61. @param failure: the Failure to fire the L{Deferred}s with.
  62. """
  63. def _reconnect(self, failure: Optional[Failure] = None) -> None:
  64. """
  65. The wait between connection attempts is done.
  66. """
  67. def _clientDisconnected(self, failure: Optional[Failure] = None) -> None:
  68. """
  69. The current connection has been disconnected.
  70. """
  71. def whenConnected(
  72. self, /, failAfterFailures: Optional[int] = None
  73. ) -> Deferred[IProtocol]:
  74. """
  75. Retrieve the currently-connected L{Protocol}, or the next one to
  76. connect.
  77. @param failAfterFailures: number of connection failures after which the
  78. Deferred will deliver a Failure (None means the Deferred will only
  79. fail if/when the service is stopped). Set this to 1 to make the
  80. very first connection failure signal an error. Use 2 to allow one
  81. failure but signal an error if the subsequent retry then fails.
  82. @return: a Deferred that fires with a protocol produced by the factory
  83. passed to C{__init__}. It may:
  84. - fire with L{IProtocol}
  85. - fail with L{CancelledError} when the service is stopped
  86. - fail with e.g.
  87. L{DNSLookupError<twisted.internet.error.DNSLookupError>} or
  88. L{ConnectionRefusedError<twisted.internet.error.ConnectionRefusedError>}
  89. when the number of consecutive failed connection attempts
  90. equals the value of "failAfterFailures"
  91. """
  92. @implementer(IProtocol)
  93. class _ReconnectingProtocolProxy:
  94. """
  95. A proxy for a Protocol to provide connectionLost notification to a client
  96. connection service, in support of reconnecting when connections are lost.
  97. """
  98. def __init__(
  99. self, protocol: IProtocol, lostNotification: Callable[[Failure], None]
  100. ) -> None:
  101. """
  102. Create a L{_ReconnectingProtocolProxy}.
  103. @param protocol: the application-provided L{interfaces.IProtocol}
  104. provider.
  105. @type protocol: provider of L{interfaces.IProtocol} which may
  106. additionally provide L{interfaces.IHalfCloseableProtocol} and
  107. L{interfaces.IFileDescriptorReceiver}.
  108. @param lostNotification: a 1-argument callable to invoke with the
  109. C{reason} when the connection is lost.
  110. """
  111. self._protocol = protocol
  112. self._lostNotification = lostNotification
  113. def makeConnection(self, transport: ITransport) -> None:
  114. self._transport = transport
  115. self._protocol.makeConnection(transport)
  116. def connectionLost(self, reason: Failure) -> None:
  117. """
  118. The connection was lost. Relay this information.
  119. @param reason: The reason the connection was lost.
  120. @return: the underlying protocol's result
  121. """
  122. try:
  123. return self._protocol.connectionLost(reason)
  124. finally:
  125. self._lostNotification(reason)
  126. def __getattr__(self, item: str) -> object:
  127. return getattr(self._protocol, item)
  128. def __repr__(self) -> str:
  129. return f"<{self.__class__.__name__} wrapping {self._protocol!r}>"
  130. @implementer(IProtocolFactory)
  131. class _DisconnectFactory:
  132. """
  133. A L{_DisconnectFactory} is a proxy for L{IProtocolFactory} that catches
  134. C{connectionLost} notifications and relays them.
  135. """
  136. def __init__(
  137. self,
  138. protocolFactory: IProtocolFactory,
  139. protocolDisconnected: Callable[[Failure], None],
  140. ) -> None:
  141. self._protocolFactory = protocolFactory
  142. self._protocolDisconnected = protocolDisconnected
  143. def buildProtocol(self, addr: IAddress) -> Optional[IProtocol]:
  144. """
  145. Create a L{_ReconnectingProtocolProxy} with the disconnect-notification
  146. callback we were called with.
  147. @param addr: The address the connection is coming from.
  148. @return: a L{_ReconnectingProtocolProxy} for a protocol produced by
  149. C{self._protocolFactory}
  150. """
  151. built = self._protocolFactory.buildProtocol(addr)
  152. if built is None:
  153. return None
  154. return _ReconnectingProtocolProxy(built, self._protocolDisconnected)
  155. def __getattr__(self, item: str) -> object:
  156. return getattr(self._protocolFactory, item)
  157. def __repr__(self) -> str:
  158. return "<{} wrapping {!r}>".format(
  159. self.__class__.__name__, self._protocolFactory
  160. )
  161. def _deinterface(o: object) -> None:
  162. """
  163. Remove the special runtime attributes set by L{implementer} so that a class
  164. can proxy through those attributes with C{__getattr__} and thereby forward
  165. optionally-provided interfaces by the delegated class.
  166. """
  167. for zopeSpecial in ["__providedBy__", "__provides__", "__implemented__"]:
  168. delattr(o, zopeSpecial)
  169. _deinterface(_DisconnectFactory)
  170. _deinterface(_ReconnectingProtocolProxy)
  171. @dataclass
  172. class _Core:
  173. """
  174. Shared core for ClientService state machine.
  175. """
  176. # required parameters
  177. endpoint: IStreamClientEndpoint
  178. factory: IProtocolFactory
  179. timeoutForAttempt: Callable[[int], float]
  180. clock: IReactorTime
  181. prepareConnection: Optional[Callable[[IProtocol], object]]
  182. # internal state
  183. stopWaiters: list[Deferred[None]] = field(default_factory=list)
  184. awaitingConnected: list[tuple[Deferred[IProtocol], Optional[int]]] = field(
  185. default_factory=list
  186. )
  187. failedAttempts: int = 0
  188. log: Logger = Logger()
  189. def waitForStop(self) -> Deferred[None]:
  190. self.stopWaiters.append(Deferred())
  191. return self.stopWaiters[-1]
  192. def unawait(self, value: Union[IProtocol, Failure]) -> None:
  193. self.awaitingConnected, waiting = [], self.awaitingConnected
  194. for w, remaining in waiting:
  195. w.callback(value)
  196. def cancelConnectWaiters(self) -> None:
  197. self.unawait(Failure(CancelledError()))
  198. def finishStopping(self) -> None:
  199. self.stopWaiters, waiting = [], self.stopWaiters
  200. for w in waiting:
  201. w.callback(None)
  202. def makeMachine() -> Callable[[_Core], _Client]:
  203. machine = TypeMachineBuilder(_Client, _Core)
  204. def waitForRetry(
  205. c: _Client, s: _Core, failure: Optional[Failure] = None
  206. ) -> IDelayedCall:
  207. s.failedAttempts += 1
  208. delay = s.timeoutForAttempt(s.failedAttempts)
  209. s.log.info(
  210. "Scheduling retry {attempt} to connect {endpoint} in {delay} seconds.",
  211. attempt=s.failedAttempts,
  212. endpoint=s.endpoint,
  213. delay=delay,
  214. )
  215. return s.clock.callLater(delay, c._reconnect)
  216. def rememberConnection(
  217. c: _Client, s: _Core, protocol: _ReconnectingProtocolProxy
  218. ) -> _ReconnectingProtocolProxy:
  219. s.failedAttempts = 0
  220. s.unawait(protocol._protocol)
  221. return protocol
  222. def attemptConnection(
  223. c: _Client, s: _Core, failure: Optional[Failure] = None
  224. ) -> Deferred[_ReconnectingProtocolProxy]:
  225. factoryProxy = _DisconnectFactory(s.factory, c._clientDisconnected)
  226. connecting: Deferred[IProtocol] = s.endpoint.connect(factoryProxy)
  227. def prepare(
  228. protocol: _ReconnectingProtocolProxy,
  229. ) -> Deferred[_ReconnectingProtocolProxy]:
  230. if s.prepareConnection is not None:
  231. return maybeDeferred(s.prepareConnection, protocol).addCallback(
  232. lambda _: protocol
  233. )
  234. return succeed(protocol)
  235. # endpoint.connect() is actually generic on the type of the protocol,
  236. # but this is not expressible via zope.interface, so we have to cast
  237. # https://github.com/Shoobx/mypy-zope/issues/95
  238. connectingProxy: Deferred[_ReconnectingProtocolProxy]
  239. connectingProxy = connecting # type:ignore[assignment]
  240. (
  241. connectingProxy.addCallback(prepare)
  242. .addCallback(c._connectionMade)
  243. .addErrback(c._connectionFailed)
  244. )
  245. return connectingProxy
  246. # States:
  247. Init = machine.state("Init")
  248. Connecting = machine.state("Connecting", attemptConnection)
  249. Stopped = machine.state("Stopped")
  250. Waiting = machine.state("Waiting", waitForRetry)
  251. Connected = machine.state("Connected", rememberConnection)
  252. Disconnecting = machine.state("Disconnecting")
  253. Restarting = machine.state("Restarting")
  254. Stopped = machine.state("Stopped")
  255. # Behavior-less state transitions:
  256. Init.upon(_Client.start).to(Connecting).returns(None)
  257. Connecting.upon(_Client.start).loop().returns(None)
  258. Connecting.upon(_Client._connectionMade).to(Connected).returns(None)
  259. Waiting.upon(_Client.start).loop().returns(None)
  260. Waiting.upon(_Client._reconnect).to(Connecting).returns(None)
  261. Connected.upon(_Client._connectionFailed).to(Waiting).returns(None)
  262. Connected.upon(_Client.start).loop().returns(None)
  263. Connected.upon(_Client._clientDisconnected).to(Waiting).returns(None)
  264. Disconnecting.upon(_Client.start).to(Restarting).returns(None)
  265. Restarting.upon(_Client.start).to(Restarting).returns(None)
  266. Stopped.upon(_Client.start).to(Connecting).returns(None)
  267. # Behavior-full state transitions:
  268. @pep614(Init.upon(_Client.stop).to(Stopped))
  269. @pep614(Stopped.upon(_Client.stop).to(Stopped))
  270. def immediateStop(c: _Client, s: _Core) -> Deferred[None]:
  271. return succeed(None)
  272. @pep614(Connecting.upon(_Client.stop).to(Disconnecting))
  273. def connectingStop(
  274. c: _Client, s: _Core, attempt: Deferred[_ReconnectingProtocolProxy]
  275. ) -> Deferred[None]:
  276. waited = s.waitForStop()
  277. attempt.cancel()
  278. return waited
  279. @pep614(Connecting.upon(_Client._connectionFailed, nodata=True).to(Waiting))
  280. def failedWhenConnecting(c: _Client, s: _Core, failure: Failure) -> None:
  281. ready = []
  282. notReady: list[tuple[Deferred[IProtocol], Optional[int]]] = []
  283. for w, remaining in s.awaitingConnected:
  284. if remaining is None:
  285. notReady.append((w, remaining))
  286. elif remaining <= 1:
  287. ready.append(w)
  288. else:
  289. notReady.append((w, remaining - 1))
  290. s.awaitingConnected = notReady
  291. for w in ready:
  292. w.callback(failure)
  293. @pep614(Waiting.upon(_Client.stop).to(Stopped))
  294. def stop(c: _Client, s: _Core, futureRetry: IDelayedCall) -> Deferred[None]:
  295. waited = s.waitForStop()
  296. s.cancelConnectWaiters()
  297. futureRetry.cancel()
  298. s.finishStopping()
  299. return waited
  300. @pep614(Connected.upon(_Client.stop).to(Disconnecting))
  301. def stopWhileConnected(
  302. c: _Client, s: _Core, protocol: _ReconnectingProtocolProxy
  303. ) -> Deferred[None]:
  304. waited = s.waitForStop()
  305. protocol._transport.loseConnection()
  306. return waited
  307. @pep614(Connected.upon(_Client.whenConnected).loop())
  308. def whenConnectedWhenConnected(
  309. c: _Client,
  310. s: _Core,
  311. protocol: _ReconnectingProtocolProxy,
  312. failAfterFailures: Optional[int] = None,
  313. ) -> Deferred[IProtocol]:
  314. return succeed(protocol._protocol)
  315. @pep614(Disconnecting.upon(_Client.stop).loop())
  316. @pep614(Restarting.upon(_Client.stop).to(Disconnecting))
  317. def discoStop(c: _Client, s: _Core) -> Deferred[None]:
  318. return s.waitForStop()
  319. @pep614(Disconnecting.upon(_Client._connectionFailed).to(Stopped))
  320. @pep614(Disconnecting.upon(_Client._clientDisconnected).to(Stopped))
  321. def disconnectingFinished(
  322. c: _Client, s: _Core, failure: Optional[Failure] = None
  323. ) -> None:
  324. s.cancelConnectWaiters()
  325. s.finishStopping()
  326. @pep614(Connecting.upon(_Client.whenConnected, nodata=True).loop())
  327. @pep614(Waiting.upon(_Client.whenConnected, nodata=True).loop())
  328. @pep614(Init.upon(_Client.whenConnected).to(Init))
  329. @pep614(Restarting.upon(_Client.whenConnected).to(Restarting))
  330. @pep614(Disconnecting.upon(_Client.whenConnected).to(Disconnecting))
  331. def awaitingConnection(
  332. c: _Client, s: _Core, failAfterFailures: Optional[int] = None
  333. ) -> Deferred[IProtocol]:
  334. result: Deferred[IProtocol] = Deferred()
  335. s.awaitingConnected.append((result, failAfterFailures))
  336. return result
  337. @pep614(Restarting.upon(_Client._clientDisconnected).to(Connecting))
  338. def restartDone(c: _Client, s: _Core, failure: Optional[Failure] = None) -> None:
  339. s.finishStopping()
  340. @pep614(Stopped.upon(_Client.whenConnected).to(Stopped))
  341. def notGoingToConnect(
  342. c: _Client, s: _Core, failAfterFailures: Optional[int] = None
  343. ) -> Deferred[IProtocol]:
  344. return fail(CancelledError())
  345. return machine.build()
  346. def backoffPolicy(
  347. initialDelay: float = 1.0,
  348. maxDelay: float = 60.0,
  349. factor: float = 1.5,
  350. jitter: Callable[[], float] = _goodEnoughRandom,
  351. ) -> Callable[[int], float]:
  352. """
  353. A timeout policy for L{ClientService} which computes an exponential backoff
  354. interval with configurable parameters.
  355. @since: 16.1.0
  356. @param initialDelay: Delay for the first reconnection attempt (default
  357. 1.0s).
  358. @type initialDelay: L{float}
  359. @param maxDelay: Maximum number of seconds between connection attempts
  360. (default 60 seconds, or one minute). Note that this value is before
  361. jitter is applied, so the actual maximum possible delay is this value
  362. plus the maximum possible result of C{jitter()}.
  363. @type maxDelay: L{float}
  364. @param factor: A multiplicative factor by which the delay grows on each
  365. failed reattempt. Default: 1.5.
  366. @type factor: L{float}
  367. @param jitter: A 0-argument callable that introduces noise into the delay.
  368. By default, C{random.random}, i.e. a pseudorandom floating-point value
  369. between zero and one.
  370. @type jitter: 0-argument callable returning L{float}
  371. @return: a 1-argument callable that, given an attempt count, returns a
  372. floating point number; the number of seconds to delay.
  373. @rtype: see L{ClientService.__init__}'s C{retryPolicy} argument.
  374. """
  375. def policy(attempt: int) -> float:
  376. try:
  377. delay = min(initialDelay * (factor ** min(100, attempt)), maxDelay)
  378. except OverflowError:
  379. delay = maxDelay
  380. return delay + jitter()
  381. return policy
  382. _defaultPolicy = backoffPolicy()
  383. ClientMachine = makeMachine()
  384. class ClientService(Service):
  385. """
  386. A L{ClientService} maintains a single outgoing connection to a client
  387. endpoint, reconnecting after a configurable timeout when a connection
  388. fails, either before or after connecting.
  389. @since: 16.1.0
  390. """
  391. _log = Logger()
  392. def __init__(
  393. self,
  394. endpoint: IStreamClientEndpoint,
  395. factory: IProtocolFactory,
  396. retryPolicy: Optional[Callable[[int], float]] = None,
  397. clock: Optional[IReactorTime] = None,
  398. prepareConnection: Optional[Callable[[IProtocol], object]] = None,
  399. ):
  400. """
  401. @param endpoint: A L{stream client endpoint
  402. <interfaces.IStreamClientEndpoint>} provider which will be used to
  403. connect when the service starts.
  404. @param factory: A L{protocol factory <interfaces.IProtocolFactory>}
  405. which will be used to create clients for the endpoint.
  406. @param retryPolicy: A policy configuring how long L{ClientService} will
  407. wait between attempts to connect to C{endpoint}; a callable taking
  408. (the number of failed connection attempts made in a row (L{int}))
  409. and returning the number of seconds to wait before making another
  410. attempt.
  411. @param clock: The clock used to schedule reconnection. It's mainly
  412. useful to be parametrized in tests. If the factory is serialized,
  413. this attribute will not be serialized, and the default value (the
  414. reactor) will be restored when deserialized.
  415. @param prepareConnection: A single argument L{callable} that may return
  416. a L{Deferred}. It will be called once with the L{protocol
  417. <interfaces.IProtocol>} each time a new connection is made. It may
  418. call methods on the protocol to prepare it for use (e.g.
  419. authenticate) or validate it (check its health).
  420. The C{prepareConnection} callable may raise an exception or return
  421. a L{Deferred} which fails to reject the connection. A rejected
  422. connection is not used to fire an L{Deferred} returned by
  423. L{whenConnected}. Instead, L{ClientService} handles the failure
  424. and continues as if the connection attempt were a failure
  425. (incrementing the counter passed to C{retryPolicy}).
  426. L{Deferred}s returned by L{whenConnected} will not fire until any
  427. L{Deferred} returned by the C{prepareConnection} callable fire.
  428. Otherwise its successful return value is consumed, but ignored.
  429. Present Since Twisted 18.7.0
  430. """
  431. clock = _maybeGlobalReactor(clock)
  432. retryPolicy = _defaultPolicy if retryPolicy is None else retryPolicy
  433. self._machine: _Client = ClientMachine(
  434. _Core(
  435. endpoint,
  436. factory,
  437. retryPolicy,
  438. clock,
  439. prepareConnection=prepareConnection,
  440. log=self._log,
  441. )
  442. )
  443. def whenConnected(
  444. self, failAfterFailures: Optional[int] = None
  445. ) -> Deferred[IProtocol]:
  446. """
  447. Retrieve the currently-connected L{Protocol}, or the next one to
  448. connect.
  449. @param failAfterFailures: number of connection failures after which
  450. the Deferred will deliver a Failure (None means the Deferred will
  451. only fail if/when the service is stopped). Set this to 1 to make
  452. the very first connection failure signal an error. Use 2 to
  453. allow one failure but signal an error if the subsequent retry
  454. then fails.
  455. @type failAfterFailures: L{int} or None
  456. @return: a Deferred that fires with a protocol produced by the
  457. factory passed to C{__init__}
  458. @rtype: L{Deferred} that may:
  459. - fire with L{IProtocol}
  460. - fail with L{CancelledError} when the service is stopped
  461. - fail with e.g.
  462. L{DNSLookupError<twisted.internet.error.DNSLookupError>} or
  463. L{ConnectionRefusedError<twisted.internet.error.ConnectionRefusedError>}
  464. when the number of consecutive failed connection attempts
  465. equals the value of "failAfterFailures"
  466. """
  467. return self._machine.whenConnected(failAfterFailures)
  468. def startService(self) -> None:
  469. """
  470. Start this L{ClientService}, initiating the connection retry loop.
  471. """
  472. if self.running:
  473. self._log.warn("Duplicate ClientService.startService {log_source}")
  474. return
  475. super().startService()
  476. self._machine.start()
  477. def stopService(self) -> Deferred[None]:
  478. """
  479. Stop attempting to reconnect and close any existing connections.
  480. @return: a L{Deferred} that fires when all outstanding connections are
  481. closed and all in-progress connection attempts halted.
  482. """
  483. super().stopService()
  484. return self._machine.stop()