controlbuf.go 27 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "runtime"
  25. "strconv"
  26. "sync"
  27. "sync/atomic"
  28. "golang.org/x/net/http2"
  29. "golang.org/x/net/http2/hpack"
  30. "google.golang.org/grpc/internal/grpclog"
  31. "google.golang.org/grpc/internal/grpcutil"
  32. "google.golang.org/grpc/status"
  33. )
  34. var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
  35. e.SetMaxDynamicTableSizeLimit(v)
  36. }
  37. type itemNode struct {
  38. it interface{}
  39. next *itemNode
  40. }
  41. type itemList struct {
  42. head *itemNode
  43. tail *itemNode
  44. }
  45. func (il *itemList) enqueue(i interface{}) {
  46. n := &itemNode{it: i}
  47. if il.tail == nil {
  48. il.head, il.tail = n, n
  49. return
  50. }
  51. il.tail.next = n
  52. il.tail = n
  53. }
  54. // peek returns the first item in the list without removing it from the
  55. // list.
  56. func (il *itemList) peek() interface{} {
  57. return il.head.it
  58. }
  59. func (il *itemList) dequeue() interface{} {
  60. if il.head == nil {
  61. return nil
  62. }
  63. i := il.head.it
  64. il.head = il.head.next
  65. if il.head == nil {
  66. il.tail = nil
  67. }
  68. return i
  69. }
  70. func (il *itemList) dequeueAll() *itemNode {
  71. h := il.head
  72. il.head, il.tail = nil, nil
  73. return h
  74. }
  75. func (il *itemList) isEmpty() bool {
  76. return il.head == nil
  77. }
  78. // The following defines various control items which could flow through
  79. // the control buffer of transport. They represent different aspects of
  80. // control tasks, e.g., flow control, settings, streaming resetting, etc.
  81. // maxQueuedTransportResponseFrames is the most queued "transport response"
  82. // frames we will buffer before preventing new reads from occurring on the
  83. // transport. These are control frames sent in response to client requests,
  84. // such as RST_STREAM due to bad headers or settings acks.
  85. const maxQueuedTransportResponseFrames = 50
  86. type cbItem interface {
  87. isTransportResponseFrame() bool
  88. }
  89. // registerStream is used to register an incoming stream with loopy writer.
  90. type registerStream struct {
  91. streamID uint32
  92. wq *writeQuota
  93. }
  94. func (*registerStream) isTransportResponseFrame() bool { return false }
  95. // headerFrame is also used to register stream on the client-side.
  96. type headerFrame struct {
  97. streamID uint32
  98. hf []hpack.HeaderField
  99. endStream bool // Valid on server side.
  100. initStream func(uint32) error // Used only on the client side.
  101. onWrite func()
  102. wq *writeQuota // write quota for the stream created.
  103. cleanup *cleanupStream // Valid on the server side.
  104. onOrphaned func(error) // Valid on client-side
  105. }
  106. func (h *headerFrame) isTransportResponseFrame() bool {
  107. return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
  108. }
  109. type cleanupStream struct {
  110. streamID uint32
  111. rst bool
  112. rstCode http2.ErrCode
  113. onWrite func()
  114. }
  115. func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
  116. type earlyAbortStream struct {
  117. httpStatus uint32
  118. streamID uint32
  119. contentSubtype string
  120. status *status.Status
  121. rst bool
  122. }
  123. func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
  124. type dataFrame struct {
  125. streamID uint32
  126. endStream bool
  127. h []byte
  128. d []byte
  129. // onEachWrite is called every time
  130. // a part of d is written out.
  131. onEachWrite func()
  132. }
  133. func (*dataFrame) isTransportResponseFrame() bool { return false }
  134. type incomingWindowUpdate struct {
  135. streamID uint32
  136. increment uint32
  137. }
  138. func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
  139. type outgoingWindowUpdate struct {
  140. streamID uint32
  141. increment uint32
  142. }
  143. func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
  144. return false // window updates are throttled by thresholds
  145. }
  146. type incomingSettings struct {
  147. ss []http2.Setting
  148. }
  149. func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
  150. type outgoingSettings struct {
  151. ss []http2.Setting
  152. }
  153. func (*outgoingSettings) isTransportResponseFrame() bool { return false }
  154. type incomingGoAway struct {
  155. }
  156. func (*incomingGoAway) isTransportResponseFrame() bool { return false }
  157. type goAway struct {
  158. code http2.ErrCode
  159. debugData []byte
  160. headsUp bool
  161. closeConn error // if set, loopyWriter will exit, resulting in conn closure
  162. }
  163. func (*goAway) isTransportResponseFrame() bool { return false }
  164. type ping struct {
  165. ack bool
  166. data [8]byte
  167. }
  168. func (*ping) isTransportResponseFrame() bool { return true }
  169. type outFlowControlSizeRequest struct {
  170. resp chan uint32
  171. }
  172. func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
  173. // closeConnection is an instruction to tell the loopy writer to flush the
  174. // framer and exit, which will cause the transport's connection to be closed
  175. // (by the client or server). The transport itself will close after the reader
  176. // encounters the EOF caused by the connection closure.
  177. type closeConnection struct{}
  178. func (closeConnection) isTransportResponseFrame() bool { return false }
  179. type outStreamState int
  180. const (
  181. active outStreamState = iota
  182. empty
  183. waitingOnStreamQuota
  184. )
  185. type outStream struct {
  186. id uint32
  187. state outStreamState
  188. itl *itemList
  189. bytesOutStanding int
  190. wq *writeQuota
  191. next *outStream
  192. prev *outStream
  193. }
  194. func (s *outStream) deleteSelf() {
  195. if s.prev != nil {
  196. s.prev.next = s.next
  197. }
  198. if s.next != nil {
  199. s.next.prev = s.prev
  200. }
  201. s.next, s.prev = nil, nil
  202. }
  203. type outStreamList struct {
  204. // Following are sentinel objects that mark the
  205. // beginning and end of the list. They do not
  206. // contain any item lists. All valid objects are
  207. // inserted in between them.
  208. // This is needed so that an outStream object can
  209. // deleteSelf() in O(1) time without knowing which
  210. // list it belongs to.
  211. head *outStream
  212. tail *outStream
  213. }
  214. func newOutStreamList() *outStreamList {
  215. head, tail := new(outStream), new(outStream)
  216. head.next = tail
  217. tail.prev = head
  218. return &outStreamList{
  219. head: head,
  220. tail: tail,
  221. }
  222. }
  223. func (l *outStreamList) enqueue(s *outStream) {
  224. e := l.tail.prev
  225. e.next = s
  226. s.prev = e
  227. s.next = l.tail
  228. l.tail.prev = s
  229. }
  230. // remove from the beginning of the list.
  231. func (l *outStreamList) dequeue() *outStream {
  232. b := l.head.next
  233. if b == l.tail {
  234. return nil
  235. }
  236. b.deleteSelf()
  237. return b
  238. }
  239. // controlBuffer is a way to pass information to loopy.
  240. // Information is passed as specific struct types called control frames.
  241. // A control frame not only represents data, messages or headers to be sent out
  242. // but can also be used to instruct loopy to update its internal state.
  243. // It shouldn't be confused with an HTTP2 frame, although some of the control frames
  244. // like dataFrame and headerFrame do go out on wire as HTTP2 frames.
  245. type controlBuffer struct {
  246. ch chan struct{}
  247. done <-chan struct{}
  248. mu sync.Mutex
  249. consumerWaiting bool
  250. list *itemList
  251. err error
  252. // transportResponseFrames counts the number of queued items that represent
  253. // the response of an action initiated by the peer. trfChan is created
  254. // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
  255. // closed and nilled when transportResponseFrames drops below the
  256. // threshold. Both fields are protected by mu.
  257. transportResponseFrames int
  258. trfChan atomic.Value // chan struct{}
  259. }
  260. func newControlBuffer(done <-chan struct{}) *controlBuffer {
  261. return &controlBuffer{
  262. ch: make(chan struct{}, 1),
  263. list: &itemList{},
  264. done: done,
  265. }
  266. }
  267. // throttle blocks if there are too many incomingSettings/cleanupStreams in the
  268. // controlbuf.
  269. func (c *controlBuffer) throttle() {
  270. ch, _ := c.trfChan.Load().(chan struct{})
  271. if ch != nil {
  272. select {
  273. case <-ch:
  274. case <-c.done:
  275. }
  276. }
  277. }
  278. func (c *controlBuffer) put(it cbItem) error {
  279. _, err := c.executeAndPut(nil, it)
  280. return err
  281. }
  282. func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
  283. var wakeUp bool
  284. c.mu.Lock()
  285. if c.err != nil {
  286. c.mu.Unlock()
  287. return false, c.err
  288. }
  289. if f != nil {
  290. if !f(it) { // f wasn't successful
  291. c.mu.Unlock()
  292. return false, nil
  293. }
  294. }
  295. if c.consumerWaiting {
  296. wakeUp = true
  297. c.consumerWaiting = false
  298. }
  299. c.list.enqueue(it)
  300. if it.isTransportResponseFrame() {
  301. c.transportResponseFrames++
  302. if c.transportResponseFrames == maxQueuedTransportResponseFrames {
  303. // We are adding the frame that puts us over the threshold; create
  304. // a throttling channel.
  305. c.trfChan.Store(make(chan struct{}))
  306. }
  307. }
  308. c.mu.Unlock()
  309. if wakeUp {
  310. select {
  311. case c.ch <- struct{}{}:
  312. default:
  313. }
  314. }
  315. return true, nil
  316. }
  317. // Note argument f should never be nil.
  318. func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
  319. c.mu.Lock()
  320. if c.err != nil {
  321. c.mu.Unlock()
  322. return false, c.err
  323. }
  324. if !f(it) { // f wasn't successful
  325. c.mu.Unlock()
  326. return false, nil
  327. }
  328. c.mu.Unlock()
  329. return true, nil
  330. }
  331. func (c *controlBuffer) get(block bool) (interface{}, error) {
  332. for {
  333. c.mu.Lock()
  334. if c.err != nil {
  335. c.mu.Unlock()
  336. return nil, c.err
  337. }
  338. if !c.list.isEmpty() {
  339. h := c.list.dequeue().(cbItem)
  340. if h.isTransportResponseFrame() {
  341. if c.transportResponseFrames == maxQueuedTransportResponseFrames {
  342. // We are removing the frame that put us over the
  343. // threshold; close and clear the throttling channel.
  344. ch := c.trfChan.Load().(chan struct{})
  345. close(ch)
  346. c.trfChan.Store((chan struct{})(nil))
  347. }
  348. c.transportResponseFrames--
  349. }
  350. c.mu.Unlock()
  351. return h, nil
  352. }
  353. if !block {
  354. c.mu.Unlock()
  355. return nil, nil
  356. }
  357. c.consumerWaiting = true
  358. c.mu.Unlock()
  359. select {
  360. case <-c.ch:
  361. case <-c.done:
  362. return nil, errors.New("transport closed by client")
  363. }
  364. }
  365. }
  366. func (c *controlBuffer) finish() {
  367. c.mu.Lock()
  368. if c.err != nil {
  369. c.mu.Unlock()
  370. return
  371. }
  372. c.err = ErrConnClosing
  373. // There may be headers for streams in the control buffer.
  374. // These streams need to be cleaned out since the transport
  375. // is still not aware of these yet.
  376. for head := c.list.dequeueAll(); head != nil; head = head.next {
  377. hdr, ok := head.it.(*headerFrame)
  378. if !ok {
  379. continue
  380. }
  381. if hdr.onOrphaned != nil { // It will be nil on the server-side.
  382. hdr.onOrphaned(ErrConnClosing)
  383. }
  384. }
  385. // In case throttle() is currently in flight, it needs to be unblocked.
  386. // Otherwise, the transport may not close, since the transport is closed by
  387. // the reader encountering the connection error.
  388. ch, _ := c.trfChan.Load().(chan struct{})
  389. if ch != nil {
  390. close(ch)
  391. }
  392. c.trfChan.Store((chan struct{})(nil))
  393. c.mu.Unlock()
  394. }
  395. type side int
  396. const (
  397. clientSide side = iota
  398. serverSide
  399. )
  400. // Loopy receives frames from the control buffer.
  401. // Each frame is handled individually; most of the work done by loopy goes
  402. // into handling data frames. Loopy maintains a queue of active streams, and each
  403. // stream maintains a queue of data frames; as loopy receives data frames
  404. // it gets added to the queue of the relevant stream.
  405. // Loopy goes over this list of active streams by processing one node every iteration,
  406. // thereby closely resemebling to a round-robin scheduling over all streams. While
  407. // processing a stream, loopy writes out data bytes from this stream capped by the min
  408. // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
  409. type loopyWriter struct {
  410. side side
  411. cbuf *controlBuffer
  412. sendQuota uint32
  413. oiws uint32 // outbound initial window size.
  414. // estdStreams is map of all established streams that are not cleaned-up yet.
  415. // On client-side, this is all streams whose headers were sent out.
  416. // On server-side, this is all streams whose headers were received.
  417. estdStreams map[uint32]*outStream // Established streams.
  418. // activeStreams is a linked-list of all streams that have data to send and some
  419. // stream-level flow control quota.
  420. // Each of these streams internally have a list of data items(and perhaps trailers
  421. // on the server-side) to be sent out.
  422. activeStreams *outStreamList
  423. framer *framer
  424. hBuf *bytes.Buffer // The buffer for HPACK encoding.
  425. hEnc *hpack.Encoder // HPACK encoder.
  426. bdpEst *bdpEstimator
  427. draining bool
  428. conn net.Conn
  429. logger *grpclog.PrefixLogger
  430. // Side-specific handlers
  431. ssGoAwayHandler func(*goAway) (bool, error)
  432. }
  433. func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
  434. var buf bytes.Buffer
  435. l := &loopyWriter{
  436. side: s,
  437. cbuf: cbuf,
  438. sendQuota: defaultWindowSize,
  439. oiws: defaultWindowSize,
  440. estdStreams: make(map[uint32]*outStream),
  441. activeStreams: newOutStreamList(),
  442. framer: fr,
  443. hBuf: &buf,
  444. hEnc: hpack.NewEncoder(&buf),
  445. bdpEst: bdpEst,
  446. conn: conn,
  447. logger: logger,
  448. }
  449. return l
  450. }
  451. const minBatchSize = 1000
  452. // run should be run in a separate goroutine.
  453. // It reads control frames from controlBuf and processes them by:
  454. // 1. Updating loopy's internal state, or/and
  455. // 2. Writing out HTTP2 frames on the wire.
  456. //
  457. // Loopy keeps all active streams with data to send in a linked-list.
  458. // All streams in the activeStreams linked-list must have both:
  459. // 1. Data to send, and
  460. // 2. Stream level flow control quota available.
  461. //
  462. // In each iteration of run loop, other than processing the incoming control
  463. // frame, loopy calls processData, which processes one node from the
  464. // activeStreams linked-list. This results in writing of HTTP2 frames into an
  465. // underlying write buffer. When there's no more control frames to read from
  466. // controlBuf, loopy flushes the write buffer. As an optimization, to increase
  467. // the batch size for each flush, loopy yields the processor, once if the batch
  468. // size is too low to give stream goroutines a chance to fill it up.
  469. //
  470. // Upon exiting, if the error causing the exit is not an I/O error, run()
  471. // flushes and closes the underlying connection. Otherwise, the connection is
  472. // left open to allow the I/O error to be encountered by the reader instead.
  473. func (l *loopyWriter) run() (err error) {
  474. defer func() {
  475. if l.logger.V(logLevel) {
  476. l.logger.Infof("loopyWriter exiting with error: %v", err)
  477. }
  478. if !isIOError(err) {
  479. l.framer.writer.Flush()
  480. l.conn.Close()
  481. }
  482. l.cbuf.finish()
  483. }()
  484. for {
  485. it, err := l.cbuf.get(true)
  486. if err != nil {
  487. return err
  488. }
  489. if err = l.handle(it); err != nil {
  490. return err
  491. }
  492. if _, err = l.processData(); err != nil {
  493. return err
  494. }
  495. gosched := true
  496. hasdata:
  497. for {
  498. it, err := l.cbuf.get(false)
  499. if err != nil {
  500. return err
  501. }
  502. if it != nil {
  503. if err = l.handle(it); err != nil {
  504. return err
  505. }
  506. if _, err = l.processData(); err != nil {
  507. return err
  508. }
  509. continue hasdata
  510. }
  511. isEmpty, err := l.processData()
  512. if err != nil {
  513. return err
  514. }
  515. if !isEmpty {
  516. continue hasdata
  517. }
  518. if gosched {
  519. gosched = false
  520. if l.framer.writer.offset < minBatchSize {
  521. runtime.Gosched()
  522. continue hasdata
  523. }
  524. }
  525. l.framer.writer.Flush()
  526. break hasdata
  527. }
  528. }
  529. }
  530. func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
  531. return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
  532. }
  533. func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
  534. // Otherwise update the quota.
  535. if w.streamID == 0 {
  536. l.sendQuota += w.increment
  537. return
  538. }
  539. // Find the stream and update it.
  540. if str, ok := l.estdStreams[w.streamID]; ok {
  541. str.bytesOutStanding -= int(w.increment)
  542. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
  543. str.state = active
  544. l.activeStreams.enqueue(str)
  545. return
  546. }
  547. }
  548. }
  549. func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
  550. return l.framer.fr.WriteSettings(s.ss...)
  551. }
  552. func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
  553. l.applySettings(s.ss)
  554. return l.framer.fr.WriteSettingsAck()
  555. }
  556. func (l *loopyWriter) registerStreamHandler(h *registerStream) {
  557. str := &outStream{
  558. id: h.streamID,
  559. state: empty,
  560. itl: &itemList{},
  561. wq: h.wq,
  562. }
  563. l.estdStreams[h.streamID] = str
  564. }
  565. func (l *loopyWriter) headerHandler(h *headerFrame) error {
  566. if l.side == serverSide {
  567. str, ok := l.estdStreams[h.streamID]
  568. if !ok {
  569. if l.logger.V(logLevel) {
  570. l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
  571. }
  572. return nil
  573. }
  574. // Case 1.A: Server is responding back with headers.
  575. if !h.endStream {
  576. return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
  577. }
  578. // else: Case 1.B: Server wants to close stream.
  579. if str.state != empty { // either active or waiting on stream quota.
  580. // add it str's list of items.
  581. str.itl.enqueue(h)
  582. return nil
  583. }
  584. if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
  585. return err
  586. }
  587. return l.cleanupStreamHandler(h.cleanup)
  588. }
  589. // Case 2: Client wants to originate stream.
  590. str := &outStream{
  591. id: h.streamID,
  592. state: empty,
  593. itl: &itemList{},
  594. wq: h.wq,
  595. }
  596. return l.originateStream(str, h)
  597. }
  598. func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
  599. // l.draining is set when handling GoAway. In which case, we want to avoid
  600. // creating new streams.
  601. if l.draining {
  602. // TODO: provide a better error with the reason we are in draining.
  603. hdr.onOrphaned(errStreamDrain)
  604. return nil
  605. }
  606. if err := hdr.initStream(str.id); err != nil {
  607. return err
  608. }
  609. if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
  610. return err
  611. }
  612. l.estdStreams[str.id] = str
  613. return nil
  614. }
  615. func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
  616. if onWrite != nil {
  617. onWrite()
  618. }
  619. l.hBuf.Reset()
  620. for _, f := range hf {
  621. if err := l.hEnc.WriteField(f); err != nil {
  622. if l.logger.V(logLevel) {
  623. l.logger.Warningf("Encountered error while encoding headers: %v", err)
  624. }
  625. }
  626. }
  627. var (
  628. err error
  629. endHeaders, first bool
  630. )
  631. first = true
  632. for !endHeaders {
  633. size := l.hBuf.Len()
  634. if size > http2MaxFrameLen {
  635. size = http2MaxFrameLen
  636. } else {
  637. endHeaders = true
  638. }
  639. if first {
  640. first = false
  641. err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
  642. StreamID: streamID,
  643. BlockFragment: l.hBuf.Next(size),
  644. EndStream: endStream,
  645. EndHeaders: endHeaders,
  646. })
  647. } else {
  648. err = l.framer.fr.WriteContinuation(
  649. streamID,
  650. endHeaders,
  651. l.hBuf.Next(size),
  652. )
  653. }
  654. if err != nil {
  655. return err
  656. }
  657. }
  658. return nil
  659. }
  660. func (l *loopyWriter) preprocessData(df *dataFrame) {
  661. str, ok := l.estdStreams[df.streamID]
  662. if !ok {
  663. return
  664. }
  665. // If we got data for a stream it means that
  666. // stream was originated and the headers were sent out.
  667. str.itl.enqueue(df)
  668. if str.state == empty {
  669. str.state = active
  670. l.activeStreams.enqueue(str)
  671. }
  672. }
  673. func (l *loopyWriter) pingHandler(p *ping) error {
  674. if !p.ack {
  675. l.bdpEst.timesnap(p.data)
  676. }
  677. return l.framer.fr.WritePing(p.ack, p.data)
  678. }
  679. func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
  680. o.resp <- l.sendQuota
  681. }
  682. func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
  683. c.onWrite()
  684. if str, ok := l.estdStreams[c.streamID]; ok {
  685. // On the server side it could be a trailers-only response or
  686. // a RST_STREAM before stream initialization thus the stream might
  687. // not be established yet.
  688. delete(l.estdStreams, c.streamID)
  689. str.deleteSelf()
  690. }
  691. if c.rst { // If RST_STREAM needs to be sent.
  692. if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
  693. return err
  694. }
  695. }
  696. if l.draining && len(l.estdStreams) == 0 {
  697. // Flush and close the connection; we are done with it.
  698. return errors.New("finished processing active streams while in draining mode")
  699. }
  700. return nil
  701. }
  702. func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
  703. if l.side == clientSide {
  704. return errors.New("earlyAbortStream not handled on client")
  705. }
  706. // In case the caller forgets to set the http status, default to 200.
  707. if eas.httpStatus == 0 {
  708. eas.httpStatus = 200
  709. }
  710. headerFields := []hpack.HeaderField{
  711. {Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
  712. {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
  713. {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
  714. {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
  715. }
  716. if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
  717. return err
  718. }
  719. if eas.rst {
  720. if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
  721. return err
  722. }
  723. }
  724. return nil
  725. }
  726. func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
  727. if l.side == clientSide {
  728. l.draining = true
  729. if len(l.estdStreams) == 0 {
  730. // Flush and close the connection; we are done with it.
  731. return errors.New("received GOAWAY with no active streams")
  732. }
  733. }
  734. return nil
  735. }
  736. func (l *loopyWriter) goAwayHandler(g *goAway) error {
  737. // Handling of outgoing GoAway is very specific to side.
  738. if l.ssGoAwayHandler != nil {
  739. draining, err := l.ssGoAwayHandler(g)
  740. if err != nil {
  741. return err
  742. }
  743. l.draining = draining
  744. }
  745. return nil
  746. }
  747. func (l *loopyWriter) handle(i interface{}) error {
  748. switch i := i.(type) {
  749. case *incomingWindowUpdate:
  750. l.incomingWindowUpdateHandler(i)
  751. case *outgoingWindowUpdate:
  752. return l.outgoingWindowUpdateHandler(i)
  753. case *incomingSettings:
  754. return l.incomingSettingsHandler(i)
  755. case *outgoingSettings:
  756. return l.outgoingSettingsHandler(i)
  757. case *headerFrame:
  758. return l.headerHandler(i)
  759. case *registerStream:
  760. l.registerStreamHandler(i)
  761. case *cleanupStream:
  762. return l.cleanupStreamHandler(i)
  763. case *earlyAbortStream:
  764. return l.earlyAbortStreamHandler(i)
  765. case *incomingGoAway:
  766. return l.incomingGoAwayHandler(i)
  767. case *dataFrame:
  768. l.preprocessData(i)
  769. case *ping:
  770. return l.pingHandler(i)
  771. case *goAway:
  772. return l.goAwayHandler(i)
  773. case *outFlowControlSizeRequest:
  774. l.outFlowControlSizeRequestHandler(i)
  775. case closeConnection:
  776. // Just return a non-I/O error and run() will flush and close the
  777. // connection.
  778. return ErrConnClosing
  779. default:
  780. return fmt.Errorf("transport: unknown control message type %T", i)
  781. }
  782. return nil
  783. }
  784. func (l *loopyWriter) applySettings(ss []http2.Setting) {
  785. for _, s := range ss {
  786. switch s.ID {
  787. case http2.SettingInitialWindowSize:
  788. o := l.oiws
  789. l.oiws = s.Val
  790. if o < l.oiws {
  791. // If the new limit is greater make all depleted streams active.
  792. for _, stream := range l.estdStreams {
  793. if stream.state == waitingOnStreamQuota {
  794. stream.state = active
  795. l.activeStreams.enqueue(stream)
  796. }
  797. }
  798. }
  799. case http2.SettingHeaderTableSize:
  800. updateHeaderTblSize(l.hEnc, s.Val)
  801. }
  802. }
  803. }
  804. // processData removes the first stream from active streams, writes out at most 16KB
  805. // of its data and then puts it at the end of activeStreams if there's still more data
  806. // to be sent and stream has some stream-level flow control.
  807. func (l *loopyWriter) processData() (bool, error) {
  808. if l.sendQuota == 0 {
  809. return true, nil
  810. }
  811. str := l.activeStreams.dequeue() // Remove the first stream.
  812. if str == nil {
  813. return true, nil
  814. }
  815. dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
  816. // A data item is represented by a dataFrame, since it later translates into
  817. // multiple HTTP2 data frames.
  818. // Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
  819. // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
  820. // maximum possible HTTP2 frame size.
  821. if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
  822. // Client sends out empty data frame with endStream = true
  823. if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
  824. return false, err
  825. }
  826. str.itl.dequeue() // remove the empty data item from stream
  827. if str.itl.isEmpty() {
  828. str.state = empty
  829. } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
  830. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
  831. return false, err
  832. }
  833. if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
  834. return false, err
  835. }
  836. } else {
  837. l.activeStreams.enqueue(str)
  838. }
  839. return false, nil
  840. }
  841. var (
  842. buf []byte
  843. )
  844. // Figure out the maximum size we can send
  845. maxSize := http2MaxFrameLen
  846. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
  847. str.state = waitingOnStreamQuota
  848. return false, nil
  849. } else if maxSize > strQuota {
  850. maxSize = strQuota
  851. }
  852. if maxSize > int(l.sendQuota) { // connection-level flow control.
  853. maxSize = int(l.sendQuota)
  854. }
  855. // Compute how much of the header and data we can send within quota and max frame length
  856. hSize := min(maxSize, len(dataItem.h))
  857. dSize := min(maxSize-hSize, len(dataItem.d))
  858. if hSize != 0 {
  859. if dSize == 0 {
  860. buf = dataItem.h
  861. } else {
  862. // We can add some data to grpc message header to distribute bytes more equally across frames.
  863. // Copy on the stack to avoid generating garbage
  864. var localBuf [http2MaxFrameLen]byte
  865. copy(localBuf[:hSize], dataItem.h)
  866. copy(localBuf[hSize:], dataItem.d[:dSize])
  867. buf = localBuf[:hSize+dSize]
  868. }
  869. } else {
  870. buf = dataItem.d
  871. }
  872. size := hSize + dSize
  873. // Now that outgoing flow controls are checked we can replenish str's write quota
  874. str.wq.replenish(size)
  875. var endStream bool
  876. // If this is the last data message on this stream and all of it can be written in this iteration.
  877. if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
  878. endStream = true
  879. }
  880. if dataItem.onEachWrite != nil {
  881. dataItem.onEachWrite()
  882. }
  883. if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
  884. return false, err
  885. }
  886. str.bytesOutStanding += size
  887. l.sendQuota -= uint32(size)
  888. dataItem.h = dataItem.h[hSize:]
  889. dataItem.d = dataItem.d[dSize:]
  890. if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
  891. str.itl.dequeue()
  892. }
  893. if str.itl.isEmpty() {
  894. str.state = empty
  895. } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
  896. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
  897. return false, err
  898. }
  899. if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
  900. return false, err
  901. }
  902. } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
  903. str.state = waitingOnStreamQuota
  904. } else { // Otherwise add it back to the list of active streams.
  905. l.activeStreams.enqueue(str)
  906. }
  907. return false, nil
  908. }
  909. func min(a, b int) int {
  910. if a < b {
  911. return a
  912. }
  913. return b
  914. }