123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969 |
- # -*- test-case-name: twisted.internet.test.test_testing -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Assorted functionality which is commonly useful when writing unit tests.
- """
- from __future__ import annotations
- from io import BytesIO
- from socket import AF_INET, AF_INET6
- from typing import Callable, Iterator, Sequence, overload
- from zope.interface import implementedBy, implementer
- from zope.interface.verify import verifyClass
- from typing_extensions import ParamSpec, Self
- from twisted.internet import address, error, protocol, task
- from twisted.internet.abstract import _dataMustBeBytes, isIPv6Address
- from twisted.internet.address import IPv4Address, IPv6Address, UNIXAddress
- from twisted.internet.defer import Deferred
- from twisted.internet.error import UnsupportedAddressFamily
- from twisted.internet.interfaces import (
- IConnector,
- IConsumer,
- IListeningPort,
- IProtocol,
- IPushProducer,
- IReactorCore,
- IReactorFDSet,
- IReactorSocket,
- IReactorSSL,
- IReactorTCP,
- IReactorUNIX,
- ITransport,
- )
- from twisted.internet.task import Clock
- from twisted.logger import ILogObserver, LogEvent, LogPublisher
- from twisted.protocols import basic
- from twisted.python import failure
- from twisted.trial.unittest import TestCase
- __all__ = [
- "AccumulatingProtocol",
- "LineSendingProtocol",
- "FakeDatagramTransport",
- "StringTransport",
- "StringTransportWithDisconnection",
- "StringIOWithoutClosing",
- "_FakeConnector",
- "_FakePort",
- "MemoryReactor",
- "MemoryReactorClock",
- "RaisingMemoryReactor",
- "NonStreamingProducer",
- "waitUntilAllDisconnected",
- "EventLoggingObserver",
- ]
- _P = ParamSpec("_P")
- class AccumulatingProtocol(protocol.Protocol):
- """
- L{AccumulatingProtocol} is an L{IProtocol} implementation which collects
- the data delivered to it and can fire a Deferred when it is connected or
- disconnected.
- @ivar made: A flag indicating whether C{connectionMade} has been called.
- @ivar data: Bytes giving all the data passed to C{dataReceived}.
- @ivar closed: A flag indicated whether C{connectionLost} has been called.
- @ivar closedReason: The value of the I{reason} parameter passed to
- C{connectionLost}.
- @ivar closedDeferred: If set to a L{Deferred}, this will be fired when
- C{connectionLost} is called.
- """
- made = closed = 0
- closedReason = None
- closedDeferred = None
- data = b""
- factory = None
- def connectionMade(self):
- self.made = 1
- if self.factory is not None and self.factory.protocolConnectionMade is not None:
- d = self.factory.protocolConnectionMade
- self.factory.protocolConnectionMade = None
- d.callback(self)
- def dataReceived(self, data):
- self.data += data
- def connectionLost(self, reason):
- self.closed = 1
- self.closedReason = reason
- if self.closedDeferred is not None:
- d, self.closedDeferred = self.closedDeferred, None
- d.callback(None)
- class LineSendingProtocol(basic.LineReceiver):
- lostConn = False
- def __init__(self, lines, start=True):
- self.lines = lines[:]
- self.response = []
- self.start = start
- def connectionMade(self):
- if self.start:
- for line in self.lines:
- self.sendLine(line)
- def lineReceived(self, line):
- if not self.start:
- for line in self.lines:
- self.sendLine(line)
- self.lines = []
- self.response.append(line)
- def connectionLost(self, reason):
- self.lostConn = True
- class FakeDatagramTransport:
- noAddr = object()
- def __init__(self):
- self.written = []
- def write(self, packet, addr=noAddr):
- self.written.append((packet, addr))
- @implementer(ITransport, IConsumer, IPushProducer)
- class StringTransport:
- """
- A transport implementation which buffers data in memory and keeps track of
- its other state without providing any behavior.
- L{StringTransport} has a number of attributes which are not part of any of
- the interfaces it claims to implement. These attributes are provided for
- testing purposes. Implementation code should not use any of these
- attributes; they are not provided by other transports.
- @ivar disconnecting: A C{bool} which is C{False} until L{loseConnection} is
- called, then C{True}.
- @ivar disconnected: A C{bool} which is C{False} until L{abortConnection} is
- called, then C{True}.
- @ivar producer: If a producer is currently registered, C{producer} is a
- reference to it. Otherwise, L{None}.
- @ivar streaming: If a producer is currently registered, C{streaming} refers
- to the value of the second parameter passed to C{registerProducer}.
- @ivar hostAddr: L{None} or an object which will be returned as the host
- address of this transport. If L{None}, a nasty tuple will be returned
- instead.
- @ivar peerAddr: L{None} or an object which will be returned as the peer
- address of this transport. If L{None}, a nasty tuple will be returned
- instead.
- @ivar producerState: The state of this L{StringTransport} in its capacity
- as an L{IPushProducer}. One of C{'producing'}, C{'paused'}, or
- C{'stopped'}.
- @ivar io: A L{io.BytesIO} which holds the data which has been written to
- this transport since the last call to L{clear}. Use L{value} instead
- of accessing this directly.
- @ivar _lenient: By default L{StringTransport} enforces that
- L{resumeProducing} is not called after the connection is lost. This is
- to ensure that any code that does call L{resumeProducing} after the
- connection is lost is not blindly expecting L{resumeProducing} to have
- any impact.
- However, if your test case is calling L{resumeProducing} after
- connection close on purpose, and you know it won't block expecting
- further data to show up, this flag may safely be set to L{True}.
- Defaults to L{False}.
- @type lenient: L{bool}
- """
- disconnecting = False
- disconnected = False
- producer = None
- streaming = None
- hostAddr = None
- peerAddr = None
- producerState = "producing"
- def __init__(self, hostAddress=None, peerAddress=None, lenient=False):
- self.clear()
- if hostAddress is not None:
- self.hostAddr = hostAddress
- if peerAddress is not None:
- self.peerAddr = peerAddress
- self.connected = True
- self._lenient = lenient
- def clear(self):
- """
- Discard all data written to this transport so far.
- This is not a transport method. It is intended for tests. Do not use
- it in implementation code.
- """
- self.io = BytesIO()
- def value(self):
- """
- Retrieve all data which has been buffered by this transport.
- This is not a transport method. It is intended for tests. Do not use
- it in implementation code.
- @return: A C{bytes} giving all data written to this transport since the
- last call to L{clear}.
- @rtype: C{bytes}
- """
- return self.io.getvalue()
- # ITransport
- def write(self, data):
- _dataMustBeBytes(data)
- self.io.write(data)
- def writeSequence(self, data):
- self.io.write(b"".join(data))
- def loseConnection(self):
- """
- Close the connection. Does nothing besides toggle the C{disconnecting}
- instance variable to C{True}.
- """
- self.disconnecting = True
- def abortConnection(self):
- """
- Abort the connection. Same as C{loseConnection}, but also toggles the
- C{aborted} instance variable to C{True}.
- """
- self.disconnected = True
- self.loseConnection()
- def getPeer(self):
- if self.peerAddr is None:
- return address.IPv4Address("TCP", "192.168.1.1", 54321)
- return self.peerAddr
- def getHost(self):
- if self.hostAddr is None:
- return address.IPv4Address("TCP", "10.0.0.1", 12345)
- return self.hostAddr
- # IConsumer
- def registerProducer(self, producer, streaming):
- if self.producer is not None:
- raise RuntimeError("Cannot register two producers")
- self.producer = producer
- self.streaming = streaming
- def unregisterProducer(self):
- if self.producer is None:
- raise RuntimeError("Cannot unregister a producer unless one is registered")
- self.producer = None
- self.streaming = None
- # IPushProducer
- def _checkState(self):
- if self.disconnecting and not self._lenient:
- raise RuntimeError("Cannot resume producing after loseConnection")
- if self.producerState == "stopped":
- raise RuntimeError("Cannot resume a stopped producer")
- def pauseProducing(self):
- self._checkState()
- self.producerState = "paused"
- def stopProducing(self):
- self.producerState = "stopped"
- def resumeProducing(self):
- self._checkState()
- self.producerState = "producing"
- class StringTransportWithDisconnection(StringTransport):
- """
- A L{StringTransport} which on disconnection will trigger the connection
- lost on the attached protocol.
- """
- protocol: IProtocol
- def loseConnection(self):
- if self.connected:
- self.connected = False
- self.protocol.connectionLost(failure.Failure(error.ConnectionDone("Bye.")))
- class StringIOWithoutClosing(BytesIO):
- """
- A BytesIO that can't be closed.
- """
- def close(self):
- """
- Do nothing.
- """
- @implementer(IListeningPort)
- class _FakePort:
- """
- A fake L{IListeningPort} to be used in tests.
- @ivar _hostAddress: The L{IAddress} this L{IListeningPort} is pretending
- to be listening on.
- """
- def __init__(self, hostAddress):
- """
- @param hostAddress: An L{IAddress} this L{IListeningPort} should
- pretend to be listening on.
- """
- self._hostAddress = hostAddress
- def startListening(self):
- """
- Fake L{IListeningPort.startListening} that doesn't do anything.
- """
- def stopListening(self):
- """
- Fake L{IListeningPort.stopListening} that doesn't do anything.
- """
- def getHost(self):
- """
- Fake L{IListeningPort.getHost} that returns our L{IAddress}.
- """
- return self._hostAddress
- @implementer(IConnector)
- class _FakeConnector:
- """
- A fake L{IConnector} that allows us to inspect if it has been told to stop
- connecting.
- @ivar stoppedConnecting: has this connector's
- L{_FakeConnector.stopConnecting} method been invoked yet?
- @ivar _address: An L{IAddress} provider that represents our destination.
- """
- _disconnected = False
- stoppedConnecting = False
- def __init__(self, address):
- """
- @param address: An L{IAddress} provider that represents this
- connector's destination.
- """
- self._address = address
- def stopConnecting(self):
- """
- Implement L{IConnector.stopConnecting} and set
- L{_FakeConnector.stoppedConnecting} to C{True}
- """
- self.stoppedConnecting = True
- def disconnect(self):
- """
- Implement L{IConnector.disconnect} as a no-op.
- """
- self._disconnected = True
- def connect(self):
- """
- Implement L{IConnector.connect} as a no-op.
- """
- def getDestination(self):
- """
- Implement L{IConnector.getDestination} to return the C{address} passed
- to C{__init__}.
- """
- return self._address
- @implementer(
- IReactorCore, IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket, IReactorFDSet
- )
- class MemoryReactor:
- """
- A fake reactor to be used in tests. This reactor doesn't actually do
- much that's useful yet. It accepts TCP connection setup attempts, but
- they will never succeed.
- @ivar hasInstalled: Keeps track of whether this reactor has been installed.
- @type hasInstalled: L{bool}
- @ivar running: Keeps track of whether this reactor is running.
- @type running: L{bool}
- @ivar hasStopped: Keeps track of whether this reactor has been stopped.
- @type hasStopped: L{bool}
- @ivar hasCrashed: Keeps track of whether this reactor has crashed.
- @type hasCrashed: L{bool}
- @ivar whenRunningHooks: Keeps track of hooks registered with
- C{callWhenRunning}.
- @type whenRunningHooks: L{list}
- @ivar triggers: Keeps track of hooks registered with
- C{addSystemEventTrigger}.
- @type triggers: L{dict}
- @ivar tcpClients: Keeps track of connection attempts (ie, calls to
- C{connectTCP}).
- @type tcpClients: L{list}
- @ivar tcpServers: Keeps track of server listen attempts (ie, calls to
- C{listenTCP}).
- @type tcpServers: L{list}
- @ivar sslClients: Keeps track of connection attempts (ie, calls to
- C{connectSSL}).
- @type sslClients: L{list}
- @ivar sslServers: Keeps track of server listen attempts (ie, calls to
- C{listenSSL}).
- @type sslServers: L{list}
- @ivar unixClients: Keeps track of connection attempts (ie, calls to
- C{connectUNIX}).
- @type unixClients: L{list}
- @ivar unixServers: Keeps track of server listen attempts (ie, calls to
- C{listenUNIX}).
- @type unixServers: L{list}
- @ivar adoptedPorts: Keeps track of server listen attempts (ie, calls to
- C{adoptStreamPort}).
- @ivar adoptedStreamConnections: Keeps track of stream-oriented
- connections added using C{adoptStreamConnection}.
- """
- def __init__(self):
- """
- Initialize the tracking lists.
- """
- self.hasInstalled = False
- self.running = False
- self.hasRun = True
- self.hasStopped = True
- self.hasCrashed = True
- self.whenRunningHooks = []
- self.triggers = {}
- self.tcpClients = []
- self.tcpServers = []
- self.sslClients = []
- self.sslServers = []
- self.unixClients = []
- self.unixServers = []
- self.adoptedPorts = []
- self.adoptedStreamConnections = []
- self.connectors = []
- self.readers = set()
- self.writers = set()
- def install(self):
- """
- Fake install callable to emulate reactor module installation.
- """
- self.hasInstalled = True
- def resolve(self, name, timeout=10):
- """
- Not implemented; raises L{NotImplementedError}.
- """
- raise NotImplementedError()
- def run(self):
- """
- Fake L{IReactorCore.run}.
- Sets C{self.running} to L{True}, runs all of the hooks passed to
- C{self.callWhenRunning}, then calls C{self.stop} to simulate a request
- to stop the reactor.
- Sets C{self.hasRun} to L{True}.
- """
- assert self.running is False
- self.running = True
- self.hasRun = True
- for f, args, kwargs in self.whenRunningHooks:
- f(*args, **kwargs)
- self.stop()
- # That we stopped means we can return, phew.
- def stop(self):
- """
- Fake L{IReactorCore.run}.
- Sets C{self.running} to L{False}.
- Sets C{self.hasStopped} to L{True}.
- """
- self.running = False
- self.hasStopped = True
- def crash(self):
- """
- Fake L{IReactorCore.crash}.
- Sets C{self.running} to L{None}, because that feels crashy.
- Sets C{self.hasCrashed} to L{True}.
- """
- self.running = None
- self.hasCrashed = True
- def iterate(self, delay=0):
- """
- Not implemented; raises L{NotImplementedError}.
- """
- raise NotImplementedError()
- def fireSystemEvent(self, eventType):
- """
- Not implemented; raises L{NotImplementedError}.
- """
- raise NotImplementedError()
- def addSystemEventTrigger(
- self,
- phase: str,
- eventType: str,
- callable: Callable[_P, object],
- *args: _P.args,
- **kw: _P.kwargs,
- ) -> None:
- """
- Fake L{IReactorCore.run}.
- Keep track of trigger by appending it to
- self.triggers[phase][eventType].
- """
- phaseTriggers = self.triggers.setdefault(phase, {})
- eventTypeTriggers = phaseTriggers.setdefault(eventType, [])
- eventTypeTriggers.append((callable, args, kw))
- def removeSystemEventTrigger(self, triggerID):
- """
- Not implemented; raises L{NotImplementedError}.
- """
- raise NotImplementedError()
- def callWhenRunning(
- self, callable: Callable[_P, object], *args: _P.args, **kw: _P.kwargs
- ) -> None:
- """
- Fake L{IReactorCore.callWhenRunning}.
- Keeps a list of invocations to make in C{self.whenRunningHooks}.
- """
- self.whenRunningHooks.append((callable, args, kw))
- def adoptStreamPort(self, fileno, addressFamily, factory):
- """
- Fake L{IReactorSocket.adoptStreamPort}, that logs the call and returns
- an L{IListeningPort}.
- """
- if addressFamily == AF_INET:
- addr = IPv4Address("TCP", "0.0.0.0", 1234)
- elif addressFamily == AF_INET6:
- addr = IPv6Address("TCP", "::", 1234)
- else:
- raise UnsupportedAddressFamily()
- self.adoptedPorts.append((fileno, addressFamily, factory))
- return _FakePort(addr)
- def adoptStreamConnection(self, fileDescriptor, addressFamily, factory):
- """
- Record the given stream connection in C{adoptedStreamConnections}.
- @see:
- L{twisted.internet.interfaces.IReactorSocket.adoptStreamConnection}
- """
- self.adoptedStreamConnections.append((fileDescriptor, addressFamily, factory))
- def adoptDatagramPort(self, fileno, addressFamily, protocol, maxPacketSize=8192):
- """
- Fake L{IReactorSocket.adoptDatagramPort}, that logs the call and
- returns a fake L{IListeningPort}.
- @see: L{twisted.internet.interfaces.IReactorSocket.adoptDatagramPort}
- """
- if addressFamily == AF_INET:
- addr = IPv4Address("UDP", "0.0.0.0", 1234)
- elif addressFamily == AF_INET6:
- addr = IPv6Address("UDP", "::", 1234)
- else:
- raise UnsupportedAddressFamily()
- self.adoptedPorts.append((fileno, addressFamily, protocol, maxPacketSize))
- return _FakePort(addr)
- def listenTCP(self, port, factory, backlog=50, interface=""):
- """
- Fake L{IReactorTCP.listenTCP}, that logs the call and
- returns an L{IListeningPort}.
- """
- self.tcpServers.append((port, factory, backlog, interface))
- if isIPv6Address(interface):
- address = IPv6Address("TCP", interface, port)
- else:
- address = IPv4Address("TCP", "0.0.0.0", port)
- return _FakePort(address)
- def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
- """
- Fake L{IReactorTCP.connectTCP}, that logs the call and
- returns an L{IConnector}.
- """
- self.tcpClients.append((host, port, factory, timeout, bindAddress))
- if isIPv6Address(host):
- conn = _FakeConnector(IPv6Address("TCP", host, port))
- else:
- conn = _FakeConnector(IPv4Address("TCP", host, port))
- factory.startedConnecting(conn)
- self.connectors.append(conn)
- return conn
- def listenSSL(self, port, factory, contextFactory, backlog=50, interface=""):
- """
- Fake L{IReactorSSL.listenSSL}, that logs the call and
- returns an L{IListeningPort}.
- """
- self.sslServers.append((port, factory, contextFactory, backlog, interface))
- return _FakePort(IPv4Address("TCP", "0.0.0.0", port))
- def connectSSL(
- self, host, port, factory, contextFactory, timeout=30, bindAddress=None
- ):
- """
- Fake L{IReactorSSL.connectSSL}, that logs the call and returns an
- L{IConnector}.
- """
- self.sslClients.append(
- (host, port, factory, contextFactory, timeout, bindAddress)
- )
- conn = _FakeConnector(IPv4Address("TCP", host, port))
- factory.startedConnecting(conn)
- self.connectors.append(conn)
- return conn
- def listenUNIX(self, address, factory, backlog=50, mode=0o666, wantPID=0):
- """
- Fake L{IReactorUNIX.listenUNIX}, that logs the call and returns an
- L{IListeningPort}.
- """
- self.unixServers.append((address, factory, backlog, mode, wantPID))
- return _FakePort(UNIXAddress(address))
- def connectUNIX(self, address, factory, timeout=30, checkPID=0):
- """
- Fake L{IReactorUNIX.connectUNIX}, that logs the call and returns an
- L{IConnector}.
- """
- self.unixClients.append((address, factory, timeout, checkPID))
- conn = _FakeConnector(UNIXAddress(address))
- factory.startedConnecting(conn)
- self.connectors.append(conn)
- return conn
- def addReader(self, reader):
- """
- Fake L{IReactorFDSet.addReader} which adds the reader to a local set.
- """
- self.readers.add(reader)
- def removeReader(self, reader):
- """
- Fake L{IReactorFDSet.removeReader} which removes the reader from a
- local set.
- """
- self.readers.discard(reader)
- def addWriter(self, writer):
- """
- Fake L{IReactorFDSet.addWriter} which adds the writer to a local set.
- """
- self.writers.add(writer)
- def removeWriter(self, writer):
- """
- Fake L{IReactorFDSet.removeWriter} which removes the writer from a
- local set.
- """
- self.writers.discard(writer)
- def getReaders(self):
- """
- Fake L{IReactorFDSet.getReaders} which returns a list of readers from
- the local set.
- """
- return list(self.readers)
- def getWriters(self):
- """
- Fake L{IReactorFDSet.getWriters} which returns a list of writers from
- the local set.
- """
- return list(self.writers)
- def removeAll(self):
- """
- Fake L{IReactorFDSet.removeAll} which removed all readers and writers
- from the local sets.
- """
- self.readers.clear()
- self.writers.clear()
- for iface in implementedBy(MemoryReactor):
- verifyClass(iface, MemoryReactor)
- class MemoryReactorClock(MemoryReactor, Clock):
- def __init__(self):
- MemoryReactor.__init__(self)
- Clock.__init__(self)
- @implementer(IReactorTCP, IReactorSSL, IReactorUNIX, IReactorSocket)
- class RaisingMemoryReactor:
- """
- A fake reactor to be used in tests. It accepts TCP connection setup
- attempts, but they will fail.
- @ivar _listenException: An instance of an L{Exception}
- @ivar _connectException: An instance of an L{Exception}
- """
- def __init__(self, listenException=None, connectException=None):
- """
- @param listenException: An instance of an L{Exception} to raise
- when any C{listen} method is called.
- @param connectException: An instance of an L{Exception} to raise
- when any C{connect} method is called.
- """
- self._listenException = listenException
- self._connectException = connectException
- def adoptStreamPort(self, fileno, addressFamily, factory):
- """
- Fake L{IReactorSocket.adoptStreamPort}, that raises
- L{_listenException}.
- """
- raise self._listenException
- def listenTCP(self, port, factory, backlog=50, interface=""):
- """
- Fake L{IReactorTCP.listenTCP}, that raises L{_listenException}.
- """
- raise self._listenException
- def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
- """
- Fake L{IReactorTCP.connectTCP}, that raises L{_connectException}.
- """
- raise self._connectException
- def listenSSL(self, port, factory, contextFactory, backlog=50, interface=""):
- """
- Fake L{IReactorSSL.listenSSL}, that raises L{_listenException}.
- """
- raise self._listenException
- def connectSSL(
- self, host, port, factory, contextFactory, timeout=30, bindAddress=None
- ):
- """
- Fake L{IReactorSSL.connectSSL}, that raises L{_connectException}.
- """
- raise self._connectException
- def listenUNIX(self, address, factory, backlog=50, mode=0o666, wantPID=0):
- """
- Fake L{IReactorUNIX.listenUNIX}, that raises L{_listenException}.
- """
- raise self._listenException
- def connectUNIX(self, address, factory, timeout=30, checkPID=0):
- """
- Fake L{IReactorUNIX.connectUNIX}, that raises L{_connectException}.
- """
- raise self._connectException
- def adoptDatagramPort(self, fileDescriptor, addressFamily, protocol, maxPacketSize):
- """
- Fake L{IReactorSocket.adoptDatagramPort}, that raises
- L{_connectException}.
- """
- raise self._connectException
- def adoptStreamConnection(self, fileDescriptor, addressFamily, factory):
- """
- Fake L{IReactorSocket.adoptStreamConnection}, that raises
- L{_connectException}.
- """
- raise self._connectException
- class NonStreamingProducer:
- """
- A pull producer which writes 10 times only.
- """
- counter = 0
- stopped = False
- def __init__(self, consumer):
- self.consumer = consumer
- self.result = Deferred()
- def resumeProducing(self):
- """
- Write the counter value once.
- """
- if self.consumer is None or self.counter >= 10:
- raise RuntimeError("BUG: resume after unregister/stop.")
- else:
- self.consumer.write(b"%d" % (self.counter,))
- self.counter += 1
- if self.counter == 10:
- self.consumer.unregisterProducer()
- self._done()
- def pauseProducing(self):
- """
- An implementation of C{IPushProducer.pauseProducing}. This should never
- be called on a pull producer, so this just raises an error.
- """
- raise RuntimeError("BUG: pause should never be called.")
- def _done(self):
- """
- Fire a L{Deferred} so that users can wait for this to complete.
- """
- self.consumer = None
- d = self.result
- del self.result
- d.callback(None)
- def stopProducing(self):
- """
- Stop all production.
- """
- self.stopped = True
- self._done()
- def waitUntilAllDisconnected(reactor, protocols):
- """
- Take a list of disconnecting protocols, callback a L{Deferred} when they're
- all done.
- This is a hack to make some older tests less flaky, as
- L{ITransport.loseConnection} is not atomic on all reactors (for example,
- the CoreFoundation, which sometimes takes a reactor turn for CFSocket to
- realise). New tests should either not use real sockets in testing, or take
- the advice in
- I{https://jml.io/pages/how-to-disconnect-in-twisted-really.html} to heart.
- @param reactor: The reactor to schedule the checks on.
- @type reactor: L{IReactorTime}
- @param protocols: The protocols to wait for disconnecting.
- @type protocols: A L{list} of L{IProtocol}s.
- """
- lc = None
- def _check():
- if True not in [x.transport.connected for x in protocols]:
- lc.stop()
- lc = task.LoopingCall(_check)
- lc.clock = reactor
- return lc.start(0.01, now=True)
- @implementer(ILogObserver)
- class EventLoggingObserver(Sequence[LogEvent]):
- """
- L{ILogObserver} That stores its events in a list for later inspection.
- This class is similar to L{LimitedHistoryLogObserver} save that the
- internal buffer is public and intended for external inspection. The
- observer implements the sequence protocol to ease iteration of the events.
- @ivar _events: The events captured by this observer
- @type _events: L{list}
- """
- def __init__(self) -> None:
- self._events: list[LogEvent] = []
- def __len__(self) -> int:
- return len(self._events)
- @overload
- def __getitem__(self, index: int) -> LogEvent:
- ...
- @overload
- def __getitem__(self, index: slice) -> Sequence[LogEvent]:
- ...
- def __getitem__(self, index: int | slice) -> LogEvent | Sequence[LogEvent]:
- return self._events[index]
- def __iter__(self) -> Iterator[LogEvent]:
- return iter(self._events)
- def __call__(self, event: LogEvent) -> None:
- """
- @see: L{ILogObserver}
- """
- self._events.append(event)
- @classmethod
- def createWithCleanup(cls, testInstance: TestCase, publisher: LogPublisher) -> Self:
- """
- Create an L{EventLoggingObserver} instance that observes the provided
- publisher and will be cleaned up with addCleanup().
- @param testInstance: Test instance in which this logger is used.
- @type testInstance: L{twisted.trial.unittest.TestCase}
- @param publisher: Log publisher to observe.
- @type publisher: twisted.logger.LogPublisher
- @return: An EventLoggingObserver configured to observe the provided
- publisher.
- @rtype: L{twisted.test.proto_helpers.EventLoggingObserver}
- """
- obs = cls()
- publisher.addObserver(obs)
- testInstance.addCleanup(lambda: publisher.removeObserver(obs))
- return obs
|