endpoints.py 79 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391
  1. # -*- test-case-name: twisted.internet.test.test_endpoints -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Implementations of L{IStreamServerEndpoint} and L{IStreamClientEndpoint} that
  6. wrap the L{IReactorTCP}, L{IReactorSSL}, and L{IReactorUNIX} interfaces.
  7. This also implements an extensible mini-language for describing endpoints,
  8. parsed by the L{clientFromString} and L{serverFromString} functions.
  9. @since: 10.1
  10. """
  11. from __future__ import annotations
  12. import os
  13. import re
  14. import socket
  15. import warnings
  16. from typing import Any, Iterable, Optional, Sequence, Type
  17. from unicodedata import normalize
  18. from zope.interface import directlyProvides, implementer
  19. from constantly import NamedConstant, Names
  20. from incremental import Version
  21. from twisted.internet import defer, error, fdesc, interfaces, threads
  22. from twisted.internet.abstract import isIPv6Address
  23. from twisted.internet.address import (
  24. HostnameAddress,
  25. IPv4Address,
  26. IPv6Address,
  27. _ProcessAddress,
  28. )
  29. from twisted.internet.interfaces import (
  30. IAddress,
  31. IHostnameResolver,
  32. IHostResolution,
  33. IOpenSSLClientConnectionCreator,
  34. IProtocol,
  35. IProtocolFactory,
  36. IReactorPluggableNameResolver,
  37. IReactorSocket,
  38. IResolutionReceiver,
  39. IStreamClientEndpoint,
  40. IStreamClientEndpointStringParserWithReactor,
  41. IStreamServerEndpointStringParser,
  42. )
  43. from twisted.internet.protocol import ClientFactory, Factory, ProcessProtocol, Protocol
  44. try:
  45. from twisted.internet.stdio import PipeAddress, StandardIO
  46. except ImportError:
  47. # fallback if pywin32 is not installed
  48. StandardIO = None # type: ignore[assignment,misc]
  49. PipeAddress = None # type: ignore[assignment,misc]
  50. from twisted.internet._resolver import HostResolution
  51. from twisted.internet.defer import Deferred
  52. from twisted.internet.task import LoopingCall
  53. from twisted.logger import Logger
  54. from twisted.plugin import IPlugin, getPlugins
  55. from twisted.python import deprecate, log
  56. from twisted.python.compat import _matchingString, iterbytes, nativeString
  57. from twisted.python.components import proxyForInterface
  58. from twisted.python.failure import Failure
  59. from twisted.python.filepath import FilePath
  60. from twisted.python.systemd import ListenFDs
  61. from ._idna import _idnaBytes, _idnaText
  62. try:
  63. from OpenSSL.SSL import Error as SSLError
  64. from twisted.internet.ssl import (
  65. Certificate,
  66. CertificateOptions,
  67. KeyPair,
  68. PrivateCertificate,
  69. optionsForClientTLS,
  70. trustRootFromCertificates,
  71. )
  72. from twisted.protocols.tls import TLSMemoryBIOFactory as _TLSMemoryBIOFactory
  73. except ImportError:
  74. TLSMemoryBIOFactory = None
  75. else:
  76. TLSMemoryBIOFactory = _TLSMemoryBIOFactory
  77. __all__ = [
  78. "clientFromString",
  79. "serverFromString",
  80. "TCP4ServerEndpoint",
  81. "TCP6ServerEndpoint",
  82. "TCP4ClientEndpoint",
  83. "TCP6ClientEndpoint",
  84. "UNIXServerEndpoint",
  85. "UNIXClientEndpoint",
  86. "SSL4ServerEndpoint",
  87. "SSL4ClientEndpoint",
  88. "AdoptedStreamServerEndpoint",
  89. "StandardIOEndpoint",
  90. "ProcessEndpoint",
  91. "HostnameEndpoint",
  92. "StandardErrorBehavior",
  93. "connectProtocol",
  94. "wrapClientTLS",
  95. ]
  96. class _WrappingProtocol(Protocol):
  97. """
  98. Wrap another protocol in order to notify my user when a connection has
  99. been made.
  100. """
  101. def __init__(self, connectedDeferred, wrappedProtocol):
  102. """
  103. @param connectedDeferred: The L{Deferred} that will callback
  104. with the C{wrappedProtocol} when it is connected.
  105. @param wrappedProtocol: An L{IProtocol} provider that will be
  106. connected.
  107. """
  108. self._connectedDeferred = connectedDeferred
  109. self._wrappedProtocol = wrappedProtocol
  110. for iface in [
  111. interfaces.IHalfCloseableProtocol,
  112. interfaces.IFileDescriptorReceiver,
  113. interfaces.IHandshakeListener,
  114. ]:
  115. if iface.providedBy(self._wrappedProtocol):
  116. directlyProvides(self, iface)
  117. def logPrefix(self):
  118. """
  119. Transparently pass through the wrapped protocol's log prefix.
  120. """
  121. if interfaces.ILoggingContext.providedBy(self._wrappedProtocol):
  122. return self._wrappedProtocol.logPrefix()
  123. return self._wrappedProtocol.__class__.__name__
  124. def connectionMade(self):
  125. """
  126. Connect the C{self._wrappedProtocol} to our C{self.transport} and
  127. callback C{self._connectedDeferred} with the C{self._wrappedProtocol}
  128. """
  129. self._wrappedProtocol.makeConnection(self.transport)
  130. self._connectedDeferred.callback(self._wrappedProtocol)
  131. def dataReceived(self, data):
  132. """
  133. Proxy C{dataReceived} calls to our C{self._wrappedProtocol}
  134. """
  135. return self._wrappedProtocol.dataReceived(data)
  136. def fileDescriptorReceived(self, descriptor):
  137. """
  138. Proxy C{fileDescriptorReceived} calls to our C{self._wrappedProtocol}
  139. """
  140. return self._wrappedProtocol.fileDescriptorReceived(descriptor)
  141. def connectionLost(self, reason):
  142. """
  143. Proxy C{connectionLost} calls to our C{self._wrappedProtocol}
  144. """
  145. return self._wrappedProtocol.connectionLost(reason)
  146. def readConnectionLost(self):
  147. """
  148. Proxy L{IHalfCloseableProtocol.readConnectionLost} to our
  149. C{self._wrappedProtocol}
  150. """
  151. self._wrappedProtocol.readConnectionLost()
  152. def writeConnectionLost(self):
  153. """
  154. Proxy L{IHalfCloseableProtocol.writeConnectionLost} to our
  155. C{self._wrappedProtocol}
  156. """
  157. self._wrappedProtocol.writeConnectionLost()
  158. def handshakeCompleted(self):
  159. """
  160. Proxy L{interfaces.IHandshakeListener} to our
  161. C{self._wrappedProtocol}.
  162. """
  163. self._wrappedProtocol.handshakeCompleted()
  164. class _WrappingFactory(ClientFactory):
  165. """
  166. Wrap a factory in order to wrap the protocols it builds.
  167. @ivar _wrappedFactory: A provider of I{IProtocolFactory} whose buildProtocol
  168. method will be called and whose resulting protocol will be wrapped.
  169. @ivar _onConnection: A L{Deferred} that fires when the protocol is
  170. connected
  171. @ivar _connector: A L{connector <twisted.internet.interfaces.IConnector>}
  172. that is managing the current or previous connection attempt.
  173. """
  174. # Type is wrong. See https://twistedmatrix.com/trac/ticket/10005#ticket
  175. protocol = _WrappingProtocol # type: ignore[assignment]
  176. def __init__(self, wrappedFactory: IProtocolFactory) -> None:
  177. """
  178. @param wrappedFactory: A provider of I{IProtocolFactory} whose
  179. buildProtocol method will be called and whose resulting protocol
  180. will be wrapped.
  181. """
  182. self._wrappedFactory = wrappedFactory
  183. self._onConnection: defer.Deferred[IProtocol] = defer.Deferred(
  184. canceller=self._canceller
  185. )
  186. def startedConnecting(self, connector):
  187. """
  188. A connection attempt was started. Remember the connector which started
  189. said attempt, for use later.
  190. """
  191. self._connector = connector
  192. def _canceller(self, deferred):
  193. """
  194. The outgoing connection attempt was cancelled. Fail that L{Deferred}
  195. with an L{error.ConnectingCancelledError}.
  196. @param deferred: The L{Deferred <defer.Deferred>} that was cancelled;
  197. should be the same as C{self._onConnection}.
  198. @type deferred: L{Deferred <defer.Deferred>}
  199. @note: This relies on startedConnecting having been called, so it may
  200. seem as though there's a race condition where C{_connector} may not
  201. have been set. However, using public APIs, this condition is
  202. impossible to catch, because a connection API
  203. (C{connectTCP}/C{SSL}/C{UNIX}) is always invoked before a
  204. L{_WrappingFactory}'s L{Deferred <defer.Deferred>} is returned to
  205. C{connect()}'s caller.
  206. @return: L{None}
  207. """
  208. deferred.errback(
  209. error.ConnectingCancelledError(self._connector.getDestination())
  210. )
  211. self._connector.stopConnecting()
  212. def doStart(self):
  213. """
  214. Start notifications are passed straight through to the wrapped factory.
  215. """
  216. self._wrappedFactory.doStart()
  217. def doStop(self):
  218. """
  219. Stop notifications are passed straight through to the wrapped factory.
  220. """
  221. self._wrappedFactory.doStop()
  222. def buildProtocol(self, addr):
  223. """
  224. Proxy C{buildProtocol} to our C{self._wrappedFactory} or errback the
  225. C{self._onConnection} L{Deferred} if the wrapped factory raises an
  226. exception or returns L{None}.
  227. @return: An instance of L{_WrappingProtocol} or L{None}
  228. """
  229. try:
  230. proto = self._wrappedFactory.buildProtocol(addr)
  231. if proto is None:
  232. raise error.NoProtocol()
  233. except BaseException:
  234. self._onConnection.errback()
  235. else:
  236. return self.protocol(self._onConnection, proto)
  237. def clientConnectionFailed(self, connector, reason):
  238. """
  239. Errback the C{self._onConnection} L{Deferred} when the
  240. client connection fails.
  241. """
  242. if not self._onConnection.called:
  243. self._onConnection.errback(reason)
  244. @implementer(interfaces.IStreamServerEndpoint)
  245. class StandardIOEndpoint:
  246. """
  247. A Standard Input/Output endpoint
  248. @ivar _stdio: a callable, like L{stdio.StandardIO}, which takes an
  249. L{IProtocol} provider and a C{reactor} keyword argument (interface
  250. dependent upon your platform).
  251. """
  252. _stdio = StandardIO
  253. def __init__(self, reactor):
  254. """
  255. @param reactor: The reactor for the endpoint.
  256. """
  257. self._reactor = reactor
  258. def listen(self, stdioProtocolFactory):
  259. """
  260. Implement L{IStreamServerEndpoint.listen} to listen on stdin/stdout
  261. """
  262. return defer.execute(
  263. self._stdio,
  264. stdioProtocolFactory.buildProtocol(PipeAddress()),
  265. reactor=self._reactor,
  266. )
  267. class _IProcessTransportWithConsumerAndProducer(
  268. interfaces.IProcessTransport, interfaces.IConsumer, interfaces.IPushProducer
  269. ):
  270. """
  271. An L{_IProcessTransportWithConsumerAndProducer} combines various interfaces
  272. to work around the issue that L{interfaces.IProcessTransport} is
  273. incompletely defined and doesn't specify flow-control interfaces, and that
  274. L{proxyForInterface} doesn't allow for multiple interfaces.
  275. """
  276. class _ProcessEndpointTransport(
  277. proxyForInterface( # type: ignore[misc]
  278. _IProcessTransportWithConsumerAndProducer,
  279. "_process",
  280. )
  281. ):
  282. """
  283. An L{ITransport}, L{IProcessTransport}, L{IConsumer}, and L{IPushProducer}
  284. provider for the L{IProtocol} instance passed to the process endpoint.
  285. @ivar _process: An active process transport which will be used by write
  286. methods on this object to write data to a child process.
  287. @type _process: L{interfaces.IProcessTransport} provider
  288. """
  289. class _WrapIProtocol(ProcessProtocol):
  290. """
  291. An L{IProcessProtocol} provider that wraps an L{IProtocol}.
  292. @ivar transport: A L{_ProcessEndpointTransport} provider that is hooked to
  293. the wrapped L{IProtocol} provider.
  294. @see: L{protocol.ProcessProtocol}
  295. """
  296. def __init__(self, proto, executable, errFlag):
  297. """
  298. @param proto: An L{IProtocol} provider.
  299. @param errFlag: A constant belonging to L{StandardErrorBehavior}
  300. that determines if stderr is logged or dropped.
  301. @param executable: The file name (full path) to spawn.
  302. """
  303. self.protocol = proto
  304. self.errFlag = errFlag
  305. self.executable = executable
  306. def makeConnection(self, process):
  307. """
  308. Call L{IProtocol} provider's makeConnection method with an
  309. L{ITransport} provider.
  310. @param process: An L{IProcessTransport} provider.
  311. """
  312. self.transport = _ProcessEndpointTransport(process)
  313. return self.protocol.makeConnection(self.transport)
  314. def childDataReceived(self, childFD, data):
  315. """
  316. This is called with data from the process's stdout or stderr pipes. It
  317. checks the status of the errFlag to setermine if stderr should be
  318. logged (default) or dropped.
  319. """
  320. if childFD == 1:
  321. return self.protocol.dataReceived(data)
  322. elif childFD == 2 and self.errFlag == StandardErrorBehavior.LOG:
  323. log.msg(
  324. format="Process %(executable)r wrote stderr unhandled by "
  325. "%(protocol)s: %(data)s",
  326. executable=self.executable,
  327. protocol=self.protocol,
  328. data=data,
  329. )
  330. def processEnded(self, reason):
  331. """
  332. If the process ends with L{error.ProcessDone}, this method calls the
  333. L{IProtocol} provider's L{connectionLost} with a
  334. L{error.ConnectionDone}
  335. @see: L{ProcessProtocol.processEnded}
  336. """
  337. if (reason.check(error.ProcessDone) == error.ProcessDone) and (
  338. reason.value.status == 0
  339. ):
  340. return self.protocol.connectionLost(Failure(error.ConnectionDone()))
  341. else:
  342. return self.protocol.connectionLost(reason)
  343. class StandardErrorBehavior(Names):
  344. """
  345. Constants used in ProcessEndpoint to decide what to do with stderr.
  346. @cvar LOG: Indicates that stderr is to be logged.
  347. @cvar DROP: Indicates that stderr is to be dropped (and not logged).
  348. @since: 13.1
  349. """
  350. LOG = NamedConstant()
  351. DROP = NamedConstant()
  352. @implementer(interfaces.IStreamClientEndpoint)
  353. class ProcessEndpoint:
  354. """
  355. An endpoint for child processes
  356. @ivar _spawnProcess: A hook used for testing the spawning of child process.
  357. @since: 13.1
  358. """
  359. def __init__(
  360. self,
  361. reactor,
  362. executable,
  363. args=(),
  364. env={},
  365. path=None,
  366. uid=None,
  367. gid=None,
  368. usePTY=0,
  369. childFDs=None,
  370. errFlag=StandardErrorBehavior.LOG,
  371. ):
  372. """
  373. See L{IReactorProcess.spawnProcess}.
  374. @param errFlag: Determines if stderr should be logged.
  375. @type errFlag: L{endpoints.StandardErrorBehavior}
  376. """
  377. self._reactor = reactor
  378. self._executable = executable
  379. self._args = args
  380. self._env = env
  381. self._path = path
  382. self._uid = uid
  383. self._gid = gid
  384. self._usePTY = usePTY
  385. self._childFDs = childFDs
  386. self._errFlag = errFlag
  387. self._spawnProcess = self._reactor.spawnProcess
  388. def connect(self, protocolFactory):
  389. """
  390. Implement L{IStreamClientEndpoint.connect} to launch a child process
  391. and connect it to a protocol created by C{protocolFactory}.
  392. @param protocolFactory: A factory for an L{IProtocol} provider which
  393. will be notified of all events related to the created process.
  394. """
  395. proto = protocolFactory.buildProtocol(_ProcessAddress())
  396. try:
  397. self._spawnProcess(
  398. _WrapIProtocol(proto, self._executable, self._errFlag),
  399. self._executable,
  400. self._args,
  401. self._env,
  402. self._path,
  403. self._uid,
  404. self._gid,
  405. self._usePTY,
  406. self._childFDs,
  407. )
  408. except BaseException:
  409. return defer.fail()
  410. else:
  411. return defer.succeed(proto)
  412. @implementer(interfaces.IStreamServerEndpoint)
  413. class _TCPServerEndpoint:
  414. """
  415. A TCP server endpoint interface
  416. """
  417. def __init__(self, reactor, port, backlog, interface):
  418. """
  419. @param reactor: An L{IReactorTCP} provider.
  420. @param port: The port number used for listening
  421. @type port: int
  422. @param backlog: Size of the listen queue
  423. @type backlog: int
  424. @param interface: The hostname to bind to
  425. @type interface: str
  426. """
  427. self._reactor = reactor
  428. self._port = port
  429. self._backlog = backlog
  430. self._interface = interface
  431. def listen(self, protocolFactory):
  432. """
  433. Implement L{IStreamServerEndpoint.listen} to listen on a TCP
  434. socket
  435. """
  436. return defer.execute(
  437. self._reactor.listenTCP,
  438. self._port,
  439. protocolFactory,
  440. backlog=self._backlog,
  441. interface=self._interface,
  442. )
  443. class TCP4ServerEndpoint(_TCPServerEndpoint):
  444. """
  445. Implements TCP server endpoint with an IPv4 configuration
  446. """
  447. def __init__(self, reactor, port, backlog=50, interface=""):
  448. """
  449. @param reactor: An L{IReactorTCP} provider.
  450. @param port: The port number used for listening
  451. @type port: int
  452. @param backlog: Size of the listen queue
  453. @type backlog: int
  454. @param interface: The hostname to bind to, defaults to '' (all)
  455. @type interface: str
  456. """
  457. _TCPServerEndpoint.__init__(self, reactor, port, backlog, interface)
  458. class TCP6ServerEndpoint(_TCPServerEndpoint):
  459. """
  460. Implements TCP server endpoint with an IPv6 configuration
  461. """
  462. def __init__(self, reactor, port, backlog=50, interface="::"):
  463. """
  464. @param reactor: An L{IReactorTCP} provider.
  465. @param port: The port number used for listening
  466. @type port: int
  467. @param backlog: Size of the listen queue
  468. @type backlog: int
  469. @param interface: The hostname to bind to, defaults to C{::} (all)
  470. @type interface: str
  471. """
  472. _TCPServerEndpoint.__init__(self, reactor, port, backlog, interface)
  473. @implementer(interfaces.IStreamClientEndpoint)
  474. class TCP4ClientEndpoint:
  475. """
  476. TCP client endpoint with an IPv4 configuration.
  477. """
  478. def __init__(
  479. self,
  480. reactor: Any,
  481. host: str,
  482. port: int,
  483. timeout: float = 30,
  484. bindAddress: str | tuple[bytes | str, int] | None = None,
  485. ) -> None:
  486. """
  487. @param reactor: An L{IReactorTCP} provider
  488. @param host: A hostname, used when connecting
  489. @type host: str
  490. @param port: The port number, used when connecting
  491. @type port: int
  492. @param timeout: The number of seconds to wait before assuming the
  493. connection has failed.
  494. @type timeout: L{float} or L{int}
  495. @param bindAddress: A (host, port) tuple of local address to bind to,
  496. or None.
  497. @type bindAddress: tuple
  498. """
  499. self._reactor = reactor
  500. self._host = host
  501. self._port = port
  502. self._timeout = timeout
  503. self._bindAddress = bindAddress
  504. def connect(self, protocolFactory: IProtocolFactory) -> Deferred[IProtocol]:
  505. """
  506. Implement L{IStreamClientEndpoint.connect} to connect via TCP.
  507. """
  508. try:
  509. wf = _WrappingFactory(protocolFactory)
  510. self._reactor.connectTCP(
  511. self._host,
  512. self._port,
  513. wf,
  514. timeout=self._timeout,
  515. bindAddress=self._bindAddress,
  516. )
  517. return wf._onConnection
  518. except BaseException:
  519. return defer.fail()
  520. @implementer(interfaces.IStreamClientEndpoint)
  521. class TCP6ClientEndpoint:
  522. """
  523. TCP client endpoint with an IPv6 configuration.
  524. @ivar _getaddrinfo: A hook used for testing name resolution.
  525. @ivar _deferToThread: A hook used for testing deferToThread.
  526. @ivar _GAI_ADDRESS: Index of the address portion in result of
  527. getaddrinfo to be used.
  528. @ivar _GAI_ADDRESS_HOST: Index of the actual host-address in the
  529. 5-tuple L{_GAI_ADDRESS}.
  530. """
  531. _getaddrinfo = staticmethod(socket.getaddrinfo)
  532. _deferToThread = staticmethod(threads.deferToThread)
  533. _GAI_ADDRESS = 4
  534. _GAI_ADDRESS_HOST = 0
  535. def __init__(self, reactor, host, port, timeout=30, bindAddress=None):
  536. """
  537. @param host: An IPv6 address literal or a hostname with an
  538. IPv6 address
  539. @see: L{twisted.internet.interfaces.IReactorTCP.connectTCP}
  540. """
  541. self._reactor = reactor
  542. self._host = host
  543. self._port = port
  544. self._timeout = timeout
  545. self._bindAddress = bindAddress
  546. def connect(self, protocolFactory):
  547. """
  548. Implement L{IStreamClientEndpoint.connect} to connect via TCP,
  549. once the hostname resolution is done.
  550. """
  551. if isIPv6Address(self._host):
  552. d = self._resolvedHostConnect(self._host, protocolFactory)
  553. else:
  554. d = self._nameResolution(self._host)
  555. d.addCallback(
  556. lambda result: result[0][self._GAI_ADDRESS][self._GAI_ADDRESS_HOST]
  557. )
  558. d.addCallback(self._resolvedHostConnect, protocolFactory)
  559. return d
  560. def _nameResolution(self, host):
  561. """
  562. Resolve the hostname string into a tuple containing the host
  563. IPv6 address.
  564. """
  565. return self._deferToThread(self._getaddrinfo, host, 0, socket.AF_INET6)
  566. def _resolvedHostConnect(
  567. self, resolvedHost: str, protocolFactory: IProtocolFactory
  568. ) -> Deferred[IProtocol]:
  569. """
  570. Connect to the server using the resolved hostname.
  571. """
  572. try:
  573. wf = _WrappingFactory(protocolFactory)
  574. self._reactor.connectTCP(
  575. resolvedHost,
  576. self._port,
  577. wf,
  578. timeout=self._timeout,
  579. bindAddress=self._bindAddress,
  580. )
  581. return wf._onConnection
  582. except BaseException:
  583. return defer.fail()
  584. @implementer(IHostnameResolver)
  585. class _SimpleHostnameResolver:
  586. """
  587. An L{IHostnameResolver} provider that invokes a provided callable
  588. to resolve hostnames.
  589. @ivar _nameResolution: the callable L{resolveHostName} invokes to
  590. resolve hostnames.
  591. @type _nameResolution: A L{callable} that accepts two arguments:
  592. the host to resolve and the port number to include in the
  593. result.
  594. """
  595. _log = Logger()
  596. def __init__(self, nameResolution):
  597. """
  598. Create a L{_SimpleHostnameResolver} instance.
  599. """
  600. self._nameResolution = nameResolution
  601. def resolveHostName(
  602. self,
  603. resolutionReceiver: IResolutionReceiver,
  604. hostName: str,
  605. portNumber: int = 0,
  606. addressTypes: Optional[Sequence[Type[IAddress]]] = None,
  607. transportSemantics: str = "TCP",
  608. ) -> IHostResolution:
  609. """
  610. Initiate a hostname resolution.
  611. @param resolutionReceiver: an object that will receive each resolved
  612. address as it arrives.
  613. @type resolutionReceiver: L{IResolutionReceiver}
  614. @param hostName: see interface
  615. @param portNumber: see interface
  616. @param addressTypes: Ignored in this implementation.
  617. @param transportSemantics: Ignored in this implementation.
  618. @return: The resolution in progress.
  619. @rtype: L{IResolutionReceiver}
  620. """
  621. resolution = HostResolution(hostName)
  622. resolutionReceiver.resolutionBegan(resolution)
  623. d = self._nameResolution(hostName, portNumber)
  624. def cbDeliver(gairesult):
  625. for family, socktype, proto, canonname, sockaddr in gairesult:
  626. if family == socket.AF_INET6:
  627. resolutionReceiver.addressResolved(IPv6Address("TCP", *sockaddr))
  628. elif family == socket.AF_INET:
  629. resolutionReceiver.addressResolved(IPv4Address("TCP", *sockaddr))
  630. def ebLog(error):
  631. self._log.failure(
  632. "while looking up {name} with {callable}",
  633. error,
  634. name=hostName,
  635. callable=self._nameResolution,
  636. )
  637. d.addCallback(cbDeliver)
  638. d.addErrback(ebLog)
  639. d.addBoth(lambda ignored: resolutionReceiver.resolutionComplete())
  640. return resolution
  641. @implementer(interfaces.IStreamClientEndpoint)
  642. class HostnameEndpoint:
  643. """
  644. A name-based endpoint that connects to the fastest amongst the resolved
  645. host addresses.
  646. @cvar _DEFAULT_ATTEMPT_DELAY: The default time to use between attempts, in
  647. seconds, when no C{attemptDelay} is given to
  648. L{HostnameEndpoint.__init__}.
  649. @ivar _hostText: the textual representation of the hostname passed to the
  650. constructor. Used to pass to the reactor's hostname resolver.
  651. @type _hostText: L{unicode}
  652. @ivar _hostBytes: the encoded bytes-representation of the hostname passed
  653. to the constructor. Used to construct the L{HostnameAddress}
  654. associated with this endpoint.
  655. @type _hostBytes: L{bytes}
  656. @ivar _badHostname: a flag - hopefully false! - indicating that an invalid
  657. hostname was passed to the constructor. This might be a textual
  658. hostname that isn't valid IDNA, or non-ASCII bytes.
  659. @type _badHostname: L{bool}
  660. """
  661. _getaddrinfo = staticmethod(socket.getaddrinfo)
  662. _deferToThread = staticmethod(threads.deferToThread)
  663. _DEFAULT_ATTEMPT_DELAY = 0.3
  664. def __init__(
  665. self,
  666. reactor: Any,
  667. host: str | bytes,
  668. port: int,
  669. timeout: float = 30,
  670. bindAddress: bytes | str | tuple[bytes | str, int] | None = None,
  671. attemptDelay: float | None = None,
  672. ) -> None:
  673. """
  674. Create a L{HostnameEndpoint}.
  675. @param reactor: The reactor to use for connections and delayed calls.
  676. @type reactor: provider of L{IReactorTCP}, L{IReactorTime} and either
  677. L{IReactorPluggableNameResolver} or L{IReactorPluggableResolver}.
  678. @param host: A hostname to connect to.
  679. @type host: L{bytes} or L{str}
  680. @param port: The port number to connect to.
  681. @type port: L{int}
  682. @param timeout: For each individual connection attempt, the number of
  683. seconds to wait before assuming the connection has failed.
  684. @type timeout: L{float} or L{int}
  685. @param bindAddress: The client socket normally uses whatever
  686. local interface (eth0, en0, lo, etc) is best suited for the
  687. target address, and a randomly-assigned port. This argument
  688. allows that local address/port to be overridden. Providing
  689. just an address (as a str) will bind the client socket to
  690. whichever interface is assigned that address. Providing a
  691. tuple of (str, int) will bind it to both an interface and a
  692. specific local port. To bind the port, but leave the
  693. interface unbound, use a tuple of ("", port), or ("0.0.0.0",
  694. port) for IPv4, or ("::0", port) for IPv6. To leave both
  695. interface and port unbound, just use None.
  696. @type bindAddress: L{str}, L{tuple}, or None
  697. @param attemptDelay: The number of seconds to delay between connection
  698. attempts.
  699. @type attemptDelay: L{float}
  700. @see: L{twisted.internet.interfaces.IReactorTCP.connectTCP}
  701. """
  702. self._reactor = reactor
  703. self._nameResolver = self._getNameResolverAndMaybeWarn(reactor)
  704. [self._badHostname, self._hostBytes, self._hostText] = self._hostAsBytesAndText(
  705. host
  706. )
  707. self._port = port
  708. self._timeout = timeout
  709. if bindAddress is not None:
  710. if isinstance(bindAddress, (bytes, str)):
  711. bindAddress = (bindAddress, 0)
  712. if isinstance(bindAddress[0], bytes):
  713. bindAddress = (bindAddress[0].decode(), bindAddress[1])
  714. self._bindAddress = bindAddress
  715. if attemptDelay is None:
  716. attemptDelay = self._DEFAULT_ATTEMPT_DELAY
  717. self._attemptDelay = attemptDelay
  718. def __repr__(self) -> str:
  719. """
  720. Produce a string representation of the L{HostnameEndpoint}.
  721. @return: A L{str}
  722. """
  723. host = (
  724. # It the hostname is bad, use the backslash-encoded version of the
  725. # string passed to the constructor, which is already a string.
  726. self._hostText
  727. if self._badHostname
  728. else (
  729. # Add some square brackets if it's an IPv6 address.
  730. f"[{self._hostText}]"
  731. if isIPv6Address(self._hostText)
  732. # Convert the bytes representation to a native string to ensure
  733. # that we display the punycoded version of the hostname, which is
  734. # more useful than any IDN version as it can be easily copy-pasted
  735. # into debugging tools.
  736. else self._hostBytes.decode("ascii")
  737. )
  738. )
  739. return f"<HostnameEndpoint {host}:{self._port}>"
  740. def _getNameResolverAndMaybeWarn(self, reactor: object) -> IHostnameResolver:
  741. """
  742. Retrieve a C{nameResolver} callable and warn the caller's
  743. caller that using a reactor which doesn't provide
  744. L{IReactorPluggableNameResolver} is deprecated.
  745. @param reactor: The reactor to check.
  746. @return: A L{IHostnameResolver} provider.
  747. """
  748. if not IReactorPluggableNameResolver.providedBy(reactor):
  749. warningString = deprecate.getDeprecationWarningString(
  750. reactor.__class__,
  751. Version("Twisted", 17, 5, 0),
  752. format=(
  753. "Passing HostnameEndpoint a reactor that does not"
  754. " provide IReactorPluggableNameResolver (%(fqpn)s)"
  755. " was deprecated in %(version)s"
  756. ),
  757. replacement=(
  758. "a reactor that provides" " IReactorPluggableNameResolver"
  759. ),
  760. )
  761. warnings.warn(warningString, DeprecationWarning, stacklevel=3)
  762. return _SimpleHostnameResolver(self._fallbackNameResolution)
  763. return reactor.nameResolver
  764. @staticmethod
  765. def _hostAsBytesAndText(host: bytes | str) -> tuple[bool, bytes, str]:
  766. """
  767. For various reasons (documented in the C{@ivar}'s in the class
  768. docstring) we need both a textual and a binary representation of the
  769. hostname given to the constructor. For compatibility and convenience,
  770. we accept both textual and binary representations of the hostname, save
  771. the form that was passed, and convert into the other form. This is
  772. mostly just because L{HostnameAddress} chose somewhat poorly to define
  773. its attribute as bytes; hopefully we can find a compatible way to clean
  774. this up in the future and just operate in terms of text internally.
  775. @param host: A hostname to convert.
  776. @return: a 3-tuple of C{(invalid, bytes, text)} where C{invalid} is a
  777. boolean indicating the validity of the hostname, C{bytes} is a
  778. binary representation of C{host}, and C{text} is a textual
  779. representation of C{host}.
  780. """
  781. invalid = False
  782. if isinstance(host, bytes):
  783. hostBytes = host
  784. try:
  785. hostText = _idnaText(hostBytes)
  786. except UnicodeError:
  787. hostText = hostBytes.decode("charmap")
  788. if not isIPv6Address(hostText):
  789. invalid = True
  790. else:
  791. hostText = normalize("NFC", host)
  792. if isIPv6Address(hostText):
  793. hostBytes = hostText.encode("ascii")
  794. else:
  795. try:
  796. hostBytes = _idnaBytes(hostText)
  797. except UnicodeError:
  798. invalid = True
  799. if invalid:
  800. hostBytes = hostText.encode("ascii", "backslashreplace")
  801. hostText = hostBytes.decode("ascii")
  802. return invalid, hostBytes, hostText
  803. def connect(self, protocolFactory: IProtocolFactory) -> Deferred[IProtocol]:
  804. """
  805. Attempts a connection to each resolved address, and returns a
  806. connection which is established first.
  807. @param protocolFactory: The protocol factory whose protocol
  808. will be connected.
  809. @type protocolFactory:
  810. L{IProtocolFactory<twisted.internet.interfaces.IProtocolFactory>}
  811. @return: A L{Deferred} that fires with the connected protocol
  812. or fails a connection-related error.
  813. """
  814. if self._badHostname:
  815. return defer.fail(ValueError(f"invalid hostname: {self._hostText}"))
  816. resolved: Deferred[list[IAddress]] = Deferred()
  817. addresses: list[IAddress] = []
  818. @implementer(IResolutionReceiver)
  819. class EndpointReceiver:
  820. @staticmethod
  821. def resolutionBegan(resolutionInProgress: IHostResolution) -> None:
  822. pass
  823. @staticmethod
  824. def addressResolved(address: IAddress) -> None:
  825. addresses.append(address)
  826. @staticmethod
  827. def resolutionComplete() -> None:
  828. resolved.callback(addresses)
  829. self._nameResolver.resolveHostName(
  830. EndpointReceiver(), self._hostText, portNumber=self._port
  831. )
  832. resolved.addErrback(
  833. lambda ignored: defer.fail(
  834. error.DNSLookupError(f"Couldn't find the hostname '{self._hostText}'")
  835. )
  836. )
  837. def resolvedAddressesToEndpoints(
  838. addresses: Iterable[IAddress],
  839. ) -> Iterable[TCP6ClientEndpoint | TCP4ClientEndpoint]:
  840. # Yield an endpoint for every address resolved from the name.
  841. for eachAddress in addresses:
  842. if isinstance(eachAddress, IPv6Address):
  843. yield TCP6ClientEndpoint(
  844. self._reactor,
  845. eachAddress.host,
  846. eachAddress.port,
  847. self._timeout,
  848. self._bindAddress,
  849. )
  850. if isinstance(eachAddress, IPv4Address):
  851. yield TCP4ClientEndpoint(
  852. self._reactor,
  853. eachAddress.host,
  854. eachAddress.port,
  855. self._timeout,
  856. self._bindAddress,
  857. )
  858. iterd = resolved.addCallback(resolvedAddressesToEndpoints)
  859. listd = iterd.addCallback(list)
  860. def _canceller(cancelled: Deferred[IProtocol]) -> None:
  861. # This canceller must remain defined outside of
  862. # `startConnectionAttempts`, because Deferred should not
  863. # participate in cycles with their cancellers; that would create a
  864. # potentially problematic circular reference and possibly
  865. # gc.garbage.
  866. cancelled.errback(
  867. error.ConnectingCancelledError(
  868. HostnameAddress(self._hostBytes, self._port)
  869. )
  870. )
  871. def startConnectionAttempts(
  872. endpoints: list[TCP6ClientEndpoint | TCP4ClientEndpoint],
  873. ) -> Deferred[IProtocol]:
  874. """
  875. Given a sequence of endpoints obtained via name resolution, start
  876. connecting to a new one every C{self._attemptDelay} seconds until
  877. one of the connections succeeds, all of them fail, or the attempt
  878. is cancelled.
  879. @param endpoints: a list of all the endpoints we might try to
  880. connect to, as determined by name resolution.
  881. @type endpoints: L{list} of L{IStreamServerEndpoint}
  882. @return: a Deferred that fires with the result of the
  883. C{endpoint.connect} method that completes the fastest, or fails
  884. with the first connection error it encountered if none of them
  885. succeed.
  886. @rtype: L{Deferred} failing with L{error.ConnectingCancelledError}
  887. or firing with L{IProtocol}
  888. """
  889. if not endpoints:
  890. raise error.DNSLookupError(
  891. f"no results for hostname lookup: {self._hostText}"
  892. )
  893. iterEndpoints = iter(endpoints)
  894. pending: list[defer.Deferred[IProtocol]] = []
  895. failures: list[Failure] = []
  896. winner: defer.Deferred[IProtocol] = defer.Deferred(canceller=_canceller)
  897. checkDoneCompleted = False
  898. checkDoneEndpointsLeft = True
  899. def checkDone() -> None:
  900. if pending or checkDoneCompleted or checkDoneEndpointsLeft:
  901. return
  902. winner.errback(failures.pop())
  903. @LoopingCall
  904. def iterateEndpoint() -> None:
  905. nonlocal checkDoneEndpointsLeft
  906. endpoint = next(iterEndpoints, None)
  907. if endpoint is None:
  908. # The list of endpoints ends.
  909. checkDoneEndpointsLeft = False
  910. checkDone()
  911. return
  912. eachAttempt = endpoint.connect(protocolFactory)
  913. pending.append(eachAttempt)
  914. def noLongerPending(result: IProtocol | Failure) -> IProtocol | Failure:
  915. pending.remove(eachAttempt)
  916. return result
  917. successState = eachAttempt.addBoth(noLongerPending)
  918. def succeeded(result: IProtocol) -> None:
  919. winner.callback(result)
  920. successState.addCallback(succeeded)
  921. def failed(reason):
  922. failures.append(reason)
  923. checkDone()
  924. successState.addErrback(failed)
  925. iterateEndpoint.clock = self._reactor
  926. iterateEndpoint.start(self._attemptDelay)
  927. def cancelRemainingPending(
  928. result: IProtocol | Failure,
  929. ) -> IProtocol | Failure:
  930. nonlocal checkDoneCompleted
  931. checkDoneCompleted = True
  932. for remaining in pending[:]:
  933. remaining.cancel()
  934. if iterateEndpoint.running:
  935. iterateEndpoint.stop()
  936. return result
  937. return winner.addBoth(cancelRemainingPending)
  938. return listd.addCallback(startConnectionAttempts)
  939. def _fallbackNameResolution(self, host, port):
  940. """
  941. Resolve the hostname string into a tuple containing the host
  942. address. This is method is only used when the reactor does
  943. not provide L{IReactorPluggableNameResolver}.
  944. @param host: A unicode hostname to resolve.
  945. @param port: The port to include in the resolution.
  946. @return: A L{Deferred} that fires with L{_getaddrinfo}'s
  947. return value.
  948. """
  949. return self._deferToThread(self._getaddrinfo, host, port, 0, socket.SOCK_STREAM)
  950. @implementer(interfaces.IStreamServerEndpoint)
  951. class SSL4ServerEndpoint:
  952. """
  953. SSL secured TCP server endpoint with an IPv4 configuration.
  954. """
  955. def __init__(self, reactor, port, sslContextFactory, backlog=50, interface=""):
  956. """
  957. @param reactor: An L{IReactorSSL} provider.
  958. @param port: The port number used for listening
  959. @type port: int
  960. @param sslContextFactory: An instance of
  961. L{interfaces.IOpenSSLContextFactory}.
  962. @param backlog: Size of the listen queue
  963. @type backlog: int
  964. @param interface: The hostname to bind to, defaults to '' (all)
  965. @type interface: str
  966. """
  967. self._reactor = reactor
  968. self._port = port
  969. self._sslContextFactory = sslContextFactory
  970. self._backlog = backlog
  971. self._interface = interface
  972. def listen(self, protocolFactory):
  973. """
  974. Implement L{IStreamServerEndpoint.listen} to listen for SSL on a
  975. TCP socket.
  976. """
  977. return defer.execute(
  978. self._reactor.listenSSL,
  979. self._port,
  980. protocolFactory,
  981. contextFactory=self._sslContextFactory,
  982. backlog=self._backlog,
  983. interface=self._interface,
  984. )
  985. @implementer(interfaces.IStreamClientEndpoint)
  986. class SSL4ClientEndpoint:
  987. """
  988. SSL secured TCP client endpoint with an IPv4 configuration
  989. """
  990. def __init__(
  991. self, reactor, host, port, sslContextFactory, timeout=30, bindAddress=None
  992. ):
  993. """
  994. @param reactor: An L{IReactorSSL} provider.
  995. @param host: A hostname, used when connecting
  996. @type host: str
  997. @param port: The port number, used when connecting
  998. @type port: int
  999. @param sslContextFactory: SSL Configuration information as an instance
  1000. of L{interfaces.IOpenSSLContextFactory}.
  1001. @param timeout: Number of seconds to wait before assuming the
  1002. connection has failed.
  1003. @type timeout: int
  1004. @param bindAddress: A (host, port) tuple of local address to bind to,
  1005. or None.
  1006. @type bindAddress: tuple
  1007. """
  1008. self._reactor = reactor
  1009. self._host = host
  1010. self._port = port
  1011. self._sslContextFactory = sslContextFactory
  1012. self._timeout = timeout
  1013. self._bindAddress = bindAddress
  1014. def connect(self, protocolFactory):
  1015. """
  1016. Implement L{IStreamClientEndpoint.connect} to connect with SSL over
  1017. TCP.
  1018. """
  1019. try:
  1020. wf = _WrappingFactory(protocolFactory)
  1021. self._reactor.connectSSL(
  1022. self._host,
  1023. self._port,
  1024. wf,
  1025. self._sslContextFactory,
  1026. timeout=self._timeout,
  1027. bindAddress=self._bindAddress,
  1028. )
  1029. return wf._onConnection
  1030. except BaseException:
  1031. return defer.fail()
  1032. @implementer(interfaces.IStreamServerEndpoint)
  1033. class UNIXServerEndpoint:
  1034. """
  1035. UnixSocket server endpoint.
  1036. """
  1037. def __init__(self, reactor, address, backlog=50, mode=0o666, wantPID=0):
  1038. """
  1039. @param reactor: An L{IReactorUNIX} provider.
  1040. @param address: The path to the Unix socket file, used when listening
  1041. @param backlog: number of connections to allow in backlog.
  1042. @param mode: mode to set on the unix socket. This parameter is
  1043. deprecated. Permissions should be set on the directory which
  1044. contains the UNIX socket.
  1045. @param wantPID: If True, create a pidfile for the socket.
  1046. """
  1047. self._reactor = reactor
  1048. self._address = address
  1049. self._backlog = backlog
  1050. self._mode = mode
  1051. self._wantPID = wantPID
  1052. def listen(self, protocolFactory):
  1053. """
  1054. Implement L{IStreamServerEndpoint.listen} to listen on a UNIX socket.
  1055. """
  1056. return defer.execute(
  1057. self._reactor.listenUNIX,
  1058. self._address,
  1059. protocolFactory,
  1060. backlog=self._backlog,
  1061. mode=self._mode,
  1062. wantPID=self._wantPID,
  1063. )
  1064. @implementer(interfaces.IStreamClientEndpoint)
  1065. class UNIXClientEndpoint:
  1066. """
  1067. UnixSocket client endpoint.
  1068. """
  1069. def __init__(self, reactor, path, timeout=30, checkPID=0):
  1070. """
  1071. @param reactor: An L{IReactorUNIX} provider.
  1072. @param path: The path to the Unix socket file, used when connecting
  1073. @type path: str
  1074. @param timeout: Number of seconds to wait before assuming the
  1075. connection has failed.
  1076. @type timeout: int
  1077. @param checkPID: If True, check for a pid file to verify that a server
  1078. is listening.
  1079. @type checkPID: bool
  1080. """
  1081. self._reactor = reactor
  1082. self._path = path
  1083. self._timeout = timeout
  1084. self._checkPID = checkPID
  1085. def connect(self, protocolFactory):
  1086. """
  1087. Implement L{IStreamClientEndpoint.connect} to connect via a
  1088. UNIX Socket
  1089. """
  1090. try:
  1091. wf = _WrappingFactory(protocolFactory)
  1092. self._reactor.connectUNIX(
  1093. self._path, wf, timeout=self._timeout, checkPID=self._checkPID
  1094. )
  1095. return wf._onConnection
  1096. except BaseException:
  1097. return defer.fail()
  1098. @implementer(interfaces.IStreamServerEndpoint)
  1099. class AdoptedStreamServerEndpoint:
  1100. """
  1101. An endpoint for listening on a file descriptor initialized outside of
  1102. Twisted.
  1103. @ivar _used: A C{bool} indicating whether this endpoint has been used to
  1104. listen with a factory yet. C{True} if so.
  1105. """
  1106. _close = os.close
  1107. _setNonBlocking = staticmethod(fdesc.setNonBlocking)
  1108. def __init__(self, reactor, fileno, addressFamily):
  1109. """
  1110. @param reactor: An L{IReactorSocket} provider.
  1111. @param fileno: An integer file descriptor corresponding to a listening
  1112. I{SOCK_STREAM} socket.
  1113. @param addressFamily: The address family of the socket given by
  1114. C{fileno}.
  1115. """
  1116. self.reactor = reactor
  1117. self.fileno = fileno
  1118. self.addressFamily = addressFamily
  1119. self._used = False
  1120. def listen(self, factory):
  1121. """
  1122. Implement L{IStreamServerEndpoint.listen} to start listening on, and
  1123. then close, C{self._fileno}.
  1124. """
  1125. if self._used:
  1126. return defer.fail(error.AlreadyListened())
  1127. self._used = True
  1128. try:
  1129. self._setNonBlocking(self.fileno)
  1130. port = self.reactor.adoptStreamPort(
  1131. self.fileno, self.addressFamily, factory
  1132. )
  1133. self._close(self.fileno)
  1134. except BaseException:
  1135. return defer.fail()
  1136. return defer.succeed(port)
  1137. def _parseTCP(factory, port, interface="", backlog=50):
  1138. """
  1139. Internal parser function for L{_parseServer} to convert the string
  1140. arguments for a TCP(IPv4) stream endpoint into the structured arguments.
  1141. @param factory: the protocol factory being parsed, or L{None}. (This was a
  1142. leftover argument from when this code was in C{strports}, and is now
  1143. mostly None and unused.)
  1144. @type factory: L{IProtocolFactory} or L{None}
  1145. @param port: the integer port number to bind
  1146. @type port: C{str}
  1147. @param interface: the interface IP to listen on
  1148. @param backlog: the length of the listen queue
  1149. @type backlog: C{str}
  1150. @return: a 2-tuple of (args, kwargs), describing the parameters to
  1151. L{IReactorTCP.listenTCP} (or, modulo argument 2, the factory, arguments
  1152. to L{TCP4ServerEndpoint}.
  1153. """
  1154. return (int(port), factory), {"interface": interface, "backlog": int(backlog)}
  1155. def _parseUNIX(factory, address, mode="666", backlog=50, lockfile=True):
  1156. """
  1157. Internal parser function for L{_parseServer} to convert the string
  1158. arguments for a UNIX (AF_UNIX/SOCK_STREAM) stream endpoint into the
  1159. structured arguments.
  1160. @param factory: the protocol factory being parsed, or L{None}. (This was a
  1161. leftover argument from when this code was in C{strports}, and is now
  1162. mostly None and unused.)
  1163. @type factory: L{IProtocolFactory} or L{None}
  1164. @param address: the pathname of the unix socket
  1165. @type address: C{str}
  1166. @param backlog: the length of the listen queue
  1167. @type backlog: C{str}
  1168. @param lockfile: A string '0' or '1', mapping to True and False
  1169. respectively. See the C{wantPID} argument to C{listenUNIX}
  1170. @return: a 2-tuple of (args, kwargs), describing the parameters to
  1171. L{twisted.internet.interfaces.IReactorUNIX.listenUNIX} (or,
  1172. modulo argument 2, the factory, arguments to L{UNIXServerEndpoint}.
  1173. """
  1174. return (
  1175. (address, factory),
  1176. {"mode": int(mode, 8), "backlog": int(backlog), "wantPID": bool(int(lockfile))},
  1177. )
  1178. def _parseSSL(
  1179. factory,
  1180. port,
  1181. privateKey="server.pem",
  1182. certKey=None,
  1183. sslmethod=None,
  1184. interface="",
  1185. backlog=50,
  1186. extraCertChain=None,
  1187. dhParameters=None,
  1188. ):
  1189. """
  1190. Internal parser function for L{_parseServer} to convert the string
  1191. arguments for an SSL (over TCP/IPv4) stream endpoint into the structured
  1192. arguments.
  1193. @param factory: the protocol factory being parsed, or L{None}. (This was a
  1194. leftover argument from when this code was in C{strports}, and is now
  1195. mostly None and unused.)
  1196. @type factory: L{IProtocolFactory} or L{None}
  1197. @param port: the integer port number to bind
  1198. @type port: C{str}
  1199. @param interface: the interface IP to listen on
  1200. @param backlog: the length of the listen queue
  1201. @type backlog: C{str}
  1202. @param privateKey: The file name of a PEM format private key file.
  1203. @type privateKey: C{str}
  1204. @param certKey: The file name of a PEM format certificate file.
  1205. @type certKey: C{str}
  1206. @param sslmethod: The string name of an SSL method, based on the name of a
  1207. constant in C{OpenSSL.SSL}.
  1208. @type sslmethod: C{str}
  1209. @param extraCertChain: The path of a file containing one or more
  1210. certificates in PEM format that establish the chain from a root CA to
  1211. the CA that signed your C{certKey}.
  1212. @type extraCertChain: L{str}
  1213. @param dhParameters: The file name of a file containing parameters that are
  1214. required for Diffie-Hellman key exchange. If this is not specified,
  1215. the forward secret C{DHE} ciphers aren't available for servers.
  1216. @type dhParameters: L{str}
  1217. @return: a 2-tuple of (args, kwargs), describing the parameters to
  1218. L{IReactorSSL.listenSSL} (or, modulo argument 2, the factory, arguments
  1219. to L{SSL4ServerEndpoint}.
  1220. """
  1221. from twisted.internet import ssl
  1222. if certKey is None:
  1223. certKey = privateKey
  1224. kw = {}
  1225. if sslmethod is not None:
  1226. kw["method"] = getattr(ssl.SSL, sslmethod)
  1227. certPEM = FilePath(certKey).getContent()
  1228. keyPEM = FilePath(privateKey).getContent()
  1229. privateCertificate = ssl.PrivateCertificate.loadPEM(certPEM + b"\n" + keyPEM)
  1230. if extraCertChain is not None:
  1231. matches = re.findall(
  1232. r"(-----BEGIN CERTIFICATE-----\n.+?\n-----END CERTIFICATE-----)",
  1233. nativeString(FilePath(extraCertChain).getContent()),
  1234. flags=re.DOTALL,
  1235. )
  1236. chainCertificates = [
  1237. ssl.Certificate.loadPEM(chainCertPEM).original for chainCertPEM in matches
  1238. ]
  1239. if not chainCertificates:
  1240. raise ValueError(
  1241. "Specified chain file '%s' doesn't contain any valid "
  1242. "certificates in PEM format." % (extraCertChain,)
  1243. )
  1244. else:
  1245. chainCertificates = None
  1246. if dhParameters is not None:
  1247. dhParameters = ssl.DiffieHellmanParameters.fromFile(
  1248. FilePath(dhParameters),
  1249. )
  1250. cf = ssl.CertificateOptions(
  1251. privateKey=privateCertificate.privateKey.original,
  1252. certificate=privateCertificate.original,
  1253. extraCertChain=chainCertificates,
  1254. dhParameters=dhParameters,
  1255. **kw,
  1256. )
  1257. return ((int(port), factory, cf), {"interface": interface, "backlog": int(backlog)})
  1258. @implementer(IPlugin, IStreamServerEndpointStringParser)
  1259. class _StandardIOParser:
  1260. """
  1261. Stream server endpoint string parser for the Standard I/O type.
  1262. @ivar prefix: See L{IStreamServerEndpointStringParser.prefix}.
  1263. """
  1264. prefix = "stdio"
  1265. def _parseServer(self, reactor):
  1266. """
  1267. Internal parser function for L{_parseServer} to convert the string
  1268. arguments into structured arguments for the L{StandardIOEndpoint}
  1269. @param reactor: Reactor for the endpoint
  1270. """
  1271. return StandardIOEndpoint(reactor)
  1272. def parseStreamServer(self, reactor, *args, **kwargs):
  1273. # Redirects to another function (self._parseServer), tricks zope.interface
  1274. # into believing the interface is correctly implemented.
  1275. return self._parseServer(reactor)
  1276. @implementer(IPlugin, IStreamServerEndpointStringParser)
  1277. class _SystemdParser:
  1278. """
  1279. Stream server endpoint string parser for the I{systemd} endpoint type.
  1280. @ivar prefix: See L{IStreamServerEndpointStringParser.prefix}.
  1281. @ivar _sddaemon: A L{ListenFDs} instance used to translate an index into an
  1282. actual file descriptor.
  1283. """
  1284. _sddaemon = ListenFDs.fromEnvironment()
  1285. prefix = "systemd"
  1286. def _parseServer(
  1287. self,
  1288. reactor: IReactorSocket,
  1289. domain: str,
  1290. index: Optional[str] = None,
  1291. name: Optional[str] = None,
  1292. ) -> AdoptedStreamServerEndpoint:
  1293. """
  1294. Internal parser function for L{_parseServer} to convert the string
  1295. arguments for a systemd server endpoint into structured arguments for
  1296. L{AdoptedStreamServerEndpoint}.
  1297. @param reactor: An L{IReactorSocket} provider.
  1298. @param domain: The domain (or address family) of the socket inherited
  1299. from systemd. This is a string like C{"INET"} or C{"UNIX"}, ie
  1300. the name of an address family from the L{socket} module, without
  1301. the C{"AF_"} prefix.
  1302. @param index: If given, the decimal representation of an integer
  1303. giving the offset into the list of file descriptors inherited from
  1304. systemd. Since the order of descriptors received from systemd is
  1305. hard to predict, this option should only be used if only one
  1306. descriptor is being inherited. Even in that case, C{name} is
  1307. probably a better idea. Either C{index} or C{name} must be given.
  1308. @param name: If given, the name (as defined by C{FileDescriptorName}
  1309. in the C{[Socket]} section of a systemd service definition) of an
  1310. inherited file descriptor. Either C{index} or C{name} must be
  1311. given.
  1312. @return: An L{AdoptedStreamServerEndpoint} which will adopt the
  1313. inherited listening port when it is used to listen.
  1314. """
  1315. if (index is None) == (name is None):
  1316. raise ValueError("Specify exactly one of descriptor index or name")
  1317. if index is not None:
  1318. fileno = self._sddaemon.inheritedDescriptors()[int(index)]
  1319. else:
  1320. assert name is not None
  1321. fileno = self._sddaemon.inheritedNamedDescriptors()[name]
  1322. addressFamily = getattr(socket, "AF_" + domain)
  1323. return AdoptedStreamServerEndpoint(reactor, fileno, addressFamily)
  1324. def parseStreamServer(self, reactor, *args, **kwargs):
  1325. # Delegate to another function with a sane signature. This function has
  1326. # an insane signature to trick zope.interface into believing the
  1327. # interface is correctly implemented.
  1328. return self._parseServer(reactor, *args, **kwargs)
  1329. @implementer(IPlugin, IStreamServerEndpointStringParser)
  1330. class _TCP6ServerParser:
  1331. """
  1332. Stream server endpoint string parser for the TCP6ServerEndpoint type.
  1333. @ivar prefix: See L{IStreamServerEndpointStringParser.prefix}.
  1334. """
  1335. prefix = (
  1336. "tcp6" # Used in _parseServer to identify the plugin with the endpoint type
  1337. )
  1338. def _parseServer(self, reactor, port, backlog=50, interface="::"):
  1339. """
  1340. Internal parser function for L{_parseServer} to convert the string
  1341. arguments into structured arguments for the L{TCP6ServerEndpoint}
  1342. @param reactor: An L{IReactorTCP} provider.
  1343. @param port: The port number used for listening
  1344. @type port: int
  1345. @param backlog: Size of the listen queue
  1346. @type backlog: int
  1347. @param interface: The hostname to bind to
  1348. @type interface: str
  1349. """
  1350. port = int(port)
  1351. backlog = int(backlog)
  1352. return TCP6ServerEndpoint(reactor, port, backlog, interface)
  1353. def parseStreamServer(self, reactor, *args, **kwargs):
  1354. # Redirects to another function (self._parseServer), tricks zope.interface
  1355. # into believing the interface is correctly implemented.
  1356. return self._parseServer(reactor, *args, **kwargs)
  1357. _serverParsers = {
  1358. "tcp": _parseTCP,
  1359. "unix": _parseUNIX,
  1360. "ssl": _parseSSL,
  1361. }
  1362. _OP, _STRING = range(2)
  1363. def _tokenize(description):
  1364. """
  1365. Tokenize a strports string and yield each token.
  1366. @param description: a string as described by L{serverFromString} or
  1367. L{clientFromString}.
  1368. @type description: L{str} or L{bytes}
  1369. @return: an iterable of 2-tuples of (C{_OP} or C{_STRING}, string). Tuples
  1370. starting with C{_OP} will contain a second element of either ':' (i.e.
  1371. 'next parameter') or '=' (i.e. 'assign parameter value'). For example,
  1372. the string 'hello:greeting=world' would result in a generator yielding
  1373. these values::
  1374. _STRING, 'hello'
  1375. _OP, ':'
  1376. _STRING, 'greet=ing'
  1377. _OP, '='
  1378. _STRING, 'world'
  1379. """
  1380. empty = _matchingString("", description)
  1381. colon = _matchingString(":", description)
  1382. equals = _matchingString("=", description)
  1383. backslash = _matchingString("\x5c", description)
  1384. current = empty
  1385. ops = colon + equals
  1386. nextOps = {colon: colon + equals, equals: colon}
  1387. iterdesc = iter(iterbytes(description))
  1388. for n in iterdesc:
  1389. if n in iterbytes(ops):
  1390. yield _STRING, current
  1391. yield _OP, n
  1392. current = empty
  1393. ops = nextOps[n]
  1394. elif n == backslash:
  1395. current += next(iterdesc)
  1396. else:
  1397. current += n
  1398. yield _STRING, current
  1399. def _parse(description):
  1400. """
  1401. Convert a description string into a list of positional and keyword
  1402. parameters, using logic vaguely like what Python does.
  1403. @param description: a string as described by L{serverFromString} or
  1404. L{clientFromString}.
  1405. @return: a 2-tuple of C{(args, kwargs)}, where 'args' is a list of all
  1406. ':'-separated C{str}s not containing an '=' and 'kwargs' is a map of
  1407. all C{str}s which do contain an '='. For example, the result of
  1408. C{_parse('a:b:d=1:c')} would be C{(['a', 'b', 'c'], {'d': '1'})}.
  1409. """
  1410. args, kw = [], {}
  1411. colon = _matchingString(":", description)
  1412. def add(sofar):
  1413. if len(sofar) == 1:
  1414. args.append(sofar[0])
  1415. else:
  1416. kw[nativeString(sofar[0])] = sofar[1]
  1417. sofar = ()
  1418. for type, value in _tokenize(description):
  1419. if type is _STRING:
  1420. sofar += (value,)
  1421. elif value == colon:
  1422. add(sofar)
  1423. sofar = ()
  1424. add(sofar)
  1425. return args, kw
  1426. # Mappings from description "names" to endpoint constructors.
  1427. _endpointServerFactories = {
  1428. "TCP": TCP4ServerEndpoint,
  1429. "SSL": SSL4ServerEndpoint,
  1430. "UNIX": UNIXServerEndpoint,
  1431. }
  1432. _endpointClientFactories = {
  1433. "TCP": TCP4ClientEndpoint,
  1434. "SSL": SSL4ClientEndpoint,
  1435. "UNIX": UNIXClientEndpoint,
  1436. }
  1437. def _parseServer(description, factory):
  1438. """
  1439. Parse a strports description into a 2-tuple of arguments and keyword
  1440. values.
  1441. @param description: A description in the format explained by
  1442. L{serverFromString}.
  1443. @type description: C{str}
  1444. @param factory: A 'factory' argument; this is left-over from
  1445. twisted.application.strports, it's not really used.
  1446. @type factory: L{IProtocolFactory} or L{None}
  1447. @return: a 3-tuple of (plugin or name, arguments, keyword arguments)
  1448. """
  1449. args, kw = _parse(description)
  1450. endpointType = args[0]
  1451. parser = _serverParsers.get(endpointType)
  1452. if parser is None:
  1453. # If the required parser is not found in _server, check if
  1454. # a plugin exists for the endpointType
  1455. plugin = _matchPluginToPrefix(
  1456. getPlugins(IStreamServerEndpointStringParser), endpointType
  1457. )
  1458. return (plugin, args[1:], kw)
  1459. return (endpointType.upper(),) + parser(factory, *args[1:], **kw)
  1460. def _matchPluginToPrefix(plugins, endpointType):
  1461. """
  1462. Match plugin to prefix.
  1463. """
  1464. endpointType = endpointType.lower()
  1465. for plugin in plugins:
  1466. if _matchingString(plugin.prefix.lower(), endpointType) == endpointType:
  1467. return plugin
  1468. raise ValueError(f"Unknown endpoint type: '{endpointType}'")
  1469. def serverFromString(reactor, description):
  1470. """
  1471. Construct a stream server endpoint from an endpoint description string.
  1472. The format for server endpoint descriptions is a simple byte string. It is
  1473. a prefix naming the type of endpoint, then a colon, then the arguments for
  1474. that endpoint.
  1475. For example, you can call it like this to create an endpoint that will
  1476. listen on TCP port 80::
  1477. serverFromString(reactor, "tcp:80")
  1478. Additional arguments may be specified as keywords, separated with colons.
  1479. For example, you can specify the interface for a TCP server endpoint to
  1480. bind to like this::
  1481. serverFromString(reactor, "tcp:80:interface=127.0.0.1")
  1482. SSL server endpoints may be specified with the 'ssl' prefix, and the
  1483. private key and certificate files may be specified by the C{privateKey} and
  1484. C{certKey} arguments::
  1485. serverFromString(
  1486. reactor, "ssl:443:privateKey=key.pem:certKey=crt.pem")
  1487. If a private key file name (C{privateKey}) isn't provided, a "server.pem"
  1488. file is assumed to exist which contains the private key. If the certificate
  1489. file name (C{certKey}) isn't provided, the private key file is assumed to
  1490. contain the certificate as well.
  1491. You may escape colons in arguments with a backslash, which you will need to
  1492. use if you want to specify a full pathname argument on Windows::
  1493. serverFromString(reactor,
  1494. "ssl:443:privateKey=C\\:/key.pem:certKey=C\\:/cert.pem")
  1495. finally, the 'unix' prefix may be used to specify a filesystem UNIX socket,
  1496. optionally with a 'mode' argument to specify the mode of the socket file
  1497. created by C{listen}::
  1498. serverFromString(reactor, "unix:/var/run/finger")
  1499. serverFromString(reactor, "unix:/var/run/finger:mode=660")
  1500. This function is also extensible; new endpoint types may be registered as
  1501. L{IStreamServerEndpointStringParser} plugins. See that interface for more
  1502. information.
  1503. @param reactor: The server endpoint will be constructed with this reactor.
  1504. @param description: The strports description to parse.
  1505. @type description: L{str}
  1506. @return: A new endpoint which can be used to listen with the parameters
  1507. given by C{description}.
  1508. @rtype: L{IStreamServerEndpoint<twisted.internet.interfaces.IStreamServerEndpoint>}
  1509. @raise ValueError: when the 'description' string cannot be parsed.
  1510. @since: 10.2
  1511. """
  1512. nameOrPlugin, args, kw = _parseServer(description, None)
  1513. if type(nameOrPlugin) is not str:
  1514. plugin = nameOrPlugin
  1515. return plugin.parseStreamServer(reactor, *args, **kw)
  1516. else:
  1517. name = nameOrPlugin
  1518. # Chop out the factory.
  1519. args = args[:1] + args[2:]
  1520. return _endpointServerFactories[name](reactor, *args, **kw)
  1521. def quoteStringArgument(argument):
  1522. """
  1523. Quote an argument to L{serverFromString} and L{clientFromString}. Since
  1524. arguments are separated with colons and colons are escaped with
  1525. backslashes, some care is necessary if, for example, you have a pathname,
  1526. you may be tempted to interpolate into a string like this::
  1527. serverFromString(reactor, "ssl:443:privateKey=%s" % (myPathName,))
  1528. This may appear to work, but will have portability issues (Windows
  1529. pathnames, for example). Usually you should just construct the appropriate
  1530. endpoint type rather than interpolating strings, which in this case would
  1531. be L{SSL4ServerEndpoint}. There are some use-cases where you may need to
  1532. generate such a string, though; for example, a tool to manipulate a
  1533. configuration file which has strports descriptions in it. To be correct in
  1534. those cases, do this instead::
  1535. serverFromString(reactor, "ssl:443:privateKey=%s" %
  1536. (quoteStringArgument(myPathName),))
  1537. @param argument: The part of the endpoint description string you want to
  1538. pass through.
  1539. @type argument: C{str}
  1540. @return: The quoted argument.
  1541. @rtype: C{str}
  1542. """
  1543. backslash, colon = "\\:"
  1544. for c in backslash, colon:
  1545. argument = argument.replace(c, backslash + c)
  1546. return argument
  1547. def _parseClientTCP(*args, **kwargs):
  1548. """
  1549. Perform any argument value coercion necessary for TCP client parameters.
  1550. Valid positional arguments to this function are host and port.
  1551. Valid keyword arguments to this function are all L{IReactorTCP.connectTCP}
  1552. arguments.
  1553. @return: The coerced values as a C{dict}.
  1554. """
  1555. if len(args) == 2:
  1556. kwargs["port"] = int(args[1])
  1557. kwargs["host"] = args[0]
  1558. elif len(args) == 1:
  1559. if "host" in kwargs:
  1560. kwargs["port"] = int(args[0])
  1561. else:
  1562. kwargs["host"] = args[0]
  1563. try:
  1564. kwargs["port"] = int(kwargs["port"])
  1565. except KeyError:
  1566. pass
  1567. try:
  1568. kwargs["timeout"] = int(kwargs["timeout"])
  1569. except KeyError:
  1570. pass
  1571. try:
  1572. kwargs["bindAddress"] = (kwargs["bindAddress"], 0)
  1573. except KeyError:
  1574. pass
  1575. return kwargs
  1576. def _loadCAsFromDir(directoryPath):
  1577. """
  1578. Load certificate-authority certificate objects in a given directory.
  1579. @param directoryPath: a L{unicode} or L{bytes} pointing at a directory to
  1580. load .pem files from, or L{None}.
  1581. @return: an L{IOpenSSLTrustRoot} provider.
  1582. """
  1583. caCerts = {}
  1584. for child in directoryPath.children():
  1585. if not child.asTextMode().basename().split(".")[-1].lower() == "pem":
  1586. continue
  1587. try:
  1588. data = child.getContent()
  1589. except OSError:
  1590. # Permission denied, corrupt disk, we don't care.
  1591. continue
  1592. try:
  1593. theCert = Certificate.loadPEM(data)
  1594. except SSLError:
  1595. # Duplicate certificate, invalid certificate, etc. We don't care.
  1596. pass
  1597. else:
  1598. caCerts[theCert.digest()] = theCert
  1599. return trustRootFromCertificates(caCerts.values())
  1600. def _parseTrustRootPath(pathName):
  1601. """
  1602. Parse a string referring to a directory full of certificate authorities
  1603. into a trust root.
  1604. @param pathName: path name
  1605. @type pathName: L{unicode} or L{bytes} or L{None}
  1606. @return: L{None} or L{IOpenSSLTrustRoot}
  1607. """
  1608. if pathName is None:
  1609. return None
  1610. return _loadCAsFromDir(FilePath(pathName))
  1611. def _privateCertFromPaths(certificatePath, keyPath):
  1612. """
  1613. Parse a certificate path and key path, either or both of which might be
  1614. L{None}, into a certificate object.
  1615. @param certificatePath: the certificate path
  1616. @type certificatePath: L{bytes} or L{unicode} or L{None}
  1617. @param keyPath: the private key path
  1618. @type keyPath: L{bytes} or L{unicode} or L{None}
  1619. @return: a L{PrivateCertificate} or L{None}
  1620. """
  1621. if certificatePath is None:
  1622. return None
  1623. certBytes = FilePath(certificatePath).getContent()
  1624. if keyPath is None:
  1625. return PrivateCertificate.loadPEM(certBytes)
  1626. else:
  1627. return PrivateCertificate.fromCertificateAndKeyPair(
  1628. Certificate.loadPEM(certBytes),
  1629. KeyPair.load(FilePath(keyPath).getContent(), 1),
  1630. )
  1631. def _parseClientSSLOptions(kwargs):
  1632. """
  1633. Parse common arguments for SSL endpoints, creating an L{CertificateOptions}
  1634. instance.
  1635. @param kwargs: A dict of keyword arguments to be parsed, potentially
  1636. containing keys C{certKey}, C{privateKey}, C{caCertsDir}, and
  1637. C{hostname}. See L{_parseClientSSL}.
  1638. @type kwargs: L{dict}
  1639. @return: The remaining arguments, including a new key C{sslContextFactory}.
  1640. """
  1641. hostname = kwargs.pop("hostname", None)
  1642. clientCertificate = _privateCertFromPaths(
  1643. kwargs.pop("certKey", None), kwargs.pop("privateKey", None)
  1644. )
  1645. trustRoot = _parseTrustRootPath(kwargs.pop("caCertsDir", None))
  1646. if hostname is not None:
  1647. configuration = optionsForClientTLS(
  1648. _idnaText(hostname),
  1649. trustRoot=trustRoot,
  1650. clientCertificate=clientCertificate,
  1651. )
  1652. else:
  1653. # _really_ though, you should specify a hostname.
  1654. if clientCertificate is not None:
  1655. privateKeyOpenSSL = clientCertificate.privateKey.original
  1656. certificateOpenSSL = clientCertificate.original
  1657. else:
  1658. privateKeyOpenSSL = None
  1659. certificateOpenSSL = None
  1660. configuration = CertificateOptions(
  1661. trustRoot=trustRoot,
  1662. privateKey=privateKeyOpenSSL,
  1663. certificate=certificateOpenSSL,
  1664. )
  1665. kwargs["sslContextFactory"] = configuration
  1666. return kwargs
  1667. def _parseClientSSL(*args, **kwargs):
  1668. """
  1669. Perform any argument value coercion necessary for SSL client parameters.
  1670. Valid keyword arguments to this function are all L{IReactorSSL.connectSSL}
  1671. arguments except for C{contextFactory}. Instead, C{certKey} (the path name
  1672. of the certificate file) C{privateKey} (the path name of the private key
  1673. associated with the certificate) are accepted and used to construct a
  1674. context factory.
  1675. Valid positional arguments to this function are host and port.
  1676. @keyword caCertsDir: The one parameter which is not part of
  1677. L{IReactorSSL.connectSSL}'s signature, this is a path name used to
  1678. construct a list of certificate authority certificates. The directory
  1679. will be scanned for files ending in C{.pem}, all of which will be
  1680. considered valid certificate authorities for this connection.
  1681. @type caCertsDir: L{str}
  1682. @keyword hostname: The hostname to use for validating the server's
  1683. certificate.
  1684. @type hostname: L{unicode}
  1685. @return: The coerced values as a L{dict}.
  1686. """
  1687. kwargs = _parseClientTCP(*args, **kwargs)
  1688. return _parseClientSSLOptions(kwargs)
  1689. def _parseClientUNIX(*args, **kwargs):
  1690. """
  1691. Perform any argument value coercion necessary for UNIX client parameters.
  1692. Valid keyword arguments to this function are all L{IReactorUNIX.connectUNIX}
  1693. keyword arguments except for C{checkPID}. Instead, C{lockfile} is accepted
  1694. and has the same meaning. Also C{path} is used instead of C{address}.
  1695. Valid positional arguments to this function are C{path}.
  1696. @return: The coerced values as a C{dict}.
  1697. """
  1698. if len(args) == 1:
  1699. kwargs["path"] = args[0]
  1700. try:
  1701. kwargs["checkPID"] = bool(int(kwargs.pop("lockfile")))
  1702. except KeyError:
  1703. pass
  1704. try:
  1705. kwargs["timeout"] = int(kwargs["timeout"])
  1706. except KeyError:
  1707. pass
  1708. return kwargs
  1709. _clientParsers = {
  1710. "TCP": _parseClientTCP,
  1711. "SSL": _parseClientSSL,
  1712. "UNIX": _parseClientUNIX,
  1713. }
  1714. def clientFromString(reactor, description):
  1715. """
  1716. Construct a client endpoint from a description string.
  1717. Client description strings are much like server description strings,
  1718. although they take all of their arguments as keywords, aside from host and
  1719. port.
  1720. You can create a TCP client endpoint with the 'host' and 'port' arguments,
  1721. like so::
  1722. clientFromString(reactor, "tcp:host=www.example.com:port=80")
  1723. or, without specifying host and port keywords::
  1724. clientFromString(reactor, "tcp:www.example.com:80")
  1725. Or you can specify only one or the other, as in the following 2 examples::
  1726. clientFromString(reactor, "tcp:host=www.example.com:80")
  1727. clientFromString(reactor, "tcp:www.example.com:port=80")
  1728. or an SSL client endpoint with those arguments, plus the arguments used by
  1729. the server SSL, for a client certificate::
  1730. clientFromString(reactor, "ssl:web.example.com:443:"
  1731. "privateKey=foo.pem:certKey=foo.pem")
  1732. to specify your certificate trust roots, you can identify a directory with
  1733. PEM files in it with the C{caCertsDir} argument::
  1734. clientFromString(reactor, "ssl:host=web.example.com:port=443:"
  1735. "caCertsDir=/etc/ssl/certs")
  1736. Both TCP and SSL client endpoint description strings can include a
  1737. 'bindAddress' keyword argument, whose value should be a local IPv4
  1738. address. This fixes the client socket to that IP address::
  1739. clientFromString(reactor, "tcp:www.example.com:80:"
  1740. "bindAddress=192.0.2.100")
  1741. NB: Fixed client ports are not currently supported in TCP or SSL
  1742. client endpoints. The client socket will always use an ephemeral
  1743. port assigned by the operating system
  1744. You can create a UNIX client endpoint with the 'path' argument and optional
  1745. 'lockfile' and 'timeout' arguments::
  1746. clientFromString(
  1747. reactor, b"unix:path=/var/foo/bar:lockfile=1:timeout=9")
  1748. or, with the path as a positional argument with or without optional
  1749. arguments as in the following 2 examples::
  1750. clientFromString(reactor, "unix:/var/foo/bar")
  1751. clientFromString(reactor, "unix:/var/foo/bar:lockfile=1:timeout=9")
  1752. This function is also extensible; new endpoint types may be registered as
  1753. L{IStreamClientEndpointStringParserWithReactor} plugins. See that
  1754. interface for more information.
  1755. @param reactor: The client endpoint will be constructed with this reactor.
  1756. @param description: The strports description to parse.
  1757. @type description: L{str}
  1758. @return: A new endpoint which can be used to connect with the parameters
  1759. given by C{description}.
  1760. @rtype: L{IStreamClientEndpoint<twisted.internet.interfaces.IStreamClientEndpoint>}
  1761. @since: 10.2
  1762. """
  1763. args, kwargs = _parse(description)
  1764. aname = args.pop(0)
  1765. name = aname.upper()
  1766. if name not in _clientParsers:
  1767. plugin = _matchPluginToPrefix(
  1768. getPlugins(IStreamClientEndpointStringParserWithReactor), name
  1769. )
  1770. return plugin.parseStreamClient(reactor, *args, **kwargs)
  1771. kwargs = _clientParsers[name](*args, **kwargs)
  1772. return _endpointClientFactories[name](reactor, **kwargs)
  1773. def connectProtocol(endpoint, protocol):
  1774. """
  1775. Connect a protocol instance to an endpoint.
  1776. This allows using a client endpoint without having to create a factory.
  1777. @param endpoint: A client endpoint to connect to.
  1778. @param protocol: A protocol instance.
  1779. @return: The result of calling C{connect} on the endpoint, i.e. a
  1780. L{Deferred} that will fire with the protocol when connected, or an
  1781. appropriate error.
  1782. @since: 13.1
  1783. """
  1784. class OneShotFactory(Factory):
  1785. def buildProtocol(self, addr):
  1786. return protocol
  1787. return endpoint.connect(OneShotFactory())
  1788. @implementer(interfaces.IStreamClientEndpoint)
  1789. class _WrapperEndpoint:
  1790. """
  1791. An endpoint that wraps another endpoint.
  1792. """
  1793. def __init__(self, wrappedEndpoint, wrapperFactory):
  1794. """
  1795. Construct a L{_WrapperEndpoint}.
  1796. """
  1797. self._wrappedEndpoint = wrappedEndpoint
  1798. self._wrapperFactory = wrapperFactory
  1799. def connect(self, protocolFactory):
  1800. """
  1801. Connect the given protocol factory and unwrap its result.
  1802. """
  1803. return self._wrappedEndpoint.connect(
  1804. self._wrapperFactory(protocolFactory)
  1805. ).addCallback(lambda protocol: protocol.wrappedProtocol)
  1806. @implementer(interfaces.IStreamServerEndpoint)
  1807. class _WrapperServerEndpoint:
  1808. """
  1809. A server endpoint that wraps another server endpoint.
  1810. """
  1811. def __init__(self, wrappedEndpoint, wrapperFactory):
  1812. """
  1813. Construct a L{_WrapperServerEndpoint}.
  1814. """
  1815. self._wrappedEndpoint = wrappedEndpoint
  1816. self._wrapperFactory = wrapperFactory
  1817. def listen(self, protocolFactory):
  1818. """
  1819. Connect the given protocol factory and unwrap its result.
  1820. """
  1821. return self._wrappedEndpoint.listen(self._wrapperFactory(protocolFactory))
  1822. def wrapClientTLS(
  1823. connectionCreator: IOpenSSLClientConnectionCreator,
  1824. wrappedEndpoint: IStreamClientEndpoint,
  1825. ) -> _WrapperEndpoint:
  1826. """
  1827. Wrap an endpoint which upgrades to TLS as soon as the connection is
  1828. established.
  1829. @since: 16.0
  1830. @param connectionCreator: The TLS options to use when connecting; see
  1831. L{twisted.internet.ssl.optionsForClientTLS} for how to construct this.
  1832. @type connectionCreator:
  1833. L{twisted.internet.interfaces.IOpenSSLClientConnectionCreator}
  1834. @param wrappedEndpoint: The endpoint to wrap.
  1835. @type wrappedEndpoint: An L{IStreamClientEndpoint} provider.
  1836. @return: an endpoint that provides transport level encryption layered on
  1837. top of C{wrappedEndpoint}
  1838. @rtype: L{twisted.internet.interfaces.IStreamClientEndpoint}
  1839. """
  1840. if TLSMemoryBIOFactory is None:
  1841. raise NotImplementedError(
  1842. "OpenSSL not available. Try `pip install twisted[tls]`."
  1843. )
  1844. return _WrapperEndpoint(
  1845. wrappedEndpoint,
  1846. lambda protocolFactory: TLSMemoryBIOFactory(
  1847. connectionCreator, True, protocolFactory
  1848. ),
  1849. )
  1850. def _parseClientTLS(
  1851. reactor: Any,
  1852. host: bytes | str,
  1853. port: bytes | str,
  1854. timeout: bytes | str = b"30",
  1855. bindAddress: bytes | str | None = None,
  1856. certificate: bytes | str | None = None,
  1857. privateKey: bytes | str | None = None,
  1858. trustRoots: bytes | str | None = None,
  1859. endpoint: bytes | str | None = None,
  1860. **kwargs: object,
  1861. ) -> IStreamClientEndpoint:
  1862. """
  1863. Internal method to construct an endpoint from string parameters.
  1864. @param reactor: The reactor passed to L{clientFromString}.
  1865. @param host: The hostname to connect to.
  1866. @type host: L{bytes} or L{unicode}
  1867. @param port: The port to connect to.
  1868. @type port: L{bytes} or L{unicode}
  1869. @param timeout: For each individual connection attempt, the number of
  1870. seconds to wait before assuming the connection has failed.
  1871. @type timeout: L{bytes} or L{unicode}
  1872. @param bindAddress: The address to which to bind outgoing connections.
  1873. @type bindAddress: L{bytes} or L{unicode}
  1874. @param certificate: a string representing a filesystem path to a
  1875. PEM-encoded certificate.
  1876. @type certificate: L{bytes} or L{unicode}
  1877. @param privateKey: a string representing a filesystem path to a PEM-encoded
  1878. certificate.
  1879. @type privateKey: L{bytes} or L{unicode}
  1880. @param endpoint: an optional string endpoint description of an endpoint to
  1881. wrap; if this is passed then C{host} is used only for certificate
  1882. verification.
  1883. @type endpoint: L{bytes} or L{unicode}
  1884. @return: a client TLS endpoint
  1885. @rtype: L{IStreamClientEndpoint}
  1886. """
  1887. if kwargs:
  1888. raise TypeError("unrecognized keyword arguments present", list(kwargs.keys()))
  1889. host = host if isinstance(host, str) else host.decode("utf-8")
  1890. bindAddress = (
  1891. bindAddress
  1892. if isinstance(bindAddress, str) or bindAddress is None
  1893. else bindAddress.decode("utf-8")
  1894. )
  1895. portint = int(port)
  1896. timeoutint = int(timeout)
  1897. return wrapClientTLS(
  1898. optionsForClientTLS(
  1899. host,
  1900. trustRoot=_parseTrustRootPath(trustRoots),
  1901. clientCertificate=_privateCertFromPaths(certificate, privateKey),
  1902. ),
  1903. (
  1904. clientFromString(reactor, endpoint)
  1905. if endpoint is not None
  1906. else HostnameEndpoint(
  1907. reactor,
  1908. _idnaBytes(host),
  1909. portint,
  1910. timeoutint,
  1911. None if bindAddress is None else (bindAddress, 0),
  1912. )
  1913. ),
  1914. )
  1915. @implementer(IPlugin, IStreamClientEndpointStringParserWithReactor)
  1916. class _TLSClientEndpointParser:
  1917. """
  1918. Stream client endpoint string parser for L{wrapClientTLS} with
  1919. L{HostnameEndpoint}.
  1920. @ivar prefix: See
  1921. L{IStreamClientEndpointStringParserWithReactor.prefix}.
  1922. """
  1923. prefix = "tls"
  1924. @staticmethod
  1925. def parseStreamClient(reactor, *args, **kwargs):
  1926. """
  1927. Redirects to another function L{_parseClientTLS}; tricks zope.interface
  1928. into believing the interface is correctly implemented, since the
  1929. signature is (C{reactor}, C{*args}, C{**kwargs}). See
  1930. L{_parseClientTLS} for the specific signature description for this
  1931. endpoint parser.
  1932. @param reactor: The reactor passed to L{clientFromString}.
  1933. @param args: The positional arguments in the endpoint description.
  1934. @type args: L{tuple}
  1935. @param kwargs: The named arguments in the endpoint description.
  1936. @type kwargs: L{dict}
  1937. @return: a client TLS endpoint
  1938. @rtype: L{IStreamClientEndpoint}
  1939. """
  1940. return _parseClientTLS(reactor, *args, **kwargs)