123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475 |
- # -*- test-case-name: twisted.words.test.test_jabbercomponent -*-
- #
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- External server-side components.
- Most Jabber server implementations allow for add-on components that act as a
- separate entity on the Jabber network, but use the server-to-server
- functionality of a regular Jabber IM server. These so-called 'external
- components' are connected to the Jabber server using the Jabber Component
- Protocol as defined in U{JEP-0114<http://www.jabber.org/jeps/jep-0114.html>}.
- This module allows for writing external server-side component by assigning one
- or more services implementing L{ijabber.IService} up to L{ServiceManager}. The
- ServiceManager connects to the Jabber server and is responsible for the
- corresponding XML stream.
- """
- from zope.interface import implementer
- from twisted.application import service
- from twisted.internet import defer
- from twisted.python import log
- from twisted.python.compat import _coercedUnicode, unicode
- from twisted.words.xish import domish
- from twisted.words.protocols.jabber import error, ijabber, jstrports, xmlstream
- from twisted.words.protocols.jabber.jid import internJID as JID
- NS_COMPONENT_ACCEPT = 'jabber:component:accept'
- def componentFactory(componentid, password):
- """
- XML stream factory for external server-side components.
- @param componentid: JID of the component.
- @type componentid: L{unicode}
- @param password: password used to authenticate to the server.
- @type password: C{str}
- """
- a = ConnectComponentAuthenticator(componentid, password)
- return xmlstream.XmlStreamFactory(a)
- class ComponentInitiatingInitializer(object):
- """
- External server-side component authentication initializer for the
- initiating entity.
- @ivar xmlstream: XML stream between server and component.
- @type xmlstream: L{xmlstream.XmlStream}
- """
- def __init__(self, xs):
- self.xmlstream = xs
- self._deferred = None
- def initialize(self):
- xs = self.xmlstream
- hs = domish.Element((self.xmlstream.namespace, "handshake"))
- digest = xmlstream.hashPassword(
- xs.sid,
- _coercedUnicode(xs.authenticator.password))
- hs.addContent(unicode(digest))
- # Setup observer to watch for handshake result
- xs.addOnetimeObserver("/handshake", self._cbHandshake)
- xs.send(hs)
- self._deferred = defer.Deferred()
- return self._deferred
- def _cbHandshake(self, _):
- # we have successfully shaken hands and can now consider this
- # entity to represent the component JID.
- self.xmlstream.thisEntity = self.xmlstream.otherEntity
- self._deferred.callback(None)
- class ConnectComponentAuthenticator(xmlstream.ConnectAuthenticator):
- """
- Authenticator to permit an XmlStream to authenticate against a Jabber
- server as an external component (where the Authenticator is initiating the
- stream).
- """
- namespace = NS_COMPONENT_ACCEPT
- def __init__(self, componentjid, password):
- """
- @type componentjid: C{str}
- @param componentjid: Jabber ID that this component wishes to bind to.
- @type password: C{str}
- @param password: Password/secret this component uses to authenticate.
- """
- # Note that we are sending 'to' our desired component JID.
- xmlstream.ConnectAuthenticator.__init__(self, componentjid)
- self.password = password
- def associateWithStream(self, xs):
- xs.version = (0, 0)
- xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
- xs.initializers = [ComponentInitiatingInitializer(xs)]
- class ListenComponentAuthenticator(xmlstream.ListenAuthenticator):
- """
- Authenticator for accepting components.
- @since: 8.2
- @ivar secret: The shared secret used to authorized incoming component
- connections.
- @type secret: C{unicode}.
- """
- namespace = NS_COMPONENT_ACCEPT
- def __init__(self, secret):
- self.secret = secret
- xmlstream.ListenAuthenticator.__init__(self)
- def associateWithStream(self, xs):
- """
- Associate the authenticator with a stream.
- This sets the stream's version to 0.0, because the XEP-0114 component
- protocol was not designed for XMPP 1.0.
- """
- xs.version = (0, 0)
- xmlstream.ListenAuthenticator.associateWithStream(self, xs)
- def streamStarted(self, rootElement):
- """
- Called by the stream when it has started.
- This examines the default namespace of the incoming stream and whether
- there is a requested hostname for the component. Then it generates a
- stream identifier, sends a response header and adds an observer for
- the first incoming element, triggering L{onElement}.
- """
- xmlstream.ListenAuthenticator.streamStarted(self, rootElement)
- if rootElement.defaultUri != self.namespace:
- exc = error.StreamError('invalid-namespace')
- self.xmlstream.sendStreamError(exc)
- return
- # self.xmlstream.thisEntity is set to the address the component
- # wants to assume.
- if not self.xmlstream.thisEntity:
- exc = error.StreamError('improper-addressing')
- self.xmlstream.sendStreamError(exc)
- return
- self.xmlstream.sendHeader()
- self.xmlstream.addOnetimeObserver('/*', self.onElement)
- def onElement(self, element):
- """
- Called on incoming XML Stanzas.
- The very first element received should be a request for handshake.
- Otherwise, the stream is dropped with a 'not-authorized' error. If a
- handshake request was received, the hash is extracted and passed to
- L{onHandshake}.
- """
- if (element.uri, element.name) == (self.namespace, 'handshake'):
- self.onHandshake(unicode(element))
- else:
- exc = error.StreamError('not-authorized')
- self.xmlstream.sendStreamError(exc)
- def onHandshake(self, handshake):
- """
- Called upon receiving the handshake request.
- This checks that the given hash in C{handshake} is equal to a
- calculated hash, responding with a handshake reply or a stream error.
- If the handshake was ok, the stream is authorized, and XML Stanzas may
- be exchanged.
- """
- calculatedHash = xmlstream.hashPassword(self.xmlstream.sid,
- unicode(self.secret))
- if handshake != calculatedHash:
- exc = error.StreamError('not-authorized', text='Invalid hash')
- self.xmlstream.sendStreamError(exc)
- else:
- self.xmlstream.send('<handshake/>')
- self.xmlstream.dispatch(self.xmlstream,
- xmlstream.STREAM_AUTHD_EVENT)
- @implementer(ijabber.IService)
- class Service(service.Service):
- """
- External server-side component service.
- """
- def componentConnected(self, xs):
- pass
- def componentDisconnected(self):
- pass
- def transportConnected(self, xs):
- pass
- def send(self, obj):
- """
- Send data over service parent's XML stream.
- @note: L{ServiceManager} maintains a queue for data sent using this
- method when there is no current established XML stream. This data is
- then sent as soon as a new stream has been established and initialized.
- Subsequently, L{componentConnected} will be called again. If this
- queueing is not desired, use C{send} on the XmlStream object (passed to
- L{componentConnected}) directly.
- @param obj: data to be sent over the XML stream. This is usually an
- object providing L{domish.IElement}, or serialized XML. See
- L{xmlstream.XmlStream} for details.
- """
- self.parent.send(obj)
- class ServiceManager(service.MultiService):
- """
- Business logic for a managed component connection to a Jabber router.
- This service maintains a single connection to a Jabber router and provides
- facilities for packet routing and transmission. Business logic modules are
- services implementing L{ijabber.IService} (like subclasses of L{Service}),
- and added as sub-service.
- """
- def __init__(self, jid, password):
- service.MultiService.__init__(self)
- # Setup defaults
- self.jabberId = jid
- self.xmlstream = None
- # Internal buffer of packets
- self._packetQueue = []
- # Setup the xmlstream factory
- self._xsFactory = componentFactory(self.jabberId, password)
- # Register some lambda functions to keep the self.xmlstream var up to
- # date
- self._xsFactory.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
- self._connected)
- self._xsFactory.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self._authd)
- self._xsFactory.addBootstrap(xmlstream.STREAM_END_EVENT,
- self._disconnected)
- # Map addBootstrap and removeBootstrap to the underlying factory -- is
- # this right? I have no clue...but it'll work for now, until i can
- # think about it more.
- self.addBootstrap = self._xsFactory.addBootstrap
- self.removeBootstrap = self._xsFactory.removeBootstrap
- def getFactory(self):
- return self._xsFactory
- def _connected(self, xs):
- self.xmlstream = xs
- for c in self:
- if ijabber.IService.providedBy(c):
- c.transportConnected(xs)
- def _authd(self, xs):
- # Flush all pending packets
- for p in self._packetQueue:
- self.xmlstream.send(p)
- self._packetQueue = []
- # Notify all child services which implement the IService interface
- for c in self:
- if ijabber.IService.providedBy(c):
- c.componentConnected(xs)
- def _disconnected(self, _):
- self.xmlstream = None
- # Notify all child services which implement
- # the IService interface
- for c in self:
- if ijabber.IService.providedBy(c):
- c.componentDisconnected()
- def send(self, obj):
- """
- Send data over the XML stream.
- When there is no established XML stream, the data is queued and sent
- out when a new XML stream has been established and initialized.
- @param obj: data to be sent over the XML stream. This is usually an
- object providing L{domish.IElement}, or serialized XML. See
- L{xmlstream.XmlStream} for details.
- """
- if self.xmlstream != None:
- self.xmlstream.send(obj)
- else:
- self._packetQueue.append(obj)
- def buildServiceManager(jid, password, strport):
- """
- Constructs a pre-built L{ServiceManager}, using the specified strport
- string.
- """
- svc = ServiceManager(jid, password)
- client_svc = jstrports.client(strport, svc.getFactory())
- client_svc.setServiceParent(svc)
- return svc
- class Router(object):
- """
- XMPP Server's Router.
- A router connects the different components of the XMPP service and routes
- messages between them based on the given routing table.
- Connected components are trusted to have correct addressing in the
- stanzas they offer for routing.
- A route destination of L{None} adds a default route. Traffic for which no
- specific route exists, will be routed to this default route.
- @since: 8.2
- @ivar routes: Routes based on the host part of JIDs. Maps host names to the
- L{EventDispatcher<utility.EventDispatcher>}s that should
- receive the traffic. A key of L{None} means the default
- route.
- @type routes: C{dict}
- """
- def __init__(self):
- self.routes = {}
- def addRoute(self, destination, xs):
- """
- Add a new route.
- The passed XML Stream C{xs} will have an observer for all stanzas
- added to route its outgoing traffic. In turn, traffic for
- C{destination} will be passed to this stream.
- @param destination: Destination of the route to be added as a host name
- or L{None} for the default route.
- @type destination: C{str} or L{None}.
- @param xs: XML Stream to register the route for.
- @type xs: L{EventDispatcher<utility.EventDispatcher>}.
- """
- self.routes[destination] = xs
- xs.addObserver('/*', self.route)
- def removeRoute(self, destination, xs):
- """
- Remove a route.
- @param destination: Destination of the route that should be removed.
- @type destination: C{str}.
- @param xs: XML Stream to remove the route for.
- @type xs: L{EventDispatcher<utility.EventDispatcher>}.
- """
- xs.removeObserver('/*', self.route)
- if (xs == self.routes[destination]):
- del self.routes[destination]
- def route(self, stanza):
- """
- Route a stanza.
- @param stanza: The stanza to be routed.
- @type stanza: L{domish.Element}.
- """
- destination = JID(stanza['to'])
- log.msg("Routing to %s: %r" % (destination.full(), stanza.toXml()))
- if destination.host in self.routes:
- self.routes[destination.host].send(stanza)
- else:
- self.routes[None].send(stanza)
- class XMPPComponentServerFactory(xmlstream.XmlStreamServerFactory):
- """
- XMPP Component Server factory.
- This factory accepts XMPP external component connections and makes
- the router service route traffic for a component's bound domain
- to that component.
- @since: 8.2
- """
- logTraffic = False
- def __init__(self, router, secret='secret'):
- self.router = router
- self.secret = secret
- def authenticatorFactory():
- return ListenComponentAuthenticator(self.secret)
- xmlstream.XmlStreamServerFactory.__init__(self, authenticatorFactory)
- self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT,
- self.onConnectionMade)
- self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT,
- self.onAuthenticated)
- self.serial = 0
- def onConnectionMade(self, xs):
- """
- Called when a component connection was made.
- This enables traffic debugging on incoming streams.
- """
- xs.serial = self.serial
- self.serial += 1
- def logDataIn(buf):
- log.msg("RECV (%d): %r" % (xs.serial, buf))
- def logDataOut(buf):
- log.msg("SEND (%d): %r" % (xs.serial, buf))
- if self.logTraffic:
- xs.rawDataInFn = logDataIn
- xs.rawDataOutFn = logDataOut
- xs.addObserver(xmlstream.STREAM_ERROR_EVENT, self.onError)
- def onAuthenticated(self, xs):
- """
- Called when a component has successfully authenticated.
- Add the component to the routing table and establish a handler
- for a closed connection.
- """
- destination = xs.thisEntity.host
- self.router.addRoute(destination, xs)
- xs.addObserver(xmlstream.STREAM_END_EVENT, self.onConnectionLost, 0,
- destination, xs)
- def onError(self, reason):
- log.err(reason, "Stream Error")
- def onConnectionLost(self, destination, xs, reason):
- self.router.removeRoute(destination, xs)
|