transport.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package transport defines and implements message oriented communication
  19. // channel to complete various transactions (e.g., an RPC). It is meant for
  20. // grpc-internal usage and is not intended to be imported directly by users.
  21. package transport
  22. import (
  23. "bytes"
  24. "context"
  25. "errors"
  26. "fmt"
  27. "io"
  28. "net"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/credentials"
  34. "google.golang.org/grpc/internal/channelz"
  35. "google.golang.org/grpc/keepalive"
  36. "google.golang.org/grpc/metadata"
  37. "google.golang.org/grpc/resolver"
  38. "google.golang.org/grpc/stats"
  39. "google.golang.org/grpc/status"
  40. "google.golang.org/grpc/tap"
  41. )
  42. // ErrNoHeaders is used as a signal that a trailers only response was received,
  43. // and is not a real error.
  44. var ErrNoHeaders = errors.New("stream has no headers")
  45. const logLevel = 2
  46. type bufferPool struct {
  47. pool sync.Pool
  48. }
  49. func newBufferPool() *bufferPool {
  50. return &bufferPool{
  51. pool: sync.Pool{
  52. New: func() interface{} {
  53. return new(bytes.Buffer)
  54. },
  55. },
  56. }
  57. }
  58. func (p *bufferPool) get() *bytes.Buffer {
  59. return p.pool.Get().(*bytes.Buffer)
  60. }
  61. func (p *bufferPool) put(b *bytes.Buffer) {
  62. p.pool.Put(b)
  63. }
  64. // recvMsg represents the received msg from the transport. All transport
  65. // protocol specific info has been removed.
  66. type recvMsg struct {
  67. buffer *bytes.Buffer
  68. // nil: received some data
  69. // io.EOF: stream is completed. data is nil.
  70. // other non-nil error: transport failure. data is nil.
  71. err error
  72. }
  73. // recvBuffer is an unbounded channel of recvMsg structs.
  74. //
  75. // Note: recvBuffer differs from buffer.Unbounded only in the fact that it
  76. // holds a channel of recvMsg structs instead of objects implementing "item"
  77. // interface. recvBuffer is written to much more often and using strict recvMsg
  78. // structs helps avoid allocation in "recvBuffer.put"
  79. type recvBuffer struct {
  80. c chan recvMsg
  81. mu sync.Mutex
  82. backlog []recvMsg
  83. err error
  84. }
  85. func newRecvBuffer() *recvBuffer {
  86. b := &recvBuffer{
  87. c: make(chan recvMsg, 1),
  88. }
  89. return b
  90. }
  91. func (b *recvBuffer) put(r recvMsg) {
  92. b.mu.Lock()
  93. if b.err != nil {
  94. b.mu.Unlock()
  95. // An error had occurred earlier, don't accept more
  96. // data or errors.
  97. return
  98. }
  99. b.err = r.err
  100. if len(b.backlog) == 0 {
  101. select {
  102. case b.c <- r:
  103. b.mu.Unlock()
  104. return
  105. default:
  106. }
  107. }
  108. b.backlog = append(b.backlog, r)
  109. b.mu.Unlock()
  110. }
  111. func (b *recvBuffer) load() {
  112. b.mu.Lock()
  113. if len(b.backlog) > 0 {
  114. select {
  115. case b.c <- b.backlog[0]:
  116. b.backlog[0] = recvMsg{}
  117. b.backlog = b.backlog[1:]
  118. default:
  119. }
  120. }
  121. b.mu.Unlock()
  122. }
  123. // get returns the channel that receives a recvMsg in the buffer.
  124. //
  125. // Upon receipt of a recvMsg, the caller should call load to send another
  126. // recvMsg onto the channel if there is any.
  127. func (b *recvBuffer) get() <-chan recvMsg {
  128. return b.c
  129. }
  130. // recvBufferReader implements io.Reader interface to read the data from
  131. // recvBuffer.
  132. type recvBufferReader struct {
  133. closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
  134. ctx context.Context
  135. ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
  136. recv *recvBuffer
  137. last *bytes.Buffer // Stores the remaining data in the previous calls.
  138. err error
  139. freeBuffer func(*bytes.Buffer)
  140. }
  141. // Read reads the next len(p) bytes from last. If last is drained, it tries to
  142. // read additional data from recv. It blocks if there no additional data available
  143. // in recv. If Read returns any non-nil error, it will continue to return that error.
  144. func (r *recvBufferReader) Read(p []byte) (n int, err error) {
  145. if r.err != nil {
  146. return 0, r.err
  147. }
  148. if r.last != nil {
  149. // Read remaining data left in last call.
  150. copied, _ := r.last.Read(p)
  151. if r.last.Len() == 0 {
  152. r.freeBuffer(r.last)
  153. r.last = nil
  154. }
  155. return copied, nil
  156. }
  157. if r.closeStream != nil {
  158. n, r.err = r.readClient(p)
  159. } else {
  160. n, r.err = r.read(p)
  161. }
  162. return n, r.err
  163. }
  164. func (r *recvBufferReader) read(p []byte) (n int, err error) {
  165. select {
  166. case <-r.ctxDone:
  167. return 0, ContextErr(r.ctx.Err())
  168. case m := <-r.recv.get():
  169. return r.readAdditional(m, p)
  170. }
  171. }
  172. func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
  173. // If the context is canceled, then closes the stream with nil metadata.
  174. // closeStream writes its error parameter to r.recv as a recvMsg.
  175. // r.readAdditional acts on that message and returns the necessary error.
  176. select {
  177. case <-r.ctxDone:
  178. // Note that this adds the ctx error to the end of recv buffer, and
  179. // reads from the head. This will delay the error until recv buffer is
  180. // empty, thus will delay ctx cancellation in Recv().
  181. //
  182. // It's done this way to fix a race between ctx cancel and trailer. The
  183. // race was, stream.Recv() may return ctx error if ctxDone wins the
  184. // race, but stream.Trailer() may return a non-nil md because the stream
  185. // was not marked as done when trailer is received. This closeStream
  186. // call will mark stream as done, thus fix the race.
  187. //
  188. // TODO: delaying ctx error seems like a unnecessary side effect. What
  189. // we really want is to mark the stream as done, and return ctx error
  190. // faster.
  191. r.closeStream(ContextErr(r.ctx.Err()))
  192. m := <-r.recv.get()
  193. return r.readAdditional(m, p)
  194. case m := <-r.recv.get():
  195. return r.readAdditional(m, p)
  196. }
  197. }
  198. func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
  199. r.recv.load()
  200. if m.err != nil {
  201. return 0, m.err
  202. }
  203. copied, _ := m.buffer.Read(p)
  204. if m.buffer.Len() == 0 {
  205. r.freeBuffer(m.buffer)
  206. r.last = nil
  207. } else {
  208. r.last = m.buffer
  209. }
  210. return copied, nil
  211. }
  212. type streamState uint32
  213. const (
  214. streamActive streamState = iota
  215. streamWriteDone // EndStream sent
  216. streamReadDone // EndStream received
  217. streamDone // the entire stream is finished.
  218. )
  219. // Stream represents an RPC in the transport layer.
  220. type Stream struct {
  221. id uint32
  222. st ServerTransport // nil for client side Stream
  223. ct *http2Client // nil for server side Stream
  224. ctx context.Context // the associated context of the stream
  225. cancel context.CancelFunc // always nil for client side Stream
  226. done chan struct{} // closed at the end of stream to unblock writers. On the client side.
  227. doneFunc func() // invoked at the end of stream on client side.
  228. ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
  229. method string // the associated RPC method of the stream
  230. recvCompress string
  231. sendCompress string
  232. buf *recvBuffer
  233. trReader io.Reader
  234. fc *inFlow
  235. wq *writeQuota
  236. // Holds compressor names passed in grpc-accept-encoding metadata from the
  237. // client. This is empty for the client side stream.
  238. clientAdvertisedCompressors string
  239. // Callback to state application's intentions to read data. This
  240. // is used to adjust flow control, if needed.
  241. requestRead func(int)
  242. headerChan chan struct{} // closed to indicate the end of header metadata.
  243. headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
  244. // headerValid indicates whether a valid header was received. Only
  245. // meaningful after headerChan is closed (always call waitOnHeader() before
  246. // reading its value). Not valid on server side.
  247. headerValid bool
  248. // hdrMu protects header and trailer metadata on the server-side.
  249. hdrMu sync.Mutex
  250. // On client side, header keeps the received header metadata.
  251. //
  252. // On server side, header keeps the header set by SetHeader(). The complete
  253. // header will merged into this after t.WriteHeader() is called.
  254. header metadata.MD
  255. trailer metadata.MD // the key-value map of trailer metadata.
  256. noHeaders bool // set if the client never received headers (set only after the stream is done).
  257. // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
  258. headerSent uint32
  259. state streamState
  260. // On client-side it is the status error received from the server.
  261. // On server-side it is unused.
  262. status *status.Status
  263. bytesReceived uint32 // indicates whether any bytes have been received on this stream
  264. unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
  265. // contentSubtype is the content-subtype for requests.
  266. // this must be lowercase or the behavior is undefined.
  267. contentSubtype string
  268. }
  269. // isHeaderSent is only valid on the server-side.
  270. func (s *Stream) isHeaderSent() bool {
  271. return atomic.LoadUint32(&s.headerSent) == 1
  272. }
  273. // updateHeaderSent updates headerSent and returns true
  274. // if it was alreay set. It is valid only on server-side.
  275. func (s *Stream) updateHeaderSent() bool {
  276. return atomic.SwapUint32(&s.headerSent, 1) == 1
  277. }
  278. func (s *Stream) swapState(st streamState) streamState {
  279. return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
  280. }
  281. func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
  282. return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
  283. }
  284. func (s *Stream) getState() streamState {
  285. return streamState(atomic.LoadUint32((*uint32)(&s.state)))
  286. }
  287. func (s *Stream) waitOnHeader() {
  288. if s.headerChan == nil {
  289. // On the server headerChan is always nil since a stream originates
  290. // only after having received headers.
  291. return
  292. }
  293. select {
  294. case <-s.ctx.Done():
  295. // Close the stream to prevent headers/trailers from changing after
  296. // this function returns.
  297. s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
  298. // headerChan could possibly not be closed yet if closeStream raced
  299. // with operateHeaders; wait until it is closed explicitly here.
  300. <-s.headerChan
  301. case <-s.headerChan:
  302. }
  303. }
  304. // RecvCompress returns the compression algorithm applied to the inbound
  305. // message. It is empty string if there is no compression applied.
  306. func (s *Stream) RecvCompress() string {
  307. s.waitOnHeader()
  308. return s.recvCompress
  309. }
  310. // SetSendCompress sets the compression algorithm to the stream.
  311. func (s *Stream) SetSendCompress(name string) error {
  312. if s.isHeaderSent() || s.getState() == streamDone {
  313. return errors.New("transport: set send compressor called after headers sent or stream done")
  314. }
  315. s.sendCompress = name
  316. return nil
  317. }
  318. // SendCompress returns the send compressor name.
  319. func (s *Stream) SendCompress() string {
  320. return s.sendCompress
  321. }
  322. // ClientAdvertisedCompressors returns the compressor names advertised by the
  323. // client via grpc-accept-encoding header.
  324. func (s *Stream) ClientAdvertisedCompressors() string {
  325. return s.clientAdvertisedCompressors
  326. }
  327. // Done returns a channel which is closed when it receives the final status
  328. // from the server.
  329. func (s *Stream) Done() <-chan struct{} {
  330. return s.done
  331. }
  332. // Header returns the header metadata of the stream.
  333. //
  334. // On client side, it acquires the key-value pairs of header metadata once it is
  335. // available. It blocks until i) the metadata is ready or ii) there is no header
  336. // metadata or iii) the stream is canceled/expired.
  337. //
  338. // On server side, it returns the out header after t.WriteHeader is called. It
  339. // does not block and must not be called until after WriteHeader.
  340. func (s *Stream) Header() (metadata.MD, error) {
  341. if s.headerChan == nil {
  342. // On server side, return the header in stream. It will be the out
  343. // header after t.WriteHeader is called.
  344. return s.header.Copy(), nil
  345. }
  346. s.waitOnHeader()
  347. if !s.headerValid {
  348. return nil, s.status.Err()
  349. }
  350. if s.noHeaders {
  351. return nil, ErrNoHeaders
  352. }
  353. return s.header.Copy(), nil
  354. }
  355. // TrailersOnly blocks until a header or trailers-only frame is received and
  356. // then returns true if the stream was trailers-only. If the stream ends
  357. // before headers are received, returns true, nil. Client-side only.
  358. func (s *Stream) TrailersOnly() bool {
  359. s.waitOnHeader()
  360. return s.noHeaders
  361. }
  362. // Trailer returns the cached trailer metedata. Note that if it is not called
  363. // after the entire stream is done, it could return an empty MD. Client
  364. // side only.
  365. // It can be safely read only after stream has ended that is either read
  366. // or write have returned io.EOF.
  367. func (s *Stream) Trailer() metadata.MD {
  368. c := s.trailer.Copy()
  369. return c
  370. }
  371. // ContentSubtype returns the content-subtype for a request. For example, a
  372. // content-subtype of "proto" will result in a content-type of
  373. // "application/grpc+proto". This will always be lowercase. See
  374. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
  375. // more details.
  376. func (s *Stream) ContentSubtype() string {
  377. return s.contentSubtype
  378. }
  379. // Context returns the context of the stream.
  380. func (s *Stream) Context() context.Context {
  381. return s.ctx
  382. }
  383. // Method returns the method for the stream.
  384. func (s *Stream) Method() string {
  385. return s.method
  386. }
  387. // Status returns the status received from the server.
  388. // Status can be read safely only after the stream has ended,
  389. // that is, after Done() is closed.
  390. func (s *Stream) Status() *status.Status {
  391. return s.status
  392. }
  393. // SetHeader sets the header metadata. This can be called multiple times.
  394. // Server side only.
  395. // This should not be called in parallel to other data writes.
  396. func (s *Stream) SetHeader(md metadata.MD) error {
  397. if md.Len() == 0 {
  398. return nil
  399. }
  400. if s.isHeaderSent() || s.getState() == streamDone {
  401. return ErrIllegalHeaderWrite
  402. }
  403. s.hdrMu.Lock()
  404. s.header = metadata.Join(s.header, md)
  405. s.hdrMu.Unlock()
  406. return nil
  407. }
  408. // SendHeader sends the given header metadata. The given metadata is
  409. // combined with any metadata set by previous calls to SetHeader and
  410. // then written to the transport stream.
  411. func (s *Stream) SendHeader(md metadata.MD) error {
  412. return s.st.WriteHeader(s, md)
  413. }
  414. // SetTrailer sets the trailer metadata which will be sent with the RPC status
  415. // by the server. This can be called multiple times. Server side only.
  416. // This should not be called parallel to other data writes.
  417. func (s *Stream) SetTrailer(md metadata.MD) error {
  418. if md.Len() == 0 {
  419. return nil
  420. }
  421. if s.getState() == streamDone {
  422. return ErrIllegalHeaderWrite
  423. }
  424. s.hdrMu.Lock()
  425. s.trailer = metadata.Join(s.trailer, md)
  426. s.hdrMu.Unlock()
  427. return nil
  428. }
  429. func (s *Stream) write(m recvMsg) {
  430. s.buf.put(m)
  431. }
  432. // Read reads all p bytes from the wire for this stream.
  433. func (s *Stream) Read(p []byte) (n int, err error) {
  434. // Don't request a read if there was an error earlier
  435. if er := s.trReader.(*transportReader).er; er != nil {
  436. return 0, er
  437. }
  438. s.requestRead(len(p))
  439. return io.ReadFull(s.trReader, p)
  440. }
  441. // tranportReader reads all the data available for this Stream from the transport and
  442. // passes them into the decoder, which converts them into a gRPC message stream.
  443. // The error is io.EOF when the stream is done or another non-nil error if
  444. // the stream broke.
  445. type transportReader struct {
  446. reader io.Reader
  447. // The handler to control the window update procedure for both this
  448. // particular stream and the associated transport.
  449. windowHandler func(int)
  450. er error
  451. }
  452. func (t *transportReader) Read(p []byte) (n int, err error) {
  453. n, err = t.reader.Read(p)
  454. if err != nil {
  455. t.er = err
  456. return
  457. }
  458. t.windowHandler(n)
  459. return
  460. }
  461. // BytesReceived indicates whether any bytes have been received on this stream.
  462. func (s *Stream) BytesReceived() bool {
  463. return atomic.LoadUint32(&s.bytesReceived) == 1
  464. }
  465. // Unprocessed indicates whether the server did not process this stream --
  466. // i.e. it sent a refused stream or GOAWAY including this stream ID.
  467. func (s *Stream) Unprocessed() bool {
  468. return atomic.LoadUint32(&s.unprocessed) == 1
  469. }
  470. // GoString is implemented by Stream so context.String() won't
  471. // race when printing %#v.
  472. func (s *Stream) GoString() string {
  473. return fmt.Sprintf("<stream: %p, %v>", s, s.method)
  474. }
  475. // state of transport
  476. type transportState int
  477. const (
  478. reachable transportState = iota
  479. closing
  480. draining
  481. )
  482. // ServerConfig consists of all the configurations to establish a server transport.
  483. type ServerConfig struct {
  484. MaxStreams uint32
  485. ConnectionTimeout time.Duration
  486. Credentials credentials.TransportCredentials
  487. InTapHandle tap.ServerInHandle
  488. StatsHandlers []stats.Handler
  489. KeepaliveParams keepalive.ServerParameters
  490. KeepalivePolicy keepalive.EnforcementPolicy
  491. InitialWindowSize int32
  492. InitialConnWindowSize int32
  493. WriteBufferSize int
  494. ReadBufferSize int
  495. ChannelzParentID *channelz.Identifier
  496. MaxHeaderListSize *uint32
  497. HeaderTableSize *uint32
  498. }
  499. // ConnectOptions covers all relevant options for communicating with the server.
  500. type ConnectOptions struct {
  501. // UserAgent is the application user agent.
  502. UserAgent string
  503. // Dialer specifies how to dial a network address.
  504. Dialer func(context.Context, string) (net.Conn, error)
  505. // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
  506. FailOnNonTempDialError bool
  507. // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
  508. PerRPCCredentials []credentials.PerRPCCredentials
  509. // TransportCredentials stores the Authenticator required to setup a client
  510. // connection. Only one of TransportCredentials and CredsBundle is non-nil.
  511. TransportCredentials credentials.TransportCredentials
  512. // CredsBundle is the credentials bundle to be used. Only one of
  513. // TransportCredentials and CredsBundle is non-nil.
  514. CredsBundle credentials.Bundle
  515. // KeepaliveParams stores the keepalive parameters.
  516. KeepaliveParams keepalive.ClientParameters
  517. // StatsHandlers stores the handler for stats.
  518. StatsHandlers []stats.Handler
  519. // InitialWindowSize sets the initial window size for a stream.
  520. InitialWindowSize int32
  521. // InitialConnWindowSize sets the initial window size for a connection.
  522. InitialConnWindowSize int32
  523. // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
  524. WriteBufferSize int
  525. // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
  526. ReadBufferSize int
  527. // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
  528. ChannelzParentID *channelz.Identifier
  529. // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
  530. MaxHeaderListSize *uint32
  531. // UseProxy specifies if a proxy should be used.
  532. UseProxy bool
  533. }
  534. // NewClientTransport establishes the transport with the required ConnectOptions
  535. // and returns it to the caller.
  536. func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
  537. return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
  538. }
  539. // Options provides additional hints and information for message
  540. // transmission.
  541. type Options struct {
  542. // Last indicates whether this write is the last piece for
  543. // this stream.
  544. Last bool
  545. }
  546. // CallHdr carries the information of a particular RPC.
  547. type CallHdr struct {
  548. // Host specifies the peer's host.
  549. Host string
  550. // Method specifies the operation to perform.
  551. Method string
  552. // SendCompress specifies the compression algorithm applied on
  553. // outbound message.
  554. SendCompress string
  555. // Creds specifies credentials.PerRPCCredentials for a call.
  556. Creds credentials.PerRPCCredentials
  557. // ContentSubtype specifies the content-subtype for a request. For example, a
  558. // content-subtype of "proto" will result in a content-type of
  559. // "application/grpc+proto". The value of ContentSubtype must be all
  560. // lowercase, otherwise the behavior is undefined. See
  561. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  562. // for more details.
  563. ContentSubtype string
  564. PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
  565. DoneFunc func() // called when the stream is finished
  566. }
  567. // ClientTransport is the common interface for all gRPC client-side transport
  568. // implementations.
  569. type ClientTransport interface {
  570. // Close tears down this transport. Once it returns, the transport
  571. // should not be accessed any more. The caller must make sure this
  572. // is called only once.
  573. Close(err error)
  574. // GracefulClose starts to tear down the transport: the transport will stop
  575. // accepting new RPCs and NewStream will return error. Once all streams are
  576. // finished, the transport will close.
  577. //
  578. // It does not block.
  579. GracefulClose()
  580. // Write sends the data for the given stream. A nil stream indicates
  581. // the write is to be performed on the transport as a whole.
  582. Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  583. // NewStream creates a Stream for an RPC.
  584. NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
  585. // CloseStream clears the footprint of a stream when the stream is
  586. // not needed any more. The err indicates the error incurred when
  587. // CloseStream is called. Must be called when a stream is finished
  588. // unless the associated transport is closing.
  589. CloseStream(stream *Stream, err error)
  590. // Error returns a channel that is closed when some I/O error
  591. // happens. Typically the caller should have a goroutine to monitor
  592. // this in order to take action (e.g., close the current transport
  593. // and create a new one) in error case. It should not return nil
  594. // once the transport is initiated.
  595. Error() <-chan struct{}
  596. // GoAway returns a channel that is closed when ClientTransport
  597. // receives the draining signal from the server (e.g., GOAWAY frame in
  598. // HTTP/2).
  599. GoAway() <-chan struct{}
  600. // GetGoAwayReason returns the reason why GoAway frame was received, along
  601. // with a human readable string with debug info.
  602. GetGoAwayReason() (GoAwayReason, string)
  603. // RemoteAddr returns the remote network address.
  604. RemoteAddr() net.Addr
  605. // IncrMsgSent increments the number of message sent through this transport.
  606. IncrMsgSent()
  607. // IncrMsgRecv increments the number of message received through this transport.
  608. IncrMsgRecv()
  609. }
  610. // ServerTransport is the common interface for all gRPC server-side transport
  611. // implementations.
  612. //
  613. // Methods may be called concurrently from multiple goroutines, but
  614. // Write methods for a given Stream will be called serially.
  615. type ServerTransport interface {
  616. // HandleStreams receives incoming streams using the given handler.
  617. HandleStreams(func(*Stream), func(context.Context, string) context.Context)
  618. // WriteHeader sends the header metadata for the given stream.
  619. // WriteHeader may not be called on all streams.
  620. WriteHeader(s *Stream, md metadata.MD) error
  621. // Write sends the data for the given stream.
  622. // Write may not be called on all streams.
  623. Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  624. // WriteStatus sends the status of a stream to the client. WriteStatus is
  625. // the final call made on a stream and always occurs.
  626. WriteStatus(s *Stream, st *status.Status) error
  627. // Close tears down the transport. Once it is called, the transport
  628. // should not be accessed any more. All the pending streams and their
  629. // handlers will be terminated asynchronously.
  630. Close(err error)
  631. // RemoteAddr returns the remote network address.
  632. RemoteAddr() net.Addr
  633. // Drain notifies the client this ServerTransport stops accepting new RPCs.
  634. Drain(debugData string)
  635. // IncrMsgSent increments the number of message sent through this transport.
  636. IncrMsgSent()
  637. // IncrMsgRecv increments the number of message received through this transport.
  638. IncrMsgRecv()
  639. }
  640. // connectionErrorf creates an ConnectionError with the specified error description.
  641. func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
  642. return ConnectionError{
  643. Desc: fmt.Sprintf(format, a...),
  644. temp: temp,
  645. err: e,
  646. }
  647. }
  648. // ConnectionError is an error that results in the termination of the
  649. // entire connection and the retry of all the active streams.
  650. type ConnectionError struct {
  651. Desc string
  652. temp bool
  653. err error
  654. }
  655. func (e ConnectionError) Error() string {
  656. return fmt.Sprintf("connection error: desc = %q", e.Desc)
  657. }
  658. // Temporary indicates if this connection error is temporary or fatal.
  659. func (e ConnectionError) Temporary() bool {
  660. return e.temp
  661. }
  662. // Origin returns the original error of this connection error.
  663. func (e ConnectionError) Origin() error {
  664. // Never return nil error here.
  665. // If the original error is nil, return itself.
  666. if e.err == nil {
  667. return e
  668. }
  669. return e.err
  670. }
  671. // Unwrap returns the original error of this connection error or nil when the
  672. // origin is nil.
  673. func (e ConnectionError) Unwrap() error {
  674. return e.err
  675. }
  676. var (
  677. // ErrConnClosing indicates that the transport is closing.
  678. ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
  679. // errStreamDrain indicates that the stream is rejected because the
  680. // connection is draining. This could be caused by goaway or balancer
  681. // removing the address.
  682. errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
  683. // errStreamDone is returned from write at the client side to indiacte application
  684. // layer of an error.
  685. errStreamDone = errors.New("the stream is done")
  686. // StatusGoAway indicates that the server sent a GOAWAY that included this
  687. // stream's ID in unprocessed RPCs.
  688. statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
  689. )
  690. // GoAwayReason contains the reason for the GoAway frame received.
  691. type GoAwayReason uint8
  692. const (
  693. // GoAwayInvalid indicates that no GoAway frame is received.
  694. GoAwayInvalid GoAwayReason = 0
  695. // GoAwayNoReason is the default value when GoAway frame is received.
  696. GoAwayNoReason GoAwayReason = 1
  697. // GoAwayTooManyPings indicates that a GoAway frame with
  698. // ErrCodeEnhanceYourCalm was received and that the debug data said
  699. // "too_many_pings".
  700. GoAwayTooManyPings GoAwayReason = 2
  701. )
  702. // channelzData is used to store channelz related data for http2Client and http2Server.
  703. // These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
  704. // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
  705. // Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
  706. type channelzData struct {
  707. kpCount int64
  708. // The number of streams that have started, including already finished ones.
  709. streamsStarted int64
  710. // Client side: The number of streams that have ended successfully by receiving
  711. // EoS bit set frame from server.
  712. // Server side: The number of streams that have ended successfully by sending
  713. // frame with EoS bit set.
  714. streamsSucceeded int64
  715. streamsFailed int64
  716. // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
  717. // instead of time.Time since it's more costly to atomically update time.Time variable than int64
  718. // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
  719. lastStreamCreatedTime int64
  720. msgSent int64
  721. msgRecv int64
  722. lastMsgSentTime int64
  723. lastMsgRecvTime int64
  724. }
  725. // ContextErr converts the error from context package into a status error.
  726. func ContextErr(err error) error {
  727. switch err {
  728. case context.DeadlineExceeded:
  729. return status.Error(codes.DeadlineExceeded, err.Error())
  730. case context.Canceled:
  731. return status.Error(codes.Canceled, err.Error())
  732. }
  733. return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
  734. }