http2_client.go 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "math"
  24. "net"
  25. "net/http"
  26. "path/filepath"
  27. "strconv"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "golang.org/x/net/http2"
  33. "golang.org/x/net/http2/hpack"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/internal/channelz"
  37. icredentials "google.golang.org/grpc/internal/credentials"
  38. "google.golang.org/grpc/internal/grpclog"
  39. "google.golang.org/grpc/internal/grpcsync"
  40. "google.golang.org/grpc/internal/grpcutil"
  41. imetadata "google.golang.org/grpc/internal/metadata"
  42. istatus "google.golang.org/grpc/internal/status"
  43. "google.golang.org/grpc/internal/syscall"
  44. "google.golang.org/grpc/internal/transport/networktype"
  45. "google.golang.org/grpc/keepalive"
  46. "google.golang.org/grpc/metadata"
  47. "google.golang.org/grpc/peer"
  48. "google.golang.org/grpc/resolver"
  49. "google.golang.org/grpc/stats"
  50. "google.golang.org/grpc/status"
  51. )
  52. // clientConnectionCounter counts the number of connections a client has
  53. // initiated (equal to the number of http2Clients created). Must be accessed
  54. // atomically.
  55. var clientConnectionCounter uint64
  56. // http2Client implements the ClientTransport interface with HTTP2.
  57. type http2Client struct {
  58. lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
  59. ctx context.Context
  60. cancel context.CancelFunc
  61. ctxDone <-chan struct{} // Cache the ctx.Done() chan.
  62. userAgent string
  63. // address contains the resolver returned address for this transport.
  64. // If the `ServerName` field is set, it takes precedence over `CallHdr.Host`
  65. // passed to `NewStream`, when determining the :authority header.
  66. address resolver.Address
  67. md metadata.MD
  68. conn net.Conn // underlying communication channel
  69. loopy *loopyWriter
  70. remoteAddr net.Addr
  71. localAddr net.Addr
  72. authInfo credentials.AuthInfo // auth info about the connection
  73. readerDone chan struct{} // sync point to enable testing.
  74. writerDone chan struct{} // sync point to enable testing.
  75. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  76. // that the server sent GoAway on this transport.
  77. goAway chan struct{}
  78. framer *framer
  79. // controlBuf delivers all the control related tasks (e.g., window
  80. // updates, reset streams, and various settings) to the controller.
  81. // Do not access controlBuf with mu held.
  82. controlBuf *controlBuffer
  83. fc *trInFlow
  84. // The scheme used: https if TLS is on, http otherwise.
  85. scheme string
  86. isSecure bool
  87. perRPCCreds []credentials.PerRPCCredentials
  88. kp keepalive.ClientParameters
  89. keepaliveEnabled bool
  90. statsHandlers []stats.Handler
  91. initialWindowSize int32
  92. // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
  93. maxSendHeaderListSize *uint32
  94. bdpEst *bdpEstimator
  95. maxConcurrentStreams uint32
  96. streamQuota int64
  97. streamsQuotaAvailable chan struct{}
  98. waitingStreams uint32
  99. nextID uint32
  100. registeredCompressors string
  101. // Do not access controlBuf with mu held.
  102. mu sync.Mutex // guard the following variables
  103. state transportState
  104. activeStreams map[uint32]*Stream
  105. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  106. prevGoAwayID uint32
  107. // goAwayReason records the http2.ErrCode and debug data received with the
  108. // GoAway frame.
  109. goAwayReason GoAwayReason
  110. // goAwayDebugMessage contains a detailed human readable string about a
  111. // GoAway frame, useful for error messages.
  112. goAwayDebugMessage string
  113. // A condition variable used to signal when the keepalive goroutine should
  114. // go dormant. The condition for dormancy is based on the number of active
  115. // streams and the `PermitWithoutStream` keepalive client parameter. And
  116. // since the number of active streams is guarded by the above mutex, we use
  117. // the same for this condition variable as well.
  118. kpDormancyCond *sync.Cond
  119. // A boolean to track whether the keepalive goroutine is dormant or not.
  120. // This is checked before attempting to signal the above condition
  121. // variable.
  122. kpDormant bool
  123. // Fields below are for channelz metric collection.
  124. channelzID *channelz.Identifier
  125. czData *channelzData
  126. onClose func(GoAwayReason)
  127. bufferPool *bufferPool
  128. connectionID uint64
  129. logger *grpclog.PrefixLogger
  130. }
  131. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
  132. address := addr.Addr
  133. networkType, ok := networktype.Get(addr)
  134. if fn != nil {
  135. // Special handling for unix scheme with custom dialer. Back in the day,
  136. // we did not have a unix resolver and therefore targets with a unix
  137. // scheme would end up using the passthrough resolver. So, user's used a
  138. // custom dialer in this case and expected the original dial target to
  139. // be passed to the custom dialer. Now, we have a unix resolver. But if
  140. // a custom dialer is specified, we want to retain the old behavior in
  141. // terms of the address being passed to the custom dialer.
  142. if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
  143. // Supported unix targets are either "unix://absolute-path" or
  144. // "unix:relative-path".
  145. if filepath.IsAbs(address) {
  146. return fn(ctx, "unix://"+address)
  147. }
  148. return fn(ctx, "unix:"+address)
  149. }
  150. return fn(ctx, address)
  151. }
  152. if !ok {
  153. networkType, address = parseDialTarget(address)
  154. }
  155. if networkType == "tcp" && useProxy {
  156. return proxyDial(ctx, address, grpcUA)
  157. }
  158. return (&net.Dialer{}).DialContext(ctx, networkType, address)
  159. }
  160. func isTemporary(err error) bool {
  161. switch err := err.(type) {
  162. case interface {
  163. Temporary() bool
  164. }:
  165. return err.Temporary()
  166. case interface {
  167. Timeout() bool
  168. }:
  169. // Timeouts may be resolved upon retry, and are thus treated as
  170. // temporary.
  171. return err.Timeout()
  172. }
  173. return true
  174. }
  175. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  176. // and starts to receive messages on it. Non-nil error returns if construction
  177. // fails.
  178. func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
  179. scheme := "http"
  180. ctx, cancel := context.WithCancel(ctx)
  181. defer func() {
  182. if err != nil {
  183. cancel()
  184. }
  185. }()
  186. // gRPC, resolver, balancer etc. can specify arbitrary data in the
  187. // Attributes field of resolver.Address, which is shoved into connectCtx
  188. // and passed to the dialer and credential handshaker. This makes it possible for
  189. // address specific arbitrary data to reach custom dialers and credential handshakers.
  190. connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
  191. conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
  192. if err != nil {
  193. if opts.FailOnNonTempDialError {
  194. return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
  195. }
  196. return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
  197. }
  198. // Any further errors will close the underlying connection
  199. defer func(conn net.Conn) {
  200. if err != nil {
  201. conn.Close()
  202. }
  203. }(conn)
  204. // The following defer and goroutine monitor the connectCtx for cancelation
  205. // and deadline. On context expiration, the connection is hard closed and
  206. // this function will naturally fail as a result. Otherwise, the defer
  207. // waits for the goroutine to exit to prevent the context from being
  208. // monitored (and to prevent the connection from ever being closed) after
  209. // returning from this function.
  210. ctxMonitorDone := grpcsync.NewEvent()
  211. newClientCtx, newClientDone := context.WithCancel(connectCtx)
  212. defer func() {
  213. newClientDone() // Awaken the goroutine below if connectCtx hasn't expired.
  214. <-ctxMonitorDone.Done() // Wait for the goroutine below to exit.
  215. }()
  216. go func(conn net.Conn) {
  217. defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
  218. <-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
  219. if err := connectCtx.Err(); err != nil {
  220. // connectCtx expired before exiting the function. Hard close the connection.
  221. if logger.V(logLevel) {
  222. logger.Infof("Aborting due to connect deadline expiring: %v", err)
  223. }
  224. conn.Close()
  225. }
  226. }(conn)
  227. kp := opts.KeepaliveParams
  228. // Validate keepalive parameters.
  229. if kp.Time == 0 {
  230. kp.Time = defaultClientKeepaliveTime
  231. }
  232. if kp.Timeout == 0 {
  233. kp.Timeout = defaultClientKeepaliveTimeout
  234. }
  235. keepaliveEnabled := false
  236. if kp.Time != infinity {
  237. if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
  238. return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
  239. }
  240. keepaliveEnabled = true
  241. }
  242. var (
  243. isSecure bool
  244. authInfo credentials.AuthInfo
  245. )
  246. transportCreds := opts.TransportCredentials
  247. perRPCCreds := opts.PerRPCCredentials
  248. if b := opts.CredsBundle; b != nil {
  249. if t := b.TransportCredentials(); t != nil {
  250. transportCreds = t
  251. }
  252. if t := b.PerRPCCredentials(); t != nil {
  253. perRPCCreds = append(perRPCCreds, t)
  254. }
  255. }
  256. if transportCreds != nil {
  257. conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
  258. if err != nil {
  259. return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
  260. }
  261. for _, cd := range perRPCCreds {
  262. if cd.RequireTransportSecurity() {
  263. if ci, ok := authInfo.(interface {
  264. GetCommonAuthInfo() credentials.CommonAuthInfo
  265. }); ok {
  266. secLevel := ci.GetCommonAuthInfo().SecurityLevel
  267. if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
  268. return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
  269. }
  270. }
  271. }
  272. }
  273. isSecure = true
  274. if transportCreds.Info().SecurityProtocol == "tls" {
  275. scheme = "https"
  276. }
  277. }
  278. dynamicWindow := true
  279. icwz := int32(initialWindowSize)
  280. if opts.InitialConnWindowSize >= defaultWindowSize {
  281. icwz = opts.InitialConnWindowSize
  282. dynamicWindow = false
  283. }
  284. writeBufSize := opts.WriteBufferSize
  285. readBufSize := opts.ReadBufferSize
  286. maxHeaderListSize := defaultClientMaxHeaderListSize
  287. if opts.MaxHeaderListSize != nil {
  288. maxHeaderListSize = *opts.MaxHeaderListSize
  289. }
  290. t := &http2Client{
  291. ctx: ctx,
  292. ctxDone: ctx.Done(), // Cache Done chan.
  293. cancel: cancel,
  294. userAgent: opts.UserAgent,
  295. registeredCompressors: grpcutil.RegisteredCompressors(),
  296. address: addr,
  297. conn: conn,
  298. remoteAddr: conn.RemoteAddr(),
  299. localAddr: conn.LocalAddr(),
  300. authInfo: authInfo,
  301. readerDone: make(chan struct{}),
  302. writerDone: make(chan struct{}),
  303. goAway: make(chan struct{}),
  304. framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
  305. fc: &trInFlow{limit: uint32(icwz)},
  306. scheme: scheme,
  307. activeStreams: make(map[uint32]*Stream),
  308. isSecure: isSecure,
  309. perRPCCreds: perRPCCreds,
  310. kp: kp,
  311. statsHandlers: opts.StatsHandlers,
  312. initialWindowSize: initialWindowSize,
  313. nextID: 1,
  314. maxConcurrentStreams: defaultMaxStreamsClient,
  315. streamQuota: defaultMaxStreamsClient,
  316. streamsQuotaAvailable: make(chan struct{}, 1),
  317. czData: new(channelzData),
  318. keepaliveEnabled: keepaliveEnabled,
  319. bufferPool: newBufferPool(),
  320. onClose: onClose,
  321. }
  322. t.logger = prefixLoggerForClientTransport(t)
  323. // Add peer information to the http2client context.
  324. t.ctx = peer.NewContext(t.ctx, t.getPeer())
  325. if md, ok := addr.Metadata.(*metadata.MD); ok {
  326. t.md = *md
  327. } else if md := imetadata.Get(addr); md != nil {
  328. t.md = md
  329. }
  330. t.controlBuf = newControlBuffer(t.ctxDone)
  331. if opts.InitialWindowSize >= defaultWindowSize {
  332. t.initialWindowSize = opts.InitialWindowSize
  333. dynamicWindow = false
  334. }
  335. if dynamicWindow {
  336. t.bdpEst = &bdpEstimator{
  337. bdp: initialWindowSize,
  338. updateFlowControl: t.updateFlowControl,
  339. }
  340. }
  341. for _, sh := range t.statsHandlers {
  342. t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
  343. RemoteAddr: t.remoteAddr,
  344. LocalAddr: t.localAddr,
  345. })
  346. connBegin := &stats.ConnBegin{
  347. Client: true,
  348. }
  349. sh.HandleConn(t.ctx, connBegin)
  350. }
  351. t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
  352. if err != nil {
  353. return nil, err
  354. }
  355. if t.keepaliveEnabled {
  356. t.kpDormancyCond = sync.NewCond(&t.mu)
  357. go t.keepalive()
  358. }
  359. // Start the reader goroutine for incoming messages. Each transport has a
  360. // dedicated goroutine which reads HTTP2 frames from the network. Then it
  361. // dispatches the frame to the corresponding stream entity. When the
  362. // server preface is received, readerErrCh is closed. If an error occurs
  363. // first, an error is pushed to the channel. This must be checked before
  364. // returning from this function.
  365. readerErrCh := make(chan error, 1)
  366. go t.reader(readerErrCh)
  367. defer func() {
  368. if err == nil {
  369. err = <-readerErrCh
  370. }
  371. if err != nil {
  372. t.Close(err)
  373. }
  374. }()
  375. // Send connection preface to server.
  376. n, err := t.conn.Write(clientPreface)
  377. if err != nil {
  378. err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
  379. return nil, err
  380. }
  381. if n != len(clientPreface) {
  382. err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  383. return nil, err
  384. }
  385. var ss []http2.Setting
  386. if t.initialWindowSize != defaultWindowSize {
  387. ss = append(ss, http2.Setting{
  388. ID: http2.SettingInitialWindowSize,
  389. Val: uint32(t.initialWindowSize),
  390. })
  391. }
  392. if opts.MaxHeaderListSize != nil {
  393. ss = append(ss, http2.Setting{
  394. ID: http2.SettingMaxHeaderListSize,
  395. Val: *opts.MaxHeaderListSize,
  396. })
  397. }
  398. err = t.framer.fr.WriteSettings(ss...)
  399. if err != nil {
  400. err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  401. return nil, err
  402. }
  403. // Adjust the connection flow control window if needed.
  404. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  405. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  406. err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  407. return nil, err
  408. }
  409. }
  410. t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
  411. if err := t.framer.writer.Flush(); err != nil {
  412. return nil, err
  413. }
  414. go func() {
  415. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
  416. t.loopy.run()
  417. close(t.writerDone)
  418. }()
  419. return t, nil
  420. }
  421. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  422. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  423. s := &Stream{
  424. ct: t,
  425. done: make(chan struct{}),
  426. method: callHdr.Method,
  427. sendCompress: callHdr.SendCompress,
  428. buf: newRecvBuffer(),
  429. headerChan: make(chan struct{}),
  430. contentSubtype: callHdr.ContentSubtype,
  431. doneFunc: callHdr.DoneFunc,
  432. }
  433. s.wq = newWriteQuota(defaultWriteQuota, s.done)
  434. s.requestRead = func(n int) {
  435. t.adjustWindow(s, uint32(n))
  436. }
  437. // The client side stream context should have exactly the same life cycle with the user provided context.
  438. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  439. // So we use the original context here instead of creating a copy.
  440. s.ctx = ctx
  441. s.trReader = &transportReader{
  442. reader: &recvBufferReader{
  443. ctx: s.ctx,
  444. ctxDone: s.ctx.Done(),
  445. recv: s.buf,
  446. closeStream: func(err error) {
  447. t.CloseStream(s, err)
  448. },
  449. freeBuffer: t.bufferPool.put,
  450. },
  451. windowHandler: func(n int) {
  452. t.updateWindow(s, uint32(n))
  453. },
  454. }
  455. return s
  456. }
  457. func (t *http2Client) getPeer() *peer.Peer {
  458. return &peer.Peer{
  459. Addr: t.remoteAddr,
  460. AuthInfo: t.authInfo, // Can be nil
  461. }
  462. }
  463. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  464. aud := t.createAudience(callHdr)
  465. ri := credentials.RequestInfo{
  466. Method: callHdr.Method,
  467. AuthInfo: t.authInfo,
  468. }
  469. ctxWithRequestInfo := icredentials.NewRequestInfoContext(ctx, ri)
  470. authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
  471. if err != nil {
  472. return nil, err
  473. }
  474. callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
  475. if err != nil {
  476. return nil, err
  477. }
  478. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  479. // first and create a slice of that exact size.
  480. // Make the slice of certain predictable size to reduce allocations made by append.
  481. hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
  482. hfLen += len(authData) + len(callAuthData)
  483. headerFields := make([]hpack.HeaderField, 0, hfLen)
  484. headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
  485. headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  486. headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  487. headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  488. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(callHdr.ContentSubtype)})
  489. headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  490. headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
  491. if callHdr.PreviousAttempts > 0 {
  492. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
  493. }
  494. registeredCompressors := t.registeredCompressors
  495. if callHdr.SendCompress != "" {
  496. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  497. // Include the outgoing compressor name when compressor is not registered
  498. // via encoding.RegisterCompressor. This is possible when client uses
  499. // WithCompressor dial option.
  500. if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) {
  501. if registeredCompressors != "" {
  502. registeredCompressors += ","
  503. }
  504. registeredCompressors += callHdr.SendCompress
  505. }
  506. }
  507. if registeredCompressors != "" {
  508. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors})
  509. }
  510. if dl, ok := ctx.Deadline(); ok {
  511. // Send out timeout regardless its value. The server can detect timeout context by itself.
  512. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
  513. timeout := time.Until(dl)
  514. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
  515. }
  516. for k, v := range authData {
  517. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  518. }
  519. for k, v := range callAuthData {
  520. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  521. }
  522. if b := stats.OutgoingTags(ctx); b != nil {
  523. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
  524. }
  525. if b := stats.OutgoingTrace(ctx); b != nil {
  526. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
  527. }
  528. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  529. var k string
  530. for k, vv := range md {
  531. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  532. if isReservedHeader(k) {
  533. continue
  534. }
  535. for _, v := range vv {
  536. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  537. }
  538. }
  539. for _, vv := range added {
  540. for i, v := range vv {
  541. if i%2 == 0 {
  542. k = strings.ToLower(v)
  543. continue
  544. }
  545. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  546. if isReservedHeader(k) {
  547. continue
  548. }
  549. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  550. }
  551. }
  552. }
  553. for k, vv := range t.md {
  554. if isReservedHeader(k) {
  555. continue
  556. }
  557. for _, v := range vv {
  558. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  559. }
  560. }
  561. return headerFields, nil
  562. }
  563. func (t *http2Client) createAudience(callHdr *CallHdr) string {
  564. // Create an audience string only if needed.
  565. if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
  566. return ""
  567. }
  568. // Construct URI required to get auth request metadata.
  569. // Omit port if it is the default one.
  570. host := strings.TrimSuffix(callHdr.Host, ":443")
  571. pos := strings.LastIndex(callHdr.Method, "/")
  572. if pos == -1 {
  573. pos = len(callHdr.Method)
  574. }
  575. return "https://" + host + callHdr.Method[:pos]
  576. }
  577. func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
  578. if len(t.perRPCCreds) == 0 {
  579. return nil, nil
  580. }
  581. authData := map[string]string{}
  582. for _, c := range t.perRPCCreds {
  583. data, err := c.GetRequestMetadata(ctx, audience)
  584. if err != nil {
  585. if st, ok := status.FromError(err); ok {
  586. // Restrict the code to the list allowed by gRFC A54.
  587. if istatus.IsRestrictedControlPlaneCode(st) {
  588. err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
  589. }
  590. return nil, err
  591. }
  592. return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", err)
  593. }
  594. for k, v := range data {
  595. // Capital header names are illegal in HTTP/2.
  596. k = strings.ToLower(k)
  597. authData[k] = v
  598. }
  599. }
  600. return authData, nil
  601. }
  602. func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
  603. var callAuthData map[string]string
  604. // Check if credentials.PerRPCCredentials were provided via call options.
  605. // Note: if these credentials are provided both via dial options and call
  606. // options, then both sets of credentials will be applied.
  607. if callCreds := callHdr.Creds; callCreds != nil {
  608. if callCreds.RequireTransportSecurity() {
  609. ri, _ := credentials.RequestInfoFromContext(ctx)
  610. if !t.isSecure || credentials.CheckSecurityLevel(ri.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
  611. return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
  612. }
  613. }
  614. data, err := callCreds.GetRequestMetadata(ctx, audience)
  615. if err != nil {
  616. if st, ok := status.FromError(err); ok {
  617. // Restrict the code to the list allowed by gRFC A54.
  618. if istatus.IsRestrictedControlPlaneCode(st) {
  619. err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
  620. }
  621. return nil, err
  622. }
  623. return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", err)
  624. }
  625. callAuthData = make(map[string]string, len(data))
  626. for k, v := range data {
  627. // Capital header names are illegal in HTTP/2
  628. k = strings.ToLower(k)
  629. callAuthData[k] = v
  630. }
  631. }
  632. return callAuthData, nil
  633. }
  634. // NewStreamError wraps an error and reports additional information. Typically
  635. // NewStream errors result in transparent retry, as they mean nothing went onto
  636. // the wire. However, there are two notable exceptions:
  637. //
  638. // 1. If the stream headers violate the max header list size allowed by the
  639. // server. It's possible this could succeed on another transport, even if
  640. // it's unlikely, but do not transparently retry.
  641. // 2. If the credentials errored when requesting their headers. In this case,
  642. // it's possible a retry can fix the problem, but indefinitely transparently
  643. // retrying is not appropriate as it is likely the credentials, if they can
  644. // eventually succeed, would need I/O to do so.
  645. type NewStreamError struct {
  646. Err error
  647. AllowTransparentRetry bool
  648. }
  649. func (e NewStreamError) Error() string {
  650. return e.Err.Error()
  651. }
  652. // NewStream creates a stream and registers it into the transport as "active"
  653. // streams. All non-nil errors returned will be *NewStreamError.
  654. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
  655. ctx = peer.NewContext(ctx, t.getPeer())
  656. // ServerName field of the resolver returned address takes precedence over
  657. // Host field of CallHdr to determine the :authority header. This is because,
  658. // the ServerName field takes precedence for server authentication during
  659. // TLS handshake, and the :authority header should match the value used
  660. // for server authentication.
  661. if t.address.ServerName != "" {
  662. newCallHdr := *callHdr
  663. newCallHdr.Host = t.address.ServerName
  664. callHdr = &newCallHdr
  665. }
  666. headerFields, err := t.createHeaderFields(ctx, callHdr)
  667. if err != nil {
  668. return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
  669. }
  670. s := t.newStream(ctx, callHdr)
  671. cleanup := func(err error) {
  672. if s.swapState(streamDone) == streamDone {
  673. // If it was already done, return.
  674. return
  675. }
  676. // The stream was unprocessed by the server.
  677. atomic.StoreUint32(&s.unprocessed, 1)
  678. s.write(recvMsg{err: err})
  679. close(s.done)
  680. // If headerChan isn't closed, then close it.
  681. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  682. close(s.headerChan)
  683. }
  684. }
  685. hdr := &headerFrame{
  686. hf: headerFields,
  687. endStream: false,
  688. initStream: func(id uint32) error {
  689. t.mu.Lock()
  690. // TODO: handle transport closure in loopy instead and remove this
  691. // initStream is never called when transport is draining.
  692. if t.state == closing {
  693. t.mu.Unlock()
  694. cleanup(ErrConnClosing)
  695. return ErrConnClosing
  696. }
  697. if channelz.IsOn() {
  698. atomic.AddInt64(&t.czData.streamsStarted, 1)
  699. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  700. }
  701. // If the keepalive goroutine has gone dormant, wake it up.
  702. if t.kpDormant {
  703. t.kpDormancyCond.Signal()
  704. }
  705. t.mu.Unlock()
  706. return nil
  707. },
  708. onOrphaned: cleanup,
  709. wq: s.wq,
  710. }
  711. firstTry := true
  712. var ch chan struct{}
  713. transportDrainRequired := false
  714. checkForStreamQuota := func(it interface{}) bool {
  715. if t.streamQuota <= 0 { // Can go negative if server decreases it.
  716. if firstTry {
  717. t.waitingStreams++
  718. }
  719. ch = t.streamsQuotaAvailable
  720. return false
  721. }
  722. if !firstTry {
  723. t.waitingStreams--
  724. }
  725. t.streamQuota--
  726. h := it.(*headerFrame)
  727. h.streamID = t.nextID
  728. t.nextID += 2
  729. // Drain client transport if nextID > MaxStreamID which signals gRPC that
  730. // the connection is closed and a new one must be created for subsequent RPCs.
  731. transportDrainRequired = t.nextID > MaxStreamID
  732. s.id = h.streamID
  733. s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
  734. t.mu.Lock()
  735. if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
  736. t.mu.Unlock()
  737. return false // Don't create a stream if the transport is already closed.
  738. }
  739. t.activeStreams[s.id] = s
  740. t.mu.Unlock()
  741. if t.streamQuota > 0 && t.waitingStreams > 0 {
  742. select {
  743. case t.streamsQuotaAvailable <- struct{}{}:
  744. default:
  745. }
  746. }
  747. return true
  748. }
  749. var hdrListSizeErr error
  750. checkForHeaderListSize := func(it interface{}) bool {
  751. if t.maxSendHeaderListSize == nil {
  752. return true
  753. }
  754. hdrFrame := it.(*headerFrame)
  755. var sz int64
  756. for _, f := range hdrFrame.hf {
  757. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  758. hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
  759. return false
  760. }
  761. }
  762. return true
  763. }
  764. for {
  765. success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
  766. return checkForHeaderListSize(it) && checkForStreamQuota(it)
  767. }, hdr)
  768. if err != nil {
  769. // Connection closed.
  770. return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
  771. }
  772. if success {
  773. break
  774. }
  775. if hdrListSizeErr != nil {
  776. return nil, &NewStreamError{Err: hdrListSizeErr}
  777. }
  778. firstTry = false
  779. select {
  780. case <-ch:
  781. case <-ctx.Done():
  782. return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
  783. case <-t.goAway:
  784. return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
  785. case <-t.ctx.Done():
  786. return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
  787. }
  788. }
  789. if len(t.statsHandlers) != 0 {
  790. header, ok := metadata.FromOutgoingContext(ctx)
  791. if ok {
  792. header.Set("user-agent", t.userAgent)
  793. } else {
  794. header = metadata.Pairs("user-agent", t.userAgent)
  795. }
  796. for _, sh := range t.statsHandlers {
  797. // Note: The header fields are compressed with hpack after this call returns.
  798. // No WireLength field is set here.
  799. // Note: Creating a new stats object to prevent pollution.
  800. outHeader := &stats.OutHeader{
  801. Client: true,
  802. FullMethod: callHdr.Method,
  803. RemoteAddr: t.remoteAddr,
  804. LocalAddr: t.localAddr,
  805. Compression: callHdr.SendCompress,
  806. Header: header,
  807. }
  808. sh.HandleRPC(s.ctx, outHeader)
  809. }
  810. }
  811. if transportDrainRequired {
  812. if t.logger.V(logLevel) {
  813. t.logger.Infof("Draining transport: t.nextID > MaxStreamID")
  814. }
  815. t.GracefulClose()
  816. }
  817. return s, nil
  818. }
  819. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  820. // This must not be executed in reader's goroutine.
  821. func (t *http2Client) CloseStream(s *Stream, err error) {
  822. var (
  823. rst bool
  824. rstCode http2.ErrCode
  825. )
  826. if err != nil {
  827. rst = true
  828. rstCode = http2.ErrCodeCancel
  829. }
  830. t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
  831. }
  832. func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
  833. // Set stream status to done.
  834. if s.swapState(streamDone) == streamDone {
  835. // If it was already done, return. If multiple closeStream calls
  836. // happen simultaneously, wait for the first to finish.
  837. <-s.done
  838. return
  839. }
  840. // status and trailers can be updated here without any synchronization because the stream goroutine will
  841. // only read it after it sees an io.EOF error from read or write and we'll write those errors
  842. // only after updating this.
  843. s.status = st
  844. if len(mdata) > 0 {
  845. s.trailer = mdata
  846. }
  847. if err != nil {
  848. // This will unblock reads eventually.
  849. s.write(recvMsg{err: err})
  850. }
  851. // If headerChan isn't closed, then close it.
  852. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  853. s.noHeaders = true
  854. close(s.headerChan)
  855. }
  856. cleanup := &cleanupStream{
  857. streamID: s.id,
  858. onWrite: func() {
  859. t.mu.Lock()
  860. if t.activeStreams != nil {
  861. delete(t.activeStreams, s.id)
  862. }
  863. t.mu.Unlock()
  864. if channelz.IsOn() {
  865. if eosReceived {
  866. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  867. } else {
  868. atomic.AddInt64(&t.czData.streamsFailed, 1)
  869. }
  870. }
  871. },
  872. rst: rst,
  873. rstCode: rstCode,
  874. }
  875. addBackStreamQuota := func(interface{}) bool {
  876. t.streamQuota++
  877. if t.streamQuota > 0 && t.waitingStreams > 0 {
  878. select {
  879. case t.streamsQuotaAvailable <- struct{}{}:
  880. default:
  881. }
  882. }
  883. return true
  884. }
  885. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
  886. // This will unblock write.
  887. close(s.done)
  888. if s.doneFunc != nil {
  889. s.doneFunc()
  890. }
  891. }
  892. // Close kicks off the shutdown process of the transport. This should be called
  893. // only once on a transport. Once it is called, the transport should not be
  894. // accessed any more.
  895. func (t *http2Client) Close(err error) {
  896. t.mu.Lock()
  897. // Make sure we only close once.
  898. if t.state == closing {
  899. t.mu.Unlock()
  900. return
  901. }
  902. if t.logger.V(logLevel) {
  903. t.logger.Infof("Closing: %v", err)
  904. }
  905. // Call t.onClose ASAP to prevent the client from attempting to create new
  906. // streams.
  907. if t.state != draining {
  908. t.onClose(GoAwayInvalid)
  909. }
  910. t.state = closing
  911. streams := t.activeStreams
  912. t.activeStreams = nil
  913. if t.kpDormant {
  914. // If the keepalive goroutine is blocked on this condition variable, we
  915. // should unblock it so that the goroutine eventually exits.
  916. t.kpDormancyCond.Signal()
  917. }
  918. t.mu.Unlock()
  919. t.controlBuf.finish()
  920. t.cancel()
  921. t.conn.Close()
  922. channelz.RemoveEntry(t.channelzID)
  923. // Append info about previous goaways if there were any, since this may be important
  924. // for understanding the root cause for this connection to be closed.
  925. _, goAwayDebugMessage := t.GetGoAwayReason()
  926. var st *status.Status
  927. if len(goAwayDebugMessage) > 0 {
  928. st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
  929. err = st.Err()
  930. } else {
  931. st = status.New(codes.Unavailable, err.Error())
  932. }
  933. // Notify all active streams.
  934. for _, s := range streams {
  935. t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
  936. }
  937. for _, sh := range t.statsHandlers {
  938. connEnd := &stats.ConnEnd{
  939. Client: true,
  940. }
  941. sh.HandleConn(t.ctx, connEnd)
  942. }
  943. }
  944. // GracefulClose sets the state to draining, which prevents new streams from
  945. // being created and causes the transport to be closed when the last active
  946. // stream is closed. If there are no active streams, the transport is closed
  947. // immediately. This does nothing if the transport is already draining or
  948. // closing.
  949. func (t *http2Client) GracefulClose() {
  950. t.mu.Lock()
  951. // Make sure we move to draining only from active.
  952. if t.state == draining || t.state == closing {
  953. t.mu.Unlock()
  954. return
  955. }
  956. if t.logger.V(logLevel) {
  957. t.logger.Infof("GracefulClose called")
  958. }
  959. t.onClose(GoAwayInvalid)
  960. t.state = draining
  961. active := len(t.activeStreams)
  962. t.mu.Unlock()
  963. if active == 0 {
  964. t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
  965. return
  966. }
  967. t.controlBuf.put(&incomingGoAway{})
  968. }
  969. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  970. // should proceed only if Write returns nil.
  971. func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  972. if opts.Last {
  973. // If it's the last message, update stream state.
  974. if !s.compareAndSwapState(streamActive, streamWriteDone) {
  975. return errStreamDone
  976. }
  977. } else if s.getState() != streamActive {
  978. return errStreamDone
  979. }
  980. df := &dataFrame{
  981. streamID: s.id,
  982. endStream: opts.Last,
  983. h: hdr,
  984. d: data,
  985. }
  986. if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
  987. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  988. return err
  989. }
  990. }
  991. return t.controlBuf.put(df)
  992. }
  993. func (t *http2Client) getStream(f http2.Frame) *Stream {
  994. t.mu.Lock()
  995. s := t.activeStreams[f.Header().StreamID]
  996. t.mu.Unlock()
  997. return s
  998. }
  999. // adjustWindow sends out extra window update over the initial window size
  1000. // of stream if the application is requesting data larger in size than
  1001. // the window.
  1002. func (t *http2Client) adjustWindow(s *Stream, n uint32) {
  1003. if w := s.fc.maybeAdjust(n); w > 0 {
  1004. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  1005. }
  1006. }
  1007. // updateWindow adjusts the inbound quota for the stream.
  1008. // Window updates will be sent out when the cumulative quota
  1009. // exceeds the corresponding threshold.
  1010. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  1011. if w := s.fc.onRead(n); w > 0 {
  1012. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  1013. }
  1014. }
  1015. // updateFlowControl updates the incoming flow control windows
  1016. // for the transport and the stream based on the current bdp
  1017. // estimation.
  1018. func (t *http2Client) updateFlowControl(n uint32) {
  1019. updateIWS := func(interface{}) bool {
  1020. t.initialWindowSize = int32(n)
  1021. t.mu.Lock()
  1022. for _, s := range t.activeStreams {
  1023. s.fc.newLimit(n)
  1024. }
  1025. t.mu.Unlock()
  1026. return true
  1027. }
  1028. t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
  1029. t.controlBuf.put(&outgoingSettings{
  1030. ss: []http2.Setting{
  1031. {
  1032. ID: http2.SettingInitialWindowSize,
  1033. Val: n,
  1034. },
  1035. },
  1036. })
  1037. }
  1038. func (t *http2Client) handleData(f *http2.DataFrame) {
  1039. size := f.Header().Length
  1040. var sendBDPPing bool
  1041. if t.bdpEst != nil {
  1042. sendBDPPing = t.bdpEst.add(size)
  1043. }
  1044. // Decouple connection's flow control from application's read.
  1045. // An update on connection's flow control should not depend on
  1046. // whether user application has read the data or not. Such a
  1047. // restriction is already imposed on the stream's flow control,
  1048. // and therefore the sender will be blocked anyways.
  1049. // Decoupling the connection flow control will prevent other
  1050. // active(fast) streams from starving in presence of slow or
  1051. // inactive streams.
  1052. //
  1053. if w := t.fc.onData(size); w > 0 {
  1054. t.controlBuf.put(&outgoingWindowUpdate{
  1055. streamID: 0,
  1056. increment: w,
  1057. })
  1058. }
  1059. if sendBDPPing {
  1060. // Avoid excessive ping detection (e.g. in an L7 proxy)
  1061. // by sending a window update prior to the BDP ping.
  1062. if w := t.fc.reset(); w > 0 {
  1063. t.controlBuf.put(&outgoingWindowUpdate{
  1064. streamID: 0,
  1065. increment: w,
  1066. })
  1067. }
  1068. t.controlBuf.put(bdpPing)
  1069. }
  1070. // Select the right stream to dispatch.
  1071. s := t.getStream(f)
  1072. if s == nil {
  1073. return
  1074. }
  1075. if size > 0 {
  1076. if err := s.fc.onData(size); err != nil {
  1077. t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
  1078. return
  1079. }
  1080. if f.Header().Flags.Has(http2.FlagDataPadded) {
  1081. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  1082. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  1083. }
  1084. }
  1085. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  1086. // guarantee f.Data() is consumed before the arrival of next frame.
  1087. // Can this copy be eliminated?
  1088. if len(f.Data()) > 0 {
  1089. buffer := t.bufferPool.get()
  1090. buffer.Reset()
  1091. buffer.Write(f.Data())
  1092. s.write(recvMsg{buffer: buffer})
  1093. }
  1094. }
  1095. // The server has closed the stream without sending trailers. Record that
  1096. // the read direction is closed, and set the status appropriately.
  1097. if f.StreamEnded() {
  1098. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
  1099. }
  1100. }
  1101. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  1102. s := t.getStream(f)
  1103. if s == nil {
  1104. return
  1105. }
  1106. if f.ErrCode == http2.ErrCodeRefusedStream {
  1107. // The stream was unprocessed by the server.
  1108. atomic.StoreUint32(&s.unprocessed, 1)
  1109. }
  1110. statusCode, ok := http2ErrConvTab[f.ErrCode]
  1111. if !ok {
  1112. if t.logger.V(logLevel) {
  1113. t.logger.Infof("Received a RST_STREAM frame with code %q, but found no mapped gRPC status", f.ErrCode)
  1114. }
  1115. statusCode = codes.Unknown
  1116. }
  1117. if statusCode == codes.Canceled {
  1118. if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
  1119. // Our deadline was already exceeded, and that was likely the cause
  1120. // of this cancelation. Alter the status code accordingly.
  1121. statusCode = codes.DeadlineExceeded
  1122. }
  1123. }
  1124. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
  1125. }
  1126. func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
  1127. if f.IsAck() {
  1128. return
  1129. }
  1130. var maxStreams *uint32
  1131. var ss []http2.Setting
  1132. var updateFuncs []func()
  1133. f.ForeachSetting(func(s http2.Setting) error {
  1134. switch s.ID {
  1135. case http2.SettingMaxConcurrentStreams:
  1136. maxStreams = new(uint32)
  1137. *maxStreams = s.Val
  1138. case http2.SettingMaxHeaderListSize:
  1139. updateFuncs = append(updateFuncs, func() {
  1140. t.maxSendHeaderListSize = new(uint32)
  1141. *t.maxSendHeaderListSize = s.Val
  1142. })
  1143. default:
  1144. ss = append(ss, s)
  1145. }
  1146. return nil
  1147. })
  1148. if isFirst && maxStreams == nil {
  1149. maxStreams = new(uint32)
  1150. *maxStreams = math.MaxUint32
  1151. }
  1152. sf := &incomingSettings{
  1153. ss: ss,
  1154. }
  1155. if maxStreams != nil {
  1156. updateStreamQuota := func() {
  1157. delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
  1158. t.maxConcurrentStreams = *maxStreams
  1159. t.streamQuota += delta
  1160. if delta > 0 && t.waitingStreams > 0 {
  1161. close(t.streamsQuotaAvailable) // wake all of them up.
  1162. t.streamsQuotaAvailable = make(chan struct{}, 1)
  1163. }
  1164. }
  1165. updateFuncs = append(updateFuncs, updateStreamQuota)
  1166. }
  1167. t.controlBuf.executeAndPut(func(interface{}) bool {
  1168. for _, f := range updateFuncs {
  1169. f()
  1170. }
  1171. return true
  1172. }, sf)
  1173. }
  1174. func (t *http2Client) handlePing(f *http2.PingFrame) {
  1175. if f.IsAck() {
  1176. // Maybe it's a BDP ping.
  1177. if t.bdpEst != nil {
  1178. t.bdpEst.calculate(f.Data)
  1179. }
  1180. return
  1181. }
  1182. pingAck := &ping{ack: true}
  1183. copy(pingAck.data[:], f.Data[:])
  1184. t.controlBuf.put(pingAck)
  1185. }
  1186. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  1187. t.mu.Lock()
  1188. if t.state == closing {
  1189. t.mu.Unlock()
  1190. return
  1191. }
  1192. if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
  1193. // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
  1194. // data equal to ASCII "too_many_pings", it should log the occurrence at a log level that is
  1195. // enabled by default and double the configure KEEPALIVE_TIME used for new connections
  1196. // on that channel.
  1197. logger.Errorf("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\".")
  1198. }
  1199. id := f.LastStreamID
  1200. if id > 0 && id%2 == 0 {
  1201. t.mu.Unlock()
  1202. t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
  1203. return
  1204. }
  1205. // A client can receive multiple GoAways from the server (see
  1206. // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
  1207. // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
  1208. // sent after an RTT delay with the ID of the last stream the server will
  1209. // process.
  1210. //
  1211. // Therefore, when we get the first GoAway we don't necessarily close any
  1212. // streams. While in case of second GoAway we close all streams created after
  1213. // the GoAwayId. This way streams that were in-flight while the GoAway from
  1214. // server was being sent don't get killed.
  1215. select {
  1216. case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
  1217. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
  1218. if id > t.prevGoAwayID {
  1219. t.mu.Unlock()
  1220. t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
  1221. return
  1222. }
  1223. default:
  1224. t.setGoAwayReason(f)
  1225. close(t.goAway)
  1226. defer t.controlBuf.put(&incomingGoAway{}) // Defer as t.mu is currently held.
  1227. // Notify the clientconn about the GOAWAY before we set the state to
  1228. // draining, to allow the client to stop attempting to create streams
  1229. // before disallowing new streams on this connection.
  1230. if t.state != draining {
  1231. t.onClose(t.goAwayReason)
  1232. t.state = draining
  1233. }
  1234. }
  1235. // All streams with IDs greater than the GoAwayId
  1236. // and smaller than the previous GoAway ID should be killed.
  1237. upperLimit := t.prevGoAwayID
  1238. if upperLimit == 0 { // This is the first GoAway Frame.
  1239. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
  1240. }
  1241. t.prevGoAwayID = id
  1242. if len(t.activeStreams) == 0 {
  1243. t.mu.Unlock()
  1244. t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
  1245. return
  1246. }
  1247. streamsToClose := make([]*Stream, 0)
  1248. for streamID, stream := range t.activeStreams {
  1249. if streamID > id && streamID <= upperLimit {
  1250. // The stream was unprocessed by the server.
  1251. if streamID > id && streamID <= upperLimit {
  1252. atomic.StoreUint32(&stream.unprocessed, 1)
  1253. streamsToClose = append(streamsToClose, stream)
  1254. }
  1255. }
  1256. }
  1257. t.mu.Unlock()
  1258. // Called outside t.mu because closeStream can take controlBuf's mu, which
  1259. // could induce deadlock and is not allowed.
  1260. for _, stream := range streamsToClose {
  1261. t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
  1262. }
  1263. }
  1264. // setGoAwayReason sets the value of t.goAwayReason based
  1265. // on the GoAway frame received.
  1266. // It expects a lock on transport's mutex to be held by
  1267. // the caller.
  1268. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
  1269. t.goAwayReason = GoAwayNoReason
  1270. switch f.ErrCode {
  1271. case http2.ErrCodeEnhanceYourCalm:
  1272. if string(f.DebugData()) == "too_many_pings" {
  1273. t.goAwayReason = GoAwayTooManyPings
  1274. }
  1275. }
  1276. if len(f.DebugData()) == 0 {
  1277. t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
  1278. } else {
  1279. t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
  1280. }
  1281. }
  1282. func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
  1283. t.mu.Lock()
  1284. defer t.mu.Unlock()
  1285. return t.goAwayReason, t.goAwayDebugMessage
  1286. }
  1287. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  1288. t.controlBuf.put(&incomingWindowUpdate{
  1289. streamID: f.Header().StreamID,
  1290. increment: f.Increment,
  1291. })
  1292. }
  1293. // operateHeaders takes action on the decoded headers.
  1294. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  1295. s := t.getStream(frame)
  1296. if s == nil {
  1297. return
  1298. }
  1299. endStream := frame.StreamEnded()
  1300. atomic.StoreUint32(&s.bytesReceived, 1)
  1301. initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
  1302. if !initialHeader && !endStream {
  1303. // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
  1304. st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
  1305. t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
  1306. return
  1307. }
  1308. // frame.Truncated is set to true when framer detects that the current header
  1309. // list size hits MaxHeaderListSize limit.
  1310. if frame.Truncated {
  1311. se := status.New(codes.Internal, "peer header list size exceeded limit")
  1312. t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream)
  1313. return
  1314. }
  1315. var (
  1316. // If a gRPC Response-Headers has already been received, then it means
  1317. // that the peer is speaking gRPC and we are in gRPC mode.
  1318. isGRPC = !initialHeader
  1319. mdata = make(map[string][]string)
  1320. contentTypeErr = "malformed header: missing HTTP content-type"
  1321. grpcMessage string
  1322. statusGen *status.Status
  1323. recvCompress string
  1324. httpStatusCode *int
  1325. httpStatusErr string
  1326. rawStatusCode = codes.Unknown
  1327. // headerError is set if an error is encountered while parsing the headers
  1328. headerError string
  1329. )
  1330. if initialHeader {
  1331. httpStatusErr = "malformed header: missing HTTP status"
  1332. }
  1333. for _, hf := range frame.Fields {
  1334. switch hf.Name {
  1335. case "content-type":
  1336. if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType {
  1337. contentTypeErr = fmt.Sprintf("transport: received unexpected content-type %q", hf.Value)
  1338. break
  1339. }
  1340. contentTypeErr = ""
  1341. mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
  1342. isGRPC = true
  1343. case "grpc-encoding":
  1344. recvCompress = hf.Value
  1345. case "grpc-status":
  1346. code, err := strconv.ParseInt(hf.Value, 10, 32)
  1347. if err != nil {
  1348. se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
  1349. t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
  1350. return
  1351. }
  1352. rawStatusCode = codes.Code(uint32(code))
  1353. case "grpc-message":
  1354. grpcMessage = decodeGrpcMessage(hf.Value)
  1355. case "grpc-status-details-bin":
  1356. var err error
  1357. statusGen, err = decodeGRPCStatusDetails(hf.Value)
  1358. if err != nil {
  1359. headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)
  1360. }
  1361. case ":status":
  1362. if hf.Value == "200" {
  1363. httpStatusErr = ""
  1364. statusCode := 200
  1365. httpStatusCode = &statusCode
  1366. break
  1367. }
  1368. c, err := strconv.ParseInt(hf.Value, 10, 32)
  1369. if err != nil {
  1370. se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
  1371. t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
  1372. return
  1373. }
  1374. statusCode := int(c)
  1375. httpStatusCode = &statusCode
  1376. httpStatusErr = fmt.Sprintf(
  1377. "unexpected HTTP status code received from server: %d (%s)",
  1378. statusCode,
  1379. http.StatusText(statusCode),
  1380. )
  1381. default:
  1382. if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
  1383. break
  1384. }
  1385. v, err := decodeMetadataHeader(hf.Name, hf.Value)
  1386. if err != nil {
  1387. headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
  1388. logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
  1389. break
  1390. }
  1391. mdata[hf.Name] = append(mdata[hf.Name], v)
  1392. }
  1393. }
  1394. if !isGRPC || httpStatusErr != "" {
  1395. var code = codes.Internal // when header does not include HTTP status, return INTERNAL
  1396. if httpStatusCode != nil {
  1397. var ok bool
  1398. code, ok = HTTPStatusConvTab[*httpStatusCode]
  1399. if !ok {
  1400. code = codes.Unknown
  1401. }
  1402. }
  1403. var errs []string
  1404. if httpStatusErr != "" {
  1405. errs = append(errs, httpStatusErr)
  1406. }
  1407. if contentTypeErr != "" {
  1408. errs = append(errs, contentTypeErr)
  1409. }
  1410. // Verify the HTTP response is a 200.
  1411. se := status.New(code, strings.Join(errs, "; "))
  1412. t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
  1413. return
  1414. }
  1415. if headerError != "" {
  1416. se := status.New(codes.Internal, headerError)
  1417. t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
  1418. return
  1419. }
  1420. isHeader := false
  1421. // If headerChan hasn't been closed yet
  1422. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  1423. s.headerValid = true
  1424. if !endStream {
  1425. // HEADERS frame block carries a Response-Headers.
  1426. isHeader = true
  1427. // These values can be set without any synchronization because
  1428. // stream goroutine will read it only after seeing a closed
  1429. // headerChan which we'll close after setting this.
  1430. s.recvCompress = recvCompress
  1431. if len(mdata) > 0 {
  1432. s.header = mdata
  1433. }
  1434. } else {
  1435. // HEADERS frame block carries a Trailers-Only.
  1436. s.noHeaders = true
  1437. }
  1438. close(s.headerChan)
  1439. }
  1440. for _, sh := range t.statsHandlers {
  1441. if isHeader {
  1442. inHeader := &stats.InHeader{
  1443. Client: true,
  1444. WireLength: int(frame.Header().Length),
  1445. Header: metadata.MD(mdata).Copy(),
  1446. Compression: s.recvCompress,
  1447. }
  1448. sh.HandleRPC(s.ctx, inHeader)
  1449. } else {
  1450. inTrailer := &stats.InTrailer{
  1451. Client: true,
  1452. WireLength: int(frame.Header().Length),
  1453. Trailer: metadata.MD(mdata).Copy(),
  1454. }
  1455. sh.HandleRPC(s.ctx, inTrailer)
  1456. }
  1457. }
  1458. if !endStream {
  1459. return
  1460. }
  1461. if statusGen == nil {
  1462. statusGen = status.New(rawStatusCode, grpcMessage)
  1463. }
  1464. // if client received END_STREAM from server while stream was still active, send RST_STREAM
  1465. rst := s.getState() == streamActive
  1466. t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
  1467. }
  1468. // readServerPreface reads and handles the initial settings frame from the
  1469. // server.
  1470. func (t *http2Client) readServerPreface() error {
  1471. frame, err := t.framer.fr.ReadFrame()
  1472. if err != nil {
  1473. return connectionErrorf(true, err, "error reading server preface: %v", err)
  1474. }
  1475. sf, ok := frame.(*http2.SettingsFrame)
  1476. if !ok {
  1477. return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame)
  1478. }
  1479. t.handleSettings(sf, true)
  1480. return nil
  1481. }
  1482. // reader verifies the server preface and reads all subsequent data from
  1483. // network connection. If the server preface is not read successfully, an
  1484. // error is pushed to errCh; otherwise errCh is closed with no error.
  1485. func (t *http2Client) reader(errCh chan<- error) {
  1486. defer close(t.readerDone)
  1487. if err := t.readServerPreface(); err != nil {
  1488. errCh <- err
  1489. return
  1490. }
  1491. close(errCh)
  1492. if t.keepaliveEnabled {
  1493. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  1494. }
  1495. // loop to keep reading incoming messages on this transport.
  1496. for {
  1497. t.controlBuf.throttle()
  1498. frame, err := t.framer.fr.ReadFrame()
  1499. if t.keepaliveEnabled {
  1500. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  1501. }
  1502. if err != nil {
  1503. // Abort an active stream if the http2.Framer returns a
  1504. // http2.StreamError. This can happen only if the server's response
  1505. // is malformed http2.
  1506. if se, ok := err.(http2.StreamError); ok {
  1507. t.mu.Lock()
  1508. s := t.activeStreams[se.StreamID]
  1509. t.mu.Unlock()
  1510. if s != nil {
  1511. // use error detail to provide better err message
  1512. code := http2ErrConvTab[se.Code]
  1513. errorDetail := t.framer.fr.ErrorDetail()
  1514. var msg string
  1515. if errorDetail != nil {
  1516. msg = errorDetail.Error()
  1517. } else {
  1518. msg = "received invalid frame"
  1519. }
  1520. t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
  1521. }
  1522. continue
  1523. } else {
  1524. // Transport error.
  1525. t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
  1526. return
  1527. }
  1528. }
  1529. switch frame := frame.(type) {
  1530. case *http2.MetaHeadersFrame:
  1531. t.operateHeaders(frame)
  1532. case *http2.DataFrame:
  1533. t.handleData(frame)
  1534. case *http2.RSTStreamFrame:
  1535. t.handleRSTStream(frame)
  1536. case *http2.SettingsFrame:
  1537. t.handleSettings(frame, false)
  1538. case *http2.PingFrame:
  1539. t.handlePing(frame)
  1540. case *http2.GoAwayFrame:
  1541. t.handleGoAway(frame)
  1542. case *http2.WindowUpdateFrame:
  1543. t.handleWindowUpdate(frame)
  1544. default:
  1545. if logger.V(logLevel) {
  1546. logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
  1547. }
  1548. }
  1549. }
  1550. }
  1551. func minTime(a, b time.Duration) time.Duration {
  1552. if a < b {
  1553. return a
  1554. }
  1555. return b
  1556. }
  1557. // keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
  1558. func (t *http2Client) keepalive() {
  1559. p := &ping{data: [8]byte{}}
  1560. // True iff a ping has been sent, and no data has been received since then.
  1561. outstandingPing := false
  1562. // Amount of time remaining before which we should receive an ACK for the
  1563. // last sent ping.
  1564. timeoutLeft := time.Duration(0)
  1565. // Records the last value of t.lastRead before we go block on the timer.
  1566. // This is required to check for read activity since then.
  1567. prevNano := time.Now().UnixNano()
  1568. timer := time.NewTimer(t.kp.Time)
  1569. for {
  1570. select {
  1571. case <-timer.C:
  1572. lastRead := atomic.LoadInt64(&t.lastRead)
  1573. if lastRead > prevNano {
  1574. // There has been read activity since the last time we were here.
  1575. outstandingPing = false
  1576. // Next timer should fire at kp.Time seconds from lastRead time.
  1577. timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  1578. prevNano = lastRead
  1579. continue
  1580. }
  1581. if outstandingPing && timeoutLeft <= 0 {
  1582. t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
  1583. return
  1584. }
  1585. t.mu.Lock()
  1586. if t.state == closing {
  1587. // If the transport is closing, we should exit from the
  1588. // keepalive goroutine here. If not, we could have a race
  1589. // between the call to Signal() from Close() and the call to
  1590. // Wait() here, whereby the keepalive goroutine ends up
  1591. // blocking on the condition variable which will never be
  1592. // signalled again.
  1593. t.mu.Unlock()
  1594. return
  1595. }
  1596. if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
  1597. // If a ping was sent out previously (because there were active
  1598. // streams at that point) which wasn't acked and its timeout
  1599. // hadn't fired, but we got here and are about to go dormant,
  1600. // we should make sure that we unconditionally send a ping once
  1601. // we awaken.
  1602. outstandingPing = false
  1603. t.kpDormant = true
  1604. t.kpDormancyCond.Wait()
  1605. }
  1606. t.kpDormant = false
  1607. t.mu.Unlock()
  1608. // We get here either because we were dormant and a new stream was
  1609. // created which unblocked the Wait() call, or because the
  1610. // keepalive timer expired. In both cases, we need to send a ping.
  1611. if !outstandingPing {
  1612. if channelz.IsOn() {
  1613. atomic.AddInt64(&t.czData.kpCount, 1)
  1614. }
  1615. t.controlBuf.put(p)
  1616. timeoutLeft = t.kp.Timeout
  1617. outstandingPing = true
  1618. }
  1619. // The amount of time to sleep here is the minimum of kp.Time and
  1620. // timeoutLeft. This will ensure that we wait only for kp.Time
  1621. // before sending out the next ping (for cases where the ping is
  1622. // acked).
  1623. sleepDuration := minTime(t.kp.Time, timeoutLeft)
  1624. timeoutLeft -= sleepDuration
  1625. timer.Reset(sleepDuration)
  1626. case <-t.ctx.Done():
  1627. if !timer.Stop() {
  1628. <-timer.C
  1629. }
  1630. return
  1631. }
  1632. }
  1633. }
  1634. func (t *http2Client) Error() <-chan struct{} {
  1635. return t.ctx.Done()
  1636. }
  1637. func (t *http2Client) GoAway() <-chan struct{} {
  1638. return t.goAway
  1639. }
  1640. func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
  1641. s := channelz.SocketInternalMetric{
  1642. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1643. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1644. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1645. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1646. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1647. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1648. LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1649. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1650. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1651. LocalFlowControlWindow: int64(t.fc.getSize()),
  1652. SocketOptions: channelz.GetSocketOption(t.conn),
  1653. LocalAddr: t.localAddr,
  1654. RemoteAddr: t.remoteAddr,
  1655. // RemoteName :
  1656. }
  1657. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1658. s.Security = au.GetSecurityValue()
  1659. }
  1660. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1661. return &s
  1662. }
  1663. func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
  1664. func (t *http2Client) IncrMsgSent() {
  1665. atomic.AddInt64(&t.czData.msgSent, 1)
  1666. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1667. }
  1668. func (t *http2Client) IncrMsgRecv() {
  1669. atomic.AddInt64(&t.czData.msgRecv, 1)
  1670. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1671. }
  1672. func (t *http2Client) getOutFlowWindow() int64 {
  1673. resp := make(chan uint32, 1)
  1674. timer := time.NewTimer(time.Second)
  1675. defer timer.Stop()
  1676. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1677. select {
  1678. case sz := <-resp:
  1679. return int64(sz)
  1680. case <-t.ctxDone:
  1681. return -1
  1682. case <-timer.C:
  1683. return -2
  1684. }
  1685. }
  1686. func (t *http2Client) stateForTesting() transportState {
  1687. t.mu.Lock()
  1688. defer t.mu.Unlock()
  1689. return t.state
  1690. }