123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- /*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package xds
- import (
- "context"
- "errors"
- "fmt"
- "net"
- "sync"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/credentials"
- "google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal"
- "google.golang.org/grpc/internal/buffer"
- "google.golang.org/grpc/internal/envconfig"
- internalgrpclog "google.golang.org/grpc/internal/grpclog"
- "google.golang.org/grpc/internal/grpcsync"
- iresolver "google.golang.org/grpc/internal/resolver"
- "google.golang.org/grpc/internal/transport"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
- "google.golang.org/grpc/xds/internal/server"
- "google.golang.org/grpc/xds/internal/xdsclient"
- "google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
- "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
- )
- const serverPrefix = "[xds-server %p] "
- var (
- // These new functions will be overridden in unit tests.
- newXDSClient = func() (xdsclient.XDSClient, func(), error) {
- return xdsclient.New()
- }
- newGRPCServer = func(opts ...grpc.ServerOption) grpcServer {
- return grpc.NewServer(opts...)
- }
- grpcGetServerCreds = internal.GetServerCredentials.(func(*grpc.Server) credentials.TransportCredentials)
- drainServerTransports = internal.DrainServerTransports.(func(*grpc.Server, string))
- logger = grpclog.Component("xds")
- )
- // grpcServer contains methods from grpc.Server which are used by the
- // GRPCServer type here. This is useful for overriding in unit tests.
- type grpcServer interface {
- RegisterService(*grpc.ServiceDesc, interface{})
- Serve(net.Listener) error
- Stop()
- GracefulStop()
- GetServiceInfo() map[string]grpc.ServiceInfo
- }
- // GRPCServer wraps a gRPC server and provides server-side xDS functionality, by
- // communication with a management server using xDS APIs. It implements the
- // grpc.ServiceRegistrar interface and can be passed to service registration
- // functions in IDL generated code.
- type GRPCServer struct {
- gs grpcServer
- quit *grpcsync.Event
- logger *internalgrpclog.PrefixLogger
- xdsCredsInUse bool
- opts *serverOptions
- // clientMu is used only in initXDSClient(), which is called at the
- // beginning of Serve(), where we have to decide if we have to create a
- // client or use an existing one.
- clientMu sync.Mutex
- xdsC xdsclient.XDSClient
- xdsClientClose func()
- }
- // NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts.
- // The underlying gRPC server has no service registered and has not started to
- // accept requests yet.
- func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
- newOpts := []grpc.ServerOption{
- grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
- grpc.ChainStreamInterceptor(xdsStreamInterceptor),
- }
- newOpts = append(newOpts, opts...)
- s := &GRPCServer{
- gs: newGRPCServer(newOpts...),
- quit: grpcsync.NewEvent(),
- }
- s.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, s))
- s.logger.Infof("Created xds.GRPCServer")
- s.handleServerOptions(opts)
- // We type assert our underlying gRPC server to the real grpc.Server here
- // before trying to retrieve the configured credentials. This approach
- // avoids performing the same type assertion in the grpc package which
- // provides the implementation for internal.GetServerCredentials, and allows
- // us to use a fake gRPC server in tests.
- if gs, ok := s.gs.(*grpc.Server); ok {
- creds := grpcGetServerCreds(gs)
- if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() {
- s.xdsCredsInUse = true
- }
- }
- s.logger.Infof("xDS credentials in use: %v", s.xdsCredsInUse)
- return s
- }
- // handleServerOptions iterates through the list of server options passed in by
- // the user, and handles the xDS server specific options.
- func (s *GRPCServer) handleServerOptions(opts []grpc.ServerOption) {
- so := s.defaultServerOptions()
- for _, opt := range opts {
- if o, ok := opt.(*serverOption); ok {
- o.apply(so)
- }
- }
- s.opts = so
- }
- func (s *GRPCServer) defaultServerOptions() *serverOptions {
- return &serverOptions{
- // A default serving mode change callback which simply logs at the
- // default-visible log level. This will be used if the application does not
- // register a mode change callback.
- //
- // Note that this means that `s.opts.modeCallback` will never be nil and can
- // safely be invoked directly from `handleServingModeChanges`.
- modeCallback: s.loggingServerModeChangeCallback,
- }
- }
- func (s *GRPCServer) loggingServerModeChangeCallback(addr net.Addr, args ServingModeChangeArgs) {
- switch args.Mode {
- case connectivity.ServingModeServing:
- s.logger.Errorf("Listener %q entering mode: %q", addr.String(), args.Mode)
- case connectivity.ServingModeNotServing:
- s.logger.Errorf("Listener %q entering mode: %q due to error: %v", addr.String(), args.Mode, args.Err)
- }
- }
- // RegisterService registers a service and its implementation to the underlying
- // gRPC server. It is called from the IDL generated code. This must be called
- // before invoking Serve.
- func (s *GRPCServer) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
- s.gs.RegisterService(sd, ss)
- }
- // GetServiceInfo returns a map from service names to ServiceInfo.
- // Service names include the package names, in the form of <package>.<service>.
- func (s *GRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo {
- return s.gs.GetServiceInfo()
- }
- // initXDSClient creates a new xdsClient if there is no existing one available.
- func (s *GRPCServer) initXDSClient() error {
- s.clientMu.Lock()
- defer s.clientMu.Unlock()
- if s.xdsC != nil {
- return nil
- }
- newXDSClient := newXDSClient
- if s.opts.bootstrapContentsForTesting != nil {
- // Bootstrap file contents may be specified as a server option for tests.
- newXDSClient = func() (xdsclient.XDSClient, func(), error) {
- return xdsclient.NewWithBootstrapContentsForTesting(s.opts.bootstrapContentsForTesting)
- }
- }
- client, close, err := newXDSClient()
- if err != nil {
- return fmt.Errorf("xds: failed to create xds-client: %v", err)
- }
- s.xdsC = client
- s.xdsClientClose = close
- return nil
- }
- // Serve gets the underlying gRPC server to accept incoming connections on the
- // listener lis, which is expected to be listening on a TCP port.
- //
- // A connection to the management server, to receive xDS configuration, is
- // initiated here.
- //
- // Serve will return a non-nil error unless Stop or GracefulStop is called.
- func (s *GRPCServer) Serve(lis net.Listener) error {
- s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String())
- if _, ok := lis.Addr().(*net.TCPAddr); !ok {
- return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr())
- }
- // If this is the first time Serve() is being called, we need to initialize
- // our xdsClient. If not, we can use the existing one.
- if err := s.initXDSClient(); err != nil {
- return err
- }
- cfg := s.xdsC.BootstrapConfig()
- if cfg == nil {
- return errors.New("bootstrap configuration is empty")
- }
- // If xds credentials were specified by the user, but bootstrap configs do
- // not contain any certificate provider configuration, it is better to fail
- // right now rather than failing when attempting to create certificate
- // providers after receiving an LDS response with security configuration.
- if s.xdsCredsInUse {
- if len(cfg.CertProviderConfigs) == 0 {
- return errors.New("xds: certificate_providers config missing in bootstrap file")
- }
- }
- // The server listener resource name template from the bootstrap
- // configuration contains a template for the name of the Listener resource
- // to subscribe to for a gRPC server. If the token `%s` is present in the
- // string, it will be replaced with the server's listening "IP:port" (e.g.,
- // "0.0.0.0:8080", "[::]:8080"). The absence of a template will be treated
- // as an error since we do not have any default value for this.
- if cfg.ServerListenerResourceNameTemplate == "" {
- return errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
- }
- name := bootstrap.PopulateResourceTemplate(cfg.ServerListenerResourceNameTemplate, lis.Addr().String())
- modeUpdateCh := buffer.NewUnbounded()
- go func() {
- s.handleServingModeChanges(modeUpdateCh)
- }()
- // Create a listenerWrapper which handles all functionality required by
- // this particular instance of Serve().
- lw, goodUpdateCh := server.NewListenerWrapper(server.ListenerWrapperParams{
- Listener: lis,
- ListenerResourceName: name,
- XDSCredsInUse: s.xdsCredsInUse,
- XDSClient: s.xdsC,
- ModeCallback: func(addr net.Addr, mode connectivity.ServingMode, err error) {
- modeUpdateCh.Put(&modeChangeArgs{
- addr: addr,
- mode: mode,
- err: err,
- })
- },
- DrainCallback: func(addr net.Addr) {
- if gs, ok := s.gs.(*grpc.Server); ok {
- drainServerTransports(gs, addr.String())
- }
- },
- })
- // Block until a good LDS response is received or the server is stopped.
- select {
- case <-s.quit.Done():
- // Since the listener has not yet been handed over to gs.Serve(), we
- // need to explicitly close the listener. Cancellation of the xDS watch
- // is handled by the listenerWrapper.
- lw.Close()
- modeUpdateCh.Close()
- return nil
- case <-goodUpdateCh:
- }
- return s.gs.Serve(lw)
- }
- // modeChangeArgs wraps argument required for invoking mode change callback.
- type modeChangeArgs struct {
- addr net.Addr
- mode connectivity.ServingMode
- err error
- }
- // handleServingModeChanges runs as a separate goroutine, spawned from Serve().
- // It reads a channel on to which mode change arguments are pushed, and in turn
- // invokes the user registered callback. It also calls an internal method on the
- // underlying grpc.Server to gracefully close existing connections, if the
- // listener moved to a "not-serving" mode.
- func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
- for {
- select {
- case <-s.quit.Done():
- return
- case u, ok := <-updateCh.Get():
- if !ok {
- return
- }
- updateCh.Load()
- args := u.(*modeChangeArgs)
- if args.mode == connectivity.ServingModeNotServing {
- // We type assert our underlying gRPC server to the real
- // grpc.Server here before trying to initiate the drain
- // operation. This approach avoids performing the same type
- // assertion in the grpc package which provides the
- // implementation for internal.GetServerCredentials, and allows
- // us to use a fake gRPC server in tests.
- if gs, ok := s.gs.(*grpc.Server); ok {
- drainServerTransports(gs, args.addr.String())
- }
- }
- // The XdsServer API will allow applications to register a "serving state"
- // callback to be invoked when the server begins serving and when the
- // server encounters errors that force it to be "not serving". If "not
- // serving", the callback must be provided error information, for
- // debugging use by developers - A36.
- s.opts.modeCallback(args.addr, ServingModeChangeArgs{
- Mode: args.mode,
- Err: args.err,
- })
- }
- }
- }
- // Stop stops the underlying gRPC server. It immediately closes all open
- // connections. It cancels all active RPCs on the server side and the
- // corresponding pending RPCs on the client side will get notified by connection
- // errors.
- func (s *GRPCServer) Stop() {
- s.quit.Fire()
- s.gs.Stop()
- if s.xdsC != nil {
- s.xdsClientClose()
- }
- }
- // GracefulStop stops the underlying gRPC server gracefully. It stops the server
- // from accepting new connections and RPCs and blocks until all the pending RPCs
- // are finished.
- func (s *GRPCServer) GracefulStop() {
- s.quit.Fire()
- s.gs.GracefulStop()
- if s.xdsC != nil {
- s.xdsClientClose()
- }
- }
- // routeAndProcess routes the incoming RPC to a configured route in the route
- // table and also processes the RPC by running the incoming RPC through any HTTP
- // Filters configured.
- func routeAndProcess(ctx context.Context) error {
- conn := transport.GetConnection(ctx)
- cw, ok := conn.(interface {
- VirtualHosts() []xdsresource.VirtualHostWithInterceptors
- })
- if !ok {
- return errors.New("missing virtual hosts in incoming context")
- }
- mn, ok := grpc.Method(ctx)
- if !ok {
- return errors.New("missing method name in incoming context")
- }
- md, ok := metadata.FromIncomingContext(ctx)
- if !ok {
- return errors.New("missing metadata in incoming context")
- }
- // A41 added logic to the core grpc implementation to guarantee that once
- // the RPC gets to this point, there will be a single, unambiguous authority
- // present in the header map.
- authority := md.Get(":authority")
- vh := xdsresource.FindBestMatchingVirtualHostServer(authority[0], cw.VirtualHosts())
- if vh == nil {
- return status.Error(codes.Unavailable, "the incoming RPC did not match a configured Virtual Host")
- }
- var rwi *xdsresource.RouteWithInterceptors
- rpcInfo := iresolver.RPCInfo{
- Context: ctx,
- Method: mn,
- }
- for _, r := range vh.Routes {
- if r.M.Match(rpcInfo) {
- // "NonForwardingAction is expected for all Routes used on server-side; a route with an inappropriate action causes
- // RPCs matching that route to fail with UNAVAILABLE." - A36
- if r.ActionType != xdsresource.RouteActionNonForwardingAction {
- return status.Error(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding")
- }
- rwi = &r
- break
- }
- }
- if rwi == nil {
- return status.Error(codes.Unavailable, "the incoming RPC did not match a configured Route")
- }
- for _, interceptor := range rwi.Interceptors {
- if err := interceptor.AllowRPC(ctx); err != nil {
- return status.Errorf(codes.PermissionDenied, "Incoming RPC is not allowed: %v", err)
- }
- }
- return nil
- }
- // xdsUnaryInterceptor is the unary interceptor added to the gRPC server to
- // perform any xDS specific functionality on unary RPCs.
- func xdsUnaryInterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- if envconfig.XDSRBAC {
- if err := routeAndProcess(ctx); err != nil {
- return nil, err
- }
- }
- return handler(ctx, req)
- }
- // xdsStreamInterceptor is the stream interceptor added to the gRPC server to
- // perform any xDS specific functionality on streaming RPCs.
- func xdsStreamInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- if envconfig.XDSRBAC {
- if err := routeAndProcess(ss.Context()); err != nil {
- return err
- }
- }
- return handler(srv, ss)
- }
|