testing.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969
  1. # -*- test-case-name: twisted.internet.test.test_testing -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Assorted functionality which is commonly useful when writing unit tests.
  6. """
  7. from __future__ import annotations
  8. from io import BytesIO
  9. from socket import AF_INET, AF_INET6
  10. from typing import Callable, Iterator, Sequence, overload
  11. from zope.interface import implementedBy, implementer
  12. from zope.interface.verify import verifyClass
  13. from typing_extensions import ParamSpec, Self
  14. from twisted.internet import address, error, protocol, task
  15. from twisted.internet.abstract import _dataMustBeBytes, isIPv6Address
  16. from twisted.internet.address import IPv4Address, IPv6Address, UNIXAddress
  17. from twisted.internet.defer import Deferred
  18. from twisted.internet.error import UnsupportedAddressFamily
  19. from twisted.internet.interfaces import (
  20. IConnector,
  21. IConsumer,
  22. IListeningPort,
  23. IProtocol,
  24. IPushProducer,
  25. IReactorCore,
  26. IReactorFDSet,
  27. IReactorSocket,
  28. IReactorSSL,
  29. IReactorTCP,
  30. IReactorUNIX,
  31. ITransport,
  32. )
  33. from twisted.internet.task import Clock
  34. from twisted.logger import ILogObserver, LogEvent, LogPublisher
  35. from twisted.protocols import basic
  36. from twisted.python import failure
  37. from twisted.trial.unittest import TestCase
  38. __all__ = [
  39. "AccumulatingProtocol",
  40. "LineSendingProtocol",
  41. "FakeDatagramTransport",
  42. "StringTransport",
  43. "StringTransportWithDisconnection",
  44. "StringIOWithoutClosing",
  45. "_FakeConnector",
  46. "_FakePort",
  47. "MemoryReactor",
  48. "MemoryReactorClock",
  49. "RaisingMemoryReactor",
  50. "NonStreamingProducer",
  51. "waitUntilAllDisconnected",
  52. "EventLoggingObserver",
  53. ]
  54. _P = ParamSpec("_P")
  55. class AccumulatingProtocol(protocol.Protocol):
  56. """
  57. L{AccumulatingProtocol} is an L{IProtocol} implementation which collects
  58. the data delivered to it and can fire a Deferred when it is connected or
  59. disconnected.
  60. @ivar made: A flag indicating whether C{connectionMade} has been called.
  61. @ivar data: Bytes giving all the data passed to C{dataReceived}.
  62. @ivar closed: A flag indicated whether C{connectionLost} has been called.
  63. @ivar closedReason: The value of the I{reason} parameter passed to
  64. C{connectionLost}.
  65. @ivar closedDeferred: If set to a L{Deferred}, this will be fired when
  66. C{connectionLost} is called.
  67. """
  68. made = closed = 0
  69. closedReason = None
  70. closedDeferred = None
  71. data = b""
  72. factory = None
  73. def connectionMade(self):
  74. self.made = 1
  75. if self.factory is not None and self.factory.protocolConnectionMade is not None:
  76. d = self.factory.protocolConnectionMade
  77. self.factory.protocolConnectionMade = None
  78. d.callback(self)
  79. def dataReceived(self, data):
  80. self.data += data
  81. def connectionLost(self, reason):
  82. self.closed = 1
  83. self.closedReason = reason
  84. if self.closedDeferred is not None:
  85. d, self.closedDeferred = self.closedDeferred, None
  86. d.callback(None)
  87. class LineSendingProtocol(basic.LineReceiver):
  88. lostConn = False
  89. def __init__(self, lines, start=True):
  90. self.lines = lines[:]
  91. self.response = []
  92. self.start = start
  93. def connectionMade(self):
  94. if self.start:
  95. for line in self.lines:
  96. self.sendLine(line)
  97. def lineReceived(self, line):
  98. if not self.start:
  99. for line in self.lines:
  100. self.sendLine(line)
  101. self.lines = []
  102. self.response.append(line)
  103. def connectionLost(self, reason):
  104. self.lostConn = True
  105. class FakeDatagramTransport:
  106. noAddr = object()
  107. def __init__(self):
  108. self.written = []
  109. def write(self, packet, addr=noAddr):
  110. self.written.append((packet, addr))
  111. @implementer(ITransport, IConsumer, IPushProducer)
  112. class StringTransport:
  113. """
  114. A transport implementation which buffers data in memory and keeps track of
  115. its other state without providing any behavior.
  116. L{StringTransport} has a number of attributes which are not part of any of
  117. the interfaces it claims to implement. These attributes are provided for
  118. testing purposes. Implementation code should not use any of these
  119. attributes; they are not provided by other transports.
  120. @ivar disconnecting: A C{bool} which is C{False} until L{loseConnection} is
  121. called, then C{True}.
  122. @ivar disconnected: A C{bool} which is C{False} until L{abortConnection} is
  123. called, then C{True}.
  124. @ivar producer: If a producer is currently registered, C{producer} is a
  125. reference to it. Otherwise, L{None}.
  126. @ivar streaming: If a producer is currently registered, C{streaming} refers
  127. to the value of the second parameter passed to C{registerProducer}.
  128. @ivar hostAddr: L{None} or an object which will be returned as the host
  129. address of this transport. If L{None}, a nasty tuple will be returned
  130. instead.
  131. @ivar peerAddr: L{None} or an object which will be returned as the peer
  132. address of this transport. If L{None}, a nasty tuple will be returned
  133. instead.
  134. @ivar producerState: The state of this L{StringTransport} in its capacity
  135. as an L{IPushProducer}. One of C{'producing'}, C{'paused'}, or
  136. C{'stopped'}.
  137. @ivar io: A L{io.BytesIO} which holds the data which has been written to
  138. this transport since the last call to L{clear}. Use L{value} instead
  139. of accessing this directly.
  140. @ivar _lenient: By default L{StringTransport} enforces that
  141. L{resumeProducing} is not called after the connection is lost. This is
  142. to ensure that any code that does call L{resumeProducing} after the
  143. connection is lost is not blindly expecting L{resumeProducing} to have
  144. any impact.
  145. However, if your test case is calling L{resumeProducing} after
  146. connection close on purpose, and you know it won't block expecting
  147. further data to show up, this flag may safely be set to L{True}.
  148. Defaults to L{False}.
  149. @type lenient: L{bool}
  150. """
  151. disconnecting = False
  152. disconnected = False
  153. producer = None
  154. streaming = None
  155. hostAddr = None
  156. peerAddr = None
  157. producerState = "producing"
  158. def __init__(self, hostAddress=None, peerAddress=None, lenient=False):
  159. self.clear()
  160. if hostAddress is not None:
  161. self.hostAddr = hostAddress
  162. if peerAddress is not None:
  163. self.peerAddr = peerAddress
  164. self.connected = True
  165. self._lenient = lenient
  166. def clear(self):
  167. """
  168. Discard all data written to this transport so far.
  169. This is not a transport method. It is intended for tests. Do not use
  170. it in implementation code.
  171. """
  172. self.io = BytesIO()
  173. def value(self):
  174. """
  175. Retrieve all data which has been buffered by this transport.
  176. This is not a transport method. It is intended for tests. Do not use
  177. it in implementation code.
  178. @return: A C{bytes} giving all data written to this transport since the
  179. last call to L{clear}.
  180. @rtype: C{bytes}
  181. """
  182. return self.io.getvalue()
  183. # ITransport
  184. def write(self, data):
  185. _dataMustBeBytes(data)
  186. self.io.write(data)
  187. def writeSequence(self, data):
  188. self.io.write(b"".join(data))
  189. def loseConnection(self):
  190. """
  191. Close the connection. Does nothing besides toggle the C{disconnecting}
  192. instance variable to C{True}.
  193. """
  194. self.disconnecting = True
  195. def abortConnection(self):
  196. """
  197. Abort the connection. Same as C{loseConnection}, but also toggles the
  198. C{aborted} instance variable to C{True}.
  199. """
  200. self.disconnected = True
  201. self.loseConnection()
  202. def getPeer(self):
  203. if self.peerAddr is None:
  204. return address.IPv4Address("TCP", "192.168.1.1", 54321)
  205. return self.peerAddr
  206. def getHost(self):
  207. if self.hostAddr is None:
  208. return address.IPv4Address("TCP", "10.0.0.1", 12345)
  209. return self.hostAddr
  210. # IConsumer
  211. def registerProducer(self, producer, streaming):
  212. if self.producer is not None:
  213. raise RuntimeError("Cannot register two producers")
  214. self.producer = producer
  215. self.streaming = streaming
  216. def unregisterProducer(self):
  217. if self.producer is None:
  218. raise RuntimeError("Cannot unregister a producer unless one is registered")
  219. self.producer = None
  220. self.streaming = None
  221. # IPushProducer
  222. def _checkState(self):
  223. if self.disconnecting and not self._lenient:
  224. raise RuntimeError("Cannot resume producing after loseConnection")
  225. if self.producerState == "stopped":
  226. raise RuntimeError("Cannot resume a stopped producer")
  227. def pauseProducing(self):
  228. self._checkState()
  229. self.producerState = "paused"
  230. def stopProducing(self):
  231. self.producerState = "stopped"
  232. def resumeProducing(self):
  233. self._checkState()
  234. self.producerState = "producing"
  235. class StringTransportWithDisconnection(StringTransport):
  236. """
  237. A L{StringTransport} which on disconnection will trigger the connection
  238. lost on the attached protocol.
  239. """
  240. protocol: IProtocol
  241. def loseConnection(self):
  242. if self.connected:
  243. self.connected = False
  244. self.protocol.connectionLost(failure.Failure(error.ConnectionDone("Bye.")))
  245. class StringIOWithoutClosing(BytesIO):
  246. """
  247. A BytesIO that can't be closed.
  248. """
  249. def close(self):
  250. """
  251. Do nothing.
  252. """
  253. @implementer(IListeningPort)
  254. class _FakePort:
  255. """
  256. A fake L{IListeningPort} to be used in tests.
  257. @ivar _hostAddress: The L{IAddress} this L{IListeningPort} is pretending
  258. to be listening on.
  259. """
  260. def __init__(self, hostAddress):
  261. """
  262. @param hostAddress: An L{IAddress} this L{IListeningPort} should
  263. pretend to be listening on.
  264. """
  265. self._hostAddress = hostAddress
  266. def startListening(self):
  267. """
  268. Fake L{IListeningPort.startListening} that doesn't do anything.
  269. """
  270. def stopListening(self):
  271. """
  272. Fake L{IListeningPort.stopListening} that doesn't do anything.
  273. """
  274. def getHost(self):
  275. """
  276. Fake L{IListeningPort.getHost} that returns our L{IAddress}.
  277. """
  278. return self._hostAddress
  279. @implementer(IConnector)
  280. class _FakeConnector:
  281. """
  282. A fake L{IConnector} that allows us to inspect if it has been told to stop
  283. connecting.
  284. @ivar stoppedConnecting: has this connector's
  285. L{_FakeConnector.stopConnecting} method been invoked yet?
  286. @ivar _address: An L{IAddress} provider that represents our destination.
  287. """
  288. _disconnected = False
  289. stoppedConnecting = False
  290. def __init__(self, address):
  291. """
  292. @param address: An L{IAddress} provider that represents this
  293. connector's destination.
  294. """
  295. self._address = address
  296. def stopConnecting(self):
  297. """
  298. Implement L{IConnector.stopConnecting} and set
  299. L{_FakeConnector.stoppedConnecting} to C{True}
  300. """
  301. self.stoppedConnecting = True
  302. def disconnect(self):
  303. """
  304. Implement L{IConnector.disconnect} as a no-op.
  305. """
  306. self._disconnected = True
  307. def connect(self):
  308. """
  309. Implement L{IConnector.connect} as a no-op.
  310. """
  311. def getDestination(self):
  312. """
  313. Implement L{IConnector.getDestination} to return the C{address} passed
  314. to C{__init__}.
  315. """
  316. return self._address
  317. @implementer(
  318. IReactorCore, IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket, IReactorFDSet
  319. )
  320. class MemoryReactor:
  321. """
  322. A fake reactor to be used in tests. This reactor doesn't actually do
  323. much that's useful yet. It accepts TCP connection setup attempts, but
  324. they will never succeed.
  325. @ivar hasInstalled: Keeps track of whether this reactor has been installed.
  326. @type hasInstalled: L{bool}
  327. @ivar running: Keeps track of whether this reactor is running.
  328. @type running: L{bool}
  329. @ivar hasStopped: Keeps track of whether this reactor has been stopped.
  330. @type hasStopped: L{bool}
  331. @ivar hasCrashed: Keeps track of whether this reactor has crashed.
  332. @type hasCrashed: L{bool}
  333. @ivar whenRunningHooks: Keeps track of hooks registered with
  334. C{callWhenRunning}.
  335. @type whenRunningHooks: L{list}
  336. @ivar triggers: Keeps track of hooks registered with
  337. C{addSystemEventTrigger}.
  338. @type triggers: L{dict}
  339. @ivar tcpClients: Keeps track of connection attempts (ie, calls to
  340. C{connectTCP}).
  341. @type tcpClients: L{list}
  342. @ivar tcpServers: Keeps track of server listen attempts (ie, calls to
  343. C{listenTCP}).
  344. @type tcpServers: L{list}
  345. @ivar sslClients: Keeps track of connection attempts (ie, calls to
  346. C{connectSSL}).
  347. @type sslClients: L{list}
  348. @ivar sslServers: Keeps track of server listen attempts (ie, calls to
  349. C{listenSSL}).
  350. @type sslServers: L{list}
  351. @ivar unixClients: Keeps track of connection attempts (ie, calls to
  352. C{connectUNIX}).
  353. @type unixClients: L{list}
  354. @ivar unixServers: Keeps track of server listen attempts (ie, calls to
  355. C{listenUNIX}).
  356. @type unixServers: L{list}
  357. @ivar adoptedPorts: Keeps track of server listen attempts (ie, calls to
  358. C{adoptStreamPort}).
  359. @ivar adoptedStreamConnections: Keeps track of stream-oriented
  360. connections added using C{adoptStreamConnection}.
  361. """
  362. def __init__(self):
  363. """
  364. Initialize the tracking lists.
  365. """
  366. self.hasInstalled = False
  367. self.running = False
  368. self.hasRun = True
  369. self.hasStopped = True
  370. self.hasCrashed = True
  371. self.whenRunningHooks = []
  372. self.triggers = {}
  373. self.tcpClients = []
  374. self.tcpServers = []
  375. self.sslClients = []
  376. self.sslServers = []
  377. self.unixClients = []
  378. self.unixServers = []
  379. self.adoptedPorts = []
  380. self.adoptedStreamConnections = []
  381. self.connectors = []
  382. self.readers = set()
  383. self.writers = set()
  384. def install(self):
  385. """
  386. Fake install callable to emulate reactor module installation.
  387. """
  388. self.hasInstalled = True
  389. def resolve(self, name, timeout=10):
  390. """
  391. Not implemented; raises L{NotImplementedError}.
  392. """
  393. raise NotImplementedError()
  394. def run(self):
  395. """
  396. Fake L{IReactorCore.run}.
  397. Sets C{self.running} to L{True}, runs all of the hooks passed to
  398. C{self.callWhenRunning}, then calls C{self.stop} to simulate a request
  399. to stop the reactor.
  400. Sets C{self.hasRun} to L{True}.
  401. """
  402. assert self.running is False
  403. self.running = True
  404. self.hasRun = True
  405. for f, args, kwargs in self.whenRunningHooks:
  406. f(*args, **kwargs)
  407. self.stop()
  408. # That we stopped means we can return, phew.
  409. def stop(self):
  410. """
  411. Fake L{IReactorCore.run}.
  412. Sets C{self.running} to L{False}.
  413. Sets C{self.hasStopped} to L{True}.
  414. """
  415. self.running = False
  416. self.hasStopped = True
  417. def crash(self):
  418. """
  419. Fake L{IReactorCore.crash}.
  420. Sets C{self.running} to L{None}, because that feels crashy.
  421. Sets C{self.hasCrashed} to L{True}.
  422. """
  423. self.running = None
  424. self.hasCrashed = True
  425. def iterate(self, delay=0):
  426. """
  427. Not implemented; raises L{NotImplementedError}.
  428. """
  429. raise NotImplementedError()
  430. def fireSystemEvent(self, eventType):
  431. """
  432. Not implemented; raises L{NotImplementedError}.
  433. """
  434. raise NotImplementedError()
  435. def addSystemEventTrigger(
  436. self,
  437. phase: str,
  438. eventType: str,
  439. callable: Callable[_P, object],
  440. *args: _P.args,
  441. **kw: _P.kwargs,
  442. ) -> None:
  443. """
  444. Fake L{IReactorCore.run}.
  445. Keep track of trigger by appending it to
  446. self.triggers[phase][eventType].
  447. """
  448. phaseTriggers = self.triggers.setdefault(phase, {})
  449. eventTypeTriggers = phaseTriggers.setdefault(eventType, [])
  450. eventTypeTriggers.append((callable, args, kw))
  451. def removeSystemEventTrigger(self, triggerID):
  452. """
  453. Not implemented; raises L{NotImplementedError}.
  454. """
  455. raise NotImplementedError()
  456. def callWhenRunning(
  457. self, callable: Callable[_P, object], *args: _P.args, **kw: _P.kwargs
  458. ) -> None:
  459. """
  460. Fake L{IReactorCore.callWhenRunning}.
  461. Keeps a list of invocations to make in C{self.whenRunningHooks}.
  462. """
  463. self.whenRunningHooks.append((callable, args, kw))
  464. def adoptStreamPort(self, fileno, addressFamily, factory):
  465. """
  466. Fake L{IReactorSocket.adoptStreamPort}, that logs the call and returns
  467. an L{IListeningPort}.
  468. """
  469. if addressFamily == AF_INET:
  470. addr = IPv4Address("TCP", "0.0.0.0", 1234)
  471. elif addressFamily == AF_INET6:
  472. addr = IPv6Address("TCP", "::", 1234)
  473. else:
  474. raise UnsupportedAddressFamily()
  475. self.adoptedPorts.append((fileno, addressFamily, factory))
  476. return _FakePort(addr)
  477. def adoptStreamConnection(self, fileDescriptor, addressFamily, factory):
  478. """
  479. Record the given stream connection in C{adoptedStreamConnections}.
  480. @see:
  481. L{twisted.internet.interfaces.IReactorSocket.adoptStreamConnection}
  482. """
  483. self.adoptedStreamConnections.append((fileDescriptor, addressFamily, factory))
  484. def adoptDatagramPort(self, fileno, addressFamily, protocol, maxPacketSize=8192):
  485. """
  486. Fake L{IReactorSocket.adoptDatagramPort}, that logs the call and
  487. returns a fake L{IListeningPort}.
  488. @see: L{twisted.internet.interfaces.IReactorSocket.adoptDatagramPort}
  489. """
  490. if addressFamily == AF_INET:
  491. addr = IPv4Address("UDP", "0.0.0.0", 1234)
  492. elif addressFamily == AF_INET6:
  493. addr = IPv6Address("UDP", "::", 1234)
  494. else:
  495. raise UnsupportedAddressFamily()
  496. self.adoptedPorts.append((fileno, addressFamily, protocol, maxPacketSize))
  497. return _FakePort(addr)
  498. def listenTCP(self, port, factory, backlog=50, interface=""):
  499. """
  500. Fake L{IReactorTCP.listenTCP}, that logs the call and
  501. returns an L{IListeningPort}.
  502. """
  503. self.tcpServers.append((port, factory, backlog, interface))
  504. if isIPv6Address(interface):
  505. address = IPv6Address("TCP", interface, port)
  506. else:
  507. address = IPv4Address("TCP", "0.0.0.0", port)
  508. return _FakePort(address)
  509. def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
  510. """
  511. Fake L{IReactorTCP.connectTCP}, that logs the call and
  512. returns an L{IConnector}.
  513. """
  514. self.tcpClients.append((host, port, factory, timeout, bindAddress))
  515. if isIPv6Address(host):
  516. conn = _FakeConnector(IPv6Address("TCP", host, port))
  517. else:
  518. conn = _FakeConnector(IPv4Address("TCP", host, port))
  519. factory.startedConnecting(conn)
  520. self.connectors.append(conn)
  521. return conn
  522. def listenSSL(self, port, factory, contextFactory, backlog=50, interface=""):
  523. """
  524. Fake L{IReactorSSL.listenSSL}, that logs the call and
  525. returns an L{IListeningPort}.
  526. """
  527. self.sslServers.append((port, factory, contextFactory, backlog, interface))
  528. return _FakePort(IPv4Address("TCP", "0.0.0.0", port))
  529. def connectSSL(
  530. self, host, port, factory, contextFactory, timeout=30, bindAddress=None
  531. ):
  532. """
  533. Fake L{IReactorSSL.connectSSL}, that logs the call and returns an
  534. L{IConnector}.
  535. """
  536. self.sslClients.append(
  537. (host, port, factory, contextFactory, timeout, bindAddress)
  538. )
  539. conn = _FakeConnector(IPv4Address("TCP", host, port))
  540. factory.startedConnecting(conn)
  541. self.connectors.append(conn)
  542. return conn
  543. def listenUNIX(self, address, factory, backlog=50, mode=0o666, wantPID=0):
  544. """
  545. Fake L{IReactorUNIX.listenUNIX}, that logs the call and returns an
  546. L{IListeningPort}.
  547. """
  548. self.unixServers.append((address, factory, backlog, mode, wantPID))
  549. return _FakePort(UNIXAddress(address))
  550. def connectUNIX(self, address, factory, timeout=30, checkPID=0):
  551. """
  552. Fake L{IReactorUNIX.connectUNIX}, that logs the call and returns an
  553. L{IConnector}.
  554. """
  555. self.unixClients.append((address, factory, timeout, checkPID))
  556. conn = _FakeConnector(UNIXAddress(address))
  557. factory.startedConnecting(conn)
  558. self.connectors.append(conn)
  559. return conn
  560. def addReader(self, reader):
  561. """
  562. Fake L{IReactorFDSet.addReader} which adds the reader to a local set.
  563. """
  564. self.readers.add(reader)
  565. def removeReader(self, reader):
  566. """
  567. Fake L{IReactorFDSet.removeReader} which removes the reader from a
  568. local set.
  569. """
  570. self.readers.discard(reader)
  571. def addWriter(self, writer):
  572. """
  573. Fake L{IReactorFDSet.addWriter} which adds the writer to a local set.
  574. """
  575. self.writers.add(writer)
  576. def removeWriter(self, writer):
  577. """
  578. Fake L{IReactorFDSet.removeWriter} which removes the writer from a
  579. local set.
  580. """
  581. self.writers.discard(writer)
  582. def getReaders(self):
  583. """
  584. Fake L{IReactorFDSet.getReaders} which returns a list of readers from
  585. the local set.
  586. """
  587. return list(self.readers)
  588. def getWriters(self):
  589. """
  590. Fake L{IReactorFDSet.getWriters} which returns a list of writers from
  591. the local set.
  592. """
  593. return list(self.writers)
  594. def removeAll(self):
  595. """
  596. Fake L{IReactorFDSet.removeAll} which removed all readers and writers
  597. from the local sets.
  598. """
  599. self.readers.clear()
  600. self.writers.clear()
  601. for iface in implementedBy(MemoryReactor):
  602. verifyClass(iface, MemoryReactor)
  603. class MemoryReactorClock(MemoryReactor, Clock):
  604. def __init__(self):
  605. MemoryReactor.__init__(self)
  606. Clock.__init__(self)
  607. @implementer(IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket)
  608. class RaisingMemoryReactor:
  609. """
  610. A fake reactor to be used in tests. It accepts TCP connection setup
  611. attempts, but they will fail.
  612. @ivar _listenException: An instance of an L{Exception}
  613. @ivar _connectException: An instance of an L{Exception}
  614. """
  615. def __init__(self, listenException=None, connectException=None):
  616. """
  617. @param listenException: An instance of an L{Exception} to raise
  618. when any C{listen} method is called.
  619. @param connectException: An instance of an L{Exception} to raise
  620. when any C{connect} method is called.
  621. """
  622. self._listenException = listenException
  623. self._connectException = connectException
  624. def adoptStreamPort(self, fileno, addressFamily, factory):
  625. """
  626. Fake L{IReactorSocket.adoptStreamPort}, that raises
  627. L{_listenException}.
  628. """
  629. raise self._listenException
  630. def listenTCP(self, port, factory, backlog=50, interface=""):
  631. """
  632. Fake L{IReactorTCP.listenTCP}, that raises L{_listenException}.
  633. """
  634. raise self._listenException
  635. def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
  636. """
  637. Fake L{IReactorTCP.connectTCP}, that raises L{_connectException}.
  638. """
  639. raise self._connectException
  640. def listenSSL(self, port, factory, contextFactory, backlog=50, interface=""):
  641. """
  642. Fake L{IReactorSSL.listenSSL}, that raises L{_listenException}.
  643. """
  644. raise self._listenException
  645. def connectSSL(
  646. self, host, port, factory, contextFactory, timeout=30, bindAddress=None
  647. ):
  648. """
  649. Fake L{IReactorSSL.connectSSL}, that raises L{_connectException}.
  650. """
  651. raise self._connectException
  652. def listenUNIX(self, address, factory, backlog=50, mode=0o666, wantPID=0):
  653. """
  654. Fake L{IReactorUNIX.listenUNIX}, that raises L{_listenException}.
  655. """
  656. raise self._listenException
  657. def connectUNIX(self, address, factory, timeout=30, checkPID=0):
  658. """
  659. Fake L{IReactorUNIX.connectUNIX}, that raises L{_connectException}.
  660. """
  661. raise self._connectException
  662. def adoptDatagramPort(self, fileDescriptor, addressFamily, protocol, maxPacketSize):
  663. """
  664. Fake L{IReactorSocket.adoptDatagramPort}, that raises
  665. L{_connectException}.
  666. """
  667. raise self._connectException
  668. def adoptStreamConnection(self, fileDescriptor, addressFamily, factory):
  669. """
  670. Fake L{IReactorSocket.adoptStreamConnection}, that raises
  671. L{_connectException}.
  672. """
  673. raise self._connectException
  674. class NonStreamingProducer:
  675. """
  676. A pull producer which writes 10 times only.
  677. """
  678. counter = 0
  679. stopped = False
  680. def __init__(self, consumer):
  681. self.consumer = consumer
  682. self.result = Deferred()
  683. def resumeProducing(self):
  684. """
  685. Write the counter value once.
  686. """
  687. if self.consumer is None or self.counter >= 10:
  688. raise RuntimeError("BUG: resume after unregister/stop.")
  689. else:
  690. self.consumer.write(b"%d" % (self.counter,))
  691. self.counter += 1
  692. if self.counter == 10:
  693. self.consumer.unregisterProducer()
  694. self._done()
  695. def pauseProducing(self):
  696. """
  697. An implementation of C{IPushProducer.pauseProducing}. This should never
  698. be called on a pull producer, so this just raises an error.
  699. """
  700. raise RuntimeError("BUG: pause should never be called.")
  701. def _done(self):
  702. """
  703. Fire a L{Deferred} so that users can wait for this to complete.
  704. """
  705. self.consumer = None
  706. d = self.result
  707. del self.result
  708. d.callback(None)
  709. def stopProducing(self):
  710. """
  711. Stop all production.
  712. """
  713. self.stopped = True
  714. self._done()
  715. def waitUntilAllDisconnected(reactor, protocols):
  716. """
  717. Take a list of disconnecting protocols, callback a L{Deferred} when they're
  718. all done.
  719. This is a hack to make some older tests less flaky, as
  720. L{ITransport.loseConnection} is not atomic on all reactors (for example,
  721. the CoreFoundation, which sometimes takes a reactor turn for CFSocket to
  722. realise). New tests should either not use real sockets in testing, or take
  723. the advice in
  724. I{https://jml.io/pages/how-to-disconnect-in-twisted-really.html} to heart.
  725. @param reactor: The reactor to schedule the checks on.
  726. @type reactor: L{IReactorTime}
  727. @param protocols: The protocols to wait for disconnecting.
  728. @type protocols: A L{list} of L{IProtocol}s.
  729. """
  730. lc = None
  731. def _check():
  732. if True not in [x.transport.connected for x in protocols]:
  733. lc.stop()
  734. lc = task.LoopingCall(_check)
  735. lc.clock = reactor
  736. return lc.start(0.01, now=True)
  737. @implementer(ILogObserver)
  738. class EventLoggingObserver(Sequence[LogEvent]):
  739. """
  740. L{ILogObserver} That stores its events in a list for later inspection.
  741. This class is similar to L{LimitedHistoryLogObserver} save that the
  742. internal buffer is public and intended for external inspection. The
  743. observer implements the sequence protocol to ease iteration of the events.
  744. @ivar _events: The events captured by this observer
  745. @type _events: L{list}
  746. """
  747. def __init__(self) -> None:
  748. self._events: list[LogEvent] = []
  749. def __len__(self) -> int:
  750. return len(self._events)
  751. @overload
  752. def __getitem__(self, index: int) -> LogEvent:
  753. ...
  754. @overload
  755. def __getitem__(self, index: slice) -> Sequence[LogEvent]:
  756. ...
  757. def __getitem__(self, index: int | slice) -> LogEvent | Sequence[LogEvent]:
  758. return self._events[index]
  759. def __iter__(self) -> Iterator[LogEvent]:
  760. return iter(self._events)
  761. def __call__(self, event: LogEvent) -> None:
  762. """
  763. @see: L{ILogObserver}
  764. """
  765. self._events.append(event)
  766. @classmethod
  767. def createWithCleanup(cls, testInstance: TestCase, publisher: LogPublisher) -> Self:
  768. """
  769. Create an L{EventLoggingObserver} instance that observes the provided
  770. publisher and will be cleaned up with addCleanup().
  771. @param testInstance: Test instance in which this logger is used.
  772. @type testInstance: L{twisted.trial.unittest.TestCase}
  773. @param publisher: Log publisher to observe.
  774. @type publisher: twisted.logger.LogPublisher
  775. @return: An EventLoggingObserver configured to observe the provided
  776. publisher.
  777. @rtype: L{twisted.test.proto_helpers.EventLoggingObserver}
  778. """
  779. obs = cls()
  780. publisher.addObserver(obs)
  781. testInstance.addCleanup(lambda: publisher.removeObserver(obs))
  782. return obs