stream.go 53 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "io"
  23. "math"
  24. "strconv"
  25. "sync"
  26. "time"
  27. "golang.org/x/net/trace"
  28. "google.golang.org/grpc/balancer"
  29. "google.golang.org/grpc/codes"
  30. "google.golang.org/grpc/encoding"
  31. "google.golang.org/grpc/internal/balancerload"
  32. "google.golang.org/grpc/internal/binarylog"
  33. "google.golang.org/grpc/internal/channelz"
  34. "google.golang.org/grpc/internal/grpcrand"
  35. "google.golang.org/grpc/internal/grpcutil"
  36. imetadata "google.golang.org/grpc/internal/metadata"
  37. iresolver "google.golang.org/grpc/internal/resolver"
  38. "google.golang.org/grpc/internal/serviceconfig"
  39. istatus "google.golang.org/grpc/internal/status"
  40. "google.golang.org/grpc/internal/transport"
  41. "google.golang.org/grpc/metadata"
  42. "google.golang.org/grpc/peer"
  43. "google.golang.org/grpc/stats"
  44. "google.golang.org/grpc/status"
  45. )
  46. // StreamHandler defines the handler called by gRPC server to complete the
  47. // execution of a streaming RPC.
  48. //
  49. // If a StreamHandler returns an error, it should either be produced by the
  50. // status package, or be one of the context errors. Otherwise, gRPC will use
  51. // codes.Unknown as the status code and err.Error() as the status message of the
  52. // RPC.
  53. type StreamHandler func(srv interface{}, stream ServerStream) error
  54. // StreamDesc represents a streaming RPC service's method specification. Used
  55. // on the server when registering services and on the client when initiating
  56. // new streams.
  57. type StreamDesc struct {
  58. // StreamName and Handler are only used when registering handlers on a
  59. // server.
  60. StreamName string // the name of the method excluding the service
  61. Handler StreamHandler // the handler called for the method
  62. // ServerStreams and ClientStreams are used for registering handlers on a
  63. // server as well as defining RPC behavior when passed to NewClientStream
  64. // and ClientConn.NewStream. At least one must be true.
  65. ServerStreams bool // indicates the server can perform streaming sends
  66. ClientStreams bool // indicates the client can perform streaming sends
  67. }
  68. // Stream defines the common interface a client or server stream has to satisfy.
  69. //
  70. // Deprecated: See ClientStream and ServerStream documentation instead.
  71. type Stream interface {
  72. // Deprecated: See ClientStream and ServerStream documentation instead.
  73. Context() context.Context
  74. // Deprecated: See ClientStream and ServerStream documentation instead.
  75. SendMsg(m interface{}) error
  76. // Deprecated: See ClientStream and ServerStream documentation instead.
  77. RecvMsg(m interface{}) error
  78. }
  79. // ClientStream defines the client-side behavior of a streaming RPC.
  80. //
  81. // All errors returned from ClientStream methods are compatible with the
  82. // status package.
  83. type ClientStream interface {
  84. // Header returns the header metadata received from the server if there
  85. // is any. It blocks if the metadata is not ready to read.
  86. Header() (metadata.MD, error)
  87. // Trailer returns the trailer metadata from the server, if there is any.
  88. // It must only be called after stream.CloseAndRecv has returned, or
  89. // stream.Recv has returned a non-nil error (including io.EOF).
  90. Trailer() metadata.MD
  91. // CloseSend closes the send direction of the stream. It closes the stream
  92. // when non-nil error is met. It is also not safe to call CloseSend
  93. // concurrently with SendMsg.
  94. CloseSend() error
  95. // Context returns the context for this stream.
  96. //
  97. // It should not be called until after Header or RecvMsg has returned. Once
  98. // called, subsequent client-side retries are disabled.
  99. Context() context.Context
  100. // SendMsg is generally called by generated code. On error, SendMsg aborts
  101. // the stream. If the error was generated by the client, the status is
  102. // returned directly; otherwise, io.EOF is returned and the status of
  103. // the stream may be discovered using RecvMsg.
  104. //
  105. // SendMsg blocks until:
  106. // - There is sufficient flow control to schedule m with the transport, or
  107. // - The stream is done, or
  108. // - The stream breaks.
  109. //
  110. // SendMsg does not wait until the message is received by the server. An
  111. // untimely stream closure may result in lost messages. To ensure delivery,
  112. // users should ensure the RPC completed successfully using RecvMsg.
  113. //
  114. // It is safe to have a goroutine calling SendMsg and another goroutine
  115. // calling RecvMsg on the same stream at the same time, but it is not safe
  116. // to call SendMsg on the same stream in different goroutines. It is also
  117. // not safe to call CloseSend concurrently with SendMsg.
  118. //
  119. // It is not safe to modify the message after calling SendMsg. Tracing
  120. // libraries and stats handlers may use the message lazily.
  121. SendMsg(m interface{}) error
  122. // RecvMsg blocks until it receives a message into m or the stream is
  123. // done. It returns io.EOF when the stream completes successfully. On
  124. // any other error, the stream is aborted and the error contains the RPC
  125. // status.
  126. //
  127. // It is safe to have a goroutine calling SendMsg and another goroutine
  128. // calling RecvMsg on the same stream at the same time, but it is not
  129. // safe to call RecvMsg on the same stream in different goroutines.
  130. RecvMsg(m interface{}) error
  131. }
  132. // NewStream creates a new Stream for the client side. This is typically
  133. // called by generated code. ctx is used for the lifetime of the stream.
  134. //
  135. // To ensure resources are not leaked due to the stream returned, one of the following
  136. // actions must be performed:
  137. //
  138. // 1. Call Close on the ClientConn.
  139. // 2. Cancel the context provided.
  140. // 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
  141. // client-streaming RPC, for instance, might use the helper function
  142. // CloseAndRecv (note that CloseSend does not Recv, therefore is not
  143. // guaranteed to release all resources).
  144. // 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
  145. //
  146. // If none of the above happen, a goroutine and a context will be leaked, and grpc
  147. // will not call the optionally-configured stats handler with a stats.End message.
  148. func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
  149. if err := cc.idlenessMgr.onCallBegin(); err != nil {
  150. return nil, err
  151. }
  152. defer cc.idlenessMgr.onCallEnd()
  153. // allow interceptor to see all applicable call options, which means those
  154. // configured as defaults from dial option as well as per-call options
  155. opts = combine(cc.dopts.callOptions, opts)
  156. if cc.dopts.streamInt != nil {
  157. return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
  158. }
  159. return newClientStream(ctx, desc, cc, method, opts...)
  160. }
  161. // NewClientStream is a wrapper for ClientConn.NewStream.
  162. func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
  163. return cc.NewStream(ctx, desc, method, opts...)
  164. }
  165. func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
  166. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  167. // validate md
  168. if err := imetadata.Validate(md); err != nil {
  169. return nil, status.Error(codes.Internal, err.Error())
  170. }
  171. // validate added
  172. for _, kvs := range added {
  173. for i := 0; i < len(kvs); i += 2 {
  174. if err := imetadata.ValidatePair(kvs[i], kvs[i+1]); err != nil {
  175. return nil, status.Error(codes.Internal, err.Error())
  176. }
  177. }
  178. }
  179. }
  180. if channelz.IsOn() {
  181. cc.incrCallsStarted()
  182. defer func() {
  183. if err != nil {
  184. cc.incrCallsFailed()
  185. }
  186. }()
  187. }
  188. // Provide an opportunity for the first RPC to see the first service config
  189. // provided by the resolver.
  190. if err := cc.waitForResolvedAddrs(ctx); err != nil {
  191. return nil, err
  192. }
  193. var mc serviceconfig.MethodConfig
  194. var onCommit func()
  195. var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
  196. return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
  197. }
  198. rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
  199. rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
  200. if err != nil {
  201. if st, ok := status.FromError(err); ok {
  202. // Restrict the code to the list allowed by gRFC A54.
  203. if istatus.IsRestrictedControlPlaneCode(st) {
  204. err = status.Errorf(codes.Internal, "config selector returned illegal status: %v", err)
  205. }
  206. return nil, err
  207. }
  208. return nil, toRPCErr(err)
  209. }
  210. if rpcConfig != nil {
  211. if rpcConfig.Context != nil {
  212. ctx = rpcConfig.Context
  213. }
  214. mc = rpcConfig.MethodConfig
  215. onCommit = rpcConfig.OnCommitted
  216. if rpcConfig.Interceptor != nil {
  217. rpcInfo.Context = nil
  218. ns := newStream
  219. newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
  220. cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
  221. if err != nil {
  222. return nil, toRPCErr(err)
  223. }
  224. return cs, nil
  225. }
  226. }
  227. }
  228. return newStream(ctx, func() {})
  229. }
  230. func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
  231. c := defaultCallInfo()
  232. if mc.WaitForReady != nil {
  233. c.failFast = !*mc.WaitForReady
  234. }
  235. // Possible context leak:
  236. // The cancel function for the child context we create will only be called
  237. // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
  238. // an error is generated by SendMsg.
  239. // https://github.com/grpc/grpc-go/issues/1818.
  240. var cancel context.CancelFunc
  241. if mc.Timeout != nil && *mc.Timeout >= 0 {
  242. ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
  243. } else {
  244. ctx, cancel = context.WithCancel(ctx)
  245. }
  246. defer func() {
  247. if err != nil {
  248. cancel()
  249. }
  250. }()
  251. for _, o := range opts {
  252. if err := o.before(c); err != nil {
  253. return nil, toRPCErr(err)
  254. }
  255. }
  256. c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
  257. c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
  258. if err := setCallInfoCodec(c); err != nil {
  259. return nil, err
  260. }
  261. callHdr := &transport.CallHdr{
  262. Host: cc.authority,
  263. Method: method,
  264. ContentSubtype: c.contentSubtype,
  265. DoneFunc: doneFunc,
  266. }
  267. // Set our outgoing compression according to the UseCompressor CallOption, if
  268. // set. In that case, also find the compressor from the encoding package.
  269. // Otherwise, use the compressor configured by the WithCompressor DialOption,
  270. // if set.
  271. var cp Compressor
  272. var comp encoding.Compressor
  273. if ct := c.compressorType; ct != "" {
  274. callHdr.SendCompress = ct
  275. if ct != encoding.Identity {
  276. comp = encoding.GetCompressor(ct)
  277. if comp == nil {
  278. return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
  279. }
  280. }
  281. } else if cc.dopts.cp != nil {
  282. callHdr.SendCompress = cc.dopts.cp.Type()
  283. cp = cc.dopts.cp
  284. }
  285. if c.creds != nil {
  286. callHdr.Creds = c.creds
  287. }
  288. cs := &clientStream{
  289. callHdr: callHdr,
  290. ctx: ctx,
  291. methodConfig: &mc,
  292. opts: opts,
  293. callInfo: c,
  294. cc: cc,
  295. desc: desc,
  296. codec: c.codec,
  297. cp: cp,
  298. comp: comp,
  299. cancel: cancel,
  300. firstAttempt: true,
  301. onCommit: onCommit,
  302. }
  303. if !cc.dopts.disableRetry {
  304. cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
  305. }
  306. if ml := binarylog.GetMethodLogger(method); ml != nil {
  307. cs.binlogs = append(cs.binlogs, ml)
  308. }
  309. if cc.dopts.binaryLogger != nil {
  310. if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
  311. cs.binlogs = append(cs.binlogs, ml)
  312. }
  313. }
  314. // Pick the transport to use and create a new stream on the transport.
  315. // Assign cs.attempt upon success.
  316. op := func(a *csAttempt) error {
  317. if err := a.getTransport(); err != nil {
  318. return err
  319. }
  320. if err := a.newStream(); err != nil {
  321. return err
  322. }
  323. // Because this operation is always called either here (while creating
  324. // the clientStream) or by the retry code while locked when replaying
  325. // the operation, it is safe to access cs.attempt directly.
  326. cs.attempt = a
  327. return nil
  328. }
  329. if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
  330. return nil, err
  331. }
  332. if len(cs.binlogs) != 0 {
  333. md, _ := metadata.FromOutgoingContext(ctx)
  334. logEntry := &binarylog.ClientHeader{
  335. OnClientSide: true,
  336. Header: md,
  337. MethodName: method,
  338. Authority: cs.cc.authority,
  339. }
  340. if deadline, ok := ctx.Deadline(); ok {
  341. logEntry.Timeout = time.Until(deadline)
  342. if logEntry.Timeout < 0 {
  343. logEntry.Timeout = 0
  344. }
  345. }
  346. for _, binlog := range cs.binlogs {
  347. binlog.Log(cs.ctx, logEntry)
  348. }
  349. }
  350. if desc != unaryStreamDesc {
  351. // Listen on cc and stream contexts to cleanup when the user closes the
  352. // ClientConn or cancels the stream context. In all other cases, an error
  353. // should already be injected into the recv buffer by the transport, which
  354. // the client will eventually receive, and then we will cancel the stream's
  355. // context in clientStream.finish.
  356. go func() {
  357. select {
  358. case <-cc.ctx.Done():
  359. cs.finish(ErrClientConnClosing)
  360. case <-ctx.Done():
  361. cs.finish(toRPCErr(ctx.Err()))
  362. }
  363. }()
  364. }
  365. return cs, nil
  366. }
  367. // newAttemptLocked creates a new csAttempt without a transport or stream.
  368. func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
  369. if err := cs.ctx.Err(); err != nil {
  370. return nil, toRPCErr(err)
  371. }
  372. if err := cs.cc.ctx.Err(); err != nil {
  373. return nil, ErrClientConnClosing
  374. }
  375. ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
  376. method := cs.callHdr.Method
  377. var beginTime time.Time
  378. shs := cs.cc.dopts.copts.StatsHandlers
  379. for _, sh := range shs {
  380. ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
  381. beginTime = time.Now()
  382. begin := &stats.Begin{
  383. Client: true,
  384. BeginTime: beginTime,
  385. FailFast: cs.callInfo.failFast,
  386. IsClientStream: cs.desc.ClientStreams,
  387. IsServerStream: cs.desc.ServerStreams,
  388. IsTransparentRetryAttempt: isTransparent,
  389. }
  390. sh.HandleRPC(ctx, begin)
  391. }
  392. var trInfo *traceInfo
  393. if EnableTracing {
  394. trInfo = &traceInfo{
  395. tr: trace.New("grpc.Sent."+methodFamily(method), method),
  396. firstLine: firstLine{
  397. client: true,
  398. },
  399. }
  400. if deadline, ok := ctx.Deadline(); ok {
  401. trInfo.firstLine.deadline = time.Until(deadline)
  402. }
  403. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  404. ctx = trace.NewContext(ctx, trInfo.tr)
  405. }
  406. if cs.cc.parsedTarget.URL.Scheme == "xds" {
  407. // Add extra metadata (metadata that will be added by transport) to context
  408. // so the balancer can see them.
  409. ctx = grpcutil.WithExtraMetadata(ctx, metadata.Pairs(
  410. "content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
  411. ))
  412. }
  413. return &csAttempt{
  414. ctx: ctx,
  415. beginTime: beginTime,
  416. cs: cs,
  417. dc: cs.cc.dopts.dc,
  418. statsHandlers: shs,
  419. trInfo: trInfo,
  420. }, nil
  421. }
  422. func (a *csAttempt) getTransport() error {
  423. cs := a.cs
  424. var err error
  425. a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
  426. if err != nil {
  427. if de, ok := err.(dropError); ok {
  428. err = de.error
  429. a.drop = true
  430. }
  431. return err
  432. }
  433. if a.trInfo != nil {
  434. a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
  435. }
  436. return nil
  437. }
  438. func (a *csAttempt) newStream() error {
  439. cs := a.cs
  440. cs.callHdr.PreviousAttempts = cs.numRetries
  441. // Merge metadata stored in PickResult, if any, with existing call metadata.
  442. // It is safe to overwrite the csAttempt's context here, since all state
  443. // maintained in it are local to the attempt. When the attempt has to be
  444. // retried, a new instance of csAttempt will be created.
  445. if a.pickResult.Metadata != nil {
  446. // We currently do not have a function it the metadata package which
  447. // merges given metadata with existing metadata in a context. Existing
  448. // function `AppendToOutgoingContext()` takes a variadic argument of key
  449. // value pairs.
  450. //
  451. // TODO: Make it possible to retrieve key value pairs from metadata.MD
  452. // in a form passable to AppendToOutgoingContext(), or create a version
  453. // of AppendToOutgoingContext() that accepts a metadata.MD.
  454. md, _ := metadata.FromOutgoingContext(a.ctx)
  455. md = metadata.Join(md, a.pickResult.Metadata)
  456. a.ctx = metadata.NewOutgoingContext(a.ctx, md)
  457. }
  458. s, err := a.t.NewStream(a.ctx, cs.callHdr)
  459. if err != nil {
  460. nse, ok := err.(*transport.NewStreamError)
  461. if !ok {
  462. // Unexpected.
  463. return err
  464. }
  465. if nse.AllowTransparentRetry {
  466. a.allowTransparentRetry = true
  467. }
  468. // Unwrap and convert error.
  469. return toRPCErr(nse.Err)
  470. }
  471. a.s = s
  472. a.p = &parser{r: s}
  473. return nil
  474. }
  475. // clientStream implements a client side Stream.
  476. type clientStream struct {
  477. callHdr *transport.CallHdr
  478. opts []CallOption
  479. callInfo *callInfo
  480. cc *ClientConn
  481. desc *StreamDesc
  482. codec baseCodec
  483. cp Compressor
  484. comp encoding.Compressor
  485. cancel context.CancelFunc // cancels all attempts
  486. sentLast bool // sent an end stream
  487. methodConfig *MethodConfig
  488. ctx context.Context // the application's context, wrapped by stats/tracing
  489. retryThrottler *retryThrottler // The throttler active when the RPC began.
  490. binlogs []binarylog.MethodLogger
  491. // serverHeaderBinlogged is a boolean for whether server header has been
  492. // logged. Server header will be logged when the first time one of those
  493. // happens: stream.Header(), stream.Recv().
  494. //
  495. // It's only read and used by Recv() and Header(), so it doesn't need to be
  496. // synchronized.
  497. serverHeaderBinlogged bool
  498. mu sync.Mutex
  499. firstAttempt bool // if true, transparent retry is valid
  500. numRetries int // exclusive of transparent retry attempt(s)
  501. numRetriesSincePushback int // retries since pushback; to reset backoff
  502. finished bool // TODO: replace with atomic cmpxchg or sync.Once?
  503. // attempt is the active client stream attempt.
  504. // The only place where it is written is the newAttemptLocked method and this method never writes nil.
  505. // So, attempt can be nil only inside newClientStream function when clientStream is first created.
  506. // One of the first things done after clientStream's creation, is to call newAttemptLocked which either
  507. // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked,
  508. // then newClientStream calls finish on the clientStream and returns. So, finish method is the only
  509. // place where we need to check if the attempt is nil.
  510. attempt *csAttempt
  511. // TODO(hedging): hedging will have multiple attempts simultaneously.
  512. committed bool // active attempt committed for retry?
  513. onCommit func()
  514. buffer []func(a *csAttempt) error // operations to replay on retry
  515. bufferSize int // current size of buffer
  516. }
  517. // csAttempt implements a single transport stream attempt within a
  518. // clientStream.
  519. type csAttempt struct {
  520. ctx context.Context
  521. cs *clientStream
  522. t transport.ClientTransport
  523. s *transport.Stream
  524. p *parser
  525. pickResult balancer.PickResult
  526. finished bool
  527. dc Decompressor
  528. decomp encoding.Compressor
  529. decompSet bool
  530. mu sync.Mutex // guards trInfo.tr
  531. // trInfo may be nil (if EnableTracing is false).
  532. // trInfo.tr is set when created (if EnableTracing is true),
  533. // and cleared when the finish method is called.
  534. trInfo *traceInfo
  535. statsHandlers []stats.Handler
  536. beginTime time.Time
  537. // set for newStream errors that may be transparently retried
  538. allowTransparentRetry bool
  539. // set for pick errors that are returned as a status
  540. drop bool
  541. }
  542. func (cs *clientStream) commitAttemptLocked() {
  543. if !cs.committed && cs.onCommit != nil {
  544. cs.onCommit()
  545. }
  546. cs.committed = true
  547. cs.buffer = nil
  548. }
  549. func (cs *clientStream) commitAttempt() {
  550. cs.mu.Lock()
  551. cs.commitAttemptLocked()
  552. cs.mu.Unlock()
  553. }
  554. // shouldRetry returns nil if the RPC should be retried; otherwise it returns
  555. // the error that should be returned by the operation. If the RPC should be
  556. // retried, the bool indicates whether it is being retried transparently.
  557. func (a *csAttempt) shouldRetry(err error) (bool, error) {
  558. cs := a.cs
  559. if cs.finished || cs.committed || a.drop {
  560. // RPC is finished or committed or was dropped by the picker; cannot retry.
  561. return false, err
  562. }
  563. if a.s == nil && a.allowTransparentRetry {
  564. return true, nil
  565. }
  566. // Wait for the trailers.
  567. unprocessed := false
  568. if a.s != nil {
  569. <-a.s.Done()
  570. unprocessed = a.s.Unprocessed()
  571. }
  572. if cs.firstAttempt && unprocessed {
  573. // First attempt, stream unprocessed: transparently retry.
  574. return true, nil
  575. }
  576. if cs.cc.dopts.disableRetry {
  577. return false, err
  578. }
  579. pushback := 0
  580. hasPushback := false
  581. if a.s != nil {
  582. if !a.s.TrailersOnly() {
  583. return false, err
  584. }
  585. // TODO(retry): Move down if the spec changes to not check server pushback
  586. // before considering this a failure for throttling.
  587. sps := a.s.Trailer()["grpc-retry-pushback-ms"]
  588. if len(sps) == 1 {
  589. var e error
  590. if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
  591. channelz.Infof(logger, cs.cc.channelzID, "Server retry pushback specified to abort (%q).", sps[0])
  592. cs.retryThrottler.throttle() // This counts as a failure for throttling.
  593. return false, err
  594. }
  595. hasPushback = true
  596. } else if len(sps) > 1 {
  597. channelz.Warningf(logger, cs.cc.channelzID, "Server retry pushback specified multiple values (%q); not retrying.", sps)
  598. cs.retryThrottler.throttle() // This counts as a failure for throttling.
  599. return false, err
  600. }
  601. }
  602. var code codes.Code
  603. if a.s != nil {
  604. code = a.s.Status().Code()
  605. } else {
  606. code = status.Code(err)
  607. }
  608. rp := cs.methodConfig.RetryPolicy
  609. if rp == nil || !rp.RetryableStatusCodes[code] {
  610. return false, err
  611. }
  612. // Note: the ordering here is important; we count this as a failure
  613. // only if the code matched a retryable code.
  614. if cs.retryThrottler.throttle() {
  615. return false, err
  616. }
  617. if cs.numRetries+1 >= rp.MaxAttempts {
  618. return false, err
  619. }
  620. var dur time.Duration
  621. if hasPushback {
  622. dur = time.Millisecond * time.Duration(pushback)
  623. cs.numRetriesSincePushback = 0
  624. } else {
  625. fact := math.Pow(rp.BackoffMultiplier, float64(cs.numRetriesSincePushback))
  626. cur := float64(rp.InitialBackoff) * fact
  627. if max := float64(rp.MaxBackoff); cur > max {
  628. cur = max
  629. }
  630. dur = time.Duration(grpcrand.Int63n(int64(cur)))
  631. cs.numRetriesSincePushback++
  632. }
  633. // TODO(dfawley): we could eagerly fail here if dur puts us past the
  634. // deadline, but unsure if it is worth doing.
  635. t := time.NewTimer(dur)
  636. select {
  637. case <-t.C:
  638. cs.numRetries++
  639. return false, nil
  640. case <-cs.ctx.Done():
  641. t.Stop()
  642. return false, status.FromContextError(cs.ctx.Err()).Err()
  643. }
  644. }
  645. // Returns nil if a retry was performed and succeeded; error otherwise.
  646. func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
  647. for {
  648. attempt.finish(toRPCErr(lastErr))
  649. isTransparent, err := attempt.shouldRetry(lastErr)
  650. if err != nil {
  651. cs.commitAttemptLocked()
  652. return err
  653. }
  654. cs.firstAttempt = false
  655. attempt, err = cs.newAttemptLocked(isTransparent)
  656. if err != nil {
  657. // Only returns error if the clientconn is closed or the context of
  658. // the stream is canceled.
  659. return err
  660. }
  661. // Note that the first op in the replay buffer always sets cs.attempt
  662. // if it is able to pick a transport and create a stream.
  663. if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
  664. return nil
  665. }
  666. }
  667. }
  668. func (cs *clientStream) Context() context.Context {
  669. cs.commitAttempt()
  670. // No need to lock before using attempt, since we know it is committed and
  671. // cannot change.
  672. if cs.attempt.s != nil {
  673. return cs.attempt.s.Context()
  674. }
  675. return cs.ctx
  676. }
  677. func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
  678. cs.mu.Lock()
  679. for {
  680. if cs.committed {
  681. cs.mu.Unlock()
  682. // toRPCErr is used in case the error from the attempt comes from
  683. // NewClientStream, which intentionally doesn't return a status
  684. // error to allow for further inspection; all other errors should
  685. // already be status errors.
  686. return toRPCErr(op(cs.attempt))
  687. }
  688. if len(cs.buffer) == 0 {
  689. // For the first op, which controls creation of the stream and
  690. // assigns cs.attempt, we need to create a new attempt inline
  691. // before executing the first op. On subsequent ops, the attempt
  692. // is created immediately before replaying the ops.
  693. var err error
  694. if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
  695. cs.mu.Unlock()
  696. cs.finish(err)
  697. return err
  698. }
  699. }
  700. a := cs.attempt
  701. cs.mu.Unlock()
  702. err := op(a)
  703. cs.mu.Lock()
  704. if a != cs.attempt {
  705. // We started another attempt already.
  706. continue
  707. }
  708. if err == io.EOF {
  709. <-a.s.Done()
  710. }
  711. if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
  712. onSuccess()
  713. cs.mu.Unlock()
  714. return err
  715. }
  716. if err := cs.retryLocked(a, err); err != nil {
  717. cs.mu.Unlock()
  718. return err
  719. }
  720. }
  721. }
  722. func (cs *clientStream) Header() (metadata.MD, error) {
  723. var m metadata.MD
  724. noHeader := false
  725. err := cs.withRetry(func(a *csAttempt) error {
  726. var err error
  727. m, err = a.s.Header()
  728. if err == transport.ErrNoHeaders {
  729. noHeader = true
  730. return nil
  731. }
  732. return toRPCErr(err)
  733. }, cs.commitAttemptLocked)
  734. if err != nil {
  735. cs.finish(err)
  736. return nil, err
  737. }
  738. if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && !noHeader {
  739. // Only log if binary log is on and header has not been logged, and
  740. // there is actually headers to log.
  741. logEntry := &binarylog.ServerHeader{
  742. OnClientSide: true,
  743. Header: m,
  744. PeerAddr: nil,
  745. }
  746. if peer, ok := peer.FromContext(cs.Context()); ok {
  747. logEntry.PeerAddr = peer.Addr
  748. }
  749. cs.serverHeaderBinlogged = true
  750. for _, binlog := range cs.binlogs {
  751. binlog.Log(cs.ctx, logEntry)
  752. }
  753. }
  754. return m, nil
  755. }
  756. func (cs *clientStream) Trailer() metadata.MD {
  757. // On RPC failure, we never need to retry, because usage requires that
  758. // RecvMsg() returned a non-nil error before calling this function is valid.
  759. // We would have retried earlier if necessary.
  760. //
  761. // Commit the attempt anyway, just in case users are not following those
  762. // directions -- it will prevent races and should not meaningfully impact
  763. // performance.
  764. cs.commitAttempt()
  765. if cs.attempt.s == nil {
  766. return nil
  767. }
  768. return cs.attempt.s.Trailer()
  769. }
  770. func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
  771. for _, f := range cs.buffer {
  772. if err := f(attempt); err != nil {
  773. return err
  774. }
  775. }
  776. return nil
  777. }
  778. func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
  779. // Note: we still will buffer if retry is disabled (for transparent retries).
  780. if cs.committed {
  781. return
  782. }
  783. cs.bufferSize += sz
  784. if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
  785. cs.commitAttemptLocked()
  786. return
  787. }
  788. cs.buffer = append(cs.buffer, op)
  789. }
  790. func (cs *clientStream) SendMsg(m interface{}) (err error) {
  791. defer func() {
  792. if err != nil && err != io.EOF {
  793. // Call finish on the client stream for errors generated by this SendMsg
  794. // call, as these indicate problems created by this client. (Transport
  795. // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
  796. // error will be returned from RecvMsg eventually in that case, or be
  797. // retried.)
  798. cs.finish(err)
  799. }
  800. }()
  801. if cs.sentLast {
  802. return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
  803. }
  804. if !cs.desc.ClientStreams {
  805. cs.sentLast = true
  806. }
  807. // load hdr, payload, data
  808. hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
  809. if err != nil {
  810. return err
  811. }
  812. // TODO(dfawley): should we be checking len(data) instead?
  813. if len(payload) > *cs.callInfo.maxSendMessageSize {
  814. return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
  815. }
  816. op := func(a *csAttempt) error {
  817. return a.sendMsg(m, hdr, payload, data)
  818. }
  819. err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
  820. if len(cs.binlogs) != 0 && err == nil {
  821. cm := &binarylog.ClientMessage{
  822. OnClientSide: true,
  823. Message: data,
  824. }
  825. for _, binlog := range cs.binlogs {
  826. binlog.Log(cs.ctx, cm)
  827. }
  828. }
  829. return err
  830. }
  831. func (cs *clientStream) RecvMsg(m interface{}) error {
  832. if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
  833. // Call Header() to binary log header if it's not already logged.
  834. cs.Header()
  835. }
  836. var recvInfo *payloadInfo
  837. if len(cs.binlogs) != 0 {
  838. recvInfo = &payloadInfo{}
  839. }
  840. err := cs.withRetry(func(a *csAttempt) error {
  841. return a.recvMsg(m, recvInfo)
  842. }, cs.commitAttemptLocked)
  843. if len(cs.binlogs) != 0 && err == nil {
  844. sm := &binarylog.ServerMessage{
  845. OnClientSide: true,
  846. Message: recvInfo.uncompressedBytes,
  847. }
  848. for _, binlog := range cs.binlogs {
  849. binlog.Log(cs.ctx, sm)
  850. }
  851. }
  852. if err != nil || !cs.desc.ServerStreams {
  853. // err != nil or non-server-streaming indicates end of stream.
  854. cs.finish(err)
  855. if len(cs.binlogs) != 0 {
  856. // finish will not log Trailer. Log Trailer here.
  857. logEntry := &binarylog.ServerTrailer{
  858. OnClientSide: true,
  859. Trailer: cs.Trailer(),
  860. Err: err,
  861. }
  862. if logEntry.Err == io.EOF {
  863. logEntry.Err = nil
  864. }
  865. if peer, ok := peer.FromContext(cs.Context()); ok {
  866. logEntry.PeerAddr = peer.Addr
  867. }
  868. for _, binlog := range cs.binlogs {
  869. binlog.Log(cs.ctx, logEntry)
  870. }
  871. }
  872. }
  873. return err
  874. }
  875. func (cs *clientStream) CloseSend() error {
  876. if cs.sentLast {
  877. // TODO: return an error and finish the stream instead, due to API misuse?
  878. return nil
  879. }
  880. cs.sentLast = true
  881. op := func(a *csAttempt) error {
  882. a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
  883. // Always return nil; io.EOF is the only error that might make sense
  884. // instead, but there is no need to signal the client to call RecvMsg
  885. // as the only use left for the stream after CloseSend is to call
  886. // RecvMsg. This also matches historical behavior.
  887. return nil
  888. }
  889. cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
  890. if len(cs.binlogs) != 0 {
  891. chc := &binarylog.ClientHalfClose{
  892. OnClientSide: true,
  893. }
  894. for _, binlog := range cs.binlogs {
  895. binlog.Log(cs.ctx, chc)
  896. }
  897. }
  898. // We never returned an error here for reasons.
  899. return nil
  900. }
  901. func (cs *clientStream) finish(err error) {
  902. if err == io.EOF {
  903. // Ending a stream with EOF indicates a success.
  904. err = nil
  905. }
  906. cs.mu.Lock()
  907. if cs.finished {
  908. cs.mu.Unlock()
  909. return
  910. }
  911. cs.finished = true
  912. for _, onFinish := range cs.callInfo.onFinish {
  913. onFinish(err)
  914. }
  915. cs.commitAttemptLocked()
  916. if cs.attempt != nil {
  917. cs.attempt.finish(err)
  918. // after functions all rely upon having a stream.
  919. if cs.attempt.s != nil {
  920. for _, o := range cs.opts {
  921. o.after(cs.callInfo, cs.attempt)
  922. }
  923. }
  924. }
  925. cs.mu.Unlock()
  926. // For binary logging. only log cancel in finish (could be caused by RPC ctx
  927. // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
  928. //
  929. // Only one of cancel or trailer needs to be logged. In the cases where
  930. // users don't call RecvMsg, users must have already canceled the RPC.
  931. if len(cs.binlogs) != 0 && status.Code(err) == codes.Canceled {
  932. c := &binarylog.Cancel{
  933. OnClientSide: true,
  934. }
  935. for _, binlog := range cs.binlogs {
  936. binlog.Log(cs.ctx, c)
  937. }
  938. }
  939. if err == nil {
  940. cs.retryThrottler.successfulRPC()
  941. }
  942. if channelz.IsOn() {
  943. if err != nil {
  944. cs.cc.incrCallsFailed()
  945. } else {
  946. cs.cc.incrCallsSucceeded()
  947. }
  948. }
  949. cs.cancel()
  950. }
  951. func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
  952. cs := a.cs
  953. if a.trInfo != nil {
  954. a.mu.Lock()
  955. if a.trInfo.tr != nil {
  956. a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
  957. }
  958. a.mu.Unlock()
  959. }
  960. if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
  961. if !cs.desc.ClientStreams {
  962. // For non-client-streaming RPCs, we return nil instead of EOF on error
  963. // because the generated code requires it. finish is not called; RecvMsg()
  964. // will call it with the stream's status independently.
  965. return nil
  966. }
  967. return io.EOF
  968. }
  969. for _, sh := range a.statsHandlers {
  970. sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
  971. }
  972. if channelz.IsOn() {
  973. a.t.IncrMsgSent()
  974. }
  975. return nil
  976. }
  977. func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
  978. cs := a.cs
  979. if len(a.statsHandlers) != 0 && payInfo == nil {
  980. payInfo = &payloadInfo{}
  981. }
  982. if !a.decompSet {
  983. // Block until we receive headers containing received message encoding.
  984. if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
  985. if a.dc == nil || a.dc.Type() != ct {
  986. // No configured decompressor, or it does not match the incoming
  987. // message encoding; attempt to find a registered compressor that does.
  988. a.dc = nil
  989. a.decomp = encoding.GetCompressor(ct)
  990. }
  991. } else {
  992. // No compression is used; disable our decompressor.
  993. a.dc = nil
  994. }
  995. // Only initialize this state once per stream.
  996. a.decompSet = true
  997. }
  998. err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
  999. if err != nil {
  1000. if err == io.EOF {
  1001. if statusErr := a.s.Status().Err(); statusErr != nil {
  1002. return statusErr
  1003. }
  1004. return io.EOF // indicates successful end of stream.
  1005. }
  1006. return toRPCErr(err)
  1007. }
  1008. if a.trInfo != nil {
  1009. a.mu.Lock()
  1010. if a.trInfo.tr != nil {
  1011. a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
  1012. }
  1013. a.mu.Unlock()
  1014. }
  1015. for _, sh := range a.statsHandlers {
  1016. sh.HandleRPC(a.ctx, &stats.InPayload{
  1017. Client: true,
  1018. RecvTime: time.Now(),
  1019. Payload: m,
  1020. // TODO truncate large payload.
  1021. Data: payInfo.uncompressedBytes,
  1022. WireLength: payInfo.compressedLength + headerLen,
  1023. CompressedLength: payInfo.compressedLength,
  1024. Length: len(payInfo.uncompressedBytes),
  1025. })
  1026. }
  1027. if channelz.IsOn() {
  1028. a.t.IncrMsgRecv()
  1029. }
  1030. if cs.desc.ServerStreams {
  1031. // Subsequent messages should be received by subsequent RecvMsg calls.
  1032. return nil
  1033. }
  1034. // Special handling for non-server-stream rpcs.
  1035. // This recv expects EOF or errors, so we don't collect inPayload.
  1036. err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
  1037. if err == nil {
  1038. return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
  1039. }
  1040. if err == io.EOF {
  1041. return a.s.Status().Err() // non-server streaming Recv returns nil on success
  1042. }
  1043. return toRPCErr(err)
  1044. }
  1045. func (a *csAttempt) finish(err error) {
  1046. a.mu.Lock()
  1047. if a.finished {
  1048. a.mu.Unlock()
  1049. return
  1050. }
  1051. a.finished = true
  1052. if err == io.EOF {
  1053. // Ending a stream with EOF indicates a success.
  1054. err = nil
  1055. }
  1056. var tr metadata.MD
  1057. if a.s != nil {
  1058. a.t.CloseStream(a.s, err)
  1059. tr = a.s.Trailer()
  1060. }
  1061. if a.pickResult.Done != nil {
  1062. br := false
  1063. if a.s != nil {
  1064. br = a.s.BytesReceived()
  1065. }
  1066. a.pickResult.Done(balancer.DoneInfo{
  1067. Err: err,
  1068. Trailer: tr,
  1069. BytesSent: a.s != nil,
  1070. BytesReceived: br,
  1071. ServerLoad: balancerload.Parse(tr),
  1072. })
  1073. }
  1074. for _, sh := range a.statsHandlers {
  1075. end := &stats.End{
  1076. Client: true,
  1077. BeginTime: a.beginTime,
  1078. EndTime: time.Now(),
  1079. Trailer: tr,
  1080. Error: err,
  1081. }
  1082. sh.HandleRPC(a.ctx, end)
  1083. }
  1084. if a.trInfo != nil && a.trInfo.tr != nil {
  1085. if err == nil {
  1086. a.trInfo.tr.LazyPrintf("RPC: [OK]")
  1087. } else {
  1088. a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
  1089. a.trInfo.tr.SetError()
  1090. }
  1091. a.trInfo.tr.Finish()
  1092. a.trInfo.tr = nil
  1093. }
  1094. a.mu.Unlock()
  1095. }
  1096. // newClientStream creates a ClientStream with the specified transport, on the
  1097. // given addrConn.
  1098. //
  1099. // It's expected that the given transport is either the same one in addrConn, or
  1100. // is already closed. To avoid race, transport is specified separately, instead
  1101. // of using ac.transpot.
  1102. //
  1103. // Main difference between this and ClientConn.NewStream:
  1104. // - no retry
  1105. // - no service config (or wait for service config)
  1106. // - no tracing or stats
  1107. func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) {
  1108. if t == nil {
  1109. // TODO: return RPC error here?
  1110. return nil, errors.New("transport provided is nil")
  1111. }
  1112. // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
  1113. c := &callInfo{}
  1114. // Possible context leak:
  1115. // The cancel function for the child context we create will only be called
  1116. // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
  1117. // an error is generated by SendMsg.
  1118. // https://github.com/grpc/grpc-go/issues/1818.
  1119. ctx, cancel := context.WithCancel(ctx)
  1120. defer func() {
  1121. if err != nil {
  1122. cancel()
  1123. }
  1124. }()
  1125. for _, o := range opts {
  1126. if err := o.before(c); err != nil {
  1127. return nil, toRPCErr(err)
  1128. }
  1129. }
  1130. c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
  1131. c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
  1132. if err := setCallInfoCodec(c); err != nil {
  1133. return nil, err
  1134. }
  1135. callHdr := &transport.CallHdr{
  1136. Host: ac.cc.authority,
  1137. Method: method,
  1138. ContentSubtype: c.contentSubtype,
  1139. }
  1140. // Set our outgoing compression according to the UseCompressor CallOption, if
  1141. // set. In that case, also find the compressor from the encoding package.
  1142. // Otherwise, use the compressor configured by the WithCompressor DialOption,
  1143. // if set.
  1144. var cp Compressor
  1145. var comp encoding.Compressor
  1146. if ct := c.compressorType; ct != "" {
  1147. callHdr.SendCompress = ct
  1148. if ct != encoding.Identity {
  1149. comp = encoding.GetCompressor(ct)
  1150. if comp == nil {
  1151. return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
  1152. }
  1153. }
  1154. } else if ac.cc.dopts.cp != nil {
  1155. callHdr.SendCompress = ac.cc.dopts.cp.Type()
  1156. cp = ac.cc.dopts.cp
  1157. }
  1158. if c.creds != nil {
  1159. callHdr.Creds = c.creds
  1160. }
  1161. // Use a special addrConnStream to avoid retry.
  1162. as := &addrConnStream{
  1163. callHdr: callHdr,
  1164. ac: ac,
  1165. ctx: ctx,
  1166. cancel: cancel,
  1167. opts: opts,
  1168. callInfo: c,
  1169. desc: desc,
  1170. codec: c.codec,
  1171. cp: cp,
  1172. comp: comp,
  1173. t: t,
  1174. }
  1175. s, err := as.t.NewStream(as.ctx, as.callHdr)
  1176. if err != nil {
  1177. err = toRPCErr(err)
  1178. return nil, err
  1179. }
  1180. as.s = s
  1181. as.p = &parser{r: s}
  1182. ac.incrCallsStarted()
  1183. if desc != unaryStreamDesc {
  1184. // Listen on stream context to cleanup when the stream context is
  1185. // canceled. Also listen for the addrConn's context in case the
  1186. // addrConn is closed or reconnects to a different address. In all
  1187. // other cases, an error should already be injected into the recv
  1188. // buffer by the transport, which the client will eventually receive,
  1189. // and then we will cancel the stream's context in
  1190. // addrConnStream.finish.
  1191. go func() {
  1192. ac.mu.Lock()
  1193. acCtx := ac.ctx
  1194. ac.mu.Unlock()
  1195. select {
  1196. case <-acCtx.Done():
  1197. as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
  1198. case <-ctx.Done():
  1199. as.finish(toRPCErr(ctx.Err()))
  1200. }
  1201. }()
  1202. }
  1203. return as, nil
  1204. }
  1205. type addrConnStream struct {
  1206. s *transport.Stream
  1207. ac *addrConn
  1208. callHdr *transport.CallHdr
  1209. cancel context.CancelFunc
  1210. opts []CallOption
  1211. callInfo *callInfo
  1212. t transport.ClientTransport
  1213. ctx context.Context
  1214. sentLast bool
  1215. desc *StreamDesc
  1216. codec baseCodec
  1217. cp Compressor
  1218. comp encoding.Compressor
  1219. decompSet bool
  1220. dc Decompressor
  1221. decomp encoding.Compressor
  1222. p *parser
  1223. mu sync.Mutex
  1224. finished bool
  1225. }
  1226. func (as *addrConnStream) Header() (metadata.MD, error) {
  1227. m, err := as.s.Header()
  1228. if err != nil {
  1229. as.finish(toRPCErr(err))
  1230. }
  1231. return m, err
  1232. }
  1233. func (as *addrConnStream) Trailer() metadata.MD {
  1234. return as.s.Trailer()
  1235. }
  1236. func (as *addrConnStream) CloseSend() error {
  1237. if as.sentLast {
  1238. // TODO: return an error and finish the stream instead, due to API misuse?
  1239. return nil
  1240. }
  1241. as.sentLast = true
  1242. as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
  1243. // Always return nil; io.EOF is the only error that might make sense
  1244. // instead, but there is no need to signal the client to call RecvMsg
  1245. // as the only use left for the stream after CloseSend is to call
  1246. // RecvMsg. This also matches historical behavior.
  1247. return nil
  1248. }
  1249. func (as *addrConnStream) Context() context.Context {
  1250. return as.s.Context()
  1251. }
  1252. func (as *addrConnStream) SendMsg(m interface{}) (err error) {
  1253. defer func() {
  1254. if err != nil && err != io.EOF {
  1255. // Call finish on the client stream for errors generated by this SendMsg
  1256. // call, as these indicate problems created by this client. (Transport
  1257. // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
  1258. // error will be returned from RecvMsg eventually in that case, or be
  1259. // retried.)
  1260. as.finish(err)
  1261. }
  1262. }()
  1263. if as.sentLast {
  1264. return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
  1265. }
  1266. if !as.desc.ClientStreams {
  1267. as.sentLast = true
  1268. }
  1269. // load hdr, payload, data
  1270. hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp)
  1271. if err != nil {
  1272. return err
  1273. }
  1274. // TODO(dfawley): should we be checking len(data) instead?
  1275. if len(payld) > *as.callInfo.maxSendMessageSize {
  1276. return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
  1277. }
  1278. if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
  1279. if !as.desc.ClientStreams {
  1280. // For non-client-streaming RPCs, we return nil instead of EOF on error
  1281. // because the generated code requires it. finish is not called; RecvMsg()
  1282. // will call it with the stream's status independently.
  1283. return nil
  1284. }
  1285. return io.EOF
  1286. }
  1287. if channelz.IsOn() {
  1288. as.t.IncrMsgSent()
  1289. }
  1290. return nil
  1291. }
  1292. func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
  1293. defer func() {
  1294. if err != nil || !as.desc.ServerStreams {
  1295. // err != nil or non-server-streaming indicates end of stream.
  1296. as.finish(err)
  1297. }
  1298. }()
  1299. if !as.decompSet {
  1300. // Block until we receive headers containing received message encoding.
  1301. if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
  1302. if as.dc == nil || as.dc.Type() != ct {
  1303. // No configured decompressor, or it does not match the incoming
  1304. // message encoding; attempt to find a registered compressor that does.
  1305. as.dc = nil
  1306. as.decomp = encoding.GetCompressor(ct)
  1307. }
  1308. } else {
  1309. // No compression is used; disable our decompressor.
  1310. as.dc = nil
  1311. }
  1312. // Only initialize this state once per stream.
  1313. as.decompSet = true
  1314. }
  1315. err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
  1316. if err != nil {
  1317. if err == io.EOF {
  1318. if statusErr := as.s.Status().Err(); statusErr != nil {
  1319. return statusErr
  1320. }
  1321. return io.EOF // indicates successful end of stream.
  1322. }
  1323. return toRPCErr(err)
  1324. }
  1325. if channelz.IsOn() {
  1326. as.t.IncrMsgRecv()
  1327. }
  1328. if as.desc.ServerStreams {
  1329. // Subsequent messages should be received by subsequent RecvMsg calls.
  1330. return nil
  1331. }
  1332. // Special handling for non-server-stream rpcs.
  1333. // This recv expects EOF or errors, so we don't collect inPayload.
  1334. err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
  1335. if err == nil {
  1336. return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
  1337. }
  1338. if err == io.EOF {
  1339. return as.s.Status().Err() // non-server streaming Recv returns nil on success
  1340. }
  1341. return toRPCErr(err)
  1342. }
  1343. func (as *addrConnStream) finish(err error) {
  1344. as.mu.Lock()
  1345. if as.finished {
  1346. as.mu.Unlock()
  1347. return
  1348. }
  1349. as.finished = true
  1350. if err == io.EOF {
  1351. // Ending a stream with EOF indicates a success.
  1352. err = nil
  1353. }
  1354. if as.s != nil {
  1355. as.t.CloseStream(as.s, err)
  1356. }
  1357. if err != nil {
  1358. as.ac.incrCallsFailed()
  1359. } else {
  1360. as.ac.incrCallsSucceeded()
  1361. }
  1362. as.cancel()
  1363. as.mu.Unlock()
  1364. }
  1365. // ServerStream defines the server-side behavior of a streaming RPC.
  1366. //
  1367. // Errors returned from ServerStream methods are compatible with the status
  1368. // package. However, the status code will often not match the RPC status as
  1369. // seen by the client application, and therefore, should not be relied upon for
  1370. // this purpose.
  1371. type ServerStream interface {
  1372. // SetHeader sets the header metadata. It may be called multiple times.
  1373. // When call multiple times, all the provided metadata will be merged.
  1374. // All the metadata will be sent out when one of the following happens:
  1375. // - ServerStream.SendHeader() is called;
  1376. // - The first response is sent out;
  1377. // - An RPC status is sent out (error or success).
  1378. SetHeader(metadata.MD) error
  1379. // SendHeader sends the header metadata.
  1380. // The provided md and headers set by SetHeader() will be sent.
  1381. // It fails if called multiple times.
  1382. SendHeader(metadata.MD) error
  1383. // SetTrailer sets the trailer metadata which will be sent with the RPC status.
  1384. // When called more than once, all the provided metadata will be merged.
  1385. SetTrailer(metadata.MD)
  1386. // Context returns the context for this stream.
  1387. Context() context.Context
  1388. // SendMsg sends a message. On error, SendMsg aborts the stream and the
  1389. // error is returned directly.
  1390. //
  1391. // SendMsg blocks until:
  1392. // - There is sufficient flow control to schedule m with the transport, or
  1393. // - The stream is done, or
  1394. // - The stream breaks.
  1395. //
  1396. // SendMsg does not wait until the message is received by the client. An
  1397. // untimely stream closure may result in lost messages.
  1398. //
  1399. // It is safe to have a goroutine calling SendMsg and another goroutine
  1400. // calling RecvMsg on the same stream at the same time, but it is not safe
  1401. // to call SendMsg on the same stream in different goroutines.
  1402. //
  1403. // It is not safe to modify the message after calling SendMsg. Tracing
  1404. // libraries and stats handlers may use the message lazily.
  1405. SendMsg(m interface{}) error
  1406. // RecvMsg blocks until it receives a message into m or the stream is
  1407. // done. It returns io.EOF when the client has performed a CloseSend. On
  1408. // any non-EOF error, the stream is aborted and the error contains the
  1409. // RPC status.
  1410. //
  1411. // It is safe to have a goroutine calling SendMsg and another goroutine
  1412. // calling RecvMsg on the same stream at the same time, but it is not
  1413. // safe to call RecvMsg on the same stream in different goroutines.
  1414. RecvMsg(m interface{}) error
  1415. }
  1416. // serverStream implements a server side Stream.
  1417. type serverStream struct {
  1418. ctx context.Context
  1419. t transport.ServerTransport
  1420. s *transport.Stream
  1421. p *parser
  1422. codec baseCodec
  1423. cp Compressor
  1424. dc Decompressor
  1425. comp encoding.Compressor
  1426. decomp encoding.Compressor
  1427. sendCompressorName string
  1428. maxReceiveMessageSize int
  1429. maxSendMessageSize int
  1430. trInfo *traceInfo
  1431. statsHandler []stats.Handler
  1432. binlogs []binarylog.MethodLogger
  1433. // serverHeaderBinlogged indicates whether server header has been logged. It
  1434. // will happen when one of the following two happens: stream.SendHeader(),
  1435. // stream.Send().
  1436. //
  1437. // It's only checked in send and sendHeader, doesn't need to be
  1438. // synchronized.
  1439. serverHeaderBinlogged bool
  1440. mu sync.Mutex // protects trInfo.tr after the service handler runs.
  1441. }
  1442. func (ss *serverStream) Context() context.Context {
  1443. return ss.ctx
  1444. }
  1445. func (ss *serverStream) SetHeader(md metadata.MD) error {
  1446. if md.Len() == 0 {
  1447. return nil
  1448. }
  1449. err := imetadata.Validate(md)
  1450. if err != nil {
  1451. return status.Error(codes.Internal, err.Error())
  1452. }
  1453. return ss.s.SetHeader(md)
  1454. }
  1455. func (ss *serverStream) SendHeader(md metadata.MD) error {
  1456. err := imetadata.Validate(md)
  1457. if err != nil {
  1458. return status.Error(codes.Internal, err.Error())
  1459. }
  1460. err = ss.t.WriteHeader(ss.s, md)
  1461. if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
  1462. h, _ := ss.s.Header()
  1463. sh := &binarylog.ServerHeader{
  1464. Header: h,
  1465. }
  1466. ss.serverHeaderBinlogged = true
  1467. for _, binlog := range ss.binlogs {
  1468. binlog.Log(ss.ctx, sh)
  1469. }
  1470. }
  1471. return err
  1472. }
  1473. func (ss *serverStream) SetTrailer(md metadata.MD) {
  1474. if md.Len() == 0 {
  1475. return
  1476. }
  1477. if err := imetadata.Validate(md); err != nil {
  1478. logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
  1479. }
  1480. ss.s.SetTrailer(md)
  1481. }
  1482. func (ss *serverStream) SendMsg(m interface{}) (err error) {
  1483. defer func() {
  1484. if ss.trInfo != nil {
  1485. ss.mu.Lock()
  1486. if ss.trInfo.tr != nil {
  1487. if err == nil {
  1488. ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
  1489. } else {
  1490. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1491. ss.trInfo.tr.SetError()
  1492. }
  1493. }
  1494. ss.mu.Unlock()
  1495. }
  1496. if err != nil && err != io.EOF {
  1497. st, _ := status.FromError(toRPCErr(err))
  1498. ss.t.WriteStatus(ss.s, st)
  1499. // Non-user specified status was sent out. This should be an error
  1500. // case (as a server side Cancel maybe).
  1501. //
  1502. // This is not handled specifically now. User will return a final
  1503. // status from the service handler, we will log that error instead.
  1504. // This behavior is similar to an interceptor.
  1505. }
  1506. if channelz.IsOn() && err == nil {
  1507. ss.t.IncrMsgSent()
  1508. }
  1509. }()
  1510. // Server handler could have set new compressor by calling SetSendCompressor.
  1511. // In case it is set, we need to use it for compressing outbound message.
  1512. if sendCompressorsName := ss.s.SendCompress(); sendCompressorsName != ss.sendCompressorName {
  1513. ss.comp = encoding.GetCompressor(sendCompressorsName)
  1514. ss.sendCompressorName = sendCompressorsName
  1515. }
  1516. // load hdr, payload, data
  1517. hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp)
  1518. if err != nil {
  1519. return err
  1520. }
  1521. // TODO(dfawley): should we be checking len(data) instead?
  1522. if len(payload) > ss.maxSendMessageSize {
  1523. return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
  1524. }
  1525. if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
  1526. return toRPCErr(err)
  1527. }
  1528. if len(ss.binlogs) != 0 {
  1529. if !ss.serverHeaderBinlogged {
  1530. h, _ := ss.s.Header()
  1531. sh := &binarylog.ServerHeader{
  1532. Header: h,
  1533. }
  1534. ss.serverHeaderBinlogged = true
  1535. for _, binlog := range ss.binlogs {
  1536. binlog.Log(ss.ctx, sh)
  1537. }
  1538. }
  1539. sm := &binarylog.ServerMessage{
  1540. Message: data,
  1541. }
  1542. for _, binlog := range ss.binlogs {
  1543. binlog.Log(ss.ctx, sm)
  1544. }
  1545. }
  1546. if len(ss.statsHandler) != 0 {
  1547. for _, sh := range ss.statsHandler {
  1548. sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
  1549. }
  1550. }
  1551. return nil
  1552. }
  1553. func (ss *serverStream) RecvMsg(m interface{}) (err error) {
  1554. defer func() {
  1555. if ss.trInfo != nil {
  1556. ss.mu.Lock()
  1557. if ss.trInfo.tr != nil {
  1558. if err == nil {
  1559. ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
  1560. } else if err != io.EOF {
  1561. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1562. ss.trInfo.tr.SetError()
  1563. }
  1564. }
  1565. ss.mu.Unlock()
  1566. }
  1567. if err != nil && err != io.EOF {
  1568. st, _ := status.FromError(toRPCErr(err))
  1569. ss.t.WriteStatus(ss.s, st)
  1570. // Non-user specified status was sent out. This should be an error
  1571. // case (as a server side Cancel maybe).
  1572. //
  1573. // This is not handled specifically now. User will return a final
  1574. // status from the service handler, we will log that error instead.
  1575. // This behavior is similar to an interceptor.
  1576. }
  1577. if channelz.IsOn() && err == nil {
  1578. ss.t.IncrMsgRecv()
  1579. }
  1580. }()
  1581. var payInfo *payloadInfo
  1582. if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
  1583. payInfo = &payloadInfo{}
  1584. }
  1585. if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
  1586. if err == io.EOF {
  1587. if len(ss.binlogs) != 0 {
  1588. chc := &binarylog.ClientHalfClose{}
  1589. for _, binlog := range ss.binlogs {
  1590. binlog.Log(ss.ctx, chc)
  1591. }
  1592. }
  1593. return err
  1594. }
  1595. if err == io.ErrUnexpectedEOF {
  1596. err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
  1597. }
  1598. return toRPCErr(err)
  1599. }
  1600. if len(ss.statsHandler) != 0 {
  1601. for _, sh := range ss.statsHandler {
  1602. sh.HandleRPC(ss.s.Context(), &stats.InPayload{
  1603. RecvTime: time.Now(),
  1604. Payload: m,
  1605. // TODO truncate large payload.
  1606. Data: payInfo.uncompressedBytes,
  1607. Length: len(payInfo.uncompressedBytes),
  1608. WireLength: payInfo.compressedLength + headerLen,
  1609. CompressedLength: payInfo.compressedLength,
  1610. })
  1611. }
  1612. }
  1613. if len(ss.binlogs) != 0 {
  1614. cm := &binarylog.ClientMessage{
  1615. Message: payInfo.uncompressedBytes,
  1616. }
  1617. for _, binlog := range ss.binlogs {
  1618. binlog.Log(ss.ctx, cm)
  1619. }
  1620. }
  1621. return nil
  1622. }
  1623. // MethodFromServerStream returns the method string for the input stream.
  1624. // The returned string is in the format of "/service/method".
  1625. func MethodFromServerStream(stream ServerStream) (string, bool) {
  1626. return Method(stream.Context())
  1627. }
  1628. // prepareMsg returns the hdr, payload and data
  1629. // using the compressors passed or using the
  1630. // passed preparedmsg
  1631. func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) {
  1632. if preparedMsg, ok := m.(*PreparedMsg); ok {
  1633. return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil
  1634. }
  1635. // The input interface is not a prepared msg.
  1636. // Marshal and Compress the data at this point
  1637. data, err = encode(codec, m)
  1638. if err != nil {
  1639. return nil, nil, nil, err
  1640. }
  1641. compData, err := compress(data, cp, comp)
  1642. if err != nil {
  1643. return nil, nil, nil, err
  1644. }
  1645. hdr, payload = msgHeader(data, compData)
  1646. return hdr, payload, data, nil
  1647. }