server.go 64 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "io"
  24. "math"
  25. "net"
  26. "net/http"
  27. "reflect"
  28. "runtime"
  29. "strings"
  30. "sync"
  31. "sync/atomic"
  32. "time"
  33. "golang.org/x/net/trace"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/encoding"
  37. "google.golang.org/grpc/encoding/proto"
  38. "google.golang.org/grpc/grpclog"
  39. "google.golang.org/grpc/internal"
  40. "google.golang.org/grpc/internal/binarylog"
  41. "google.golang.org/grpc/internal/channelz"
  42. "google.golang.org/grpc/internal/grpcsync"
  43. "google.golang.org/grpc/internal/grpcutil"
  44. "google.golang.org/grpc/internal/transport"
  45. "google.golang.org/grpc/keepalive"
  46. "google.golang.org/grpc/metadata"
  47. "google.golang.org/grpc/peer"
  48. "google.golang.org/grpc/stats"
  49. "google.golang.org/grpc/status"
  50. "google.golang.org/grpc/tap"
  51. )
  52. const (
  53. defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
  54. defaultServerMaxSendMessageSize = math.MaxInt32
  55. // Server transports are tracked in a map which is keyed on listener
  56. // address. For regular gRPC traffic, connections are accepted in Serve()
  57. // through a call to Accept(), and we use the actual listener address as key
  58. // when we add it to the map. But for connections received through
  59. // ServeHTTP(), we do not have a listener and hence use this dummy value.
  60. listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
  61. )
  62. func init() {
  63. internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
  64. return srv.opts.creds
  65. }
  66. internal.DrainServerTransports = func(srv *Server, addr string) {
  67. srv.drainServerTransports(addr)
  68. }
  69. internal.AddGlobalServerOptions = func(opt ...ServerOption) {
  70. globalServerOptions = append(globalServerOptions, opt...)
  71. }
  72. internal.ClearGlobalServerOptions = func() {
  73. globalServerOptions = nil
  74. }
  75. internal.BinaryLogger = binaryLogger
  76. internal.JoinServerOptions = newJoinServerOption
  77. }
  78. var statusOK = status.New(codes.OK, "")
  79. var logger = grpclog.Component("core")
  80. type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
  81. // MethodDesc represents an RPC service's method specification.
  82. type MethodDesc struct {
  83. MethodName string
  84. Handler methodHandler
  85. }
  86. // ServiceDesc represents an RPC service's specification.
  87. type ServiceDesc struct {
  88. ServiceName string
  89. // The pointer to the service interface. Used to check whether the user
  90. // provided implementation satisfies the interface requirements.
  91. HandlerType interface{}
  92. Methods []MethodDesc
  93. Streams []StreamDesc
  94. Metadata interface{}
  95. }
  96. // serviceInfo wraps information about a service. It is very similar to
  97. // ServiceDesc and is constructed from it for internal purposes.
  98. type serviceInfo struct {
  99. // Contains the implementation for the methods in this service.
  100. serviceImpl interface{}
  101. methods map[string]*MethodDesc
  102. streams map[string]*StreamDesc
  103. mdata interface{}
  104. }
  105. type serverWorkerData struct {
  106. st transport.ServerTransport
  107. wg *sync.WaitGroup
  108. stream *transport.Stream
  109. }
  110. // Server is a gRPC server to serve RPC requests.
  111. type Server struct {
  112. opts serverOptions
  113. mu sync.Mutex // guards following
  114. lis map[net.Listener]bool
  115. // conns contains all active server transports. It is a map keyed on a
  116. // listener address with the value being the set of active transports
  117. // belonging to that listener.
  118. conns map[string]map[transport.ServerTransport]bool
  119. serve bool
  120. drain bool
  121. cv *sync.Cond // signaled when connections close for GracefulStop
  122. services map[string]*serviceInfo // service name -> service info
  123. events trace.EventLog
  124. quit *grpcsync.Event
  125. done *grpcsync.Event
  126. channelzRemoveOnce sync.Once
  127. serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
  128. channelzID *channelz.Identifier
  129. czData *channelzData
  130. serverWorkerChannel chan *serverWorkerData
  131. }
  132. type serverOptions struct {
  133. creds credentials.TransportCredentials
  134. codec baseCodec
  135. cp Compressor
  136. dc Decompressor
  137. unaryInt UnaryServerInterceptor
  138. streamInt StreamServerInterceptor
  139. chainUnaryInts []UnaryServerInterceptor
  140. chainStreamInts []StreamServerInterceptor
  141. binaryLogger binarylog.Logger
  142. inTapHandle tap.ServerInHandle
  143. statsHandlers []stats.Handler
  144. maxConcurrentStreams uint32
  145. maxReceiveMessageSize int
  146. maxSendMessageSize int
  147. unknownStreamDesc *StreamDesc
  148. keepaliveParams keepalive.ServerParameters
  149. keepalivePolicy keepalive.EnforcementPolicy
  150. initialWindowSize int32
  151. initialConnWindowSize int32
  152. writeBufferSize int
  153. readBufferSize int
  154. connectionTimeout time.Duration
  155. maxHeaderListSize *uint32
  156. headerTableSize *uint32
  157. numServerWorkers uint32
  158. }
  159. var defaultServerOptions = serverOptions{
  160. maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
  161. maxSendMessageSize: defaultServerMaxSendMessageSize,
  162. connectionTimeout: 120 * time.Second,
  163. writeBufferSize: defaultWriteBufSize,
  164. readBufferSize: defaultReadBufSize,
  165. }
  166. var globalServerOptions []ServerOption
  167. // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
  168. type ServerOption interface {
  169. apply(*serverOptions)
  170. }
  171. // EmptyServerOption does not alter the server configuration. It can be embedded
  172. // in another structure to build custom server options.
  173. //
  174. // # Experimental
  175. //
  176. // Notice: This type is EXPERIMENTAL and may be changed or removed in a
  177. // later release.
  178. type EmptyServerOption struct{}
  179. func (EmptyServerOption) apply(*serverOptions) {}
  180. // funcServerOption wraps a function that modifies serverOptions into an
  181. // implementation of the ServerOption interface.
  182. type funcServerOption struct {
  183. f func(*serverOptions)
  184. }
  185. func (fdo *funcServerOption) apply(do *serverOptions) {
  186. fdo.f(do)
  187. }
  188. func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
  189. return &funcServerOption{
  190. f: f,
  191. }
  192. }
  193. // joinServerOption provides a way to combine arbitrary number of server
  194. // options into one.
  195. type joinServerOption struct {
  196. opts []ServerOption
  197. }
  198. func (mdo *joinServerOption) apply(do *serverOptions) {
  199. for _, opt := range mdo.opts {
  200. opt.apply(do)
  201. }
  202. }
  203. func newJoinServerOption(opts ...ServerOption) ServerOption {
  204. return &joinServerOption{opts: opts}
  205. }
  206. // WriteBufferSize determines how much data can be batched before doing a write
  207. // on the wire. The corresponding memory allocation for this buffer will be
  208. // twice the size to keep syscalls low. The default value for this buffer is
  209. // 32KB. Zero or negative values will disable the write buffer such that each
  210. // write will be on underlying connection.
  211. // Note: A Send call may not directly translate to a write.
  212. func WriteBufferSize(s int) ServerOption {
  213. return newFuncServerOption(func(o *serverOptions) {
  214. o.writeBufferSize = s
  215. })
  216. }
  217. // ReadBufferSize lets you set the size of read buffer, this determines how much
  218. // data can be read at most for one read syscall. The default value for this
  219. // buffer is 32KB. Zero or negative values will disable read buffer for a
  220. // connection so data framer can access the underlying conn directly.
  221. func ReadBufferSize(s int) ServerOption {
  222. return newFuncServerOption(func(o *serverOptions) {
  223. o.readBufferSize = s
  224. })
  225. }
  226. // InitialWindowSize returns a ServerOption that sets window size for stream.
  227. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  228. func InitialWindowSize(s int32) ServerOption {
  229. return newFuncServerOption(func(o *serverOptions) {
  230. o.initialWindowSize = s
  231. })
  232. }
  233. // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
  234. // The lower bound for window size is 64K and any value smaller than that will be ignored.
  235. func InitialConnWindowSize(s int32) ServerOption {
  236. return newFuncServerOption(func(o *serverOptions) {
  237. o.initialConnWindowSize = s
  238. })
  239. }
  240. // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
  241. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
  242. if kp.Time > 0 && kp.Time < time.Second {
  243. logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
  244. kp.Time = time.Second
  245. }
  246. return newFuncServerOption(func(o *serverOptions) {
  247. o.keepaliveParams = kp
  248. })
  249. }
  250. // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
  251. func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
  252. return newFuncServerOption(func(o *serverOptions) {
  253. o.keepalivePolicy = kep
  254. })
  255. }
  256. // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
  257. //
  258. // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
  259. //
  260. // Deprecated: register codecs using encoding.RegisterCodec. The server will
  261. // automatically use registered codecs based on the incoming requests' headers.
  262. // See also
  263. // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
  264. // Will be supported throughout 1.x.
  265. func CustomCodec(codec Codec) ServerOption {
  266. return newFuncServerOption(func(o *serverOptions) {
  267. o.codec = codec
  268. })
  269. }
  270. // ForceServerCodec returns a ServerOption that sets a codec for message
  271. // marshaling and unmarshaling.
  272. //
  273. // This will override any lookups by content-subtype for Codecs registered
  274. // with RegisterCodec.
  275. //
  276. // See Content-Type on
  277. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
  278. // more details. Also see the documentation on RegisterCodec and
  279. // CallContentSubtype for more details on the interaction between encoding.Codec
  280. // and content-subtype.
  281. //
  282. // This function is provided for advanced users; prefer to register codecs
  283. // using encoding.RegisterCodec.
  284. // The server will automatically use registered codecs based on the incoming
  285. // requests' headers. See also
  286. // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
  287. // Will be supported throughout 1.x.
  288. //
  289. // # Experimental
  290. //
  291. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  292. // later release.
  293. func ForceServerCodec(codec encoding.Codec) ServerOption {
  294. return newFuncServerOption(func(o *serverOptions) {
  295. o.codec = codec
  296. })
  297. }
  298. // RPCCompressor returns a ServerOption that sets a compressor for outbound
  299. // messages. For backward compatibility, all outbound messages will be sent
  300. // using this compressor, regardless of incoming message compression. By
  301. // default, server messages will be sent using the same compressor with which
  302. // request messages were sent.
  303. //
  304. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  305. // throughout 1.x.
  306. func RPCCompressor(cp Compressor) ServerOption {
  307. return newFuncServerOption(func(o *serverOptions) {
  308. o.cp = cp
  309. })
  310. }
  311. // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
  312. // messages. It has higher priority than decompressors registered via
  313. // encoding.RegisterCompressor.
  314. //
  315. // Deprecated: use encoding.RegisterCompressor instead. Will be supported
  316. // throughout 1.x.
  317. func RPCDecompressor(dc Decompressor) ServerOption {
  318. return newFuncServerOption(func(o *serverOptions) {
  319. o.dc = dc
  320. })
  321. }
  322. // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  323. // If this is not set, gRPC uses the default limit.
  324. //
  325. // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
  326. func MaxMsgSize(m int) ServerOption {
  327. return MaxRecvMsgSize(m)
  328. }
  329. // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
  330. // If this is not set, gRPC uses the default 4MB.
  331. func MaxRecvMsgSize(m int) ServerOption {
  332. return newFuncServerOption(func(o *serverOptions) {
  333. o.maxReceiveMessageSize = m
  334. })
  335. }
  336. // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
  337. // If this is not set, gRPC uses the default `math.MaxInt32`.
  338. func MaxSendMsgSize(m int) ServerOption {
  339. return newFuncServerOption(func(o *serverOptions) {
  340. o.maxSendMessageSize = m
  341. })
  342. }
  343. // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
  344. // of concurrent streams to each ServerTransport.
  345. func MaxConcurrentStreams(n uint32) ServerOption {
  346. return newFuncServerOption(func(o *serverOptions) {
  347. o.maxConcurrentStreams = n
  348. })
  349. }
  350. // Creds returns a ServerOption that sets credentials for server connections.
  351. func Creds(c credentials.TransportCredentials) ServerOption {
  352. return newFuncServerOption(func(o *serverOptions) {
  353. o.creds = c
  354. })
  355. }
  356. // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
  357. // server. Only one unary interceptor can be installed. The construction of multiple
  358. // interceptors (e.g., chaining) can be implemented at the caller.
  359. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
  360. return newFuncServerOption(func(o *serverOptions) {
  361. if o.unaryInt != nil {
  362. panic("The unary server interceptor was already set and may not be reset.")
  363. }
  364. o.unaryInt = i
  365. })
  366. }
  367. // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
  368. // for unary RPCs. The first interceptor will be the outer most,
  369. // while the last interceptor will be the inner most wrapper around the real call.
  370. // All unary interceptors added by this method will be chained.
  371. func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
  372. return newFuncServerOption(func(o *serverOptions) {
  373. o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
  374. })
  375. }
  376. // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
  377. // server. Only one stream interceptor can be installed.
  378. func StreamInterceptor(i StreamServerInterceptor) ServerOption {
  379. return newFuncServerOption(func(o *serverOptions) {
  380. if o.streamInt != nil {
  381. panic("The stream server interceptor was already set and may not be reset.")
  382. }
  383. o.streamInt = i
  384. })
  385. }
  386. // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
  387. // for streaming RPCs. The first interceptor will be the outer most,
  388. // while the last interceptor will be the inner most wrapper around the real call.
  389. // All stream interceptors added by this method will be chained.
  390. func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
  391. return newFuncServerOption(func(o *serverOptions) {
  392. o.chainStreamInts = append(o.chainStreamInts, interceptors...)
  393. })
  394. }
  395. // InTapHandle returns a ServerOption that sets the tap handle for all the server
  396. // transport to be created. Only one can be installed.
  397. //
  398. // # Experimental
  399. //
  400. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  401. // later release.
  402. func InTapHandle(h tap.ServerInHandle) ServerOption {
  403. return newFuncServerOption(func(o *serverOptions) {
  404. if o.inTapHandle != nil {
  405. panic("The tap handle was already set and may not be reset.")
  406. }
  407. o.inTapHandle = h
  408. })
  409. }
  410. // StatsHandler returns a ServerOption that sets the stats handler for the server.
  411. func StatsHandler(h stats.Handler) ServerOption {
  412. return newFuncServerOption(func(o *serverOptions) {
  413. if h == nil {
  414. logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
  415. // Do not allow a nil stats handler, which would otherwise cause
  416. // panics.
  417. return
  418. }
  419. o.statsHandlers = append(o.statsHandlers, h)
  420. })
  421. }
  422. // binaryLogger returns a ServerOption that can set the binary logger for the
  423. // server.
  424. func binaryLogger(bl binarylog.Logger) ServerOption {
  425. return newFuncServerOption(func(o *serverOptions) {
  426. o.binaryLogger = bl
  427. })
  428. }
  429. // UnknownServiceHandler returns a ServerOption that allows for adding a custom
  430. // unknown service handler. The provided method is a bidi-streaming RPC service
  431. // handler that will be invoked instead of returning the "unimplemented" gRPC
  432. // error whenever a request is received for an unregistered service or method.
  433. // The handling function and stream interceptor (if set) have full access to
  434. // the ServerStream, including its Context.
  435. func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
  436. return newFuncServerOption(func(o *serverOptions) {
  437. o.unknownStreamDesc = &StreamDesc{
  438. StreamName: "unknown_service_handler",
  439. Handler: streamHandler,
  440. // We need to assume that the users of the streamHandler will want to use both.
  441. ClientStreams: true,
  442. ServerStreams: true,
  443. }
  444. })
  445. }
  446. // ConnectionTimeout returns a ServerOption that sets the timeout for
  447. // connection establishment (up to and including HTTP/2 handshaking) for all
  448. // new connections. If this is not set, the default is 120 seconds. A zero or
  449. // negative value will result in an immediate timeout.
  450. //
  451. // # Experimental
  452. //
  453. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  454. // later release.
  455. func ConnectionTimeout(d time.Duration) ServerOption {
  456. return newFuncServerOption(func(o *serverOptions) {
  457. o.connectionTimeout = d
  458. })
  459. }
  460. // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
  461. // of header list that the server is prepared to accept.
  462. func MaxHeaderListSize(s uint32) ServerOption {
  463. return newFuncServerOption(func(o *serverOptions) {
  464. o.maxHeaderListSize = &s
  465. })
  466. }
  467. // HeaderTableSize returns a ServerOption that sets the size of dynamic
  468. // header table for stream.
  469. //
  470. // # Experimental
  471. //
  472. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  473. // later release.
  474. func HeaderTableSize(s uint32) ServerOption {
  475. return newFuncServerOption(func(o *serverOptions) {
  476. o.headerTableSize = &s
  477. })
  478. }
  479. // NumStreamWorkers returns a ServerOption that sets the number of worker
  480. // goroutines that should be used to process incoming streams. Setting this to
  481. // zero (default) will disable workers and spawn a new goroutine for each
  482. // stream.
  483. //
  484. // # Experimental
  485. //
  486. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  487. // later release.
  488. func NumStreamWorkers(numServerWorkers uint32) ServerOption {
  489. // TODO: If/when this API gets stabilized (i.e. stream workers become the
  490. // only way streams are processed), change the behavior of the zero value to
  491. // a sane default. Preliminary experiments suggest that a value equal to the
  492. // number of CPUs available is most performant; requires thorough testing.
  493. return newFuncServerOption(func(o *serverOptions) {
  494. o.numServerWorkers = numServerWorkers
  495. })
  496. }
  497. // serverWorkerResetThreshold defines how often the stack must be reset. Every
  498. // N requests, by spawning a new goroutine in its place, a worker can reset its
  499. // stack so that large stacks don't live in memory forever. 2^16 should allow
  500. // each goroutine stack to live for at least a few seconds in a typical
  501. // workload (assuming a QPS of a few thousand requests/sec).
  502. const serverWorkerResetThreshold = 1 << 16
  503. // serverWorkers blocks on a *transport.Stream channel forever and waits for
  504. // data to be fed by serveStreams. This allows multiple requests to be
  505. // processed by the same goroutine, removing the need for expensive stack
  506. // re-allocations (see the runtime.morestack problem [1]).
  507. //
  508. // [1] https://github.com/golang/go/issues/18138
  509. func (s *Server) serverWorker() {
  510. for completed := 0; completed < serverWorkerResetThreshold; completed++ {
  511. data, ok := <-s.serverWorkerChannel
  512. if !ok {
  513. return
  514. }
  515. s.handleSingleStream(data)
  516. }
  517. go s.serverWorker()
  518. }
  519. func (s *Server) handleSingleStream(data *serverWorkerData) {
  520. defer data.wg.Done()
  521. s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
  522. }
  523. // initServerWorkers creates worker goroutines and a channel to process incoming
  524. // connections to reduce the time spent overall on runtime.morestack.
  525. func (s *Server) initServerWorkers() {
  526. s.serverWorkerChannel = make(chan *serverWorkerData)
  527. for i := uint32(0); i < s.opts.numServerWorkers; i++ {
  528. go s.serverWorker()
  529. }
  530. }
  531. func (s *Server) stopServerWorkers() {
  532. close(s.serverWorkerChannel)
  533. }
  534. // NewServer creates a gRPC server which has no service registered and has not
  535. // started to accept requests yet.
  536. func NewServer(opt ...ServerOption) *Server {
  537. opts := defaultServerOptions
  538. for _, o := range globalServerOptions {
  539. o.apply(&opts)
  540. }
  541. for _, o := range opt {
  542. o.apply(&opts)
  543. }
  544. s := &Server{
  545. lis: make(map[net.Listener]bool),
  546. opts: opts,
  547. conns: make(map[string]map[transport.ServerTransport]bool),
  548. services: make(map[string]*serviceInfo),
  549. quit: grpcsync.NewEvent(),
  550. done: grpcsync.NewEvent(),
  551. czData: new(channelzData),
  552. }
  553. chainUnaryServerInterceptors(s)
  554. chainStreamServerInterceptors(s)
  555. s.cv = sync.NewCond(&s.mu)
  556. if EnableTracing {
  557. _, file, line, _ := runtime.Caller(1)
  558. s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
  559. }
  560. if s.opts.numServerWorkers > 0 {
  561. s.initServerWorkers()
  562. }
  563. s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
  564. channelz.Info(logger, s.channelzID, "Server created")
  565. return s
  566. }
  567. // printf records an event in s's event log, unless s has been stopped.
  568. // REQUIRES s.mu is held.
  569. func (s *Server) printf(format string, a ...interface{}) {
  570. if s.events != nil {
  571. s.events.Printf(format, a...)
  572. }
  573. }
  574. // errorf records an error in s's event log, unless s has been stopped.
  575. // REQUIRES s.mu is held.
  576. func (s *Server) errorf(format string, a ...interface{}) {
  577. if s.events != nil {
  578. s.events.Errorf(format, a...)
  579. }
  580. }
  581. // ServiceRegistrar wraps a single method that supports service registration. It
  582. // enables users to pass concrete types other than grpc.Server to the service
  583. // registration methods exported by the IDL generated code.
  584. type ServiceRegistrar interface {
  585. // RegisterService registers a service and its implementation to the
  586. // concrete type implementing this interface. It may not be called
  587. // once the server has started serving.
  588. // desc describes the service and its methods and handlers. impl is the
  589. // service implementation which is passed to the method handlers.
  590. RegisterService(desc *ServiceDesc, impl interface{})
  591. }
  592. // RegisterService registers a service and its implementation to the gRPC
  593. // server. It is called from the IDL generated code. This must be called before
  594. // invoking Serve. If ss is non-nil (for legacy code), its type is checked to
  595. // ensure it implements sd.HandlerType.
  596. func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  597. if ss != nil {
  598. ht := reflect.TypeOf(sd.HandlerType).Elem()
  599. st := reflect.TypeOf(ss)
  600. if !st.Implements(ht) {
  601. logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
  602. }
  603. }
  604. s.register(sd, ss)
  605. }
  606. func (s *Server) register(sd *ServiceDesc, ss interface{}) {
  607. s.mu.Lock()
  608. defer s.mu.Unlock()
  609. s.printf("RegisterService(%q)", sd.ServiceName)
  610. if s.serve {
  611. logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
  612. }
  613. if _, ok := s.services[sd.ServiceName]; ok {
  614. logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
  615. }
  616. info := &serviceInfo{
  617. serviceImpl: ss,
  618. methods: make(map[string]*MethodDesc),
  619. streams: make(map[string]*StreamDesc),
  620. mdata: sd.Metadata,
  621. }
  622. for i := range sd.Methods {
  623. d := &sd.Methods[i]
  624. info.methods[d.MethodName] = d
  625. }
  626. for i := range sd.Streams {
  627. d := &sd.Streams[i]
  628. info.streams[d.StreamName] = d
  629. }
  630. s.services[sd.ServiceName] = info
  631. }
  632. // MethodInfo contains the information of an RPC including its method name and type.
  633. type MethodInfo struct {
  634. // Name is the method name only, without the service name or package name.
  635. Name string
  636. // IsClientStream indicates whether the RPC is a client streaming RPC.
  637. IsClientStream bool
  638. // IsServerStream indicates whether the RPC is a server streaming RPC.
  639. IsServerStream bool
  640. }
  641. // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
  642. type ServiceInfo struct {
  643. Methods []MethodInfo
  644. // Metadata is the metadata specified in ServiceDesc when registering service.
  645. Metadata interface{}
  646. }
  647. // GetServiceInfo returns a map from service names to ServiceInfo.
  648. // Service names include the package names, in the form of <package>.<service>.
  649. func (s *Server) GetServiceInfo() map[string]ServiceInfo {
  650. ret := make(map[string]ServiceInfo)
  651. for n, srv := range s.services {
  652. methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
  653. for m := range srv.methods {
  654. methods = append(methods, MethodInfo{
  655. Name: m,
  656. IsClientStream: false,
  657. IsServerStream: false,
  658. })
  659. }
  660. for m, d := range srv.streams {
  661. methods = append(methods, MethodInfo{
  662. Name: m,
  663. IsClientStream: d.ClientStreams,
  664. IsServerStream: d.ServerStreams,
  665. })
  666. }
  667. ret[n] = ServiceInfo{
  668. Methods: methods,
  669. Metadata: srv.mdata,
  670. }
  671. }
  672. return ret
  673. }
  674. // ErrServerStopped indicates that the operation is now illegal because of
  675. // the server being stopped.
  676. var ErrServerStopped = errors.New("grpc: the server has been stopped")
  677. type listenSocket struct {
  678. net.Listener
  679. channelzID *channelz.Identifier
  680. }
  681. func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
  682. return &channelz.SocketInternalMetric{
  683. SocketOptions: channelz.GetSocketOption(l.Listener),
  684. LocalAddr: l.Listener.Addr(),
  685. }
  686. }
  687. func (l *listenSocket) Close() error {
  688. err := l.Listener.Close()
  689. channelz.RemoveEntry(l.channelzID)
  690. channelz.Info(logger, l.channelzID, "ListenSocket deleted")
  691. return err
  692. }
  693. // Serve accepts incoming connections on the listener lis, creating a new
  694. // ServerTransport and service goroutine for each. The service goroutines
  695. // read gRPC requests and then call the registered handlers to reply to them.
  696. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when
  697. // this method returns.
  698. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  699. func (s *Server) Serve(lis net.Listener) error {
  700. s.mu.Lock()
  701. s.printf("serving")
  702. s.serve = true
  703. if s.lis == nil {
  704. // Serve called after Stop or GracefulStop.
  705. s.mu.Unlock()
  706. lis.Close()
  707. return ErrServerStopped
  708. }
  709. s.serveWG.Add(1)
  710. defer func() {
  711. s.serveWG.Done()
  712. if s.quit.HasFired() {
  713. // Stop or GracefulStop called; block until done and return nil.
  714. <-s.done.Done()
  715. }
  716. }()
  717. ls := &listenSocket{Listener: lis}
  718. s.lis[ls] = true
  719. defer func() {
  720. s.mu.Lock()
  721. if s.lis != nil && s.lis[ls] {
  722. ls.Close()
  723. delete(s.lis, ls)
  724. }
  725. s.mu.Unlock()
  726. }()
  727. var err error
  728. ls.channelzID, err = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
  729. if err != nil {
  730. s.mu.Unlock()
  731. return err
  732. }
  733. s.mu.Unlock()
  734. channelz.Info(logger, ls.channelzID, "ListenSocket created")
  735. var tempDelay time.Duration // how long to sleep on accept failure
  736. for {
  737. rawConn, err := lis.Accept()
  738. if err != nil {
  739. if ne, ok := err.(interface {
  740. Temporary() bool
  741. }); ok && ne.Temporary() {
  742. if tempDelay == 0 {
  743. tempDelay = 5 * time.Millisecond
  744. } else {
  745. tempDelay *= 2
  746. }
  747. if max := 1 * time.Second; tempDelay > max {
  748. tempDelay = max
  749. }
  750. s.mu.Lock()
  751. s.printf("Accept error: %v; retrying in %v", err, tempDelay)
  752. s.mu.Unlock()
  753. timer := time.NewTimer(tempDelay)
  754. select {
  755. case <-timer.C:
  756. case <-s.quit.Done():
  757. timer.Stop()
  758. return nil
  759. }
  760. continue
  761. }
  762. s.mu.Lock()
  763. s.printf("done serving; Accept = %v", err)
  764. s.mu.Unlock()
  765. if s.quit.HasFired() {
  766. return nil
  767. }
  768. return err
  769. }
  770. tempDelay = 0
  771. // Start a new goroutine to deal with rawConn so we don't stall this Accept
  772. // loop goroutine.
  773. //
  774. // Make sure we account for the goroutine so GracefulStop doesn't nil out
  775. // s.conns before this conn can be added.
  776. s.serveWG.Add(1)
  777. go func() {
  778. s.handleRawConn(lis.Addr().String(), rawConn)
  779. s.serveWG.Done()
  780. }()
  781. }
  782. }
  783. // handleRawConn forks a goroutine to handle a just-accepted connection that
  784. // has not had any I/O performed on it yet.
  785. func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
  786. if s.quit.HasFired() {
  787. rawConn.Close()
  788. return
  789. }
  790. rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
  791. // Finish handshaking (HTTP2)
  792. st := s.newHTTP2Transport(rawConn)
  793. rawConn.SetDeadline(time.Time{})
  794. if st == nil {
  795. return
  796. }
  797. if !s.addConn(lisAddr, st) {
  798. return
  799. }
  800. go func() {
  801. s.serveStreams(st)
  802. s.removeConn(lisAddr, st)
  803. }()
  804. }
  805. func (s *Server) drainServerTransports(addr string) {
  806. s.mu.Lock()
  807. conns := s.conns[addr]
  808. for st := range conns {
  809. st.Drain("")
  810. }
  811. s.mu.Unlock()
  812. }
  813. // newHTTP2Transport sets up a http/2 transport (using the
  814. // gRPC http2 server transport in transport/http2_server.go).
  815. func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
  816. config := &transport.ServerConfig{
  817. MaxStreams: s.opts.maxConcurrentStreams,
  818. ConnectionTimeout: s.opts.connectionTimeout,
  819. Credentials: s.opts.creds,
  820. InTapHandle: s.opts.inTapHandle,
  821. StatsHandlers: s.opts.statsHandlers,
  822. KeepaliveParams: s.opts.keepaliveParams,
  823. KeepalivePolicy: s.opts.keepalivePolicy,
  824. InitialWindowSize: s.opts.initialWindowSize,
  825. InitialConnWindowSize: s.opts.initialConnWindowSize,
  826. WriteBufferSize: s.opts.writeBufferSize,
  827. ReadBufferSize: s.opts.readBufferSize,
  828. ChannelzParentID: s.channelzID,
  829. MaxHeaderListSize: s.opts.maxHeaderListSize,
  830. HeaderTableSize: s.opts.headerTableSize,
  831. }
  832. st, err := transport.NewServerTransport(c, config)
  833. if err != nil {
  834. s.mu.Lock()
  835. s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
  836. s.mu.Unlock()
  837. // ErrConnDispatched means that the connection was dispatched away from
  838. // gRPC; those connections should be left open.
  839. if err != credentials.ErrConnDispatched {
  840. // Don't log on ErrConnDispatched and io.EOF to prevent log spam.
  841. if err != io.EOF {
  842. channelz.Info(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
  843. }
  844. c.Close()
  845. }
  846. return nil
  847. }
  848. return st
  849. }
  850. func (s *Server) serveStreams(st transport.ServerTransport) {
  851. defer st.Close(errors.New("finished serving streams for the server transport"))
  852. var wg sync.WaitGroup
  853. st.HandleStreams(func(stream *transport.Stream) {
  854. wg.Add(1)
  855. if s.opts.numServerWorkers > 0 {
  856. data := &serverWorkerData{st: st, wg: &wg, stream: stream}
  857. select {
  858. case s.serverWorkerChannel <- data:
  859. return
  860. default:
  861. // If all stream workers are busy, fallback to the default code path.
  862. }
  863. }
  864. go func() {
  865. defer wg.Done()
  866. s.handleStream(st, stream, s.traceInfo(st, stream))
  867. }()
  868. }, func(ctx context.Context, method string) context.Context {
  869. if !EnableTracing {
  870. return ctx
  871. }
  872. tr := trace.New("grpc.Recv."+methodFamily(method), method)
  873. return trace.NewContext(ctx, tr)
  874. })
  875. wg.Wait()
  876. }
  877. var _ http.Handler = (*Server)(nil)
  878. // ServeHTTP implements the Go standard library's http.Handler
  879. // interface by responding to the gRPC request r, by looking up
  880. // the requested gRPC method in the gRPC server s.
  881. //
  882. // The provided HTTP request must have arrived on an HTTP/2
  883. // connection. When using the Go standard library's server,
  884. // practically this means that the Request must also have arrived
  885. // over TLS.
  886. //
  887. // To share one port (such as 443 for https) between gRPC and an
  888. // existing http.Handler, use a root http.Handler such as:
  889. //
  890. // if r.ProtoMajor == 2 && strings.HasPrefix(
  891. // r.Header.Get("Content-Type"), "application/grpc") {
  892. // grpcServer.ServeHTTP(w, r)
  893. // } else {
  894. // yourMux.ServeHTTP(w, r)
  895. // }
  896. //
  897. // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
  898. // separate from grpc-go's HTTP/2 server. Performance and features may vary
  899. // between the two paths. ServeHTTP does not support some gRPC features
  900. // available through grpc-go's HTTP/2 server.
  901. //
  902. // # Experimental
  903. //
  904. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  905. // later release.
  906. func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  907. st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
  908. if err != nil {
  909. // Errors returned from transport.NewServerHandlerTransport have
  910. // already been written to w.
  911. return
  912. }
  913. if !s.addConn(listenerAddressForServeHTTP, st) {
  914. return
  915. }
  916. defer s.removeConn(listenerAddressForServeHTTP, st)
  917. s.serveStreams(st)
  918. }
  919. // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
  920. // If tracing is not enabled, it returns nil.
  921. func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
  922. if !EnableTracing {
  923. return nil
  924. }
  925. tr, ok := trace.FromContext(stream.Context())
  926. if !ok {
  927. return nil
  928. }
  929. trInfo = &traceInfo{
  930. tr: tr,
  931. firstLine: firstLine{
  932. client: false,
  933. remoteAddr: st.RemoteAddr(),
  934. },
  935. }
  936. if dl, ok := stream.Context().Deadline(); ok {
  937. trInfo.firstLine.deadline = time.Until(dl)
  938. }
  939. return trInfo
  940. }
  941. func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
  942. s.mu.Lock()
  943. defer s.mu.Unlock()
  944. if s.conns == nil {
  945. st.Close(errors.New("Server.addConn called when server has already been stopped"))
  946. return false
  947. }
  948. if s.drain {
  949. // Transport added after we drained our existing conns: drain it
  950. // immediately.
  951. st.Drain("")
  952. }
  953. if s.conns[addr] == nil {
  954. // Create a map entry if this is the first connection on this listener.
  955. s.conns[addr] = make(map[transport.ServerTransport]bool)
  956. }
  957. s.conns[addr][st] = true
  958. return true
  959. }
  960. func (s *Server) removeConn(addr string, st transport.ServerTransport) {
  961. s.mu.Lock()
  962. defer s.mu.Unlock()
  963. conns := s.conns[addr]
  964. if conns != nil {
  965. delete(conns, st)
  966. if len(conns) == 0 {
  967. // If the last connection for this address is being removed, also
  968. // remove the map entry corresponding to the address. This is used
  969. // in GracefulStop() when waiting for all connections to be closed.
  970. delete(s.conns, addr)
  971. }
  972. s.cv.Broadcast()
  973. }
  974. }
  975. func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
  976. return &channelz.ServerInternalMetric{
  977. CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
  978. CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
  979. CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
  980. LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
  981. }
  982. }
  983. func (s *Server) incrCallsStarted() {
  984. atomic.AddInt64(&s.czData.callsStarted, 1)
  985. atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
  986. }
  987. func (s *Server) incrCallsSucceeded() {
  988. atomic.AddInt64(&s.czData.callsSucceeded, 1)
  989. }
  990. func (s *Server) incrCallsFailed() {
  991. atomic.AddInt64(&s.czData.callsFailed, 1)
  992. }
  993. func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  994. data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
  995. if err != nil {
  996. channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
  997. return err
  998. }
  999. compData, err := compress(data, cp, comp)
  1000. if err != nil {
  1001. channelz.Error(logger, s.channelzID, "grpc: server failed to compress response: ", err)
  1002. return err
  1003. }
  1004. hdr, payload := msgHeader(data, compData)
  1005. // TODO(dfawley): should we be checking len(data) instead?
  1006. if len(payload) > s.opts.maxSendMessageSize {
  1007. return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
  1008. }
  1009. err = t.Write(stream, hdr, payload, opts)
  1010. if err == nil {
  1011. for _, sh := range s.opts.statsHandlers {
  1012. sh.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
  1013. }
  1014. }
  1015. return err
  1016. }
  1017. // chainUnaryServerInterceptors chains all unary server interceptors into one.
  1018. func chainUnaryServerInterceptors(s *Server) {
  1019. // Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
  1020. // be executed before any other chained interceptors.
  1021. interceptors := s.opts.chainUnaryInts
  1022. if s.opts.unaryInt != nil {
  1023. interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
  1024. }
  1025. var chainedInt UnaryServerInterceptor
  1026. if len(interceptors) == 0 {
  1027. chainedInt = nil
  1028. } else if len(interceptors) == 1 {
  1029. chainedInt = interceptors[0]
  1030. } else {
  1031. chainedInt = chainUnaryInterceptors(interceptors)
  1032. }
  1033. s.opts.unaryInt = chainedInt
  1034. }
  1035. func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
  1036. return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
  1037. return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
  1038. }
  1039. }
  1040. func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
  1041. if curr == len(interceptors)-1 {
  1042. return finalHandler
  1043. }
  1044. return func(ctx context.Context, req interface{}) (interface{}, error) {
  1045. return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
  1046. }
  1047. }
  1048. func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
  1049. shs := s.opts.statsHandlers
  1050. if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
  1051. if channelz.IsOn() {
  1052. s.incrCallsStarted()
  1053. }
  1054. var statsBegin *stats.Begin
  1055. for _, sh := range shs {
  1056. beginTime := time.Now()
  1057. statsBegin = &stats.Begin{
  1058. BeginTime: beginTime,
  1059. IsClientStream: false,
  1060. IsServerStream: false,
  1061. }
  1062. sh.HandleRPC(stream.Context(), statsBegin)
  1063. }
  1064. if trInfo != nil {
  1065. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1066. }
  1067. // The deferred error handling for tracing, stats handler and channelz are
  1068. // combined into one function to reduce stack usage -- a defer takes ~56-64
  1069. // bytes on the stack, so overflowing the stack will require a stack
  1070. // re-allocation, which is expensive.
  1071. //
  1072. // To maintain behavior similar to separate deferred statements, statements
  1073. // should be executed in the reverse order. That is, tracing first, stats
  1074. // handler second, and channelz last. Note that panics *within* defers will
  1075. // lead to different behavior, but that's an acceptable compromise; that
  1076. // would be undefined behavior territory anyway.
  1077. defer func() {
  1078. if trInfo != nil {
  1079. if err != nil && err != io.EOF {
  1080. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1081. trInfo.tr.SetError()
  1082. }
  1083. trInfo.tr.Finish()
  1084. }
  1085. for _, sh := range shs {
  1086. end := &stats.End{
  1087. BeginTime: statsBegin.BeginTime,
  1088. EndTime: time.Now(),
  1089. }
  1090. if err != nil && err != io.EOF {
  1091. end.Error = toRPCErr(err)
  1092. }
  1093. sh.HandleRPC(stream.Context(), end)
  1094. }
  1095. if channelz.IsOn() {
  1096. if err != nil && err != io.EOF {
  1097. s.incrCallsFailed()
  1098. } else {
  1099. s.incrCallsSucceeded()
  1100. }
  1101. }
  1102. }()
  1103. }
  1104. var binlogs []binarylog.MethodLogger
  1105. if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
  1106. binlogs = append(binlogs, ml)
  1107. }
  1108. if s.opts.binaryLogger != nil {
  1109. if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
  1110. binlogs = append(binlogs, ml)
  1111. }
  1112. }
  1113. if len(binlogs) != 0 {
  1114. ctx := stream.Context()
  1115. md, _ := metadata.FromIncomingContext(ctx)
  1116. logEntry := &binarylog.ClientHeader{
  1117. Header: md,
  1118. MethodName: stream.Method(),
  1119. PeerAddr: nil,
  1120. }
  1121. if deadline, ok := ctx.Deadline(); ok {
  1122. logEntry.Timeout = time.Until(deadline)
  1123. if logEntry.Timeout < 0 {
  1124. logEntry.Timeout = 0
  1125. }
  1126. }
  1127. if a := md[":authority"]; len(a) > 0 {
  1128. logEntry.Authority = a[0]
  1129. }
  1130. if peer, ok := peer.FromContext(ctx); ok {
  1131. logEntry.PeerAddr = peer.Addr
  1132. }
  1133. for _, binlog := range binlogs {
  1134. binlog.Log(ctx, logEntry)
  1135. }
  1136. }
  1137. // comp and cp are used for compression. decomp and dc are used for
  1138. // decompression. If comp and decomp are both set, they are the same;
  1139. // however they are kept separate to ensure that at most one of the
  1140. // compressor/decompressor variable pairs are set for use later.
  1141. var comp, decomp encoding.Compressor
  1142. var cp Compressor
  1143. var dc Decompressor
  1144. var sendCompressorName string
  1145. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1146. // to find a matching registered compressor for decomp.
  1147. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1148. dc = s.opts.dc
  1149. } else if rc != "" && rc != encoding.Identity {
  1150. decomp = encoding.GetCompressor(rc)
  1151. if decomp == nil {
  1152. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1153. t.WriteStatus(stream, st)
  1154. return st.Err()
  1155. }
  1156. }
  1157. // If cp is set, use it. Otherwise, attempt to compress the response using
  1158. // the incoming message compression method.
  1159. //
  1160. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1161. if s.opts.cp != nil {
  1162. cp = s.opts.cp
  1163. sendCompressorName = cp.Type()
  1164. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1165. // Legacy compressor not specified; attempt to respond with same encoding.
  1166. comp = encoding.GetCompressor(rc)
  1167. if comp != nil {
  1168. sendCompressorName = comp.Name()
  1169. }
  1170. }
  1171. if sendCompressorName != "" {
  1172. if err := stream.SetSendCompress(sendCompressorName); err != nil {
  1173. return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
  1174. }
  1175. }
  1176. var payInfo *payloadInfo
  1177. if len(shs) != 0 || len(binlogs) != 0 {
  1178. payInfo = &payloadInfo{}
  1179. }
  1180. d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
  1181. if err != nil {
  1182. if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
  1183. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1184. }
  1185. return err
  1186. }
  1187. if channelz.IsOn() {
  1188. t.IncrMsgRecv()
  1189. }
  1190. df := func(v interface{}) error {
  1191. if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
  1192. return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  1193. }
  1194. for _, sh := range shs {
  1195. sh.HandleRPC(stream.Context(), &stats.InPayload{
  1196. RecvTime: time.Now(),
  1197. Payload: v,
  1198. Length: len(d),
  1199. WireLength: payInfo.compressedLength + headerLen,
  1200. CompressedLength: payInfo.compressedLength,
  1201. Data: d,
  1202. })
  1203. }
  1204. if len(binlogs) != 0 {
  1205. cm := &binarylog.ClientMessage{
  1206. Message: d,
  1207. }
  1208. for _, binlog := range binlogs {
  1209. binlog.Log(stream.Context(), cm)
  1210. }
  1211. }
  1212. if trInfo != nil {
  1213. trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  1214. }
  1215. return nil
  1216. }
  1217. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1218. reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
  1219. if appErr != nil {
  1220. appStatus, ok := status.FromError(appErr)
  1221. if !ok {
  1222. // Convert non-status application error to a status error with code
  1223. // Unknown, but handle context errors specifically.
  1224. appStatus = status.FromContextError(appErr)
  1225. appErr = appStatus.Err()
  1226. }
  1227. if trInfo != nil {
  1228. trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1229. trInfo.tr.SetError()
  1230. }
  1231. if e := t.WriteStatus(stream, appStatus); e != nil {
  1232. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1233. }
  1234. if len(binlogs) != 0 {
  1235. if h, _ := stream.Header(); h.Len() > 0 {
  1236. // Only log serverHeader if there was header. Otherwise it can
  1237. // be trailer only.
  1238. sh := &binarylog.ServerHeader{
  1239. Header: h,
  1240. }
  1241. for _, binlog := range binlogs {
  1242. binlog.Log(stream.Context(), sh)
  1243. }
  1244. }
  1245. st := &binarylog.ServerTrailer{
  1246. Trailer: stream.Trailer(),
  1247. Err: appErr,
  1248. }
  1249. for _, binlog := range binlogs {
  1250. binlog.Log(stream.Context(), st)
  1251. }
  1252. }
  1253. return appErr
  1254. }
  1255. if trInfo != nil {
  1256. trInfo.tr.LazyLog(stringer("OK"), false)
  1257. }
  1258. opts := &transport.Options{Last: true}
  1259. // Server handler could have set new compressor by calling SetSendCompressor.
  1260. // In case it is set, we need to use it for compressing outbound message.
  1261. if stream.SendCompress() != sendCompressorName {
  1262. comp = encoding.GetCompressor(stream.SendCompress())
  1263. }
  1264. if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
  1265. if err == io.EOF {
  1266. // The entire stream is done (for unary RPC only).
  1267. return err
  1268. }
  1269. if sts, ok := status.FromError(err); ok {
  1270. if e := t.WriteStatus(stream, sts); e != nil {
  1271. channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1272. }
  1273. } else {
  1274. switch st := err.(type) {
  1275. case transport.ConnectionError:
  1276. // Nothing to do here.
  1277. default:
  1278. panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  1279. }
  1280. }
  1281. if len(binlogs) != 0 {
  1282. h, _ := stream.Header()
  1283. sh := &binarylog.ServerHeader{
  1284. Header: h,
  1285. }
  1286. st := &binarylog.ServerTrailer{
  1287. Trailer: stream.Trailer(),
  1288. Err: appErr,
  1289. }
  1290. for _, binlog := range binlogs {
  1291. binlog.Log(stream.Context(), sh)
  1292. binlog.Log(stream.Context(), st)
  1293. }
  1294. }
  1295. return err
  1296. }
  1297. if len(binlogs) != 0 {
  1298. h, _ := stream.Header()
  1299. sh := &binarylog.ServerHeader{
  1300. Header: h,
  1301. }
  1302. sm := &binarylog.ServerMessage{
  1303. Message: reply,
  1304. }
  1305. for _, binlog := range binlogs {
  1306. binlog.Log(stream.Context(), sh)
  1307. binlog.Log(stream.Context(), sm)
  1308. }
  1309. }
  1310. if channelz.IsOn() {
  1311. t.IncrMsgSent()
  1312. }
  1313. if trInfo != nil {
  1314. trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  1315. }
  1316. // TODO: Should we be logging if writing status failed here, like above?
  1317. // Should the logging be in WriteStatus? Should we ignore the WriteStatus
  1318. // error or allow the stats handler to see it?
  1319. if len(binlogs) != 0 {
  1320. st := &binarylog.ServerTrailer{
  1321. Trailer: stream.Trailer(),
  1322. Err: appErr,
  1323. }
  1324. for _, binlog := range binlogs {
  1325. binlog.Log(stream.Context(), st)
  1326. }
  1327. }
  1328. return t.WriteStatus(stream, statusOK)
  1329. }
  1330. // chainStreamServerInterceptors chains all stream server interceptors into one.
  1331. func chainStreamServerInterceptors(s *Server) {
  1332. // Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
  1333. // be executed before any other chained interceptors.
  1334. interceptors := s.opts.chainStreamInts
  1335. if s.opts.streamInt != nil {
  1336. interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
  1337. }
  1338. var chainedInt StreamServerInterceptor
  1339. if len(interceptors) == 0 {
  1340. chainedInt = nil
  1341. } else if len(interceptors) == 1 {
  1342. chainedInt = interceptors[0]
  1343. } else {
  1344. chainedInt = chainStreamInterceptors(interceptors)
  1345. }
  1346. s.opts.streamInt = chainedInt
  1347. }
  1348. func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
  1349. return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
  1350. return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
  1351. }
  1352. }
  1353. func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
  1354. if curr == len(interceptors)-1 {
  1355. return finalHandler
  1356. }
  1357. return func(srv interface{}, stream ServerStream) error {
  1358. return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
  1359. }
  1360. }
  1361. func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
  1362. if channelz.IsOn() {
  1363. s.incrCallsStarted()
  1364. }
  1365. shs := s.opts.statsHandlers
  1366. var statsBegin *stats.Begin
  1367. if len(shs) != 0 {
  1368. beginTime := time.Now()
  1369. statsBegin = &stats.Begin{
  1370. BeginTime: beginTime,
  1371. IsClientStream: sd.ClientStreams,
  1372. IsServerStream: sd.ServerStreams,
  1373. }
  1374. for _, sh := range shs {
  1375. sh.HandleRPC(stream.Context(), statsBegin)
  1376. }
  1377. }
  1378. ctx := NewContextWithServerTransportStream(stream.Context(), stream)
  1379. ss := &serverStream{
  1380. ctx: ctx,
  1381. t: t,
  1382. s: stream,
  1383. p: &parser{r: stream},
  1384. codec: s.getCodec(stream.ContentSubtype()),
  1385. maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  1386. maxSendMessageSize: s.opts.maxSendMessageSize,
  1387. trInfo: trInfo,
  1388. statsHandler: shs,
  1389. }
  1390. if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
  1391. // See comment in processUnaryRPC on defers.
  1392. defer func() {
  1393. if trInfo != nil {
  1394. ss.mu.Lock()
  1395. if err != nil && err != io.EOF {
  1396. ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1397. ss.trInfo.tr.SetError()
  1398. }
  1399. ss.trInfo.tr.Finish()
  1400. ss.trInfo.tr = nil
  1401. ss.mu.Unlock()
  1402. }
  1403. if len(shs) != 0 {
  1404. end := &stats.End{
  1405. BeginTime: statsBegin.BeginTime,
  1406. EndTime: time.Now(),
  1407. }
  1408. if err != nil && err != io.EOF {
  1409. end.Error = toRPCErr(err)
  1410. }
  1411. for _, sh := range shs {
  1412. sh.HandleRPC(stream.Context(), end)
  1413. }
  1414. }
  1415. if channelz.IsOn() {
  1416. if err != nil && err != io.EOF {
  1417. s.incrCallsFailed()
  1418. } else {
  1419. s.incrCallsSucceeded()
  1420. }
  1421. }
  1422. }()
  1423. }
  1424. if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
  1425. ss.binlogs = append(ss.binlogs, ml)
  1426. }
  1427. if s.opts.binaryLogger != nil {
  1428. if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
  1429. ss.binlogs = append(ss.binlogs, ml)
  1430. }
  1431. }
  1432. if len(ss.binlogs) != 0 {
  1433. md, _ := metadata.FromIncomingContext(ctx)
  1434. logEntry := &binarylog.ClientHeader{
  1435. Header: md,
  1436. MethodName: stream.Method(),
  1437. PeerAddr: nil,
  1438. }
  1439. if deadline, ok := ctx.Deadline(); ok {
  1440. logEntry.Timeout = time.Until(deadline)
  1441. if logEntry.Timeout < 0 {
  1442. logEntry.Timeout = 0
  1443. }
  1444. }
  1445. if a := md[":authority"]; len(a) > 0 {
  1446. logEntry.Authority = a[0]
  1447. }
  1448. if peer, ok := peer.FromContext(ss.Context()); ok {
  1449. logEntry.PeerAddr = peer.Addr
  1450. }
  1451. for _, binlog := range ss.binlogs {
  1452. binlog.Log(stream.Context(), logEntry)
  1453. }
  1454. }
  1455. // If dc is set and matches the stream's compression, use it. Otherwise, try
  1456. // to find a matching registered compressor for decomp.
  1457. if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1458. ss.dc = s.opts.dc
  1459. } else if rc != "" && rc != encoding.Identity {
  1460. ss.decomp = encoding.GetCompressor(rc)
  1461. if ss.decomp == nil {
  1462. st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1463. t.WriteStatus(ss.s, st)
  1464. return st.Err()
  1465. }
  1466. }
  1467. // If cp is set, use it. Otherwise, attempt to compress the response using
  1468. // the incoming message compression method.
  1469. //
  1470. // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1471. if s.opts.cp != nil {
  1472. ss.cp = s.opts.cp
  1473. ss.sendCompressorName = s.opts.cp.Type()
  1474. } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1475. // Legacy compressor not specified; attempt to respond with same encoding.
  1476. ss.comp = encoding.GetCompressor(rc)
  1477. if ss.comp != nil {
  1478. ss.sendCompressorName = rc
  1479. }
  1480. }
  1481. if ss.sendCompressorName != "" {
  1482. if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
  1483. return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
  1484. }
  1485. }
  1486. ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
  1487. if trInfo != nil {
  1488. trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1489. }
  1490. var appErr error
  1491. var server interface{}
  1492. if info != nil {
  1493. server = info.serviceImpl
  1494. }
  1495. if s.opts.streamInt == nil {
  1496. appErr = sd.Handler(server, ss)
  1497. } else {
  1498. info := &StreamServerInfo{
  1499. FullMethod: stream.Method(),
  1500. IsClientStream: sd.ClientStreams,
  1501. IsServerStream: sd.ServerStreams,
  1502. }
  1503. appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  1504. }
  1505. if appErr != nil {
  1506. appStatus, ok := status.FromError(appErr)
  1507. if !ok {
  1508. // Convert non-status application error to a status error with code
  1509. // Unknown, but handle context errors specifically.
  1510. appStatus = status.FromContextError(appErr)
  1511. appErr = appStatus.Err()
  1512. }
  1513. if trInfo != nil {
  1514. ss.mu.Lock()
  1515. ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1516. ss.trInfo.tr.SetError()
  1517. ss.mu.Unlock()
  1518. }
  1519. if len(ss.binlogs) != 0 {
  1520. st := &binarylog.ServerTrailer{
  1521. Trailer: ss.s.Trailer(),
  1522. Err: appErr,
  1523. }
  1524. for _, binlog := range ss.binlogs {
  1525. binlog.Log(stream.Context(), st)
  1526. }
  1527. }
  1528. t.WriteStatus(ss.s, appStatus)
  1529. // TODO: Should we log an error from WriteStatus here and below?
  1530. return appErr
  1531. }
  1532. if trInfo != nil {
  1533. ss.mu.Lock()
  1534. ss.trInfo.tr.LazyLog(stringer("OK"), false)
  1535. ss.mu.Unlock()
  1536. }
  1537. if len(ss.binlogs) != 0 {
  1538. st := &binarylog.ServerTrailer{
  1539. Trailer: ss.s.Trailer(),
  1540. Err: appErr,
  1541. }
  1542. for _, binlog := range ss.binlogs {
  1543. binlog.Log(stream.Context(), st)
  1544. }
  1545. }
  1546. return t.WriteStatus(ss.s, statusOK)
  1547. }
  1548. func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  1549. sm := stream.Method()
  1550. if sm != "" && sm[0] == '/' {
  1551. sm = sm[1:]
  1552. }
  1553. pos := strings.LastIndex(sm, "/")
  1554. if pos == -1 {
  1555. if trInfo != nil {
  1556. trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
  1557. trInfo.tr.SetError()
  1558. }
  1559. errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  1560. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1561. if trInfo != nil {
  1562. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1563. trInfo.tr.SetError()
  1564. }
  1565. channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1566. }
  1567. if trInfo != nil {
  1568. trInfo.tr.Finish()
  1569. }
  1570. return
  1571. }
  1572. service := sm[:pos]
  1573. method := sm[pos+1:]
  1574. srv, knownService := s.services[service]
  1575. if knownService {
  1576. if md, ok := srv.methods[method]; ok {
  1577. s.processUnaryRPC(t, stream, srv, md, trInfo)
  1578. return
  1579. }
  1580. if sd, ok := srv.streams[method]; ok {
  1581. s.processStreamingRPC(t, stream, srv, sd, trInfo)
  1582. return
  1583. }
  1584. }
  1585. // Unknown service, or known server unknown method.
  1586. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  1587. s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
  1588. return
  1589. }
  1590. var errDesc string
  1591. if !knownService {
  1592. errDesc = fmt.Sprintf("unknown service %v", service)
  1593. } else {
  1594. errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
  1595. }
  1596. if trInfo != nil {
  1597. trInfo.tr.LazyPrintf("%s", errDesc)
  1598. trInfo.tr.SetError()
  1599. }
  1600. if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1601. if trInfo != nil {
  1602. trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
  1603. trInfo.tr.SetError()
  1604. }
  1605. channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
  1606. }
  1607. if trInfo != nil {
  1608. trInfo.tr.Finish()
  1609. }
  1610. }
  1611. // The key to save ServerTransportStream in the context.
  1612. type streamKey struct{}
  1613. // NewContextWithServerTransportStream creates a new context from ctx and
  1614. // attaches stream to it.
  1615. //
  1616. // # Experimental
  1617. //
  1618. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  1619. // later release.
  1620. func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
  1621. return context.WithValue(ctx, streamKey{}, stream)
  1622. }
  1623. // ServerTransportStream is a minimal interface that a transport stream must
  1624. // implement. This can be used to mock an actual transport stream for tests of
  1625. // handler code that use, for example, grpc.SetHeader (which requires some
  1626. // stream to be in context).
  1627. //
  1628. // See also NewContextWithServerTransportStream.
  1629. //
  1630. // # Experimental
  1631. //
  1632. // Notice: This type is EXPERIMENTAL and may be changed or removed in a
  1633. // later release.
  1634. type ServerTransportStream interface {
  1635. Method() string
  1636. SetHeader(md metadata.MD) error
  1637. SendHeader(md metadata.MD) error
  1638. SetTrailer(md metadata.MD) error
  1639. }
  1640. // ServerTransportStreamFromContext returns the ServerTransportStream saved in
  1641. // ctx. Returns nil if the given context has no stream associated with it
  1642. // (which implies it is not an RPC invocation context).
  1643. //
  1644. // # Experimental
  1645. //
  1646. // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  1647. // later release.
  1648. func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
  1649. s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
  1650. return s
  1651. }
  1652. // Stop stops the gRPC server. It immediately closes all open
  1653. // connections and listeners.
  1654. // It cancels all active RPCs on the server side and the corresponding
  1655. // pending RPCs on the client side will get notified by connection
  1656. // errors.
  1657. func (s *Server) Stop() {
  1658. s.quit.Fire()
  1659. defer func() {
  1660. s.serveWG.Wait()
  1661. s.done.Fire()
  1662. }()
  1663. s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
  1664. s.mu.Lock()
  1665. listeners := s.lis
  1666. s.lis = nil
  1667. conns := s.conns
  1668. s.conns = nil
  1669. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
  1670. s.cv.Broadcast()
  1671. s.mu.Unlock()
  1672. for lis := range listeners {
  1673. lis.Close()
  1674. }
  1675. for _, cs := range conns {
  1676. for st := range cs {
  1677. st.Close(errors.New("Server.Stop called"))
  1678. }
  1679. }
  1680. if s.opts.numServerWorkers > 0 {
  1681. s.stopServerWorkers()
  1682. }
  1683. s.mu.Lock()
  1684. if s.events != nil {
  1685. s.events.Finish()
  1686. s.events = nil
  1687. }
  1688. s.mu.Unlock()
  1689. }
  1690. // GracefulStop stops the gRPC server gracefully. It stops the server from
  1691. // accepting new connections and RPCs and blocks until all the pending RPCs are
  1692. // finished.
  1693. func (s *Server) GracefulStop() {
  1694. s.quit.Fire()
  1695. defer s.done.Fire()
  1696. s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelzID) })
  1697. s.mu.Lock()
  1698. if s.conns == nil {
  1699. s.mu.Unlock()
  1700. return
  1701. }
  1702. for lis := range s.lis {
  1703. lis.Close()
  1704. }
  1705. s.lis = nil
  1706. if !s.drain {
  1707. for _, conns := range s.conns {
  1708. for st := range conns {
  1709. st.Drain("graceful_stop")
  1710. }
  1711. }
  1712. s.drain = true
  1713. }
  1714. // Wait for serving threads to be ready to exit. Only then can we be sure no
  1715. // new conns will be created.
  1716. s.mu.Unlock()
  1717. s.serveWG.Wait()
  1718. s.mu.Lock()
  1719. for len(s.conns) != 0 {
  1720. s.cv.Wait()
  1721. }
  1722. s.conns = nil
  1723. if s.events != nil {
  1724. s.events.Finish()
  1725. s.events = nil
  1726. }
  1727. s.mu.Unlock()
  1728. }
  1729. // contentSubtype must be lowercase
  1730. // cannot return nil
  1731. func (s *Server) getCodec(contentSubtype string) baseCodec {
  1732. if s.opts.codec != nil {
  1733. return s.opts.codec
  1734. }
  1735. if contentSubtype == "" {
  1736. return encoding.GetCodec(proto.Name)
  1737. }
  1738. codec := encoding.GetCodec(contentSubtype)
  1739. if codec == nil {
  1740. return encoding.GetCodec(proto.Name)
  1741. }
  1742. return codec
  1743. }
  1744. // SetHeader sets the header metadata to be sent from the server to the client.
  1745. // The context provided must be the context passed to the server's handler.
  1746. //
  1747. // Streaming RPCs should prefer the SetHeader method of the ServerStream.
  1748. //
  1749. // When called multiple times, all the provided metadata will be merged. All
  1750. // the metadata will be sent out when one of the following happens:
  1751. //
  1752. // - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
  1753. // - The first response message is sent. For unary handlers, this occurs when
  1754. // the handler returns; for streaming handlers, this can happen when stream's
  1755. // SendMsg method is called.
  1756. // - An RPC status is sent out (error or success). This occurs when the handler
  1757. // returns.
  1758. //
  1759. // SetHeader will fail if called after any of the events above.
  1760. //
  1761. // The error returned is compatible with the status package. However, the
  1762. // status code will often not match the RPC status as seen by the client
  1763. // application, and therefore, should not be relied upon for this purpose.
  1764. func SetHeader(ctx context.Context, md metadata.MD) error {
  1765. if md.Len() == 0 {
  1766. return nil
  1767. }
  1768. stream := ServerTransportStreamFromContext(ctx)
  1769. if stream == nil {
  1770. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1771. }
  1772. return stream.SetHeader(md)
  1773. }
  1774. // SendHeader sends header metadata. It may be called at most once, and may not
  1775. // be called after any event that causes headers to be sent (see SetHeader for
  1776. // a complete list). The provided md and headers set by SetHeader() will be
  1777. // sent.
  1778. //
  1779. // The error returned is compatible with the status package. However, the
  1780. // status code will often not match the RPC status as seen by the client
  1781. // application, and therefore, should not be relied upon for this purpose.
  1782. func SendHeader(ctx context.Context, md metadata.MD) error {
  1783. stream := ServerTransportStreamFromContext(ctx)
  1784. if stream == nil {
  1785. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1786. }
  1787. if err := stream.SendHeader(md); err != nil {
  1788. return toRPCErr(err)
  1789. }
  1790. return nil
  1791. }
  1792. // SetSendCompressor sets a compressor for outbound messages from the server.
  1793. // It must not be called after any event that causes headers to be sent
  1794. // (see ServerStream.SetHeader for the complete list). Provided compressor is
  1795. // used when below conditions are met:
  1796. //
  1797. // - compressor is registered via encoding.RegisterCompressor
  1798. // - compressor name must exist in the client advertised compressor names
  1799. // sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
  1800. // get client supported compressor names.
  1801. //
  1802. // The context provided must be the context passed to the server's handler.
  1803. // It must be noted that compressor name encoding.Identity disables the
  1804. // outbound compression.
  1805. // By default, server messages will be sent using the same compressor with
  1806. // which request messages were sent.
  1807. //
  1808. // It is not safe to call SetSendCompressor concurrently with SendHeader and
  1809. // SendMsg.
  1810. //
  1811. // # Experimental
  1812. //
  1813. // Notice: This function is EXPERIMENTAL and may be changed or removed in a
  1814. // later release.
  1815. func SetSendCompressor(ctx context.Context, name string) error {
  1816. stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
  1817. if !ok || stream == nil {
  1818. return fmt.Errorf("failed to fetch the stream from the given context")
  1819. }
  1820. if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
  1821. return fmt.Errorf("unable to set send compressor: %w", err)
  1822. }
  1823. return stream.SetSendCompress(name)
  1824. }
  1825. // ClientSupportedCompressors returns compressor names advertised by the client
  1826. // via grpc-accept-encoding header.
  1827. //
  1828. // The context provided must be the context passed to the server's handler.
  1829. //
  1830. // # Experimental
  1831. //
  1832. // Notice: This function is EXPERIMENTAL and may be changed or removed in a
  1833. // later release.
  1834. func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
  1835. stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
  1836. if !ok || stream == nil {
  1837. return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
  1838. }
  1839. return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
  1840. }
  1841. // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  1842. // When called more than once, all the provided metadata will be merged.
  1843. //
  1844. // The error returned is compatible with the status package. However, the
  1845. // status code will often not match the RPC status as seen by the client
  1846. // application, and therefore, should not be relied upon for this purpose.
  1847. func SetTrailer(ctx context.Context, md metadata.MD) error {
  1848. if md.Len() == 0 {
  1849. return nil
  1850. }
  1851. stream := ServerTransportStreamFromContext(ctx)
  1852. if stream == nil {
  1853. return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  1854. }
  1855. return stream.SetTrailer(md)
  1856. }
  1857. // Method returns the method string for the server context. The returned
  1858. // string is in the format of "/service/method".
  1859. func Method(ctx context.Context) (string, bool) {
  1860. s := ServerTransportStreamFromContext(ctx)
  1861. if s == nil {
  1862. return "", false
  1863. }
  1864. return s.Method(), true
  1865. }
  1866. type channelzServer struct {
  1867. s *Server
  1868. }
  1869. func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
  1870. return c.s.channelzMetric()
  1871. }
  1872. // validateSendCompressor returns an error when given compressor name cannot be
  1873. // handled by the server or the client based on the advertised compressors.
  1874. func validateSendCompressor(name, clientCompressors string) error {
  1875. if name == encoding.Identity {
  1876. return nil
  1877. }
  1878. if !grpcutil.IsCompressorNameRegistered(name) {
  1879. return fmt.Errorf("compressor not registered %q", name)
  1880. }
  1881. for _, c := range strings.Split(clientCompressors, ",") {
  1882. if c == name {
  1883. return nil // found match
  1884. }
  1885. }
  1886. return fmt.Errorf("client does not support compressor %q", name)
  1887. }