server.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. /*
  2. *
  3. * Copyright 2020 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package xds
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "net"
  24. "sync"
  25. "google.golang.org/grpc"
  26. "google.golang.org/grpc/codes"
  27. "google.golang.org/grpc/connectivity"
  28. "google.golang.org/grpc/credentials"
  29. "google.golang.org/grpc/grpclog"
  30. "google.golang.org/grpc/internal"
  31. "google.golang.org/grpc/internal/buffer"
  32. "google.golang.org/grpc/internal/envconfig"
  33. internalgrpclog "google.golang.org/grpc/internal/grpclog"
  34. "google.golang.org/grpc/internal/grpcsync"
  35. iresolver "google.golang.org/grpc/internal/resolver"
  36. "google.golang.org/grpc/internal/transport"
  37. "google.golang.org/grpc/metadata"
  38. "google.golang.org/grpc/status"
  39. "google.golang.org/grpc/xds/internal/server"
  40. "google.golang.org/grpc/xds/internal/xdsclient"
  41. "google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
  42. "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
  43. )
  44. const serverPrefix = "[xds-server %p] "
  45. var (
  46. // These new functions will be overridden in unit tests.
  47. newXDSClient = func() (xdsclient.XDSClient, func(), error) {
  48. return xdsclient.New()
  49. }
  50. newGRPCServer = func(opts ...grpc.ServerOption) grpcServer {
  51. return grpc.NewServer(opts...)
  52. }
  53. grpcGetServerCreds = internal.GetServerCredentials.(func(*grpc.Server) credentials.TransportCredentials)
  54. drainServerTransports = internal.DrainServerTransports.(func(*grpc.Server, string))
  55. logger = grpclog.Component("xds")
  56. )
  57. // grpcServer contains methods from grpc.Server which are used by the
  58. // GRPCServer type here. This is useful for overriding in unit tests.
  59. type grpcServer interface {
  60. RegisterService(*grpc.ServiceDesc, interface{})
  61. Serve(net.Listener) error
  62. Stop()
  63. GracefulStop()
  64. GetServiceInfo() map[string]grpc.ServiceInfo
  65. }
  66. // GRPCServer wraps a gRPC server and provides server-side xDS functionality, by
  67. // communication with a management server using xDS APIs. It implements the
  68. // grpc.ServiceRegistrar interface and can be passed to service registration
  69. // functions in IDL generated code.
  70. type GRPCServer struct {
  71. gs grpcServer
  72. quit *grpcsync.Event
  73. logger *internalgrpclog.PrefixLogger
  74. xdsCredsInUse bool
  75. opts *serverOptions
  76. // clientMu is used only in initXDSClient(), which is called at the
  77. // beginning of Serve(), where we have to decide if we have to create a
  78. // client or use an existing one.
  79. clientMu sync.Mutex
  80. xdsC xdsclient.XDSClient
  81. xdsClientClose func()
  82. }
  83. // NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts.
  84. // The underlying gRPC server has no service registered and has not started to
  85. // accept requests yet.
  86. func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
  87. newOpts := []grpc.ServerOption{
  88. grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
  89. grpc.ChainStreamInterceptor(xdsStreamInterceptor),
  90. }
  91. newOpts = append(newOpts, opts...)
  92. s := &GRPCServer{
  93. gs: newGRPCServer(newOpts...),
  94. quit: grpcsync.NewEvent(),
  95. }
  96. s.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, s))
  97. s.logger.Infof("Created xds.GRPCServer")
  98. s.handleServerOptions(opts)
  99. // We type assert our underlying gRPC server to the real grpc.Server here
  100. // before trying to retrieve the configured credentials. This approach
  101. // avoids performing the same type assertion in the grpc package which
  102. // provides the implementation for internal.GetServerCredentials, and allows
  103. // us to use a fake gRPC server in tests.
  104. if gs, ok := s.gs.(*grpc.Server); ok {
  105. creds := grpcGetServerCreds(gs)
  106. if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() {
  107. s.xdsCredsInUse = true
  108. }
  109. }
  110. s.logger.Infof("xDS credentials in use: %v", s.xdsCredsInUse)
  111. return s
  112. }
  113. // handleServerOptions iterates through the list of server options passed in by
  114. // the user, and handles the xDS server specific options.
  115. func (s *GRPCServer) handleServerOptions(opts []grpc.ServerOption) {
  116. so := s.defaultServerOptions()
  117. for _, opt := range opts {
  118. if o, ok := opt.(*serverOption); ok {
  119. o.apply(so)
  120. }
  121. }
  122. s.opts = so
  123. }
  124. func (s *GRPCServer) defaultServerOptions() *serverOptions {
  125. return &serverOptions{
  126. // A default serving mode change callback which simply logs at the
  127. // default-visible log level. This will be used if the application does not
  128. // register a mode change callback.
  129. //
  130. // Note that this means that `s.opts.modeCallback` will never be nil and can
  131. // safely be invoked directly from `handleServingModeChanges`.
  132. modeCallback: s.loggingServerModeChangeCallback,
  133. }
  134. }
  135. func (s *GRPCServer) loggingServerModeChangeCallback(addr net.Addr, args ServingModeChangeArgs) {
  136. switch args.Mode {
  137. case connectivity.ServingModeServing:
  138. s.logger.Errorf("Listener %q entering mode: %q", addr.String(), args.Mode)
  139. case connectivity.ServingModeNotServing:
  140. s.logger.Errorf("Listener %q entering mode: %q due to error: %v", addr.String(), args.Mode, args.Err)
  141. }
  142. }
  143. // RegisterService registers a service and its implementation to the underlying
  144. // gRPC server. It is called from the IDL generated code. This must be called
  145. // before invoking Serve.
  146. func (s *GRPCServer) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
  147. s.gs.RegisterService(sd, ss)
  148. }
  149. // GetServiceInfo returns a map from service names to ServiceInfo.
  150. // Service names include the package names, in the form of <package>.<service>.
  151. func (s *GRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo {
  152. return s.gs.GetServiceInfo()
  153. }
  154. // initXDSClient creates a new xdsClient if there is no existing one available.
  155. func (s *GRPCServer) initXDSClient() error {
  156. s.clientMu.Lock()
  157. defer s.clientMu.Unlock()
  158. if s.xdsC != nil {
  159. return nil
  160. }
  161. newXDSClient := newXDSClient
  162. if s.opts.bootstrapContentsForTesting != nil {
  163. // Bootstrap file contents may be specified as a server option for tests.
  164. newXDSClient = func() (xdsclient.XDSClient, func(), error) {
  165. return xdsclient.NewWithBootstrapContentsForTesting(s.opts.bootstrapContentsForTesting)
  166. }
  167. }
  168. client, close, err := newXDSClient()
  169. if err != nil {
  170. return fmt.Errorf("xds: failed to create xds-client: %v", err)
  171. }
  172. s.xdsC = client
  173. s.xdsClientClose = close
  174. return nil
  175. }
  176. // Serve gets the underlying gRPC server to accept incoming connections on the
  177. // listener lis, which is expected to be listening on a TCP port.
  178. //
  179. // A connection to the management server, to receive xDS configuration, is
  180. // initiated here.
  181. //
  182. // Serve will return a non-nil error unless Stop or GracefulStop is called.
  183. func (s *GRPCServer) Serve(lis net.Listener) error {
  184. s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String())
  185. if _, ok := lis.Addr().(*net.TCPAddr); !ok {
  186. return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr())
  187. }
  188. // If this is the first time Serve() is being called, we need to initialize
  189. // our xdsClient. If not, we can use the existing one.
  190. if err := s.initXDSClient(); err != nil {
  191. return err
  192. }
  193. cfg := s.xdsC.BootstrapConfig()
  194. if cfg == nil {
  195. return errors.New("bootstrap configuration is empty")
  196. }
  197. // If xds credentials were specified by the user, but bootstrap configs do
  198. // not contain any certificate provider configuration, it is better to fail
  199. // right now rather than failing when attempting to create certificate
  200. // providers after receiving an LDS response with security configuration.
  201. if s.xdsCredsInUse {
  202. if len(cfg.CertProviderConfigs) == 0 {
  203. return errors.New("xds: certificate_providers config missing in bootstrap file")
  204. }
  205. }
  206. // The server listener resource name template from the bootstrap
  207. // configuration contains a template for the name of the Listener resource
  208. // to subscribe to for a gRPC server. If the token `%s` is present in the
  209. // string, it will be replaced with the server's listening "IP:port" (e.g.,
  210. // "0.0.0.0:8080", "[::]:8080"). The absence of a template will be treated
  211. // as an error since we do not have any default value for this.
  212. if cfg.ServerListenerResourceNameTemplate == "" {
  213. return errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
  214. }
  215. name := bootstrap.PopulateResourceTemplate(cfg.ServerListenerResourceNameTemplate, lis.Addr().String())
  216. modeUpdateCh := buffer.NewUnbounded()
  217. go func() {
  218. s.handleServingModeChanges(modeUpdateCh)
  219. }()
  220. // Create a listenerWrapper which handles all functionality required by
  221. // this particular instance of Serve().
  222. lw, goodUpdateCh := server.NewListenerWrapper(server.ListenerWrapperParams{
  223. Listener: lis,
  224. ListenerResourceName: name,
  225. XDSCredsInUse: s.xdsCredsInUse,
  226. XDSClient: s.xdsC,
  227. ModeCallback: func(addr net.Addr, mode connectivity.ServingMode, err error) {
  228. modeUpdateCh.Put(&modeChangeArgs{
  229. addr: addr,
  230. mode: mode,
  231. err: err,
  232. })
  233. },
  234. DrainCallback: func(addr net.Addr) {
  235. if gs, ok := s.gs.(*grpc.Server); ok {
  236. drainServerTransports(gs, addr.String())
  237. }
  238. },
  239. })
  240. // Block until a good LDS response is received or the server is stopped.
  241. select {
  242. case <-s.quit.Done():
  243. // Since the listener has not yet been handed over to gs.Serve(), we
  244. // need to explicitly close the listener. Cancellation of the xDS watch
  245. // is handled by the listenerWrapper.
  246. lw.Close()
  247. modeUpdateCh.Close()
  248. return nil
  249. case <-goodUpdateCh:
  250. }
  251. return s.gs.Serve(lw)
  252. }
  253. // modeChangeArgs wraps argument required for invoking mode change callback.
  254. type modeChangeArgs struct {
  255. addr net.Addr
  256. mode connectivity.ServingMode
  257. err error
  258. }
  259. // handleServingModeChanges runs as a separate goroutine, spawned from Serve().
  260. // It reads a channel on to which mode change arguments are pushed, and in turn
  261. // invokes the user registered callback. It also calls an internal method on the
  262. // underlying grpc.Server to gracefully close existing connections, if the
  263. // listener moved to a "not-serving" mode.
  264. func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
  265. for {
  266. select {
  267. case <-s.quit.Done():
  268. return
  269. case u, ok := <-updateCh.Get():
  270. if !ok {
  271. return
  272. }
  273. updateCh.Load()
  274. args := u.(*modeChangeArgs)
  275. if args.mode == connectivity.ServingModeNotServing {
  276. // We type assert our underlying gRPC server to the real
  277. // grpc.Server here before trying to initiate the drain
  278. // operation. This approach avoids performing the same type
  279. // assertion in the grpc package which provides the
  280. // implementation for internal.GetServerCredentials, and allows
  281. // us to use a fake gRPC server in tests.
  282. if gs, ok := s.gs.(*grpc.Server); ok {
  283. drainServerTransports(gs, args.addr.String())
  284. }
  285. }
  286. // The XdsServer API will allow applications to register a "serving state"
  287. // callback to be invoked when the server begins serving and when the
  288. // server encounters errors that force it to be "not serving". If "not
  289. // serving", the callback must be provided error information, for
  290. // debugging use by developers - A36.
  291. s.opts.modeCallback(args.addr, ServingModeChangeArgs{
  292. Mode: args.mode,
  293. Err: args.err,
  294. })
  295. }
  296. }
  297. }
  298. // Stop stops the underlying gRPC server. It immediately closes all open
  299. // connections. It cancels all active RPCs on the server side and the
  300. // corresponding pending RPCs on the client side will get notified by connection
  301. // errors.
  302. func (s *GRPCServer) Stop() {
  303. s.quit.Fire()
  304. s.gs.Stop()
  305. if s.xdsC != nil {
  306. s.xdsClientClose()
  307. }
  308. }
  309. // GracefulStop stops the underlying gRPC server gracefully. It stops the server
  310. // from accepting new connections and RPCs and blocks until all the pending RPCs
  311. // are finished.
  312. func (s *GRPCServer) GracefulStop() {
  313. s.quit.Fire()
  314. s.gs.GracefulStop()
  315. if s.xdsC != nil {
  316. s.xdsClientClose()
  317. }
  318. }
  319. // routeAndProcess routes the incoming RPC to a configured route in the route
  320. // table and also processes the RPC by running the incoming RPC through any HTTP
  321. // Filters configured.
  322. func routeAndProcess(ctx context.Context) error {
  323. conn := transport.GetConnection(ctx)
  324. cw, ok := conn.(interface {
  325. VirtualHosts() []xdsresource.VirtualHostWithInterceptors
  326. })
  327. if !ok {
  328. return errors.New("missing virtual hosts in incoming context")
  329. }
  330. mn, ok := grpc.Method(ctx)
  331. if !ok {
  332. return errors.New("missing method name in incoming context")
  333. }
  334. md, ok := metadata.FromIncomingContext(ctx)
  335. if !ok {
  336. return errors.New("missing metadata in incoming context")
  337. }
  338. // A41 added logic to the core grpc implementation to guarantee that once
  339. // the RPC gets to this point, there will be a single, unambiguous authority
  340. // present in the header map.
  341. authority := md.Get(":authority")
  342. vh := xdsresource.FindBestMatchingVirtualHostServer(authority[0], cw.VirtualHosts())
  343. if vh == nil {
  344. return status.Error(codes.Unavailable, "the incoming RPC did not match a configured Virtual Host")
  345. }
  346. var rwi *xdsresource.RouteWithInterceptors
  347. rpcInfo := iresolver.RPCInfo{
  348. Context: ctx,
  349. Method: mn,
  350. }
  351. for _, r := range vh.Routes {
  352. if r.M.Match(rpcInfo) {
  353. // "NonForwardingAction is expected for all Routes used on server-side; a route with an inappropriate action causes
  354. // RPCs matching that route to fail with UNAVAILABLE." - A36
  355. if r.ActionType != xdsresource.RouteActionNonForwardingAction {
  356. return status.Error(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding")
  357. }
  358. rwi = &r
  359. break
  360. }
  361. }
  362. if rwi == nil {
  363. return status.Error(codes.Unavailable, "the incoming RPC did not match a configured Route")
  364. }
  365. for _, interceptor := range rwi.Interceptors {
  366. if err := interceptor.AllowRPC(ctx); err != nil {
  367. return status.Errorf(codes.PermissionDenied, "Incoming RPC is not allowed: %v", err)
  368. }
  369. }
  370. return nil
  371. }
  372. // xdsUnaryInterceptor is the unary interceptor added to the gRPC server to
  373. // perform any xDS specific functionality on unary RPCs.
  374. func xdsUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  375. if envconfig.XDSRBAC {
  376. if err := routeAndProcess(ctx); err != nil {
  377. return nil, err
  378. }
  379. }
  380. return handler(ctx, req)
  381. }
  382. // xdsStreamInterceptor is the stream interceptor added to the gRPC server to
  383. // perform any xDS specific functionality on streaming RPCs.
  384. func xdsStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  385. if envconfig.XDSRBAC {
  386. if err := routeAndProcess(ss.Context()); err != nil {
  387. return err
  388. }
  389. }
  390. return handler(srv, ss)
  391. }