testing.py 28 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010
  1. # -*- test-case-name: twisted.internet.test.test_testing -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Assorted functionality which is commonly useful when writing unit tests.
  6. """
  7. from __future__ import division, absolute_import
  8. from socket import AF_INET, AF_INET6
  9. from io import BytesIO
  10. from zope.interface import implementer, implementedBy
  11. from zope.interface.verify import verifyClass
  12. from twisted.python import failure
  13. from twisted.python.compat import unicode, intToBytes, Sequence
  14. from twisted.internet.defer import Deferred
  15. from twisted.internet.interfaces import (
  16. ITransport, IConsumer, IPushProducer, IConnector,
  17. IReactorCore, IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket,
  18. IListeningPort, IReactorFDSet,
  19. )
  20. from twisted.internet.abstract import isIPv6Address
  21. from twisted.internet.error import UnsupportedAddressFamily
  22. from twisted.protocols import basic
  23. from twisted.internet import protocol, error, address, task
  24. from twisted.internet.task import Clock
  25. from twisted.internet.address import IPv4Address, UNIXAddress, IPv6Address
  26. from twisted.logger import ILogObserver
  27. __all__ = [
  28. 'AccumulatingProtocol',
  29. 'LineSendingProtocol',
  30. 'FakeDatagramTransport',
  31. 'StringTransport',
  32. 'StringTransportWithDisconnection',
  33. 'StringIOWithoutClosing',
  34. '_FakeConnector',
  35. '_FakePort',
  36. 'MemoryReactor',
  37. 'MemoryReactorClock',
  38. 'RaisingMemoryReactor',
  39. 'NonStreamingProducer',
  40. 'waitUntilAllDisconnected',
  41. 'EventLoggingObserver'
  42. ]
  43. class AccumulatingProtocol(protocol.Protocol):
  44. """
  45. L{AccumulatingProtocol} is an L{IProtocol} implementation which collects
  46. the data delivered to it and can fire a Deferred when it is connected or
  47. disconnected.
  48. @ivar made: A flag indicating whether C{connectionMade} has been called.
  49. @ivar data: Bytes giving all the data passed to C{dataReceived}.
  50. @ivar closed: A flag indicated whether C{connectionLost} has been called.
  51. @ivar closedReason: The value of the I{reason} parameter passed to
  52. C{connectionLost}.
  53. @ivar closedDeferred: If set to a L{Deferred}, this will be fired when
  54. C{connectionLost} is called.
  55. """
  56. made = closed = 0
  57. closedReason = None
  58. closedDeferred = None
  59. data = b""
  60. factory = None
  61. def connectionMade(self):
  62. self.made = 1
  63. if (self.factory is not None and self.factory.protocolConnectionMade
  64. is not None):
  65. d = self.factory.protocolConnectionMade
  66. self.factory.protocolConnectionMade = None
  67. d.callback(self)
  68. def dataReceived(self, data):
  69. self.data += data
  70. def connectionLost(self, reason):
  71. self.closed = 1
  72. self.closedReason = reason
  73. if self.closedDeferred is not None:
  74. d, self.closedDeferred = self.closedDeferred, None
  75. d.callback(None)
  76. class LineSendingProtocol(basic.LineReceiver):
  77. lostConn = False
  78. def __init__(self, lines, start=True):
  79. self.lines = lines[:]
  80. self.response = []
  81. self.start = start
  82. def connectionMade(self):
  83. if self.start:
  84. for line in self.lines:
  85. self.sendLine(line)
  86. def lineReceived(self, line):
  87. if not self.start:
  88. for line in self.lines:
  89. self.sendLine(line)
  90. self.lines = []
  91. self.response.append(line)
  92. def connectionLost(self, reason):
  93. self.lostConn = True
  94. class FakeDatagramTransport:
  95. noAddr = object()
  96. def __init__(self):
  97. self.written = []
  98. def write(self, packet, addr=noAddr):
  99. self.written.append((packet, addr))
  100. @implementer(ITransport, IConsumer, IPushProducer)
  101. class StringTransport:
  102. """
  103. A transport implementation which buffers data in memory and keeps track of
  104. its other state without providing any behavior.
  105. L{StringTransport} has a number of attributes which are not part of any of
  106. the interfaces it claims to implement. These attributes are provided for
  107. testing purposes. Implementation code should not use any of these
  108. attributes; they are not provided by other transports.
  109. @ivar disconnecting: A C{bool} which is C{False} until L{loseConnection} is
  110. called, then C{True}.
  111. @ivar disconnected: A C{bool} which is C{False} until L{abortConnection} is
  112. called, then C{True}.
  113. @ivar producer: If a producer is currently registered, C{producer} is a
  114. reference to it. Otherwise, L{None}.
  115. @ivar streaming: If a producer is currently registered, C{streaming} refers
  116. to the value of the second parameter passed to C{registerProducer}.
  117. @ivar hostAddr: L{None} or an object which will be returned as the host
  118. address of this transport. If L{None}, a nasty tuple will be returned
  119. instead.
  120. @ivar peerAddr: L{None} or an object which will be returned as the peer
  121. address of this transport. If L{None}, a nasty tuple will be returned
  122. instead.
  123. @ivar producerState: The state of this L{StringTransport} in its capacity
  124. as an L{IPushProducer}. One of C{'producing'}, C{'paused'}, or
  125. C{'stopped'}.
  126. @ivar io: A L{io.BytesIO} which holds the data which has been written to
  127. this transport since the last call to L{clear}. Use L{value} instead
  128. of accessing this directly.
  129. @ivar _lenient: By default L{StringTransport} enforces that
  130. L{resumeProducing} is not called after the connection is lost. This is
  131. to ensure that any code that does call L{resumeProducing} after the
  132. connection is lost is not blindly expecting L{resumeProducing} to have
  133. any impact.
  134. However, if your test case is calling L{resumeProducing} after
  135. connection close on purpose, and you know it won't block expecting
  136. further data to show up, this flag may safely be set to L{True}.
  137. Defaults to L{False}.
  138. @type lenient: L{bool}
  139. """
  140. disconnecting = False
  141. disconnected = False
  142. producer = None
  143. streaming = None
  144. hostAddr = None
  145. peerAddr = None
  146. producerState = 'producing'
  147. def __init__(self, hostAddress=None, peerAddress=None, lenient=False):
  148. self.clear()
  149. if hostAddress is not None:
  150. self.hostAddr = hostAddress
  151. if peerAddress is not None:
  152. self.peerAddr = peerAddress
  153. self.connected = True
  154. self._lenient = lenient
  155. def clear(self):
  156. """
  157. Discard all data written to this transport so far.
  158. This is not a transport method. It is intended for tests. Do not use
  159. it in implementation code.
  160. """
  161. self.io = BytesIO()
  162. def value(self):
  163. """
  164. Retrieve all data which has been buffered by this transport.
  165. This is not a transport method. It is intended for tests. Do not use
  166. it in implementation code.
  167. @return: A C{bytes} giving all data written to this transport since the
  168. last call to L{clear}.
  169. @rtype: C{bytes}
  170. """
  171. return self.io.getvalue()
  172. # ITransport
  173. def write(self, data):
  174. if isinstance(data, unicode): # no, really, I mean it
  175. raise TypeError("Data must not be unicode")
  176. self.io.write(data)
  177. def writeSequence(self, data):
  178. self.io.write(b''.join(data))
  179. def loseConnection(self):
  180. """
  181. Close the connection. Does nothing besides toggle the C{disconnecting}
  182. instance variable to C{True}.
  183. """
  184. self.disconnecting = True
  185. def abortConnection(self):
  186. """
  187. Abort the connection. Same as C{loseConnection}, but also toggles the
  188. C{aborted} instance variable to C{True}.
  189. """
  190. self.disconnected = True
  191. self.loseConnection()
  192. def getPeer(self):
  193. if self.peerAddr is None:
  194. return address.IPv4Address('TCP', '192.168.1.1', 54321)
  195. return self.peerAddr
  196. def getHost(self):
  197. if self.hostAddr is None:
  198. return address.IPv4Address('TCP', '10.0.0.1', 12345)
  199. return self.hostAddr
  200. # IConsumer
  201. def registerProducer(self, producer, streaming):
  202. if self.producer is not None:
  203. raise RuntimeError("Cannot register two producers")
  204. self.producer = producer
  205. self.streaming = streaming
  206. def unregisterProducer(self):
  207. if self.producer is None:
  208. raise RuntimeError(
  209. "Cannot unregister a producer unless one is registered")
  210. self.producer = None
  211. self.streaming = None
  212. # IPushProducer
  213. def _checkState(self):
  214. if self.disconnecting and not self._lenient:
  215. raise RuntimeError(
  216. "Cannot resume producing after loseConnection")
  217. if self.producerState == 'stopped':
  218. raise RuntimeError("Cannot resume a stopped producer")
  219. def pauseProducing(self):
  220. self._checkState()
  221. self.producerState = 'paused'
  222. def stopProducing(self):
  223. self.producerState = 'stopped'
  224. def resumeProducing(self):
  225. self._checkState()
  226. self.producerState = 'producing'
  227. class StringTransportWithDisconnection(StringTransport):
  228. """
  229. A L{StringTransport} which on disconnection will trigger the connection
  230. lost on the attached protocol.
  231. """
  232. def loseConnection(self):
  233. if self.connected:
  234. self.connected = False
  235. self.protocol.connectionLost(
  236. failure.Failure(error.ConnectionDone("Bye.")))
  237. class StringIOWithoutClosing(BytesIO):
  238. """
  239. A BytesIO that can't be closed.
  240. """
  241. def close(self):
  242. """
  243. Do nothing.
  244. """
  245. @implementer(IListeningPort)
  246. class _FakePort(object):
  247. """
  248. A fake L{IListeningPort} to be used in tests.
  249. @ivar _hostAddress: The L{IAddress} this L{IListeningPort} is pretending
  250. to be listening on.
  251. """
  252. def __init__(self, hostAddress):
  253. """
  254. @param hostAddress: An L{IAddress} this L{IListeningPort} should
  255. pretend to be listening on.
  256. """
  257. self._hostAddress = hostAddress
  258. def startListening(self):
  259. """
  260. Fake L{IListeningPort.startListening} that doesn't do anything.
  261. """
  262. def stopListening(self):
  263. """
  264. Fake L{IListeningPort.stopListening} that doesn't do anything.
  265. """
  266. def getHost(self):
  267. """
  268. Fake L{IListeningPort.getHost} that returns our L{IAddress}.
  269. """
  270. return self._hostAddress
  271. @implementer(IConnector)
  272. class _FakeConnector(object):
  273. """
  274. A fake L{IConnector} that allows us to inspect if it has been told to stop
  275. connecting.
  276. @ivar stoppedConnecting: has this connector's
  277. L{_FakeConnector.stopConnecting} method been invoked yet?
  278. @ivar _address: An L{IAddress} provider that represents our destination.
  279. """
  280. _disconnected = False
  281. stoppedConnecting = False
  282. def __init__(self, address):
  283. """
  284. @param address: An L{IAddress} provider that represents this
  285. connector's destination.
  286. """
  287. self._address = address
  288. def stopConnecting(self):
  289. """
  290. Implement L{IConnector.stopConnecting} and set
  291. L{_FakeConnector.stoppedConnecting} to C{True}
  292. """
  293. self.stoppedConnecting = True
  294. def disconnect(self):
  295. """
  296. Implement L{IConnector.disconnect} as a no-op.
  297. """
  298. self._disconnected = True
  299. def connect(self):
  300. """
  301. Implement L{IConnector.connect} as a no-op.
  302. """
  303. def getDestination(self):
  304. """
  305. Implement L{IConnector.getDestination} to return the C{address} passed
  306. to C{__init__}.
  307. """
  308. return self._address
  309. @implementer(
  310. IReactorCore,
  311. IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket, IReactorFDSet
  312. )
  313. class MemoryReactor(object):
  314. """
  315. A fake reactor to be used in tests. This reactor doesn't actually do
  316. much that's useful yet. It accepts TCP connection setup attempts, but
  317. they will never succeed.
  318. @ivar hasInstalled: Keeps track of whether this reactor has been installed.
  319. @type hasInstalled: L{bool}
  320. @ivar running: Keeps track of whether this reactor is running.
  321. @type running: L{bool}
  322. @ivar hasStopped: Keeps track of whether this reactor has been stopped.
  323. @type hasStopped: L{bool}
  324. @ivar hasCrashed: Keeps track of whether this reactor has crashed.
  325. @type hasCrashed: L{bool}
  326. @ivar whenRunningHooks: Keeps track of hooks registered with
  327. C{callWhenRunning}.
  328. @type whenRunningHooks: L{list}
  329. @ivar triggers: Keeps track of hooks registered with
  330. C{addSystemEventTrigger}.
  331. @type triggers: L{dict}
  332. @ivar tcpClients: Keeps track of connection attempts (ie, calls to
  333. C{connectTCP}).
  334. @type tcpClients: L{list}
  335. @ivar tcpServers: Keeps track of server listen attempts (ie, calls to
  336. C{listenTCP}).
  337. @type tcpServers: L{list}
  338. @ivar sslClients: Keeps track of connection attempts (ie, calls to
  339. C{connectSSL}).
  340. @type sslClients: L{list}
  341. @ivar sslServers: Keeps track of server listen attempts (ie, calls to
  342. C{listenSSL}).
  343. @type sslServers: L{list}
  344. @ivar unixClients: Keeps track of connection attempts (ie, calls to
  345. C{connectUNIX}).
  346. @type unixClients: L{list}
  347. @ivar unixServers: Keeps track of server listen attempts (ie, calls to
  348. C{listenUNIX}).
  349. @type unixServers: L{list}
  350. @ivar adoptedPorts: Keeps track of server listen attempts (ie, calls to
  351. C{adoptStreamPort}).
  352. @ivar adoptedStreamConnections: Keeps track of stream-oriented
  353. connections added using C{adoptStreamConnection}.
  354. """
  355. def __init__(self):
  356. """
  357. Initialize the tracking lists.
  358. """
  359. self.hasInstalled = False
  360. self.running = False
  361. self.hasRun = True
  362. self.hasStopped = True
  363. self.hasCrashed = True
  364. self.whenRunningHooks = []
  365. self.triggers = {}
  366. self.tcpClients = []
  367. self.tcpServers = []
  368. self.sslClients = []
  369. self.sslServers = []
  370. self.unixClients = []
  371. self.unixServers = []
  372. self.adoptedPorts = []
  373. self.adoptedStreamConnections = []
  374. self.connectors = []
  375. self.readers = set()
  376. self.writers = set()
  377. def install(self):
  378. """
  379. Fake install callable to emulate reactor module installation.
  380. """
  381. self.hasInstalled = True
  382. def resolve(self, name, timeout=10):
  383. """
  384. Not implemented; raises L{NotImplementedError}.
  385. """
  386. raise NotImplementedError()
  387. def run(self):
  388. """
  389. Fake L{IReactorCore.run}.
  390. Sets C{self.running} to L{True}, runs all of the hooks passed to
  391. C{self.callWhenRunning}, then calls C{self.stop} to simulate a request
  392. to stop the reactor.
  393. Sets C{self.hasRun} to L{True}.
  394. """
  395. assert self.running is False
  396. self.running = True
  397. self.hasRun = True
  398. for f, args, kwargs in self.whenRunningHooks:
  399. f(*args, **kwargs)
  400. self.stop()
  401. # That we stopped means we can return, phew.
  402. def stop(self):
  403. """
  404. Fake L{IReactorCore.run}.
  405. Sets C{self.running} to L{False}.
  406. Sets C{self.hasStopped} to L{True}.
  407. """
  408. self.running = False
  409. self.hasStopped = True
  410. def crash(self):
  411. """
  412. Fake L{IReactorCore.crash}.
  413. Sets C{self.running} to L{None}, because that feels crashy.
  414. Sets C{self.hasCrashed} to L{True}.
  415. """
  416. self.running = None
  417. self.hasCrashed = True
  418. def iterate(self, delay=0):
  419. """
  420. Not implemented; raises L{NotImplementedError}.
  421. """
  422. raise NotImplementedError()
  423. def fireSystemEvent(self, eventType):
  424. """
  425. Not implemented; raises L{NotImplementedError}.
  426. """
  427. raise NotImplementedError()
  428. def addSystemEventTrigger(self, phase, eventType, callable, *args, **kw):
  429. """
  430. Fake L{IReactorCore.run}.
  431. Keep track of trigger by appending it to
  432. self.triggers[phase][eventType].
  433. """
  434. phaseTriggers = self.triggers.setdefault(phase, {})
  435. eventTypeTriggers = phaseTriggers.setdefault(eventType, [])
  436. eventTypeTriggers.append((callable, args, kw))
  437. def removeSystemEventTrigger(self, triggerID):
  438. """
  439. Not implemented; raises L{NotImplementedError}.
  440. """
  441. raise NotImplementedError()
  442. def callWhenRunning(self, callable, *args, **kw):
  443. """
  444. Fake L{IReactorCore.callWhenRunning}.
  445. Keeps a list of invocations to make in C{self.whenRunningHooks}.
  446. """
  447. self.whenRunningHooks.append((callable, args, kw))
  448. def adoptStreamPort(self, fileno, addressFamily, factory):
  449. """
  450. Fake L{IReactorSocket.adoptStreamPort}, that logs the call and returns
  451. an L{IListeningPort}.
  452. """
  453. if addressFamily == AF_INET:
  454. addr = IPv4Address('TCP', '0.0.0.0', 1234)
  455. elif addressFamily == AF_INET6:
  456. addr = IPv6Address('TCP', '::', 1234)
  457. else:
  458. raise UnsupportedAddressFamily()
  459. self.adoptedPorts.append((fileno, addressFamily, factory))
  460. return _FakePort(addr)
  461. def adoptStreamConnection(self, fileDescriptor, addressFamily, factory):
  462. """
  463. Record the given stream connection in C{adoptedStreamConnections}.
  464. @see:
  465. L{twisted.internet.interfaces.IReactorSocket.adoptStreamConnection}
  466. """
  467. self.adoptedStreamConnections.append((
  468. fileDescriptor, addressFamily, factory))
  469. def adoptDatagramPort(self, fileno, addressFamily, protocol,
  470. maxPacketSize=8192):
  471. """
  472. Fake L{IReactorSocket.adoptDatagramPort}, that logs the call and
  473. returns a fake L{IListeningPort}.
  474. @see: L{twisted.internet.interfaces.IReactorSocket.adoptDatagramPort}
  475. """
  476. if addressFamily == AF_INET:
  477. addr = IPv4Address('UDP', '0.0.0.0', 1234)
  478. elif addressFamily == AF_INET6:
  479. addr = IPv6Address('UDP', '::', 1234)
  480. else:
  481. raise UnsupportedAddressFamily()
  482. self.adoptedPorts.append(
  483. (fileno, addressFamily, protocol, maxPacketSize))
  484. return _FakePort(addr)
  485. def listenTCP(self, port, factory, backlog=50, interface=''):
  486. """
  487. Fake L{IReactorTCP.listenTCP}, that logs the call and
  488. returns an L{IListeningPort}.
  489. """
  490. self.tcpServers.append((port, factory, backlog, interface))
  491. if isIPv6Address(interface):
  492. address = IPv6Address('TCP', interface, port)
  493. else:
  494. address = IPv4Address('TCP', '0.0.0.0', port)
  495. return _FakePort(address)
  496. def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
  497. """
  498. Fake L{IReactorTCP.connectTCP}, that logs the call and
  499. returns an L{IConnector}.
  500. """
  501. self.tcpClients.append((host, port, factory, timeout, bindAddress))
  502. if isIPv6Address(host):
  503. conn = _FakeConnector(IPv6Address('TCP', host, port))
  504. else:
  505. conn = _FakeConnector(IPv4Address('TCP', host, port))
  506. factory.startedConnecting(conn)
  507. self.connectors.append(conn)
  508. return conn
  509. def listenSSL(self, port, factory, contextFactory,
  510. backlog=50, interface=''):
  511. """
  512. Fake L{IReactorSSL.listenSSL}, that logs the call and
  513. returns an L{IListeningPort}.
  514. """
  515. self.sslServers.append((port, factory, contextFactory,
  516. backlog, interface))
  517. return _FakePort(IPv4Address('TCP', '0.0.0.0', port))
  518. def connectSSL(self, host, port, factory, contextFactory,
  519. timeout=30, bindAddress=None):
  520. """
  521. Fake L{IReactorSSL.connectSSL}, that logs the call and returns an
  522. L{IConnector}.
  523. """
  524. self.sslClients.append((host, port, factory, contextFactory,
  525. timeout, bindAddress))
  526. conn = _FakeConnector(IPv4Address('TCP', host, port))
  527. factory.startedConnecting(conn)
  528. self.connectors.append(conn)
  529. return conn
  530. def listenUNIX(self, address, factory,
  531. backlog=50, mode=0o666, wantPID=0):
  532. """
  533. Fake L{IReactorUNIX.listenUNIX}, that logs the call and returns an
  534. L{IListeningPort}.
  535. """
  536. self.unixServers.append((address, factory, backlog, mode, wantPID))
  537. return _FakePort(UNIXAddress(address))
  538. def connectUNIX(self, address, factory, timeout=30, checkPID=0):
  539. """
  540. Fake L{IReactorUNIX.connectUNIX}, that logs the call and returns an
  541. L{IConnector}.
  542. """
  543. self.unixClients.append((address, factory, timeout, checkPID))
  544. conn = _FakeConnector(UNIXAddress(address))
  545. factory.startedConnecting(conn)
  546. self.connectors.append(conn)
  547. return conn
  548. def addReader(self, reader):
  549. """
  550. Fake L{IReactorFDSet.addReader} which adds the reader to a local set.
  551. """
  552. self.readers.add(reader)
  553. def removeReader(self, reader):
  554. """
  555. Fake L{IReactorFDSet.removeReader} which removes the reader from a
  556. local set.
  557. """
  558. self.readers.discard(reader)
  559. def addWriter(self, writer):
  560. """
  561. Fake L{IReactorFDSet.addWriter} which adds the writer to a local set.
  562. """
  563. self.writers.add(writer)
  564. def removeWriter(self, writer):
  565. """
  566. Fake L{IReactorFDSet.removeWriter} which removes the writer from a
  567. local set.
  568. """
  569. self.writers.discard(writer)
  570. def getReaders(self):
  571. """
  572. Fake L{IReactorFDSet.getReaders} which returns a list of readers from
  573. the local set.
  574. """
  575. return list(self.readers)
  576. def getWriters(self):
  577. """
  578. Fake L{IReactorFDSet.getWriters} which returns a list of writers from
  579. the local set.
  580. """
  581. return list(self.writers)
  582. def removeAll(self):
  583. """
  584. Fake L{IReactorFDSet.removeAll} which removed all readers and writers
  585. from the local sets.
  586. """
  587. self.readers.clear()
  588. self.writers.clear()
  589. for iface in implementedBy(MemoryReactor):
  590. verifyClass(iface, MemoryReactor)
  591. class MemoryReactorClock(MemoryReactor, Clock):
  592. def __init__(self):
  593. MemoryReactor.__init__(self)
  594. Clock.__init__(self)
  595. @implementer(IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket)
  596. class RaisingMemoryReactor(object):
  597. """
  598. A fake reactor to be used in tests. It accepts TCP connection setup
  599. attempts, but they will fail.
  600. @ivar _listenException: An instance of an L{Exception}
  601. @ivar _connectException: An instance of an L{Exception}
  602. """
  603. def __init__(self, listenException=None, connectException=None):
  604. """
  605. @param listenException: An instance of an L{Exception} to raise
  606. when any C{listen} method is called.
  607. @param connectException: An instance of an L{Exception} to raise
  608. when any C{connect} method is called.
  609. """
  610. self._listenException = listenException
  611. self._connectException = connectException
  612. def adoptStreamPort(self, fileno, addressFamily, factory):
  613. """
  614. Fake L{IReactorSocket.adoptStreamPort}, that raises
  615. L{_listenException}.
  616. """
  617. raise self._listenException
  618. def listenTCP(self, port, factory, backlog=50, interface=''):
  619. """
  620. Fake L{IReactorTCP.listenTCP}, that raises L{_listenException}.
  621. """
  622. raise self._listenException
  623. def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
  624. """
  625. Fake L{IReactorTCP.connectTCP}, that raises L{_connectException}.
  626. """
  627. raise self._connectException
  628. def listenSSL(self, port, factory, contextFactory,
  629. backlog=50, interface=''):
  630. """
  631. Fake L{IReactorSSL.listenSSL}, that raises L{_listenException}.
  632. """
  633. raise self._listenException
  634. def connectSSL(self, host, port, factory, contextFactory,
  635. timeout=30, bindAddress=None):
  636. """
  637. Fake L{IReactorSSL.connectSSL}, that raises L{_connectException}.
  638. """
  639. raise self._connectException
  640. def listenUNIX(self, address, factory,
  641. backlog=50, mode=0o666, wantPID=0):
  642. """
  643. Fake L{IReactorUNIX.listenUNIX}, that raises L{_listenException}.
  644. """
  645. raise self._listenException
  646. def connectUNIX(self, address, factory, timeout=30, checkPID=0):
  647. """
  648. Fake L{IReactorUNIX.connectUNIX}, that raises L{_connectException}.
  649. """
  650. raise self._connectException
  651. class NonStreamingProducer(object):
  652. """
  653. A pull producer which writes 10 times only.
  654. """
  655. counter = 0
  656. stopped = False
  657. def __init__(self, consumer):
  658. self.consumer = consumer
  659. self.result = Deferred()
  660. def resumeProducing(self):
  661. """
  662. Write the counter value once.
  663. """
  664. if self.consumer is None or self.counter >= 10:
  665. raise RuntimeError("BUG: resume after unregister/stop.")
  666. else:
  667. self.consumer.write(intToBytes(self.counter))
  668. self.counter += 1
  669. if self.counter == 10:
  670. self.consumer.unregisterProducer()
  671. self._done()
  672. def pauseProducing(self):
  673. """
  674. An implementation of C{IPushProducer.pauseProducing}. This should never
  675. be called on a pull producer, so this just raises an error.
  676. """
  677. raise RuntimeError("BUG: pause should never be called.")
  678. def _done(self):
  679. """
  680. Fire a L{Deferred} so that users can wait for this to complete.
  681. """
  682. self.consumer = None
  683. d = self.result
  684. del self.result
  685. d.callback(None)
  686. def stopProducing(self):
  687. """
  688. Stop all production.
  689. """
  690. self.stopped = True
  691. self._done()
  692. def waitUntilAllDisconnected(reactor, protocols):
  693. """
  694. Take a list of disconnecting protocols, callback a L{Deferred} when they're
  695. all done.
  696. This is a hack to make some older tests less flaky, as
  697. L{ITransport.loseConnection} is not atomic on all reactors (for example,
  698. the CoreFoundation, which sometimes takes a reactor turn for CFSocket to
  699. realise). New tests should either not use real sockets in testing, or take
  700. the advice in
  701. I{https://jml.io/pages/how-to-disconnect-in-twisted-really.html} to heart.
  702. @param reactor: The reactor to schedule the checks on.
  703. @type reactor: L{IReactorTime}
  704. @param protocols: The protocols to wait for disconnecting.
  705. @type protocols: A L{list} of L{IProtocol}s.
  706. """
  707. lc = None
  708. def _check():
  709. if True not in [x.transport.connected for x in protocols]:
  710. lc.stop()
  711. lc = task.LoopingCall(_check)
  712. lc.clock = reactor
  713. return lc.start(0.01, now=True)
  714. @implementer(ILogObserver)
  715. class EventLoggingObserver(Sequence):
  716. """
  717. L{ILogObserver} That stores its events in a list for later inspection.
  718. This class is similar to L{LimitedHistoryLogObserver} save that the
  719. internal buffer is public and intended for external inspection. The
  720. observer implements the sequence protocol to ease iteration of the events.
  721. @ivar _events: The events captured by this observer
  722. @type _events: L{list}
  723. """
  724. def __init__(self):
  725. self._events = []
  726. def __len__(self):
  727. return len(self._events)
  728. def __getitem__(self, index):
  729. return self._events[index]
  730. def __iter__(self):
  731. return iter(self._events)
  732. def __call__(self, event):
  733. """
  734. @see: L{ILogObserver}
  735. """
  736. self._events.append(event)
  737. @classmethod
  738. def createWithCleanup(cls, testInstance, publisher):
  739. """
  740. Create an L{EventLoggingObserver} instance that observes the provided
  741. publisher and will be cleaned up with addCleanup().
  742. @param testInstance: Test instance in which this logger is used.
  743. @type testInstance: L{twisted.trial.unittest.TestCase}
  744. @param publisher: Log publisher to observe.
  745. @type publisher: twisted.logger.LogPublisher
  746. @return: An EventLoggingObserver configured to observe the provided
  747. publisher.
  748. @rtype: L{twisted.test.proto_helpers.EventLoggingObserver}
  749. """
  750. obs = cls()
  751. publisher.addObserver(obs)
  752. testInstance.addCleanup(lambda: publisher.removeObserver(obs))
  753. return obs