123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- /*
- *
- * Copyright 2021 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package rls
- import (
- "context"
- "fmt"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/balancer/rls/internal/adaptive"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/internal"
- internalgrpclog "google.golang.org/grpc/internal/grpclog"
- "google.golang.org/grpc/internal/pretty"
- rlsgrpc "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
- rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
- )
- var newAdaptiveThrottler = func() adaptiveThrottler { return adaptive.New() }
- type adaptiveThrottler interface {
- ShouldThrottle() bool
- RegisterBackendResponse(throttled bool)
- }
- // controlChannel is a wrapper around the gRPC channel to the RLS server
- // specified in the service config.
- type controlChannel struct {
- // rpcTimeout specifies the timeout for the RouteLookup RPC call. The LB
- // policy receives this value in its service config.
- rpcTimeout time.Duration
- // backToReadyFunc is a callback to be invoked when the connectivity state
- // changes from READY --> TRANSIENT_FAILURE --> READY.
- backToReadyFunc func()
- // throttler in an adaptive throttling implementation used to avoid
- // hammering the RLS service while it is overloaded or down.
- throttler adaptiveThrottler
- cc *grpc.ClientConn
- client rlsgrpc.RouteLookupServiceClient
- logger *internalgrpclog.PrefixLogger
- }
- // newControlChannel creates a controlChannel to rlsServerName and uses
- // serviceConfig, if non-empty, as the default service config for the underlying
- // gRPC channel.
- func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) {
- ctrlCh := &controlChannel{
- rpcTimeout: rpcTimeout,
- backToReadyFunc: backToReadyFunc,
- throttler: newAdaptiveThrottler(),
- }
- ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))
- dopts, err := ctrlCh.dialOpts(bOpts, serviceConfig)
- if err != nil {
- return nil, err
- }
- ctrlCh.cc, err = grpc.Dial(rlsServerName, dopts...)
- if err != nil {
- return nil, err
- }
- ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc)
- ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName)
- go ctrlCh.monitorConnectivityState()
- return ctrlCh, nil
- }
- // dialOpts constructs the dial options for the control plane channel.
- func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig string) ([]grpc.DialOption, error) {
- // The control plane channel will use the same authority as the parent
- // channel for server authorization. This ensures that the identity of the
- // RLS server and the identity of the backends is the same, so if the RLS
- // config is injected by an attacker, it cannot cause leakage of private
- // information contained in headers set by the application.
- dopts := []grpc.DialOption{grpc.WithAuthority(bOpts.Authority)}
- if bOpts.Dialer != nil {
- dopts = append(dopts, grpc.WithContextDialer(bOpts.Dialer))
- }
- // The control channel will use the channel credentials from the parent
- // channel, including any call creds associated with the channel creds.
- var credsOpt grpc.DialOption
- switch {
- case bOpts.DialCreds != nil:
- credsOpt = grpc.WithTransportCredentials(bOpts.DialCreds.Clone())
- case bOpts.CredsBundle != nil:
- // The "fallback" mode in google default credentials (which is the only
- // type of credentials we expect to be used with RLS) uses TLS/ALTS
- // creds for transport and uses the same call creds as that on the
- // parent bundle.
- bundle, err := bOpts.CredsBundle.NewWithMode(internal.CredsBundleModeFallback)
- if err != nil {
- return nil, err
- }
- credsOpt = grpc.WithCredentialsBundle(bundle)
- default:
- cc.logger.Warningf("no credentials available, using Insecure")
- credsOpt = grpc.WithTransportCredentials(insecure.NewCredentials())
- }
- dopts = append(dopts, credsOpt)
- // If the RLS LB policy's configuration specified a service config for the
- // control channel, use that and disable service config fetching via the name
- // resolver for the control channel.
- if serviceConfig != "" {
- cc.logger.Infof("Disabling service config from the name resolver and instead using: %s", serviceConfig)
- dopts = append(dopts, grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(serviceConfig))
- }
- return dopts, nil
- }
- func (cc *controlChannel) monitorConnectivityState() {
- cc.logger.Infof("Starting connectivity state monitoring goroutine")
- // Since we use two mechanisms to deal with RLS server being down:
- // - adaptive throttling for the channel as a whole
- // - exponential backoff on a per-request basis
- // we need a way to avoid double-penalizing requests by counting failures
- // toward both mechanisms when the RLS server is unreachable.
- //
- // To accomplish this, we monitor the state of the control plane channel. If
- // the state has been TRANSIENT_FAILURE since the last time it was in state
- // READY, and it then transitions into state READY, we push on a channel
- // which is being read by the LB policy.
- //
- // The LB the policy will iterate through the cache to reset the backoff
- // timeouts in all cache entries. Specifically, this means that it will
- // reset the backoff state and cancel the pending backoff timer. Note that
- // when cancelling the backoff timer, just like when the backoff timer fires
- // normally, a new picker is returned to the channel, to force it to
- // re-process any wait-for-ready RPCs that may still be queued if we failed
- // them while we were in backoff. However, we should optimize this case by
- // returning only one new picker, regardless of how many backoff timers are
- // cancelled.
- // Using the background context is fine here since we check for the ClientConn
- // entering SHUTDOWN and return early in that case.
- ctx := context.Background()
- first := true
- for {
- // Wait for the control channel to become READY.
- for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
- if s == connectivity.Shutdown {
- return
- }
- cc.cc.WaitForStateChange(ctx, s)
- }
- cc.logger.Infof("Connectivity state is READY")
- if !first {
- cc.logger.Infof("Control channel back to READY")
- cc.backToReadyFunc()
- }
- first = false
- // Wait for the control channel to move out of READY.
- cc.cc.WaitForStateChange(ctx, connectivity.Ready)
- if cc.cc.GetState() == connectivity.Shutdown {
- return
- }
- cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())
- }
- }
- func (cc *controlChannel) close() {
- cc.logger.Infof("Closing control channel")
- cc.cc.Close()
- }
- type lookupCallback func(targets []string, headerData string, err error)
- // lookup starts a RouteLookup RPC in a separate goroutine and returns the
- // results (and error, if any) in the provided callback.
- //
- // The returned boolean indicates whether the request was throttled by the
- // client-side adaptive throttling algorithm in which case the provided callback
- // will not be invoked.
- func (cc *controlChannel) lookup(reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string, cb lookupCallback) (throttled bool) {
- if cc.throttler.ShouldThrottle() {
- cc.logger.Infof("RLS request throttled by client-side adaptive throttling")
- return true
- }
- go func() {
- req := &rlspb.RouteLookupRequest{
- TargetType: "grpc",
- KeyMap: reqKeys,
- Reason: reason,
- StaleHeaderData: staleHeaders,
- }
- cc.logger.Infof("Sending RLS request %+v", pretty.ToJSON(req))
- ctx, cancel := context.WithTimeout(context.Background(), cc.rpcTimeout)
- defer cancel()
- resp, err := cc.client.RouteLookup(ctx, req)
- cb(resp.GetTargets(), resp.GetHeaderData(), err)
- }()
- return false
- }
|