123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- # -*- test-case-name: twisted.application.test.test_internet,twisted.test.test_application,twisted.test.test_cooperator -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Reactor-based Services
- Here are services to run clients, servers and periodic services using
- the reactor.
- If you want to run a server service, L{StreamServerEndpointService} defines a
- service that can wrap an arbitrary L{IStreamServerEndpoint
- <twisted.internet.interfaces.IStreamServerEndpoint>}
- as an L{IService}. See also L{twisted.application.strports.service} for
- constructing one of these directly from a descriptive string.
- Additionally, this module (dynamically) defines various Service subclasses that
- let you represent clients and servers in a Service hierarchy. Endpoints APIs
- should be preferred for stream server services, but since those APIs do not yet
- exist for clients or datagram services, many of these are still useful.
- They are as follows::
- TCPServer, TCPClient,
- UNIXServer, UNIXClient,
- SSLServer, SSLClient,
- UDPServer,
- UNIXDatagramServer, UNIXDatagramClient,
- MulticastServer
- These classes take arbitrary arguments in their constructors and pass
- them straight on to their respective reactor.listenXXX or
- reactor.connectXXX calls.
- For example, the following service starts a web server on port 8080:
- C{TCPServer(8080, server.Site(r))}. See the documentation for the
- reactor.listen/connect* methods for more information.
- """
- from typing import List
- from twisted.application import service
- from twisted.internet import task
- from twisted.internet.defer import CancelledError
- from twisted.python import log
- from ._client_service import ClientService, _maybeGlobalReactor, backoffPolicy
- class _VolatileDataService(service.Service):
- volatile: List[str] = []
- def __getstate__(self):
- d = service.Service.__getstate__(self)
- for attr in self.volatile:
- if attr in d:
- del d[attr]
- return d
- class _AbstractServer(_VolatileDataService):
- """
- @cvar volatile: list of attribute to remove from pickling.
- @type volatile: C{list}
- @ivar method: the type of method to call on the reactor, one of B{TCP},
- B{UDP}, B{SSL} or B{UNIX}.
- @type method: C{str}
- @ivar reactor: the current running reactor.
- @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP},
- C{IReactorSSL} or C{IReactorUnix}.
- @ivar _port: instance of port set when the service is started.
- @type _port: a provider of L{twisted.internet.interfaces.IListeningPort}.
- """
- volatile = ["_port"]
- method: str = ""
- reactor = None
- _port = None
- def __init__(self, *args, **kwargs):
- self.args = args
- if "reactor" in kwargs:
- self.reactor = kwargs.pop("reactor")
- self.kwargs = kwargs
- def privilegedStartService(self):
- service.Service.privilegedStartService(self)
- self._port = self._getPort()
- def startService(self):
- service.Service.startService(self)
- if self._port is None:
- self._port = self._getPort()
- def stopService(self):
- service.Service.stopService(self)
- # TODO: if startup failed, should shutdown skip stopListening?
- # _port won't exist
- if self._port is not None:
- d = self._port.stopListening()
- del self._port
- return d
- def _getPort(self):
- """
- Wrapper around the appropriate listen method of the reactor.
- @return: the port object returned by the listen method.
- @rtype: an object providing
- L{twisted.internet.interfaces.IListeningPort}.
- """
- return getattr(
- _maybeGlobalReactor(self.reactor),
- "listen{}".format(
- self.method,
- ),
- )(*self.args, **self.kwargs)
- class _AbstractClient(_VolatileDataService):
- """
- @cvar volatile: list of attribute to remove from pickling.
- @type volatile: C{list}
- @ivar method: the type of method to call on the reactor, one of B{TCP},
- B{UDP}, B{SSL} or B{UNIX}.
- @type method: C{str}
- @ivar reactor: the current running reactor.
- @type reactor: a provider of C{IReactorTCP}, C{IReactorUDP},
- C{IReactorSSL} or C{IReactorUnix}.
- @ivar _connection: instance of connection set when the service is started.
- @type _connection: a provider of L{twisted.internet.interfaces.IConnector}.
- """
- volatile = ["_connection"]
- method: str = ""
- reactor = None
- _connection = None
- def __init__(self, *args, **kwargs):
- self.args = args
- if "reactor" in kwargs:
- self.reactor = kwargs.pop("reactor")
- self.kwargs = kwargs
- def startService(self):
- service.Service.startService(self)
- self._connection = self._getConnection()
- def stopService(self):
- service.Service.stopService(self)
- if self._connection is not None:
- self._connection.disconnect()
- del self._connection
- def _getConnection(self):
- """
- Wrapper around the appropriate connect method of the reactor.
- @return: the port object returned by the connect method.
- @rtype: an object providing L{twisted.internet.interfaces.IConnector}.
- """
- return getattr(_maybeGlobalReactor(self.reactor), f"connect{self.method}")(
- *self.args, **self.kwargs
- )
- _clientDoc = """Connect to {tran}
- Call reactor.connect{tran} when the service starts, with the
- arguments given to the constructor.
- """
- _serverDoc = """Serve {tran} clients
- Call reactor.listen{tran} when the service starts, with the
- arguments given to the constructor. When the service stops,
- stop listening. See twisted.internet.interfaces for documentation
- on arguments to the reactor method.
- """
- class TCPServer(_AbstractServer):
- __doc__ = _serverDoc.format(tran="TCP")
- method = "TCP"
- class TCPClient(_AbstractClient):
- __doc__ = _clientDoc.format(tran="TCP")
- method = "TCP"
- class UNIXServer(_AbstractServer):
- __doc__ = _serverDoc.format(tran="UNIX")
- method = "UNIX"
- class UNIXClient(_AbstractClient):
- __doc__ = _clientDoc.format(tran="UNIX")
- method = "UNIX"
- class SSLServer(_AbstractServer):
- __doc__ = _serverDoc.format(tran="SSL")
- method = "SSL"
- class SSLClient(_AbstractClient):
- __doc__ = _clientDoc.format(tran="SSL")
- method = "SSL"
- class UDPServer(_AbstractServer):
- __doc__ = _serverDoc.format(tran="UDP")
- method = "UDP"
- class UNIXDatagramServer(_AbstractServer):
- __doc__ = _serverDoc.format(tran="UNIXDatagram")
- method = "UNIXDatagram"
- class UNIXDatagramClient(_AbstractClient):
- __doc__ = _clientDoc.format(tran="UNIXDatagram")
- method = "UNIXDatagram"
- class MulticastServer(_AbstractServer):
- __doc__ = _serverDoc.format(tran="Multicast")
- method = "Multicast"
- class TimerService(_VolatileDataService):
- """
- Service to periodically call a function
- Every C{step} seconds call the given function with the given arguments.
- The service starts the calls when it starts, and cancels them
- when it stops.
- @ivar clock: Source of time. This defaults to L{None} which is
- causes L{twisted.internet.reactor} to be used.
- Feel free to set this to something else, but it probably ought to be
- set *before* calling L{startService}.
- @type clock: L{IReactorTime<twisted.internet.interfaces.IReactorTime>}
- @ivar call: Function and arguments to call periodically.
- @type call: L{tuple} of C{(callable, args, kwargs)}
- """
- volatile = ["_loop", "_loopFinished"]
- def __init__(self, step, callable, *args, **kwargs):
- """
- @param step: The number of seconds between calls.
- @type step: L{float}
- @param callable: Function to call
- @type callable: L{callable}
- @param args: Positional arguments to pass to function
- @param kwargs: Keyword arguments to pass to function
- """
- self.step = step
- self.call = (callable, args, kwargs)
- self.clock = None
- def startService(self):
- service.Service.startService(self)
- callable, args, kwargs = self.call
- # we have to make a new LoopingCall each time we're started, because
- # an active LoopingCall remains active when serialized. If
- # LoopingCall were a _VolatileDataService, we wouldn't need to do
- # this.
- self._loop = task.LoopingCall(callable, *args, **kwargs)
- self._loop.clock = _maybeGlobalReactor(self.clock)
- self._loopFinished = self._loop.start(self.step, now=True)
- self._loopFinished.addErrback(self._failed)
- def _failed(self, why):
- # make a note that the LoopingCall is no longer looping, so we don't
- # try to shut it down a second time in stopService. I think this
- # should be in LoopingCall. -warner
- self._loop.running = False
- log.err(why)
- def stopService(self):
- """
- Stop the service.
- @rtype: L{Deferred<defer.Deferred>}
- @return: a L{Deferred<defer.Deferred>} which is fired when the
- currently running call (if any) is finished.
- """
- if self._loop.running:
- self._loop.stop()
- self._loopFinished.addCallback(lambda _: service.Service.stopService(self))
- return self._loopFinished
- class CooperatorService(service.Service):
- """
- Simple L{service.IService} which starts and stops a L{twisted.internet.task.Cooperator}.
- """
- def __init__(self):
- self.coop = task.Cooperator(started=False)
- def coiterate(self, iterator):
- return self.coop.coiterate(iterator)
- def startService(self):
- self.coop.start()
- def stopService(self):
- self.coop.stop()
- class StreamServerEndpointService(service.Service):
- """
- A L{StreamServerEndpointService} is an L{IService} which runs a server on a
- listening port described by an L{IStreamServerEndpoint
- <twisted.internet.interfaces.IStreamServerEndpoint>}.
- @ivar factory: A server factory which will be used to listen on the
- endpoint.
- @ivar endpoint: An L{IStreamServerEndpoint
- <twisted.internet.interfaces.IStreamServerEndpoint>} provider
- which will be used to listen when the service starts.
- @ivar _waitingForPort: a Deferred, if C{listen} has yet been invoked on the
- endpoint, otherwise None.
- @ivar _raiseSynchronously: Defines error-handling behavior for the case
- where C{listen(...)} raises an exception before C{startService} or
- C{privilegedStartService} have completed.
- @type _raiseSynchronously: C{bool}
- @since: 10.2
- """
- _raiseSynchronously = False
- def __init__(self, endpoint, factory):
- self.endpoint = endpoint
- self.factory = factory
- self._waitingForPort = None
- def privilegedStartService(self):
- """
- Start listening on the endpoint.
- """
- service.Service.privilegedStartService(self)
- self._waitingForPort = self.endpoint.listen(self.factory)
- raisedNow = []
- def handleIt(err):
- if self._raiseSynchronously:
- raisedNow.append(err)
- elif not err.check(CancelledError):
- log.err(err)
- self._waitingForPort.addErrback(handleIt)
- if raisedNow:
- raisedNow[0].raiseException()
- self._raiseSynchronously = False
- def startService(self):
- """
- Start listening on the endpoint, unless L{privilegedStartService} got
- around to it already.
- """
- service.Service.startService(self)
- if self._waitingForPort is None:
- self.privilegedStartService()
- def stopService(self):
- """
- Stop listening on the port if it is already listening, otherwise,
- cancel the attempt to listen.
- @return: a L{Deferred<twisted.internet.defer.Deferred>} which fires
- with L{None} when the port has stopped listening.
- """
- self._waitingForPort.cancel()
- def stopIt(port):
- if port is not None:
- return port.stopListening()
- d = self._waitingForPort.addCallback(stopIt)
- def stop(passthrough):
- self.running = False
- return passthrough
- d.addBoth(stop)
- return d
- __all__ = [
- "TimerService",
- "CooperatorService",
- "MulticastServer",
- "StreamServerEndpointService",
- "UDPServer",
- "ClientService",
- "TCPServer",
- "TCPClient",
- "UNIXServer",
- "UNIXClient",
- "SSLServer",
- "SSLClient",
- "UNIXDatagramServer",
- "UNIXDatagramClient",
- "ClientService",
- "backoffPolicy",
- ]
|