http2_server.go 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "context"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math"
  26. "net"
  27. "net/http"
  28. "strconv"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "github.com/golang/protobuf/proto"
  33. "golang.org/x/net/http2"
  34. "golang.org/x/net/http2/hpack"
  35. "google.golang.org/grpc/internal/grpclog"
  36. "google.golang.org/grpc/internal/grpcutil"
  37. "google.golang.org/grpc/internal/pretty"
  38. "google.golang.org/grpc/internal/syscall"
  39. "google.golang.org/grpc/codes"
  40. "google.golang.org/grpc/credentials"
  41. "google.golang.org/grpc/internal/channelz"
  42. "google.golang.org/grpc/internal/grpcrand"
  43. "google.golang.org/grpc/internal/grpcsync"
  44. "google.golang.org/grpc/keepalive"
  45. "google.golang.org/grpc/metadata"
  46. "google.golang.org/grpc/peer"
  47. "google.golang.org/grpc/stats"
  48. "google.golang.org/grpc/status"
  49. "google.golang.org/grpc/tap"
  50. )
  51. var (
  52. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  53. // the stream's state.
  54. ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
  55. // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
  56. // than the limit set by peer.
  57. ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
  58. )
  59. // serverConnectionCounter counts the number of connections a server has seen
  60. // (equal to the number of http2Servers created). Must be accessed atomically.
  61. var serverConnectionCounter uint64
  62. // http2Server implements the ServerTransport interface with HTTP2.
  63. type http2Server struct {
  64. lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
  65. ctx context.Context
  66. done chan struct{}
  67. conn net.Conn
  68. loopy *loopyWriter
  69. readerDone chan struct{} // sync point to enable testing.
  70. writerDone chan struct{} // sync point to enable testing.
  71. remoteAddr net.Addr
  72. localAddr net.Addr
  73. authInfo credentials.AuthInfo // auth info about the connection
  74. inTapHandle tap.ServerInHandle
  75. framer *framer
  76. // The max number of concurrent streams.
  77. maxStreams uint32
  78. // controlBuf delivers all the control related tasks (e.g., window
  79. // updates, reset streams, and various settings) to the controller.
  80. controlBuf *controlBuffer
  81. fc *trInFlow
  82. stats []stats.Handler
  83. // Keepalive and max-age parameters for the server.
  84. kp keepalive.ServerParameters
  85. // Keepalive enforcement policy.
  86. kep keepalive.EnforcementPolicy
  87. // The time instance last ping was received.
  88. lastPingAt time.Time
  89. // Number of times the client has violated keepalive ping policy so far.
  90. pingStrikes uint8
  91. // Flag to signify that number of ping strikes should be reset to 0.
  92. // This is set whenever data or header frames are sent.
  93. // 1 means yes.
  94. resetPingStrikes uint32 // Accessed atomically.
  95. initialWindowSize int32
  96. bdpEst *bdpEstimator
  97. maxSendHeaderListSize *uint32
  98. mu sync.Mutex // guard the following
  99. // drainEvent is initialized when Drain() is called the first time. After
  100. // which the server writes out the first GoAway(with ID 2^31-1) frame. Then
  101. // an independent goroutine will be launched to later send the second
  102. // GoAway. During this time we don't want to write another first GoAway(with
  103. // ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
  104. // already initialized since draining is already underway.
  105. drainEvent *grpcsync.Event
  106. state transportState
  107. activeStreams map[uint32]*Stream
  108. // idle is the time instant when the connection went idle.
  109. // This is either the beginning of the connection or when the number of
  110. // RPCs go down to 0.
  111. // When the connection is busy, this value is set to 0.
  112. idle time.Time
  113. // Fields below are for channelz metric collection.
  114. channelzID *channelz.Identifier
  115. czData *channelzData
  116. bufferPool *bufferPool
  117. connectionID uint64
  118. // maxStreamMu guards the maximum stream ID
  119. // This lock may not be taken if mu is already held.
  120. maxStreamMu sync.Mutex
  121. maxStreamID uint32 // max stream ID ever seen
  122. logger *grpclog.PrefixLogger
  123. }
  124. // NewServerTransport creates a http2 transport with conn and configuration
  125. // options from config.
  126. //
  127. // It returns a non-nil transport and a nil error on success. On failure, it
  128. // returns a nil transport and a non-nil error. For a special case where the
  129. // underlying conn gets closed before the client preface could be read, it
  130. // returns a nil transport and a nil error.
  131. func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  132. var authInfo credentials.AuthInfo
  133. rawConn := conn
  134. if config.Credentials != nil {
  135. var err error
  136. conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
  137. if err != nil {
  138. // ErrConnDispatched means that the connection was dispatched away
  139. // from gRPC; those connections should be left open. io.EOF means
  140. // the connection was closed before handshaking completed, which can
  141. // happen naturally from probers. Return these errors directly.
  142. if err == credentials.ErrConnDispatched || err == io.EOF {
  143. return nil, err
  144. }
  145. return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  146. }
  147. }
  148. writeBufSize := config.WriteBufferSize
  149. readBufSize := config.ReadBufferSize
  150. maxHeaderListSize := defaultServerMaxHeaderListSize
  151. if config.MaxHeaderListSize != nil {
  152. maxHeaderListSize = *config.MaxHeaderListSize
  153. }
  154. framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
  155. // Send initial settings as connection preface to client.
  156. isettings := []http2.Setting{{
  157. ID: http2.SettingMaxFrameSize,
  158. Val: http2MaxFrameLen,
  159. }}
  160. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
  161. // permitted in the HTTP2 spec.
  162. maxStreams := config.MaxStreams
  163. if maxStreams == 0 {
  164. maxStreams = math.MaxUint32
  165. } else {
  166. isettings = append(isettings, http2.Setting{
  167. ID: http2.SettingMaxConcurrentStreams,
  168. Val: maxStreams,
  169. })
  170. }
  171. dynamicWindow := true
  172. iwz := int32(initialWindowSize)
  173. if config.InitialWindowSize >= defaultWindowSize {
  174. iwz = config.InitialWindowSize
  175. dynamicWindow = false
  176. }
  177. icwz := int32(initialWindowSize)
  178. if config.InitialConnWindowSize >= defaultWindowSize {
  179. icwz = config.InitialConnWindowSize
  180. dynamicWindow = false
  181. }
  182. if iwz != defaultWindowSize {
  183. isettings = append(isettings, http2.Setting{
  184. ID: http2.SettingInitialWindowSize,
  185. Val: uint32(iwz)})
  186. }
  187. if config.MaxHeaderListSize != nil {
  188. isettings = append(isettings, http2.Setting{
  189. ID: http2.SettingMaxHeaderListSize,
  190. Val: *config.MaxHeaderListSize,
  191. })
  192. }
  193. if config.HeaderTableSize != nil {
  194. isettings = append(isettings, http2.Setting{
  195. ID: http2.SettingHeaderTableSize,
  196. Val: *config.HeaderTableSize,
  197. })
  198. }
  199. if err := framer.fr.WriteSettings(isettings...); err != nil {
  200. return nil, connectionErrorf(false, err, "transport: %v", err)
  201. }
  202. // Adjust the connection flow control window if needed.
  203. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  204. if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
  205. return nil, connectionErrorf(false, err, "transport: %v", err)
  206. }
  207. }
  208. kp := config.KeepaliveParams
  209. if kp.MaxConnectionIdle == 0 {
  210. kp.MaxConnectionIdle = defaultMaxConnectionIdle
  211. }
  212. if kp.MaxConnectionAge == 0 {
  213. kp.MaxConnectionAge = defaultMaxConnectionAge
  214. }
  215. // Add a jitter to MaxConnectionAge.
  216. kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
  217. if kp.MaxConnectionAgeGrace == 0 {
  218. kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
  219. }
  220. if kp.Time == 0 {
  221. kp.Time = defaultServerKeepaliveTime
  222. }
  223. if kp.Timeout == 0 {
  224. kp.Timeout = defaultServerKeepaliveTimeout
  225. }
  226. if kp.Time != infinity {
  227. if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
  228. return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
  229. }
  230. }
  231. kep := config.KeepalivePolicy
  232. if kep.MinTime == 0 {
  233. kep.MinTime = defaultKeepalivePolicyMinTime
  234. }
  235. done := make(chan struct{})
  236. t := &http2Server{
  237. ctx: setConnection(context.Background(), rawConn),
  238. done: done,
  239. conn: conn,
  240. remoteAddr: conn.RemoteAddr(),
  241. localAddr: conn.LocalAddr(),
  242. authInfo: authInfo,
  243. framer: framer,
  244. readerDone: make(chan struct{}),
  245. writerDone: make(chan struct{}),
  246. maxStreams: maxStreams,
  247. inTapHandle: config.InTapHandle,
  248. fc: &trInFlow{limit: uint32(icwz)},
  249. state: reachable,
  250. activeStreams: make(map[uint32]*Stream),
  251. stats: config.StatsHandlers,
  252. kp: kp,
  253. idle: time.Now(),
  254. kep: kep,
  255. initialWindowSize: iwz,
  256. czData: new(channelzData),
  257. bufferPool: newBufferPool(),
  258. }
  259. t.logger = prefixLoggerForServerTransport(t)
  260. // Add peer information to the http2server context.
  261. t.ctx = peer.NewContext(t.ctx, t.getPeer())
  262. t.controlBuf = newControlBuffer(t.done)
  263. if dynamicWindow {
  264. t.bdpEst = &bdpEstimator{
  265. bdp: initialWindowSize,
  266. updateFlowControl: t.updateFlowControl,
  267. }
  268. }
  269. for _, sh := range t.stats {
  270. t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
  271. RemoteAddr: t.remoteAddr,
  272. LocalAddr: t.localAddr,
  273. })
  274. connBegin := &stats.ConnBegin{}
  275. sh.HandleConn(t.ctx, connBegin)
  276. }
  277. t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
  278. if err != nil {
  279. return nil, err
  280. }
  281. t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
  282. t.framer.writer.Flush()
  283. defer func() {
  284. if err != nil {
  285. t.Close(err)
  286. }
  287. }()
  288. // Check the validity of client preface.
  289. preface := make([]byte, len(clientPreface))
  290. if _, err := io.ReadFull(t.conn, preface); err != nil {
  291. // In deployments where a gRPC server runs behind a cloud load balancer
  292. // which performs regular TCP level health checks, the connection is
  293. // closed immediately by the latter. Returning io.EOF here allows the
  294. // grpc server implementation to recognize this scenario and suppress
  295. // logging to reduce spam.
  296. if err == io.EOF {
  297. return nil, io.EOF
  298. }
  299. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  300. }
  301. if !bytes.Equal(preface, clientPreface) {
  302. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  303. }
  304. frame, err := t.framer.fr.ReadFrame()
  305. if err == io.EOF || err == io.ErrUnexpectedEOF {
  306. return nil, err
  307. }
  308. if err != nil {
  309. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
  310. }
  311. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  312. sf, ok := frame.(*http2.SettingsFrame)
  313. if !ok {
  314. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  315. }
  316. t.handleSettings(sf)
  317. go func() {
  318. t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
  319. t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
  320. t.loopy.run()
  321. close(t.writerDone)
  322. }()
  323. go t.keepalive()
  324. return t, nil
  325. }
  326. // operateHeaders takes action on the decoded headers. Returns an error if fatal
  327. // error encountered and transport needs to close, otherwise returns nil.
  328. func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) error {
  329. // Acquire max stream ID lock for entire duration
  330. t.maxStreamMu.Lock()
  331. defer t.maxStreamMu.Unlock()
  332. streamID := frame.Header().StreamID
  333. // frame.Truncated is set to true when framer detects that the current header
  334. // list size hits MaxHeaderListSize limit.
  335. if frame.Truncated {
  336. t.controlBuf.put(&cleanupStream{
  337. streamID: streamID,
  338. rst: true,
  339. rstCode: http2.ErrCodeFrameSize,
  340. onWrite: func() {},
  341. })
  342. return nil
  343. }
  344. if streamID%2 != 1 || streamID <= t.maxStreamID {
  345. // illegal gRPC stream id.
  346. return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
  347. }
  348. t.maxStreamID = streamID
  349. buf := newRecvBuffer()
  350. s := &Stream{
  351. id: streamID,
  352. st: t,
  353. buf: buf,
  354. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  355. }
  356. var (
  357. // if false, content-type was missing or invalid
  358. isGRPC = false
  359. contentType = ""
  360. mdata = make(metadata.MD, len(frame.Fields))
  361. httpMethod string
  362. // these are set if an error is encountered while parsing the headers
  363. protocolError bool
  364. headerError *status.Status
  365. timeoutSet bool
  366. timeout time.Duration
  367. )
  368. for _, hf := range frame.Fields {
  369. switch hf.Name {
  370. case "content-type":
  371. contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
  372. if !validContentType {
  373. contentType = hf.Value
  374. break
  375. }
  376. mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
  377. s.contentSubtype = contentSubtype
  378. isGRPC = true
  379. case "grpc-accept-encoding":
  380. mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
  381. if hf.Value == "" {
  382. continue
  383. }
  384. compressors := hf.Value
  385. if s.clientAdvertisedCompressors != "" {
  386. compressors = s.clientAdvertisedCompressors + "," + compressors
  387. }
  388. s.clientAdvertisedCompressors = compressors
  389. case "grpc-encoding":
  390. s.recvCompress = hf.Value
  391. case ":method":
  392. httpMethod = hf.Value
  393. case ":path":
  394. s.method = hf.Value
  395. case "grpc-timeout":
  396. timeoutSet = true
  397. var err error
  398. if timeout, err = decodeTimeout(hf.Value); err != nil {
  399. headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
  400. }
  401. // "Transports must consider requests containing the Connection header
  402. // as malformed." - A41
  403. case "connection":
  404. if t.logger.V(logLevel) {
  405. t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
  406. }
  407. protocolError = true
  408. default:
  409. if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
  410. break
  411. }
  412. v, err := decodeMetadataHeader(hf.Name, hf.Value)
  413. if err != nil {
  414. headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
  415. t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
  416. break
  417. }
  418. mdata[hf.Name] = append(mdata[hf.Name], v)
  419. }
  420. }
  421. // "If multiple Host headers or multiple :authority headers are present, the
  422. // request must be rejected with an HTTP status code 400 as required by Host
  423. // validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
  424. // with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
  425. // error, this takes precedence over a client not speaking gRPC.
  426. if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
  427. errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
  428. if t.logger.V(logLevel) {
  429. t.logger.Infof("Aborting the stream early: %v", errMsg)
  430. }
  431. t.controlBuf.put(&earlyAbortStream{
  432. httpStatus: http.StatusBadRequest,
  433. streamID: streamID,
  434. contentSubtype: s.contentSubtype,
  435. status: status.New(codes.Internal, errMsg),
  436. rst: !frame.StreamEnded(),
  437. })
  438. return nil
  439. }
  440. if protocolError {
  441. t.controlBuf.put(&cleanupStream{
  442. streamID: streamID,
  443. rst: true,
  444. rstCode: http2.ErrCodeProtocol,
  445. onWrite: func() {},
  446. })
  447. return nil
  448. }
  449. if !isGRPC {
  450. t.controlBuf.put(&earlyAbortStream{
  451. httpStatus: http.StatusUnsupportedMediaType,
  452. streamID: streamID,
  453. contentSubtype: s.contentSubtype,
  454. status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
  455. rst: !frame.StreamEnded(),
  456. })
  457. return nil
  458. }
  459. if headerError != nil {
  460. t.controlBuf.put(&earlyAbortStream{
  461. httpStatus: http.StatusBadRequest,
  462. streamID: streamID,
  463. contentSubtype: s.contentSubtype,
  464. status: headerError,
  465. rst: !frame.StreamEnded(),
  466. })
  467. return nil
  468. }
  469. // "If :authority is missing, Host must be renamed to :authority." - A41
  470. if len(mdata[":authority"]) == 0 {
  471. // No-op if host isn't present, no eventual :authority header is a valid
  472. // RPC.
  473. if host, ok := mdata["host"]; ok {
  474. mdata[":authority"] = host
  475. delete(mdata, "host")
  476. }
  477. } else {
  478. // "If :authority is present, Host must be discarded" - A41
  479. delete(mdata, "host")
  480. }
  481. if frame.StreamEnded() {
  482. // s is just created by the caller. No lock needed.
  483. s.state = streamReadDone
  484. }
  485. if timeoutSet {
  486. s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
  487. } else {
  488. s.ctx, s.cancel = context.WithCancel(t.ctx)
  489. }
  490. // Attach the received metadata to the context.
  491. if len(mdata) > 0 {
  492. s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
  493. if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
  494. s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
  495. }
  496. if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
  497. s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
  498. }
  499. }
  500. t.mu.Lock()
  501. if t.state != reachable {
  502. t.mu.Unlock()
  503. s.cancel()
  504. return nil
  505. }
  506. if uint32(len(t.activeStreams)) >= t.maxStreams {
  507. t.mu.Unlock()
  508. t.controlBuf.put(&cleanupStream{
  509. streamID: streamID,
  510. rst: true,
  511. rstCode: http2.ErrCodeRefusedStream,
  512. onWrite: func() {},
  513. })
  514. s.cancel()
  515. return nil
  516. }
  517. if httpMethod != http.MethodPost {
  518. t.mu.Unlock()
  519. errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
  520. if t.logger.V(logLevel) {
  521. t.logger.Infof("Aborting the stream early: %v", errMsg)
  522. }
  523. t.controlBuf.put(&earlyAbortStream{
  524. httpStatus: 405,
  525. streamID: streamID,
  526. contentSubtype: s.contentSubtype,
  527. status: status.New(codes.Internal, errMsg),
  528. rst: !frame.StreamEnded(),
  529. })
  530. s.cancel()
  531. return nil
  532. }
  533. if t.inTapHandle != nil {
  534. var err error
  535. if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method}); err != nil {
  536. t.mu.Unlock()
  537. if t.logger.V(logLevel) {
  538. t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
  539. }
  540. stat, ok := status.FromError(err)
  541. if !ok {
  542. stat = status.New(codes.PermissionDenied, err.Error())
  543. }
  544. t.controlBuf.put(&earlyAbortStream{
  545. httpStatus: 200,
  546. streamID: s.id,
  547. contentSubtype: s.contentSubtype,
  548. status: stat,
  549. rst: !frame.StreamEnded(),
  550. })
  551. return nil
  552. }
  553. }
  554. t.activeStreams[streamID] = s
  555. if len(t.activeStreams) == 1 {
  556. t.idle = time.Time{}
  557. }
  558. t.mu.Unlock()
  559. if channelz.IsOn() {
  560. atomic.AddInt64(&t.czData.streamsStarted, 1)
  561. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  562. }
  563. s.requestRead = func(n int) {
  564. t.adjustWindow(s, uint32(n))
  565. }
  566. s.ctx = traceCtx(s.ctx, s.method)
  567. for _, sh := range t.stats {
  568. s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
  569. inHeader := &stats.InHeader{
  570. FullMethod: s.method,
  571. RemoteAddr: t.remoteAddr,
  572. LocalAddr: t.localAddr,
  573. Compression: s.recvCompress,
  574. WireLength: int(frame.Header().Length),
  575. Header: mdata.Copy(),
  576. }
  577. sh.HandleRPC(s.ctx, inHeader)
  578. }
  579. s.ctxDone = s.ctx.Done()
  580. s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
  581. s.trReader = &transportReader{
  582. reader: &recvBufferReader{
  583. ctx: s.ctx,
  584. ctxDone: s.ctxDone,
  585. recv: s.buf,
  586. freeBuffer: t.bufferPool.put,
  587. },
  588. windowHandler: func(n int) {
  589. t.updateWindow(s, uint32(n))
  590. },
  591. }
  592. // Register the stream with loopy.
  593. t.controlBuf.put(&registerStream{
  594. streamID: s.id,
  595. wq: s.wq,
  596. })
  597. handle(s)
  598. return nil
  599. }
  600. // HandleStreams receives incoming streams using the given handler. This is
  601. // typically run in a separate goroutine.
  602. // traceCtx attaches trace to ctx and returns the new context.
  603. func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  604. defer close(t.readerDone)
  605. for {
  606. t.controlBuf.throttle()
  607. frame, err := t.framer.fr.ReadFrame()
  608. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  609. if err != nil {
  610. if se, ok := err.(http2.StreamError); ok {
  611. if t.logger.V(logLevel) {
  612. t.logger.Warningf("Encountered http2.StreamError: %v", se)
  613. }
  614. t.mu.Lock()
  615. s := t.activeStreams[se.StreamID]
  616. t.mu.Unlock()
  617. if s != nil {
  618. t.closeStream(s, true, se.Code, false)
  619. } else {
  620. t.controlBuf.put(&cleanupStream{
  621. streamID: se.StreamID,
  622. rst: true,
  623. rstCode: se.Code,
  624. onWrite: func() {},
  625. })
  626. }
  627. continue
  628. }
  629. if err == io.EOF || err == io.ErrUnexpectedEOF {
  630. t.Close(err)
  631. return
  632. }
  633. t.Close(err)
  634. return
  635. }
  636. switch frame := frame.(type) {
  637. case *http2.MetaHeadersFrame:
  638. if err := t.operateHeaders(frame, handle, traceCtx); err != nil {
  639. t.Close(err)
  640. break
  641. }
  642. case *http2.DataFrame:
  643. t.handleData(frame)
  644. case *http2.RSTStreamFrame:
  645. t.handleRSTStream(frame)
  646. case *http2.SettingsFrame:
  647. t.handleSettings(frame)
  648. case *http2.PingFrame:
  649. t.handlePing(frame)
  650. case *http2.WindowUpdateFrame:
  651. t.handleWindowUpdate(frame)
  652. case *http2.GoAwayFrame:
  653. // TODO: Handle GoAway from the client appropriately.
  654. default:
  655. if t.logger.V(logLevel) {
  656. t.logger.Infof("Received unsupported frame type %T", frame)
  657. }
  658. }
  659. }
  660. }
  661. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  662. t.mu.Lock()
  663. defer t.mu.Unlock()
  664. if t.activeStreams == nil {
  665. // The transport is closing.
  666. return nil, false
  667. }
  668. s, ok := t.activeStreams[f.Header().StreamID]
  669. if !ok {
  670. // The stream is already done.
  671. return nil, false
  672. }
  673. return s, true
  674. }
  675. // adjustWindow sends out extra window update over the initial window size
  676. // of stream if the application is requesting data larger in size than
  677. // the window.
  678. func (t *http2Server) adjustWindow(s *Stream, n uint32) {
  679. if w := s.fc.maybeAdjust(n); w > 0 {
  680. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  681. }
  682. }
  683. // updateWindow adjusts the inbound quota for the stream and the transport.
  684. // Window updates will deliver to the controller for sending when
  685. // the cumulative quota exceeds the corresponding threshold.
  686. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  687. if w := s.fc.onRead(n); w > 0 {
  688. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
  689. increment: w,
  690. })
  691. }
  692. }
  693. // updateFlowControl updates the incoming flow control windows
  694. // for the transport and the stream based on the current bdp
  695. // estimation.
  696. func (t *http2Server) updateFlowControl(n uint32) {
  697. t.mu.Lock()
  698. for _, s := range t.activeStreams {
  699. s.fc.newLimit(n)
  700. }
  701. t.initialWindowSize = int32(n)
  702. t.mu.Unlock()
  703. t.controlBuf.put(&outgoingWindowUpdate{
  704. streamID: 0,
  705. increment: t.fc.newLimit(n),
  706. })
  707. t.controlBuf.put(&outgoingSettings{
  708. ss: []http2.Setting{
  709. {
  710. ID: http2.SettingInitialWindowSize,
  711. Val: n,
  712. },
  713. },
  714. })
  715. }
  716. func (t *http2Server) handleData(f *http2.DataFrame) {
  717. size := f.Header().Length
  718. var sendBDPPing bool
  719. if t.bdpEst != nil {
  720. sendBDPPing = t.bdpEst.add(size)
  721. }
  722. // Decouple connection's flow control from application's read.
  723. // An update on connection's flow control should not depend on
  724. // whether user application has read the data or not. Such a
  725. // restriction is already imposed on the stream's flow control,
  726. // and therefore the sender will be blocked anyways.
  727. // Decoupling the connection flow control will prevent other
  728. // active(fast) streams from starving in presence of slow or
  729. // inactive streams.
  730. if w := t.fc.onData(size); w > 0 {
  731. t.controlBuf.put(&outgoingWindowUpdate{
  732. streamID: 0,
  733. increment: w,
  734. })
  735. }
  736. if sendBDPPing {
  737. // Avoid excessive ping detection (e.g. in an L7 proxy)
  738. // by sending a window update prior to the BDP ping.
  739. if w := t.fc.reset(); w > 0 {
  740. t.controlBuf.put(&outgoingWindowUpdate{
  741. streamID: 0,
  742. increment: w,
  743. })
  744. }
  745. t.controlBuf.put(bdpPing)
  746. }
  747. // Select the right stream to dispatch.
  748. s, ok := t.getStream(f)
  749. if !ok {
  750. return
  751. }
  752. if s.getState() == streamReadDone {
  753. t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
  754. return
  755. }
  756. if size > 0 {
  757. if err := s.fc.onData(size); err != nil {
  758. t.closeStream(s, true, http2.ErrCodeFlowControl, false)
  759. return
  760. }
  761. if f.Header().Flags.Has(http2.FlagDataPadded) {
  762. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  763. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  764. }
  765. }
  766. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  767. // guarantee f.Data() is consumed before the arrival of next frame.
  768. // Can this copy be eliminated?
  769. if len(f.Data()) > 0 {
  770. buffer := t.bufferPool.get()
  771. buffer.Reset()
  772. buffer.Write(f.Data())
  773. s.write(recvMsg{buffer: buffer})
  774. }
  775. }
  776. if f.StreamEnded() {
  777. // Received the end of stream from the client.
  778. s.compareAndSwapState(streamActive, streamReadDone)
  779. s.write(recvMsg{err: io.EOF})
  780. }
  781. }
  782. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  783. // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
  784. if s, ok := t.getStream(f); ok {
  785. t.closeStream(s, false, 0, false)
  786. return
  787. }
  788. // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
  789. t.controlBuf.put(&cleanupStream{
  790. streamID: f.Header().StreamID,
  791. rst: false,
  792. rstCode: 0,
  793. onWrite: func() {},
  794. })
  795. }
  796. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  797. if f.IsAck() {
  798. return
  799. }
  800. var ss []http2.Setting
  801. var updateFuncs []func()
  802. f.ForeachSetting(func(s http2.Setting) error {
  803. switch s.ID {
  804. case http2.SettingMaxHeaderListSize:
  805. updateFuncs = append(updateFuncs, func() {
  806. t.maxSendHeaderListSize = new(uint32)
  807. *t.maxSendHeaderListSize = s.Val
  808. })
  809. default:
  810. ss = append(ss, s)
  811. }
  812. return nil
  813. })
  814. t.controlBuf.executeAndPut(func(interface{}) bool {
  815. for _, f := range updateFuncs {
  816. f()
  817. }
  818. return true
  819. }, &incomingSettings{
  820. ss: ss,
  821. })
  822. }
  823. const (
  824. maxPingStrikes = 2
  825. defaultPingTimeout = 2 * time.Hour
  826. )
  827. func (t *http2Server) handlePing(f *http2.PingFrame) {
  828. if f.IsAck() {
  829. if f.Data == goAwayPing.data && t.drainEvent != nil {
  830. t.drainEvent.Fire()
  831. return
  832. }
  833. // Maybe it's a BDP ping.
  834. if t.bdpEst != nil {
  835. t.bdpEst.calculate(f.Data)
  836. }
  837. return
  838. }
  839. pingAck := &ping{ack: true}
  840. copy(pingAck.data[:], f.Data[:])
  841. t.controlBuf.put(pingAck)
  842. now := time.Now()
  843. defer func() {
  844. t.lastPingAt = now
  845. }()
  846. // A reset ping strikes means that we don't need to check for policy
  847. // violation for this ping and the pingStrikes counter should be set
  848. // to 0.
  849. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  850. t.pingStrikes = 0
  851. return
  852. }
  853. t.mu.Lock()
  854. ns := len(t.activeStreams)
  855. t.mu.Unlock()
  856. if ns < 1 && !t.kep.PermitWithoutStream {
  857. // Keepalive shouldn't be active thus, this new ping should
  858. // have come after at least defaultPingTimeout.
  859. if t.lastPingAt.Add(defaultPingTimeout).After(now) {
  860. t.pingStrikes++
  861. }
  862. } else {
  863. // Check if keepalive policy is respected.
  864. if t.lastPingAt.Add(t.kep.MinTime).After(now) {
  865. t.pingStrikes++
  866. }
  867. }
  868. if t.pingStrikes > maxPingStrikes {
  869. // Send goaway and close the connection.
  870. t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
  871. }
  872. }
  873. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  874. t.controlBuf.put(&incomingWindowUpdate{
  875. streamID: f.Header().StreamID,
  876. increment: f.Increment,
  877. })
  878. }
  879. func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
  880. for k, vv := range md {
  881. if isReservedHeader(k) {
  882. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  883. continue
  884. }
  885. for _, v := range vv {
  886. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  887. }
  888. }
  889. return headerFields
  890. }
  891. func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
  892. if t.maxSendHeaderListSize == nil {
  893. return true
  894. }
  895. hdrFrame := it.(*headerFrame)
  896. var sz int64
  897. for _, f := range hdrFrame.hf {
  898. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  899. if t.logger.V(logLevel) {
  900. t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
  901. }
  902. return false
  903. }
  904. }
  905. return true
  906. }
  907. func (t *http2Server) streamContextErr(s *Stream) error {
  908. select {
  909. case <-t.done:
  910. return ErrConnClosing
  911. default:
  912. }
  913. return ContextErr(s.ctx.Err())
  914. }
  915. // WriteHeader sends the header metadata md back to the client.
  916. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  917. s.hdrMu.Lock()
  918. defer s.hdrMu.Unlock()
  919. if s.getState() == streamDone {
  920. return t.streamContextErr(s)
  921. }
  922. if s.updateHeaderSent() {
  923. return ErrIllegalHeaderWrite
  924. }
  925. if md.Len() > 0 {
  926. if s.header.Len() > 0 {
  927. s.header = metadata.Join(s.header, md)
  928. } else {
  929. s.header = md
  930. }
  931. }
  932. if err := t.writeHeaderLocked(s); err != nil {
  933. return status.Convert(err).Err()
  934. }
  935. return nil
  936. }
  937. func (t *http2Server) setResetPingStrikes() {
  938. atomic.StoreUint32(&t.resetPingStrikes, 1)
  939. }
  940. func (t *http2Server) writeHeaderLocked(s *Stream) error {
  941. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  942. // first and create a slice of that exact size.
  943. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  944. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  945. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
  946. if s.sendCompress != "" {
  947. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  948. }
  949. headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
  950. success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
  951. streamID: s.id,
  952. hf: headerFields,
  953. endStream: false,
  954. onWrite: t.setResetPingStrikes,
  955. })
  956. if !success {
  957. if err != nil {
  958. return err
  959. }
  960. t.closeStream(s, true, http2.ErrCodeInternal, false)
  961. return ErrHeaderListSizeLimitViolation
  962. }
  963. for _, sh := range t.stats {
  964. // Note: Headers are compressed with hpack after this call returns.
  965. // No WireLength field is set here.
  966. outHeader := &stats.OutHeader{
  967. Header: s.header.Copy(),
  968. Compression: s.sendCompress,
  969. }
  970. sh.HandleRPC(s.Context(), outHeader)
  971. }
  972. return nil
  973. }
  974. // WriteStatus sends stream status to the client and terminates the stream.
  975. // There is no further I/O operations being able to perform on this stream.
  976. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  977. // OK is adopted.
  978. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  979. s.hdrMu.Lock()
  980. defer s.hdrMu.Unlock()
  981. if s.getState() == streamDone {
  982. return nil
  983. }
  984. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  985. // first and create a slice of that exact size.
  986. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  987. if !s.updateHeaderSent() { // No headers have been sent.
  988. if len(s.header) > 0 { // Send a separate header frame.
  989. if err := t.writeHeaderLocked(s); err != nil {
  990. return err
  991. }
  992. } else { // Send a trailer only response.
  993. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  994. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
  995. }
  996. }
  997. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  998. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  999. if p := st.Proto(); p != nil && len(p.Details) > 0 {
  1000. stBytes, err := proto.Marshal(p)
  1001. if err != nil {
  1002. // TODO: return error instead, when callers are able to handle it.
  1003. t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
  1004. } else {
  1005. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
  1006. }
  1007. }
  1008. // Attach the trailer metadata.
  1009. headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
  1010. trailingHeader := &headerFrame{
  1011. streamID: s.id,
  1012. hf: headerFields,
  1013. endStream: true,
  1014. onWrite: t.setResetPingStrikes,
  1015. }
  1016. success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
  1017. if !success {
  1018. if err != nil {
  1019. return err
  1020. }
  1021. t.closeStream(s, true, http2.ErrCodeInternal, false)
  1022. return ErrHeaderListSizeLimitViolation
  1023. }
  1024. // Send a RST_STREAM after the trailers if the client has not already half-closed.
  1025. rst := s.getState() == streamActive
  1026. t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
  1027. for _, sh := range t.stats {
  1028. // Note: The trailer fields are compressed with hpack after this call returns.
  1029. // No WireLength field is set here.
  1030. sh.HandleRPC(s.Context(), &stats.OutTrailer{
  1031. Trailer: s.trailer.Copy(),
  1032. })
  1033. }
  1034. return nil
  1035. }
  1036. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  1037. // is returns if it fails (e.g., framing error, transport error).
  1038. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  1039. if !s.isHeaderSent() { // Headers haven't been written yet.
  1040. if err := t.WriteHeader(s, nil); err != nil {
  1041. return err
  1042. }
  1043. } else {
  1044. // Writing headers checks for this condition.
  1045. if s.getState() == streamDone {
  1046. return t.streamContextErr(s)
  1047. }
  1048. }
  1049. df := &dataFrame{
  1050. streamID: s.id,
  1051. h: hdr,
  1052. d: data,
  1053. onEachWrite: t.setResetPingStrikes,
  1054. }
  1055. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  1056. return t.streamContextErr(s)
  1057. }
  1058. return t.controlBuf.put(df)
  1059. }
  1060. // keepalive running in a separate goroutine does the following:
  1061. // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  1062. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  1063. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  1064. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  1065. // after an additional duration of keepalive.Timeout.
  1066. func (t *http2Server) keepalive() {
  1067. p := &ping{}
  1068. // True iff a ping has been sent, and no data has been received since then.
  1069. outstandingPing := false
  1070. // Amount of time remaining before which we should receive an ACK for the
  1071. // last sent ping.
  1072. kpTimeoutLeft := time.Duration(0)
  1073. // Records the last value of t.lastRead before we go block on the timer.
  1074. // This is required to check for read activity since then.
  1075. prevNano := time.Now().UnixNano()
  1076. // Initialize the different timers to their default values.
  1077. idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
  1078. ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
  1079. kpTimer := time.NewTimer(t.kp.Time)
  1080. defer func() {
  1081. // We need to drain the underlying channel in these timers after a call
  1082. // to Stop(), only if we are interested in resetting them. Clearly we
  1083. // are not interested in resetting them here.
  1084. idleTimer.Stop()
  1085. ageTimer.Stop()
  1086. kpTimer.Stop()
  1087. }()
  1088. for {
  1089. select {
  1090. case <-idleTimer.C:
  1091. t.mu.Lock()
  1092. idle := t.idle
  1093. if idle.IsZero() { // The connection is non-idle.
  1094. t.mu.Unlock()
  1095. idleTimer.Reset(t.kp.MaxConnectionIdle)
  1096. continue
  1097. }
  1098. val := t.kp.MaxConnectionIdle - time.Since(idle)
  1099. t.mu.Unlock()
  1100. if val <= 0 {
  1101. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  1102. // Gracefully close the connection.
  1103. t.Drain("max_idle")
  1104. return
  1105. }
  1106. idleTimer.Reset(val)
  1107. case <-ageTimer.C:
  1108. t.Drain("max_age")
  1109. ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
  1110. select {
  1111. case <-ageTimer.C:
  1112. // Close the connection after grace period.
  1113. if t.logger.V(logLevel) {
  1114. t.logger.Infof("Closing server transport due to maximum connection age")
  1115. }
  1116. t.controlBuf.put(closeConnection{})
  1117. case <-t.done:
  1118. }
  1119. return
  1120. case <-kpTimer.C:
  1121. lastRead := atomic.LoadInt64(&t.lastRead)
  1122. if lastRead > prevNano {
  1123. // There has been read activity since the last time we were
  1124. // here. Setup the timer to fire at kp.Time seconds from
  1125. // lastRead time and continue.
  1126. outstandingPing = false
  1127. kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  1128. prevNano = lastRead
  1129. continue
  1130. }
  1131. if outstandingPing && kpTimeoutLeft <= 0 {
  1132. t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
  1133. return
  1134. }
  1135. if !outstandingPing {
  1136. if channelz.IsOn() {
  1137. atomic.AddInt64(&t.czData.kpCount, 1)
  1138. }
  1139. t.controlBuf.put(p)
  1140. kpTimeoutLeft = t.kp.Timeout
  1141. outstandingPing = true
  1142. }
  1143. // The amount of time to sleep here is the minimum of kp.Time and
  1144. // timeoutLeft. This will ensure that we wait only for kp.Time
  1145. // before sending out the next ping (for cases where the ping is
  1146. // acked).
  1147. sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
  1148. kpTimeoutLeft -= sleepDuration
  1149. kpTimer.Reset(sleepDuration)
  1150. case <-t.done:
  1151. return
  1152. }
  1153. }
  1154. }
  1155. // Close starts shutting down the http2Server transport.
  1156. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  1157. // could cause some resource issue. Revisit this later.
  1158. func (t *http2Server) Close(err error) {
  1159. t.mu.Lock()
  1160. if t.state == closing {
  1161. t.mu.Unlock()
  1162. return
  1163. }
  1164. if t.logger.V(logLevel) {
  1165. t.logger.Infof("Closing: %v", err)
  1166. }
  1167. t.state = closing
  1168. streams := t.activeStreams
  1169. t.activeStreams = nil
  1170. t.mu.Unlock()
  1171. t.controlBuf.finish()
  1172. close(t.done)
  1173. if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
  1174. t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
  1175. }
  1176. channelz.RemoveEntry(t.channelzID)
  1177. // Cancel all active streams.
  1178. for _, s := range streams {
  1179. s.cancel()
  1180. }
  1181. for _, sh := range t.stats {
  1182. connEnd := &stats.ConnEnd{}
  1183. sh.HandleConn(t.ctx, connEnd)
  1184. }
  1185. }
  1186. // deleteStream deletes the stream s from transport's active streams.
  1187. func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
  1188. t.mu.Lock()
  1189. if _, ok := t.activeStreams[s.id]; ok {
  1190. delete(t.activeStreams, s.id)
  1191. if len(t.activeStreams) == 0 {
  1192. t.idle = time.Now()
  1193. }
  1194. }
  1195. t.mu.Unlock()
  1196. if channelz.IsOn() {
  1197. if eosReceived {
  1198. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  1199. } else {
  1200. atomic.AddInt64(&t.czData.streamsFailed, 1)
  1201. }
  1202. }
  1203. }
  1204. // finishStream closes the stream and puts the trailing headerFrame into controlbuf.
  1205. func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
  1206. // In case stream sending and receiving are invoked in separate
  1207. // goroutines (e.g., bi-directional streaming), cancel needs to be
  1208. // called to interrupt the potential blocking on other goroutines.
  1209. s.cancel()
  1210. oldState := s.swapState(streamDone)
  1211. if oldState == streamDone {
  1212. // If the stream was already done, return.
  1213. return
  1214. }
  1215. hdr.cleanup = &cleanupStream{
  1216. streamID: s.id,
  1217. rst: rst,
  1218. rstCode: rstCode,
  1219. onWrite: func() {
  1220. t.deleteStream(s, eosReceived)
  1221. },
  1222. }
  1223. t.controlBuf.put(hdr)
  1224. }
  1225. // closeStream clears the footprint of a stream when the stream is not needed any more.
  1226. func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
  1227. // In case stream sending and receiving are invoked in separate
  1228. // goroutines (e.g., bi-directional streaming), cancel needs to be
  1229. // called to interrupt the potential blocking on other goroutines.
  1230. s.cancel()
  1231. s.swapState(streamDone)
  1232. t.deleteStream(s, eosReceived)
  1233. t.controlBuf.put(&cleanupStream{
  1234. streamID: s.id,
  1235. rst: rst,
  1236. rstCode: rstCode,
  1237. onWrite: func() {},
  1238. })
  1239. }
  1240. func (t *http2Server) RemoteAddr() net.Addr {
  1241. return t.remoteAddr
  1242. }
  1243. func (t *http2Server) Drain(debugData string) {
  1244. t.mu.Lock()
  1245. defer t.mu.Unlock()
  1246. if t.drainEvent != nil {
  1247. return
  1248. }
  1249. t.drainEvent = grpcsync.NewEvent()
  1250. t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
  1251. }
  1252. var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  1253. // Handles outgoing GoAway and returns true if loopy needs to put itself
  1254. // in draining mode.
  1255. func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
  1256. t.maxStreamMu.Lock()
  1257. t.mu.Lock()
  1258. if t.state == closing { // TODO(mmukhi): This seems unnecessary.
  1259. t.mu.Unlock()
  1260. t.maxStreamMu.Unlock()
  1261. // The transport is closing.
  1262. return false, ErrConnClosing
  1263. }
  1264. if !g.headsUp {
  1265. // Stop accepting more streams now.
  1266. t.state = draining
  1267. sid := t.maxStreamID
  1268. retErr := g.closeConn
  1269. if len(t.activeStreams) == 0 {
  1270. retErr = errors.New("second GOAWAY written and no active streams left to process")
  1271. }
  1272. t.mu.Unlock()
  1273. t.maxStreamMu.Unlock()
  1274. if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
  1275. return false, err
  1276. }
  1277. if retErr != nil {
  1278. return false, retErr
  1279. }
  1280. return true, nil
  1281. }
  1282. t.mu.Unlock()
  1283. t.maxStreamMu.Unlock()
  1284. // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1285. // Follow that with a ping and wait for the ack to come back or a timer
  1286. // to expire. During this time accept new streams since they might have
  1287. // originated before the GoAway reaches the client.
  1288. // After getting the ack or timer expiration send out another GoAway this
  1289. // time with an ID of the max stream server intends to process.
  1290. if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
  1291. return false, err
  1292. }
  1293. if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1294. return false, err
  1295. }
  1296. go func() {
  1297. timer := time.NewTimer(time.Minute)
  1298. defer timer.Stop()
  1299. select {
  1300. case <-t.drainEvent.Done():
  1301. case <-timer.C:
  1302. case <-t.done:
  1303. return
  1304. }
  1305. t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
  1306. }()
  1307. return false, nil
  1308. }
  1309. func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
  1310. s := channelz.SocketInternalMetric{
  1311. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1312. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1313. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1314. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1315. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1316. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1317. LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1318. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1319. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1320. LocalFlowControlWindow: int64(t.fc.getSize()),
  1321. SocketOptions: channelz.GetSocketOption(t.conn),
  1322. LocalAddr: t.localAddr,
  1323. RemoteAddr: t.remoteAddr,
  1324. // RemoteName :
  1325. }
  1326. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1327. s.Security = au.GetSecurityValue()
  1328. }
  1329. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1330. return &s
  1331. }
  1332. func (t *http2Server) IncrMsgSent() {
  1333. atomic.AddInt64(&t.czData.msgSent, 1)
  1334. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1335. }
  1336. func (t *http2Server) IncrMsgRecv() {
  1337. atomic.AddInt64(&t.czData.msgRecv, 1)
  1338. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1339. }
  1340. func (t *http2Server) getOutFlowWindow() int64 {
  1341. resp := make(chan uint32, 1)
  1342. timer := time.NewTimer(time.Second)
  1343. defer timer.Stop()
  1344. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1345. select {
  1346. case sz := <-resp:
  1347. return int64(sz)
  1348. case <-t.done:
  1349. return -1
  1350. case <-timer.C:
  1351. return -2
  1352. }
  1353. }
  1354. func (t *http2Server) getPeer() *peer.Peer {
  1355. return &peer.Peer{
  1356. Addr: t.remoteAddr,
  1357. AuthInfo: t.authInfo, // Can be nil
  1358. }
  1359. }
  1360. func getJitter(v time.Duration) time.Duration {
  1361. if v == infinity {
  1362. return 0
  1363. }
  1364. // Generate a jitter between +/- 10% of the value.
  1365. r := int64(v / 10)
  1366. j := grpcrand.Int63n(2*r) - r
  1367. return time.Duration(j)
  1368. }
  1369. type connectionKey struct{}
  1370. // GetConnection gets the connection from the context.
  1371. func GetConnection(ctx context.Context) net.Conn {
  1372. conn, _ := ctx.Value(connectionKey{}).(net.Conn)
  1373. return conn
  1374. }
  1375. // SetConnection adds the connection to the context to be able to get
  1376. // information about the destination ip and port for an incoming RPC. This also
  1377. // allows any unary or streaming interceptors to see the connection.
  1378. func setConnection(ctx context.Context, conn net.Conn) context.Context {
  1379. return context.WithValue(ctx, connectionKey{}, conn)
  1380. }