control_channel.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. /*
  2. *
  3. * Copyright 2021 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package rls
  19. import (
  20. "context"
  21. "fmt"
  22. "time"
  23. "google.golang.org/grpc"
  24. "google.golang.org/grpc/balancer"
  25. "google.golang.org/grpc/balancer/rls/internal/adaptive"
  26. "google.golang.org/grpc/connectivity"
  27. "google.golang.org/grpc/credentials/insecure"
  28. "google.golang.org/grpc/internal"
  29. internalgrpclog "google.golang.org/grpc/internal/grpclog"
  30. "google.golang.org/grpc/internal/pretty"
  31. rlsgrpc "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
  32. rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
  33. )
  34. var newAdaptiveThrottler = func() adaptiveThrottler { return adaptive.New() }
  35. type adaptiveThrottler interface {
  36. ShouldThrottle() bool
  37. RegisterBackendResponse(throttled bool)
  38. }
  39. // controlChannel is a wrapper around the gRPC channel to the RLS server
  40. // specified in the service config.
  41. type controlChannel struct {
  42. // rpcTimeout specifies the timeout for the RouteLookup RPC call. The LB
  43. // policy receives this value in its service config.
  44. rpcTimeout time.Duration
  45. // backToReadyFunc is a callback to be invoked when the connectivity state
  46. // changes from READY --> TRANSIENT_FAILURE --> READY.
  47. backToReadyFunc func()
  48. // throttler in an adaptive throttling implementation used to avoid
  49. // hammering the RLS service while it is overloaded or down.
  50. throttler adaptiveThrottler
  51. cc *grpc.ClientConn
  52. client rlsgrpc.RouteLookupServiceClient
  53. logger *internalgrpclog.PrefixLogger
  54. }
  55. // newControlChannel creates a controlChannel to rlsServerName and uses
  56. // serviceConfig, if non-empty, as the default service config for the underlying
  57. // gRPC channel.
  58. func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) {
  59. ctrlCh := &controlChannel{
  60. rpcTimeout: rpcTimeout,
  61. backToReadyFunc: backToReadyFunc,
  62. throttler: newAdaptiveThrottler(),
  63. }
  64. ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))
  65. dopts, err := ctrlCh.dialOpts(bOpts, serviceConfig)
  66. if err != nil {
  67. return nil, err
  68. }
  69. ctrlCh.cc, err = grpc.Dial(rlsServerName, dopts...)
  70. if err != nil {
  71. return nil, err
  72. }
  73. ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc)
  74. ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName)
  75. go ctrlCh.monitorConnectivityState()
  76. return ctrlCh, nil
  77. }
  78. // dialOpts constructs the dial options for the control plane channel.
  79. func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig string) ([]grpc.DialOption, error) {
  80. // The control plane channel will use the same authority as the parent
  81. // channel for server authorization. This ensures that the identity of the
  82. // RLS server and the identity of the backends is the same, so if the RLS
  83. // config is injected by an attacker, it cannot cause leakage of private
  84. // information contained in headers set by the application.
  85. dopts := []grpc.DialOption{grpc.WithAuthority(bOpts.Authority)}
  86. if bOpts.Dialer != nil {
  87. dopts = append(dopts, grpc.WithContextDialer(bOpts.Dialer))
  88. }
  89. // The control channel will use the channel credentials from the parent
  90. // channel, including any call creds associated with the channel creds.
  91. var credsOpt grpc.DialOption
  92. switch {
  93. case bOpts.DialCreds != nil:
  94. credsOpt = grpc.WithTransportCredentials(bOpts.DialCreds.Clone())
  95. case bOpts.CredsBundle != nil:
  96. // The "fallback" mode in google default credentials (which is the only
  97. // type of credentials we expect to be used with RLS) uses TLS/ALTS
  98. // creds for transport and uses the same call creds as that on the
  99. // parent bundle.
  100. bundle, err := bOpts.CredsBundle.NewWithMode(internal.CredsBundleModeFallback)
  101. if err != nil {
  102. return nil, err
  103. }
  104. credsOpt = grpc.WithCredentialsBundle(bundle)
  105. default:
  106. cc.logger.Warningf("no credentials available, using Insecure")
  107. credsOpt = grpc.WithTransportCredentials(insecure.NewCredentials())
  108. }
  109. dopts = append(dopts, credsOpt)
  110. // If the RLS LB policy's configuration specified a service config for the
  111. // control channel, use that and disable service config fetching via the name
  112. // resolver for the control channel.
  113. if serviceConfig != "" {
  114. cc.logger.Infof("Disabling service config from the name resolver and instead using: %s", serviceConfig)
  115. dopts = append(dopts, grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(serviceConfig))
  116. }
  117. return dopts, nil
  118. }
  119. func (cc *controlChannel) monitorConnectivityState() {
  120. cc.logger.Infof("Starting connectivity state monitoring goroutine")
  121. // Since we use two mechanisms to deal with RLS server being down:
  122. // - adaptive throttling for the channel as a whole
  123. // - exponential backoff on a per-request basis
  124. // we need a way to avoid double-penalizing requests by counting failures
  125. // toward both mechanisms when the RLS server is unreachable.
  126. //
  127. // To accomplish this, we monitor the state of the control plane channel. If
  128. // the state has been TRANSIENT_FAILURE since the last time it was in state
  129. // READY, and it then transitions into state READY, we push on a channel
  130. // which is being read by the LB policy.
  131. //
  132. // The LB the policy will iterate through the cache to reset the backoff
  133. // timeouts in all cache entries. Specifically, this means that it will
  134. // reset the backoff state and cancel the pending backoff timer. Note that
  135. // when cancelling the backoff timer, just like when the backoff timer fires
  136. // normally, a new picker is returned to the channel, to force it to
  137. // re-process any wait-for-ready RPCs that may still be queued if we failed
  138. // them while we were in backoff. However, we should optimize this case by
  139. // returning only one new picker, regardless of how many backoff timers are
  140. // cancelled.
  141. // Using the background context is fine here since we check for the ClientConn
  142. // entering SHUTDOWN and return early in that case.
  143. ctx := context.Background()
  144. first := true
  145. for {
  146. // Wait for the control channel to become READY.
  147. for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
  148. if s == connectivity.Shutdown {
  149. return
  150. }
  151. cc.cc.WaitForStateChange(ctx, s)
  152. }
  153. cc.logger.Infof("Connectivity state is READY")
  154. if !first {
  155. cc.logger.Infof("Control channel back to READY")
  156. cc.backToReadyFunc()
  157. }
  158. first = false
  159. // Wait for the control channel to move out of READY.
  160. cc.cc.WaitForStateChange(ctx, connectivity.Ready)
  161. if cc.cc.GetState() == connectivity.Shutdown {
  162. return
  163. }
  164. cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())
  165. }
  166. }
  167. func (cc *controlChannel) close() {
  168. cc.logger.Infof("Closing control channel")
  169. cc.cc.Close()
  170. }
  171. type lookupCallback func(targets []string, headerData string, err error)
  172. // lookup starts a RouteLookup RPC in a separate goroutine and returns the
  173. // results (and error, if any) in the provided callback.
  174. //
  175. // The returned boolean indicates whether the request was throttled by the
  176. // client-side adaptive throttling algorithm in which case the provided callback
  177. // will not be invoked.
  178. func (cc *controlChannel) lookup(reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string, cb lookupCallback) (throttled bool) {
  179. if cc.throttler.ShouldThrottle() {
  180. cc.logger.Infof("RLS request throttled by client-side adaptive throttling")
  181. return true
  182. }
  183. go func() {
  184. req := &rlspb.RouteLookupRequest{
  185. TargetType: "grpc",
  186. KeyMap: reqKeys,
  187. Reason: reason,
  188. StaleHeaderData: staleHeaders,
  189. }
  190. cc.logger.Infof("Sending RLS request %+v", pretty.ToJSON(req))
  191. ctx, cancel := context.WithTimeout(context.Background(), cc.rpcTimeout)
  192. defer cancel()
  193. resp, err := cc.client.RouteLookup(ctx, req)
  194. cb(resp.GetTargets(), resp.GetHeaderData(), err)
  195. }()
  196. return false
  197. }