_http2.py 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356
  1. # -*- test-case-name: twisted.web.test.test_http2 -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. HTTP2 Implementation
  6. This is the basic server-side protocol implementation used by the Twisted
  7. Web server for HTTP2. This functionality is intended to be combined with the
  8. HTTP/1.1 and HTTP/1.0 functionality in twisted.web.http to provide complete
  9. protocol support for HTTP-type protocols.
  10. This API is currently considered private because it's in early draft form. When
  11. it has stabilised, it'll be made public.
  12. """
  13. from __future__ import absolute_import, division
  14. import io
  15. import warnings
  16. import sys
  17. from collections import deque
  18. from zope.interface import implementer
  19. import priority
  20. import h2.config
  21. import h2.connection
  22. import h2.errors
  23. import h2.events
  24. import h2.exceptions
  25. from twisted.internet.defer import Deferred
  26. from twisted.internet.error import ConnectionLost
  27. from twisted.internet.interfaces import (
  28. IProtocol, ITransport, IConsumer, IPushProducer, ISSLTransport
  29. )
  30. from twisted.internet._producer_helpers import _PullToPush
  31. from twisted.internet.protocol import Protocol
  32. from twisted.logger import Logger
  33. from twisted.protocols.policies import TimeoutMixin
  34. from twisted.python.failure import Failure
  35. from twisted.web.error import ExcessiveBufferingError
  36. # This API is currently considered private.
  37. __all__ = []
  38. _END_STREAM_SENTINEL = object()
  39. # Python versions 2.7.3 and older don't have a memoryview object that plays
  40. # well with the struct module, which h2 needs. On those versions, just refuse
  41. # to import.
  42. if sys.version_info < (2, 7, 4):
  43. warnings.warn(
  44. "HTTP/2 cannot be enabled because this version of Python is too "
  45. "old, and does not fully support memoryview objects.",
  46. UserWarning,
  47. stacklevel=2,
  48. )
  49. raise ImportError("HTTP/2 not supported on this Python version.")
  50. @implementer(IProtocol, IPushProducer)
  51. class H2Connection(Protocol, TimeoutMixin):
  52. """
  53. A class representing a single HTTP/2 connection.
  54. This implementation of L{IProtocol} works hand in hand with L{H2Stream}.
  55. This is because we have the requirement to register multiple producers for
  56. a single HTTP/2 connection, one for each stream. The standard Twisted
  57. interfaces don't really allow for this, so instead there's a custom
  58. interface between the two objects that allows them to work hand-in-hand here.
  59. @ivar conn: The HTTP/2 connection state machine.
  60. @type conn: L{h2.connection.H2Connection}
  61. @ivar streams: A mapping of stream IDs to L{H2Stream} objects, used to call
  62. specific methods on streams when events occur.
  63. @type streams: L{dict}, mapping L{int} stream IDs to L{H2Stream} objects.
  64. @ivar priority: A HTTP/2 priority tree used to ensure that responses are
  65. prioritised appropriately.
  66. @type priority: L{priority.PriorityTree}
  67. @ivar _consumerBlocked: A flag tracking whether or not the L{IConsumer}
  68. that is consuming this data has asked us to stop producing.
  69. @type _consumerBlocked: L{bool}
  70. @ivar _sendingDeferred: A L{Deferred} used to restart the data-sending loop
  71. when more response data has been produced. Will not be present if there
  72. is outstanding data still to send.
  73. @type _consumerBlocked: A L{twisted.internet.defer.Deferred}, or L{None}
  74. @ivar _outboundStreamQueues: A map of stream IDs to queues, used to store
  75. data blocks that are yet to be sent on the connection. These are used
  76. both to handle producers that do not respect L{IConsumer} but also to
  77. allow priority to multiplex data appropriately.
  78. @type _outboundStreamQueues: A L{dict} mapping L{int} stream IDs to
  79. L{collections.deque} queues, which contain either L{bytes} objects or
  80. C{_END_STREAM_SENTINEL}.
  81. @ivar _sender: A handle to the data-sending loop, allowing it to be
  82. terminated if needed.
  83. @type _sender: L{twisted.internet.task.LoopingCall}
  84. @ivar abortTimeout: The number of seconds to wait after we attempt to shut
  85. the transport down cleanly to give up and forcibly terminate it. This
  86. is only used when we time a connection out, to prevent errors causing
  87. the FD to get leaked. If this is L{None}, we will wait forever.
  88. @type abortTimeout: L{int}
  89. @ivar _abortingCall: The L{twisted.internet.base.DelayedCall} that will be
  90. used to forcibly close the transport if it doesn't close cleanly.
  91. @type _abortingCall: L{twisted.internet.base.DelayedCall}
  92. """
  93. factory = None
  94. site = None
  95. abortTimeout = 15
  96. _log = Logger()
  97. _abortingCall = None
  98. def __init__(self, reactor=None):
  99. config = h2.config.H2Configuration(
  100. client_side=False, header_encoding=None
  101. )
  102. self.conn = h2.connection.H2Connection(config=config)
  103. self.streams = {}
  104. self.priority = priority.PriorityTree()
  105. self._consumerBlocked = None
  106. self._sendingDeferred = None
  107. self._outboundStreamQueues = {}
  108. self._streamCleanupCallbacks = {}
  109. self._stillProducing = True
  110. # Limit the number of buffered control frame (e.g. PING and
  111. # SETTINGS) bytes.
  112. self._maxBufferedControlFrameBytes = 1024 * 17
  113. self._bufferedControlFrames = deque()
  114. self._bufferedControlFrameBytes = 0
  115. if reactor is None:
  116. from twisted.internet import reactor
  117. self._reactor = reactor
  118. # Start the data sending function.
  119. self._reactor.callLater(0, self._sendPrioritisedData)
  120. # Implementation of IProtocol
  121. def connectionMade(self):
  122. """
  123. Called by the reactor when a connection is received. May also be called
  124. by the L{twisted.web.http._GenericHTTPChannelProtocol} during upgrade
  125. to HTTP/2.
  126. """
  127. self.setTimeout(self.timeOut)
  128. self.conn.initiate_connection()
  129. self.transport.write(self.conn.data_to_send())
  130. def dataReceived(self, data):
  131. """
  132. Called whenever a chunk of data is received from the transport.
  133. @param data: The data received from the transport.
  134. @type data: L{bytes}
  135. """
  136. try:
  137. events = self.conn.receive_data(data)
  138. except h2.exceptions.ProtocolError:
  139. stillActive = self._tryToWriteControlData()
  140. if stillActive:
  141. self.transport.loseConnection()
  142. self.connectionLost(Failure(), _cancelTimeouts=False)
  143. return
  144. # Only reset the timeout if we've received an actual H2
  145. # protocol message
  146. self.resetTimeout()
  147. for event in events:
  148. if isinstance(event, h2.events.RequestReceived):
  149. self._requestReceived(event)
  150. elif isinstance(event, h2.events.DataReceived):
  151. self._requestDataReceived(event)
  152. elif isinstance(event, h2.events.StreamEnded):
  153. self._requestEnded(event)
  154. elif isinstance(event, h2.events.StreamReset):
  155. self._requestAborted(event)
  156. elif isinstance(event, h2.events.WindowUpdated):
  157. self._handleWindowUpdate(event)
  158. elif isinstance(event, h2.events.PriorityUpdated):
  159. self._handlePriorityUpdate(event)
  160. elif isinstance(event, h2.events.ConnectionTerminated):
  161. self.transport.loseConnection()
  162. self.connectionLost(
  163. ConnectionLost("Remote peer sent GOAWAY"),
  164. _cancelTimeouts=False,
  165. )
  166. self._tryToWriteControlData()
  167. def timeoutConnection(self):
  168. """
  169. Called when the connection has been inactive for
  170. L{self.timeOut<twisted.protocols.policies.TimeoutMixin.timeOut>}
  171. seconds. Cleanly tears the connection down, attempting to notify the
  172. peer if needed.
  173. We override this method to add two extra bits of functionality:
  174. - We want to log the timeout.
  175. - We want to send a GOAWAY frame indicating that the connection is
  176. being terminated, and whether it was clean or not. We have to do this
  177. before the connection is torn down.
  178. """
  179. self._log.info(
  180. "Timing out client {client}", client=self.transport.getPeer()
  181. )
  182. # Check whether there are open streams. If there are, we're going to
  183. # want to use the error code PROTOCOL_ERROR. If there aren't, use
  184. # NO_ERROR.
  185. if (self.conn.open_outbound_streams > 0 or
  186. self.conn.open_inbound_streams > 0):
  187. error_code = h2.errors.ErrorCodes.PROTOCOL_ERROR
  188. else:
  189. error_code = h2.errors.ErrorCodes.NO_ERROR
  190. self.conn.close_connection(error_code=error_code)
  191. self.transport.write(self.conn.data_to_send())
  192. # Don't let the client hold this connection open too long.
  193. if self.abortTimeout is not None:
  194. # We use self.callLater because that's what TimeoutMixin does, even
  195. # though we have a perfectly good reactor sitting around. See
  196. # https://twistedmatrix.com/trac/ticket/8488.
  197. self._abortingCall = self.callLater(
  198. self.abortTimeout, self.forceAbortClient
  199. )
  200. # We're done, throw the connection away.
  201. self.transport.loseConnection()
  202. def forceAbortClient(self):
  203. """
  204. Called if C{abortTimeout} seconds have passed since the timeout fired,
  205. and the connection still hasn't gone away. This can really only happen
  206. on extremely bad connections or when clients are maliciously attempting
  207. to keep connections open.
  208. """
  209. self._log.info(
  210. "Forcibly timing out client: {client}",
  211. client=self.transport.getPeer()
  212. )
  213. # We want to lose track of the _abortingCall so that no-one tries to
  214. # cancel it.
  215. self._abortingCall = None
  216. self.transport.abortConnection()
  217. def connectionLost(self, reason, _cancelTimeouts=True):
  218. """
  219. Called when the transport connection is lost.
  220. Informs all outstanding response handlers that the connection
  221. has been lost, and cleans up all internal state.
  222. @param reason: See L{IProtocol.connectionLost}
  223. @param _cancelTimeouts: Propagate the C{reason} to this
  224. connection's streams but don't cancel any timers, so that
  225. peers who never read the data we've written are eventually
  226. timed out.
  227. """
  228. self._stillProducing = False
  229. if _cancelTimeouts:
  230. self.setTimeout(None)
  231. for stream in self.streams.values():
  232. stream.connectionLost(reason)
  233. for streamID in list(self.streams.keys()):
  234. self._requestDone(streamID)
  235. # If we were going to force-close the transport, we don't have to now.
  236. if _cancelTimeouts and self._abortingCall is not None:
  237. self._abortingCall.cancel()
  238. self._abortingCall = None
  239. # Implementation of IPushProducer
  240. #
  241. # Here's how we handle IPushProducer. We have multiple outstanding
  242. # H2Streams. Each of these exposes an IConsumer interface to the response
  243. # handler that allows it to push data into the H2Stream. The H2Stream then
  244. # writes the data into the H2Connection object.
  245. #
  246. # The H2Connection needs to manage these writes to account for:
  247. #
  248. # - flow control
  249. # - priority
  250. #
  251. # We manage each of these in different ways.
  252. #
  253. # For flow control, we simply use the equivalent of the IPushProducer
  254. # interface. We simply tell the H2Stream: "Hey, you can't send any data
  255. # right now, sorry!". When that stream becomes unblocked, we free it up
  256. # again. This allows the H2Stream to propagate this backpressure up the
  257. # chain.
  258. #
  259. # For priority, we need to keep a backlog of data frames that we can send,
  260. # and interleave them appropriately. This backlog is most sensibly kept in
  261. # the H2Connection object itself. We keep one queue per stream, which is
  262. # where the writes go, and then we have a loop that manages popping these
  263. # streams off in priority order.
  264. #
  265. # Logically then, we go as follows:
  266. #
  267. # 1. Stream calls writeDataToStream(). This causes a DataFrame to be placed
  268. # on the queue for that stream. It also informs the priority
  269. # implementation that this stream is unblocked.
  270. # 2. The _sendPrioritisedData() function spins in a tight loop. Each
  271. # iteration it asks the priority implementation which stream should send
  272. # next, and pops a data frame off that stream's queue. If, after sending
  273. # that frame, there is no data left on that stream's queue, the function
  274. # informs the priority implementation that the stream is blocked.
  275. #
  276. # If all streams are blocked, or if there are no outstanding streams, the
  277. # _sendPrioritisedData function waits to be awoken when more data is ready
  278. # to send.
  279. #
  280. # Note that all of this only applies to *data*. Headers and other control
  281. # frames deliberately skip this processing as they are not subject to flow
  282. # control or priority constraints. Instead, they are stored in their own buffer
  283. # which is used primarily to detect excessive buffering.
  284. def stopProducing(self):
  285. """
  286. Stop producing data.
  287. This tells the L{H2Connection} that its consumer has died, so it must
  288. stop producing data for good.
  289. """
  290. self.connectionLost(ConnectionLost("Producing stopped"))
  291. def pauseProducing(self):
  292. """
  293. Pause producing data.
  294. Tells the L{H2Connection} that it has produced too much data to process
  295. for the time being, and to stop until resumeProducing() is called.
  296. """
  297. self._consumerBlocked = Deferred()
  298. # Ensure pending control data (if any) are sent first.
  299. self._consumerBlocked.addCallback(self._flushBufferedControlData)
  300. def resumeProducing(self):
  301. """
  302. Resume producing data.
  303. This tells the L{H2Connection} to re-add itself to the main loop and
  304. produce more data for the consumer.
  305. """
  306. if self._consumerBlocked is not None:
  307. d = self._consumerBlocked
  308. self._consumerBlocked = None
  309. d.callback(None)
  310. def _sendPrioritisedData(self, *args):
  311. """
  312. The data sending loop. This function repeatedly calls itself, either
  313. from L{Deferred}s or from
  314. L{reactor.callLater<twisted.internet.interfaces.IReactorTime.callLater>}
  315. This function sends data on streams according to the rules of HTTP/2
  316. priority. It ensures that the data from each stream is interleved
  317. according to the priority signalled by the client, making sure that the
  318. connection is used with maximal efficiency.
  319. This function will execute if data is available: if all data is
  320. exhausted, the function will place a deferred onto the L{H2Connection}
  321. object and wait until it is called to resume executing.
  322. """
  323. # If producing has stopped, we're done. Don't reschedule ourselves
  324. if not self._stillProducing:
  325. return
  326. stream = None
  327. while stream is None:
  328. try:
  329. stream = next(self.priority)
  330. except priority.DeadlockError:
  331. # All streams are currently blocked or not progressing. Wait
  332. # until a new one becomes available.
  333. assert self._sendingDeferred is None
  334. self._sendingDeferred = Deferred()
  335. self._sendingDeferred.addCallback(self._sendPrioritisedData)
  336. return
  337. # Wait behind the transport.
  338. if self._consumerBlocked is not None:
  339. self._consumerBlocked.addCallback(self._sendPrioritisedData)
  340. return
  341. self.resetTimeout()
  342. remainingWindow = self.conn.local_flow_control_window(stream)
  343. frameData = self._outboundStreamQueues[stream].popleft()
  344. maxFrameSize = min(self.conn.max_outbound_frame_size, remainingWindow)
  345. if frameData is _END_STREAM_SENTINEL:
  346. # There's no error handling here even though this can throw
  347. # ProtocolError because we really shouldn't encounter this problem.
  348. # If we do, that's a nasty bug.
  349. self.conn.end_stream(stream)
  350. self.transport.write(self.conn.data_to_send())
  351. # Clean up the stream
  352. self._requestDone(stream)
  353. else:
  354. # Respect the max frame size.
  355. if len(frameData) > maxFrameSize:
  356. excessData = frameData[maxFrameSize:]
  357. frameData = frameData[:maxFrameSize]
  358. self._outboundStreamQueues[stream].appendleft(excessData)
  359. # There's deliberately no error handling here, because this just
  360. # absolutely should not happen.
  361. # If for whatever reason the max frame length is zero and so we
  362. # have no frame data to send, don't send any.
  363. if frameData:
  364. self.conn.send_data(stream, frameData)
  365. self.transport.write(self.conn.data_to_send())
  366. # If there's no data left, this stream is now blocked.
  367. if not self._outboundStreamQueues[stream]:
  368. self.priority.block(stream)
  369. # Also, if the stream's flow control window is exhausted, tell it
  370. # to stop.
  371. if self.remainingOutboundWindow(stream) <= 0:
  372. self.streams[stream].flowControlBlocked()
  373. self._reactor.callLater(0, self._sendPrioritisedData)
  374. # Internal functions.
  375. def _requestReceived(self, event):
  376. """
  377. Internal handler for when a request has been received.
  378. @param event: The Hyper-h2 event that encodes information about the
  379. received request.
  380. @type event: L{h2.events.RequestReceived}
  381. """
  382. stream = H2Stream(
  383. event.stream_id,
  384. self, event.headers,
  385. self.requestFactory,
  386. self.site,
  387. self.factory
  388. )
  389. self.streams[event.stream_id] = stream
  390. self._streamCleanupCallbacks[event.stream_id] = Deferred()
  391. self._outboundStreamQueues[event.stream_id] = deque()
  392. # Add the stream to the priority tree but immediately block it.
  393. try:
  394. self.priority.insert_stream(event.stream_id)
  395. except priority.DuplicateStreamError:
  396. # Stream already in the tree. This can happen if we received a
  397. # PRIORITY frame before a HEADERS frame. Just move on: we set the
  398. # stream up properly in _handlePriorityUpdate.
  399. pass
  400. else:
  401. self.priority.block(event.stream_id)
  402. def _requestDataReceived(self, event):
  403. """
  404. Internal handler for when a chunk of data is received for a given
  405. request.
  406. @param event: The Hyper-h2 event that encodes information about the
  407. received data.
  408. @type event: L{h2.events.DataReceived}
  409. """
  410. stream = self.streams[event.stream_id]
  411. stream.receiveDataChunk(event.data, event.flow_controlled_length)
  412. def _requestEnded(self, event):
  413. """
  414. Internal handler for when a request is complete, and we expect no
  415. further data for that request.
  416. @param event: The Hyper-h2 event that encodes information about the
  417. completed stream.
  418. @type event: L{h2.events.StreamEnded}
  419. """
  420. stream = self.streams[event.stream_id]
  421. stream.requestComplete()
  422. def _requestAborted(self, event):
  423. """
  424. Internal handler for when a request is aborted by a remote peer.
  425. @param event: The Hyper-h2 event that encodes information about the
  426. reset stream.
  427. @type event: L{h2.events.StreamReset}
  428. """
  429. stream = self.streams[event.stream_id]
  430. stream.connectionLost(
  431. ConnectionLost("Stream reset with code %s" % event.error_code)
  432. )
  433. self._requestDone(event.stream_id)
  434. def _handlePriorityUpdate(self, event):
  435. """
  436. Internal handler for when a stream priority is updated.
  437. @param event: The Hyper-h2 event that encodes information about the
  438. stream reprioritization.
  439. @type event: L{h2.events.PriorityUpdated}
  440. """
  441. try:
  442. self.priority.reprioritize(
  443. stream_id=event.stream_id,
  444. depends_on=event.depends_on or None,
  445. weight=event.weight,
  446. exclusive=event.exclusive,
  447. )
  448. except priority.MissingStreamError:
  449. # A PRIORITY frame arrived before the HEADERS frame that would
  450. # trigger us to insert the stream into the tree. That's fine: we
  451. # can create the stream here and mark it as blocked.
  452. self.priority.insert_stream(
  453. stream_id=event.stream_id,
  454. depends_on=event.depends_on or None,
  455. weight=event.weight,
  456. exclusive=event.exclusive,
  457. )
  458. self.priority.block(event.stream_id)
  459. def writeHeaders(self, version, code, reason, headers, streamID):
  460. """
  461. Called by L{twisted.web.http.Request} objects to write a complete set
  462. of HTTP headers to a stream.
  463. @param version: The HTTP version in use. Unused in HTTP/2.
  464. @type version: L{bytes}
  465. @param code: The HTTP status code to write.
  466. @type code: L{bytes}
  467. @param reason: The HTTP reason phrase to write. Unused in HTTP/2.
  468. @type reason: L{bytes}
  469. @param headers: The headers to write to the stream.
  470. @type headers: L{twisted.web.http_headers.Headers}
  471. @param streamID: The ID of the stream to write the headers to.
  472. @type streamID: L{int}
  473. """
  474. headers.insert(0, (b':status', code))
  475. try:
  476. self.conn.send_headers(streamID, headers)
  477. except h2.exceptions.StreamClosedError:
  478. # Stream was closed by the client at some point. We need to not
  479. # explode here: just swallow the error. That's what write() does
  480. # when a connection is lost, so that's what we do too.
  481. return
  482. else:
  483. self._tryToWriteControlData()
  484. def writeDataToStream(self, streamID, data):
  485. """
  486. May be called by L{H2Stream} objects to write response data to a given
  487. stream. Writes a single data frame.
  488. @param streamID: The ID of the stream to write the data to.
  489. @type streamID: L{int}
  490. @param data: The data chunk to write to the stream.
  491. @type data: L{bytes}
  492. """
  493. self._outboundStreamQueues[streamID].append(data)
  494. # There's obviously no point unblocking this stream and the sending
  495. # loop if the data can't actually be sent, so confirm that there's
  496. # some room to send data.
  497. if self.conn.local_flow_control_window(streamID) > 0:
  498. self.priority.unblock(streamID)
  499. if self._sendingDeferred is not None:
  500. d = self._sendingDeferred
  501. self._sendingDeferred = None
  502. d.callback(streamID)
  503. if self.remainingOutboundWindow(streamID) <= 0:
  504. self.streams[streamID].flowControlBlocked()
  505. def endRequest(self, streamID):
  506. """
  507. Called by L{H2Stream} objects to signal completion of a response.
  508. @param streamID: The ID of the stream to write the data to.
  509. @type streamID: L{int}
  510. """
  511. self._outboundStreamQueues[streamID].append(_END_STREAM_SENTINEL)
  512. self.priority.unblock(streamID)
  513. if self._sendingDeferred is not None:
  514. d = self._sendingDeferred
  515. self._sendingDeferred = None
  516. d.callback(streamID)
  517. def abortRequest(self, streamID):
  518. """
  519. Called by L{H2Stream} objects to request early termination of a stream.
  520. This emits a RstStream frame and then removes all stream state.
  521. @param streamID: The ID of the stream to write the data to.
  522. @type streamID: L{int}
  523. """
  524. self.conn.reset_stream(streamID)
  525. stillActive = self._tryToWriteControlData()
  526. if stillActive:
  527. self._requestDone(streamID)
  528. def _requestDone(self, streamID):
  529. """
  530. Called internally by the data sending loop to clean up state that was
  531. being used for the stream. Called when the stream is complete.
  532. @param streamID: The ID of the stream to clean up state for.
  533. @type streamID: L{int}
  534. """
  535. del self._outboundStreamQueues[streamID]
  536. self.priority.remove_stream(streamID)
  537. del self.streams[streamID]
  538. cleanupCallback = self._streamCleanupCallbacks.pop(streamID)
  539. cleanupCallback.callback(streamID)
  540. def remainingOutboundWindow(self, streamID):
  541. """
  542. Called to determine how much room is left in the send window for a
  543. given stream. Allows us to handle blocking and unblocking producers.
  544. @param streamID: The ID of the stream whose flow control window we'll
  545. check.
  546. @type streamID: L{int}
  547. @return: The amount of room remaining in the send window for the given
  548. stream, including the data queued to be sent.
  549. @rtype: L{int}
  550. """
  551. # TODO: This involves a fair bit of looping and computation for
  552. # something that is called a lot. Consider caching values somewhere.
  553. windowSize = self.conn.local_flow_control_window(streamID)
  554. sendQueue = self._outboundStreamQueues[streamID]
  555. alreadyConsumed = sum(
  556. len(chunk) for chunk in sendQueue
  557. if chunk is not _END_STREAM_SENTINEL
  558. )
  559. return windowSize - alreadyConsumed
  560. def _handleWindowUpdate(self, event):
  561. """
  562. Manage flow control windows.
  563. Streams that are blocked on flow control will register themselves with
  564. the connection. This will fire deferreds that wake those streams up and
  565. allow them to continue processing.
  566. @param event: The Hyper-h2 event that encodes information about the
  567. flow control window change.
  568. @type event: L{h2.events.WindowUpdated}
  569. """
  570. streamID = event.stream_id
  571. if streamID:
  572. if not self._streamIsActive(streamID):
  573. # We may have already cleaned up our stream state, making this
  574. # a late WINDOW_UPDATE frame. That's fine: the update is
  575. # unnecessary but benign. We'll ignore it.
  576. return
  577. # If we haven't got any data to send, don't unblock the stream. If
  578. # we do, we'll eventually get an exception inside the
  579. # _sendPrioritisedData loop some time later.
  580. if self._outboundStreamQueues.get(streamID):
  581. self.priority.unblock(streamID)
  582. self.streams[streamID].windowUpdated()
  583. else:
  584. # Update strictly applies to all streams.
  585. for stream in self.streams.values():
  586. stream.windowUpdated()
  587. # If we still have data to send for this stream, unblock it.
  588. if self._outboundStreamQueues.get(stream.streamID):
  589. self.priority.unblock(stream.streamID)
  590. def getPeer(self):
  591. """
  592. Get the remote address of this connection.
  593. Treat this method with caution. It is the unfortunate result of the
  594. CGI and Jabber standards, but should not be considered reliable for
  595. the usual host of reasons; port forwarding, proxying, firewalls, IP
  596. masquerading, etc.
  597. @return: An L{IAddress} provider.
  598. """
  599. return self.transport.getPeer()
  600. def getHost(self):
  601. """
  602. Similar to getPeer, but returns an address describing this side of the
  603. connection.
  604. @return: An L{IAddress} provider.
  605. """
  606. return self.transport.getHost()
  607. def openStreamWindow(self, streamID, increment):
  608. """
  609. Open the stream window by a given increment.
  610. @param streamID: The ID of the stream whose window needs to be opened.
  611. @type streamID: L{int}
  612. @param increment: The amount by which the stream window must be
  613. incremented.
  614. @type increment: L{int}
  615. """
  616. self.conn.acknowledge_received_data(increment, streamID)
  617. self._tryToWriteControlData()
  618. def _isSecure(self):
  619. """
  620. Returns L{True} if this channel is using a secure transport.
  621. @returns: L{True} if this channel is secure.
  622. @rtype: L{bool}
  623. """
  624. # A channel is secure if its transport is ISSLTransport.
  625. return ISSLTransport(self.transport, None) is not None
  626. def _send100Continue(self, streamID):
  627. """
  628. Sends a 100 Continue response, used to signal to clients that further
  629. processing will be performed.
  630. @param streamID: The ID of the stream that needs the 100 Continue
  631. response
  632. @type streamID: L{int}
  633. """
  634. headers = [(b':status', b'100')]
  635. self.conn.send_headers(headers=headers, stream_id=streamID)
  636. self._tryToWriteControlData()
  637. def _respondToBadRequestAndDisconnect(self, streamID):
  638. """
  639. This is a quick and dirty way of responding to bad requests.
  640. As described by HTTP standard we should be patient and accept the
  641. whole request from the client before sending a polite bad request
  642. response, even in the case when clients send tons of data.
  643. Unlike in the HTTP/1.1 case, this does not actually disconnect the
  644. underlying transport: there's no need. This instead just sends a 400
  645. response and terminates the stream.
  646. @param streamID: The ID of the stream that needs the 100 Continue
  647. response
  648. @type streamID: L{int}
  649. """
  650. headers = [(b':status', b'400')]
  651. self.conn.send_headers(
  652. headers=headers,
  653. stream_id=streamID,
  654. end_stream=True
  655. )
  656. stillActive = self._tryToWriteControlData()
  657. if stillActive:
  658. stream = self.streams[streamID]
  659. stream.connectionLost(ConnectionLost("Invalid request"))
  660. self._requestDone(streamID)
  661. def _streamIsActive(self, streamID):
  662. """
  663. Checks whether Twisted has still got state for a given stream and so
  664. can process events for that stream.
  665. @param streamID: The ID of the stream that needs processing.
  666. @type streamID: L{int}
  667. @return: Whether the stream still has state allocated.
  668. @rtype: L{bool}
  669. """
  670. return streamID in self.streams
  671. def _tryToWriteControlData(self):
  672. """
  673. Checks whether the connection is blocked on flow control and,
  674. if it isn't, writes any buffered control data.
  675. @return: L{True} if the connection is still active and
  676. L{False} if it was aborted because too many bytes have
  677. been written but not consumed by the other end.
  678. """
  679. bufferedBytes = self.conn.data_to_send()
  680. if not bufferedBytes:
  681. return True
  682. if self._consumerBlocked is None and not self._bufferedControlFrames:
  683. # The consumer isn't blocked, and we don't have any buffered frames:
  684. # write this directly.
  685. self.transport.write(bufferedBytes)
  686. return True
  687. else:
  688. # Either the consumer is blocked or we have buffered frames. If the
  689. # consumer is blocked, we'll write this when we unblock. If we have
  690. # buffered frames, we have presumably been re-entered from
  691. # transport.write, and so to avoid reordering issues we'll buffer anyway.
  692. self._bufferedControlFrames.append(bufferedBytes)
  693. self._bufferedControlFrameBytes += len(bufferedBytes)
  694. if self._bufferedControlFrameBytes >= self._maxBufferedControlFrameBytes:
  695. self._log.error(
  696. "Maximum number of control frame bytes buffered: "
  697. "{bufferedControlFrameBytes} > = {maxBufferedControlFrameBytes}. "
  698. "Aborting connection to client: {client} ",
  699. bufferedControlFrameBytes=self._bufferedControlFrameBytes,
  700. maxBufferedControlFrameBytes=self._maxBufferedControlFrameBytes,
  701. client=self.transport.getPeer(),
  702. )
  703. # We've exceeded a reasonable buffer size for max buffered control frames.
  704. # This is a denial of service risk, so we're going to drop this connection.
  705. self.transport.abortConnection()
  706. self.connectionLost(ExcessiveBufferingError())
  707. return False
  708. return True
  709. def _flushBufferedControlData(self, *args):
  710. """
  711. Called when the connection is marked writable again after being marked unwritable.
  712. Attempts to flush buffered control data if there is any.
  713. """
  714. # To respect backpressure here we send each write in order, paying attention to whether
  715. # we got blocked
  716. while self._consumerBlocked is None and self._bufferedControlFrames:
  717. nextWrite = self._bufferedControlFrames.popleft()
  718. self._bufferedControlFrameBytes -= len(nextWrite)
  719. self.transport.write(nextWrite)
  720. @implementer(ITransport, IConsumer, IPushProducer)
  721. class H2Stream(object):
  722. """
  723. A class representing a single HTTP/2 stream.
  724. This class works hand-in-hand with L{H2Connection}. It acts to provide an
  725. implementation of L{ITransport}, L{IConsumer}, and L{IProducer} that work
  726. for a single HTTP/2 connection, while tightly cleaving to the interface
  727. provided by those interfaces. It does this by having a tight coupling to
  728. L{H2Connection}, which allows associating many of the functions of
  729. L{ITransport}, L{IConsumer}, and L{IProducer} to objects on a
  730. stream-specific level.
  731. @ivar streamID: The numerical stream ID that this object corresponds to.
  732. @type streamID: L{int}
  733. @ivar producing: Whether this stream is currently allowed to produce data
  734. to its consumer.
  735. @type producing: L{bool}
  736. @ivar command: The HTTP verb used on the request.
  737. @type command: L{unicode}
  738. @ivar path: The HTTP path used on the request.
  739. @type path: L{unicode}
  740. @ivar producer: The object producing the response, if any.
  741. @type producer: L{IProducer}
  742. @ivar site: The L{twisted.web.server.Site} object this stream belongs to,
  743. if any.
  744. @type site: L{twisted.web.server.Site}
  745. @ivar factory: The L{twisted.web.http.HTTPFactory} object that constructed
  746. this stream's parent connection.
  747. @type factory: L{twisted.web.http.HTTPFactory}
  748. @ivar _producerProducing: Whether the producer stored in producer is
  749. currently producing data.
  750. @type _producerProducing: L{bool}
  751. @ivar _inboundDataBuffer: Any data that has been received from the network
  752. but has not yet been received by the consumer.
  753. @type _inboundDataBuffer: A L{collections.deque} containing L{bytes}
  754. @ivar _conn: A reference to the connection this stream belongs to.
  755. @type _conn: L{H2Connection}
  756. @ivar _request: A request object that this stream corresponds to.
  757. @type _request: L{twisted.web.iweb.IRequest}
  758. @ivar _buffer: A buffer containing data produced by the producer that could
  759. not be sent on the network at this time.
  760. @type _buffer: L{io.BytesIO}
  761. """
  762. # We need a transport property for t.w.h.Request, but HTTP/2 doesn't want
  763. # to expose it. So we just set it to None.
  764. transport = None
  765. def __init__(self, streamID, connection, headers,
  766. requestFactory, site, factory):
  767. """
  768. Initialize this HTTP/2 stream.
  769. @param streamID: The numerical stream ID that this object corresponds
  770. to.
  771. @type streamID: L{int}
  772. @param connection: The HTTP/2 connection this stream belongs to.
  773. @type connection: L{H2Connection}
  774. @param headers: The HTTP/2 request headers.
  775. @type headers: A L{list} of L{tuple}s of header name and header value,
  776. both as L{bytes}.
  777. @param requestFactory: A function that builds appropriate request
  778. request objects.
  779. @type requestFactory: A callable that returns a
  780. L{twisted.web.iweb.IRequest}.
  781. @param site: The L{twisted.web.server.Site} object this stream belongs
  782. to, if any.
  783. @type site: L{twisted.web.server.Site}
  784. @param factory: The L{twisted.web.http.HTTPFactory} object that
  785. constructed this stream's parent connection.
  786. @type factory: L{twisted.web.http.HTTPFactory}
  787. """
  788. self.streamID = streamID
  789. self.site = site
  790. self.factory = factory
  791. self.producing = True
  792. self.command = None
  793. self.path = None
  794. self.producer = None
  795. self._producerProducing = False
  796. self._hasStreamingProducer = None
  797. self._inboundDataBuffer = deque()
  798. self._conn = connection
  799. self._request = requestFactory(self, queued=False)
  800. self._buffer = io.BytesIO()
  801. self._convertHeaders(headers)
  802. def _convertHeaders(self, headers):
  803. """
  804. This method converts the HTTP/2 header set into something that looks
  805. like HTTP/1.1. In particular, it strips the 'special' headers and adds
  806. a Host: header.
  807. @param headers: The HTTP/2 header set.
  808. @type headers: A L{list} of L{tuple}s of header name and header value,
  809. both as L{bytes}.
  810. """
  811. gotLength = False
  812. for header in headers:
  813. if not header[0].startswith(b':'):
  814. gotLength = (
  815. _addHeaderToRequest(self._request, header) or gotLength
  816. )
  817. elif header[0] == b':method':
  818. self.command = header[1]
  819. elif header[0] == b':path':
  820. self.path = header[1]
  821. elif header[0] == b':authority':
  822. # This is essentially the Host: header from HTTP/1.1
  823. _addHeaderToRequest(self._request, (b'host', header[1]))
  824. if not gotLength:
  825. if self.command in (b'GET', b'HEAD'):
  826. self._request.gotLength(0)
  827. else:
  828. self._request.gotLength(None)
  829. self._request.parseCookies()
  830. expectContinue = self._request.requestHeaders.getRawHeaders(b'expect')
  831. if expectContinue and expectContinue[0].lower() == b'100-continue':
  832. self._send100Continue()
  833. # Methods called by the H2Connection
  834. def receiveDataChunk(self, data, flowControlledLength):
  835. """
  836. Called when the connection has received a chunk of data from the
  837. underlying transport. If the stream has been registered with a
  838. consumer, and is currently able to push data, immediately passes it
  839. through. Otherwise, buffers the chunk until we can start producing.
  840. @param data: The chunk of data that was received.
  841. @type data: L{bytes}
  842. @param flowControlledLength: The total flow controlled length of this
  843. chunk, which is used when we want to re-open the window. May be
  844. different to C{len(data)}.
  845. @type flowControlledLength: L{int}
  846. """
  847. if not self.producing:
  848. # Buffer data.
  849. self._inboundDataBuffer.append((data, flowControlledLength))
  850. else:
  851. self._request.handleContentChunk(data)
  852. self._conn.openStreamWindow(self.streamID, flowControlledLength)
  853. def requestComplete(self):
  854. """
  855. Called by the L{H2Connection} when the all data for a request has been
  856. received. Currently, with the legacy L{twisted.web.http.Request}
  857. object, just calls requestReceived unless the producer wants us to be
  858. quiet.
  859. """
  860. if self.producing:
  861. self._request.requestReceived(self.command, self.path, b'HTTP/2')
  862. else:
  863. self._inboundDataBuffer.append((_END_STREAM_SENTINEL, None))
  864. def connectionLost(self, reason):
  865. """
  866. Called by the L{H2Connection} when a connection is lost or a stream is
  867. reset.
  868. @param reason: The reason the connection was lost.
  869. @type reason: L{str}
  870. """
  871. self._request.connectionLost(reason)
  872. def windowUpdated(self):
  873. """
  874. Called by the L{H2Connection} when this stream's flow control window
  875. has been opened.
  876. """
  877. # If we don't have a producer, we have no-one to tell.
  878. if not self.producer:
  879. return
  880. # If we're not blocked on flow control, we don't care.
  881. if self._producerProducing:
  882. return
  883. # We check whether the stream's flow control window is actually above
  884. # 0, and then, if a producer is registered and we still have space in
  885. # the window, we unblock it.
  886. remainingWindow = self._conn.remainingOutboundWindow(self.streamID)
  887. if not remainingWindow > 0:
  888. return
  889. # We have a producer and space in the window, so that producer can
  890. # start producing again!
  891. self._producerProducing = True
  892. self.producer.resumeProducing()
  893. def flowControlBlocked(self):
  894. """
  895. Called by the L{H2Connection} when this stream's flow control window
  896. has been exhausted.
  897. """
  898. if not self.producer:
  899. return
  900. if self._producerProducing:
  901. self.producer.pauseProducing()
  902. self._producerProducing = False
  903. # Methods called by the consumer (usually an IRequest).
  904. def writeHeaders(self, version, code, reason, headers):
  905. """
  906. Called by the consumer to write headers to the stream.
  907. @param version: The HTTP version.
  908. @type version: L{bytes}
  909. @param code: The status code.
  910. @type code: L{int}
  911. @param reason: The reason phrase. Ignored in HTTP/2.
  912. @type reason: L{bytes}
  913. @param headers: The HTTP response headers.
  914. @type: Any iterable of two-tuples of L{bytes}, representing header
  915. names and header values.
  916. """
  917. self._conn.writeHeaders(version, code, reason, headers, self.streamID)
  918. def requestDone(self, request):
  919. """
  920. Called by a consumer to clean up whatever permanent state is in use.
  921. @param request: The request calling the method.
  922. @type request: L{twisted.web.iweb.IRequest}
  923. """
  924. self._conn.endRequest(self.streamID)
  925. def _send100Continue(self):
  926. """
  927. Sends a 100 Continue response, used to signal to clients that further
  928. processing will be performed.
  929. """
  930. self._conn._send100Continue(self.streamID)
  931. def _respondToBadRequestAndDisconnect(self):
  932. """
  933. This is a quick and dirty way of responding to bad requests.
  934. As described by HTTP standard we should be patient and accept the
  935. whole request from the client before sending a polite bad request
  936. response, even in the case when clients send tons of data.
  937. Unlike in the HTTP/1.1 case, this does not actually disconnect the
  938. underlying transport: there's no need. This instead just sends a 400
  939. response and terminates the stream.
  940. """
  941. self._conn._respondToBadRequestAndDisconnect(self.streamID)
  942. # Implementation: ITransport
  943. def write(self, data):
  944. """
  945. Write a single chunk of data into a data frame.
  946. @param data: The data chunk to send.
  947. @type data: L{bytes}
  948. """
  949. self._conn.writeDataToStream(self.streamID, data)
  950. return
  951. def writeSequence(self, iovec):
  952. """
  953. Write a sequence of chunks of data into data frames.
  954. @param iovec: A sequence of chunks to send.
  955. @type iovec: An iterable of L{bytes} chunks.
  956. """
  957. for chunk in iovec:
  958. self.write(chunk)
  959. def loseConnection(self):
  960. """
  961. Close the connection after writing all pending data.
  962. """
  963. self._conn.endRequest(self.streamID)
  964. def abortConnection(self):
  965. """
  966. Forcefully abort the connection by sending a RstStream frame.
  967. """
  968. self._conn.abortRequest(self.streamID)
  969. def getPeer(self):
  970. """
  971. Get information about the peer.
  972. """
  973. return self._conn.getPeer()
  974. def getHost(self):
  975. """
  976. Similar to getPeer, but for this side of the connection.
  977. """
  978. return self._conn.getHost()
  979. def isSecure(self):
  980. """
  981. Returns L{True} if this channel is using a secure transport.
  982. @returns: L{True} if this channel is secure.
  983. @rtype: L{bool}
  984. """
  985. return self._conn._isSecure()
  986. # Implementation: IConsumer
  987. def registerProducer(self, producer, streaming):
  988. """
  989. Register to receive data from a producer.
  990. This sets self to be a consumer for a producer. When this object runs
  991. out of data (as when a send(2) call on a socket succeeds in moving the
  992. last data from a userspace buffer into a kernelspace buffer), it will
  993. ask the producer to resumeProducing().
  994. For L{IPullProducer} providers, C{resumeProducing} will be called once
  995. each time data is required.
  996. For L{IPushProducer} providers, C{pauseProducing} will be called
  997. whenever the write buffer fills up and C{resumeProducing} will only be
  998. called when it empties.
  999. @param producer: The producer to register.
  1000. @type producer: L{IProducer} provider
  1001. @param streaming: L{True} if C{producer} provides L{IPushProducer},
  1002. L{False} if C{producer} provides L{IPullProducer}.
  1003. @type streaming: L{bool}
  1004. @raise RuntimeError: If a producer is already registered.
  1005. @return: L{None}
  1006. """
  1007. if self.producer:
  1008. raise ValueError(
  1009. "registering producer %s before previous one (%s) was "
  1010. "unregistered" % (producer, self.producer))
  1011. if not streaming:
  1012. self.hasStreamingProducer = False
  1013. producer = _PullToPush(producer, self)
  1014. producer.startStreaming()
  1015. else:
  1016. self.hasStreamingProducer = True
  1017. self.producer = producer
  1018. self._producerProducing = True
  1019. def unregisterProducer(self):
  1020. """
  1021. @see: L{IConsumer.unregisterProducer}
  1022. """
  1023. # When the producer is unregistered, we're done.
  1024. if self.producer is not None and not self.hasStreamingProducer:
  1025. self.producer.stopStreaming()
  1026. self._producerProducing = False
  1027. self.producer = None
  1028. self.hasStreamingProducer = None
  1029. # Implementation: IPushProducer
  1030. def stopProducing(self):
  1031. """
  1032. @see: L{IProducer.stopProducing}
  1033. """
  1034. self.producing = False
  1035. self.abortConnection()
  1036. def pauseProducing(self):
  1037. """
  1038. @see: L{IPushProducer.pauseProducing}
  1039. """
  1040. self.producing = False
  1041. def resumeProducing(self):
  1042. """
  1043. @see: L{IPushProducer.resumeProducing}
  1044. """
  1045. self.producing = True
  1046. consumedLength = 0
  1047. while self.producing and self._inboundDataBuffer:
  1048. # Allow for pauseProducing to be called in response to a call to
  1049. # resumeProducing.
  1050. chunk, flowControlledLength = self._inboundDataBuffer.popleft()
  1051. if chunk is _END_STREAM_SENTINEL:
  1052. self.requestComplete()
  1053. else:
  1054. consumedLength += flowControlledLength
  1055. self._request.handleContentChunk(chunk)
  1056. self._conn.openStreamWindow(self.streamID, consumedLength)
  1057. def _addHeaderToRequest(request, header):
  1058. """
  1059. Add a header tuple to a request header object.
  1060. @param request: The request to add the header tuple to.
  1061. @type request: L{twisted.web.http.Request}
  1062. @param header: The header tuple to add to the request.
  1063. @type header: A L{tuple} with two elements, the header name and header
  1064. value, both as L{bytes}.
  1065. @return: If the header being added was the C{Content-Length} header.
  1066. @rtype: L{bool}
  1067. """
  1068. requestHeaders = request.requestHeaders
  1069. name, value = header
  1070. values = requestHeaders.getRawHeaders(name)
  1071. if values is not None:
  1072. values.append(value)
  1073. else:
  1074. requestHeaders.setRawHeaders(name, [value])
  1075. if name == b'content-length':
  1076. request.gotLength(int(value))
  1077. return True
  1078. return False