123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- /*
- *
- * Copyright 2022 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package rls
- import (
- "errors"
- "fmt"
- "strings"
- "sync/atomic"
- "time"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/balancer/rls/internal/keys"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/connectivity"
- internalgrpclog "google.golang.org/grpc/internal/grpclog"
- rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
- )
- var (
- errRLSThrottled = errors.New("RLS call throttled at client side")
- // Function to compute data cache entry size.
- computeDataCacheEntrySize = dcEntrySize
- )
- // exitIdler wraps the only method on the BalancerGroup that the picker calls.
- type exitIdler interface {
- ExitIdleOne(id string)
- }
- // rlsPicker selects the subConn to be used for a particular RPC. It does not
- // manage subConns directly and delegates to pickers provided by child policies.
- type rlsPicker struct {
- // The keyBuilder map used to generate RLS keys for the RPC. This is built
- // by the LB policy based on the received ServiceConfig.
- kbm keys.BuilderMap
- // Endpoint from the user's original dial target. Used to set the `host_key`
- // field in `extra_keys`.
- origEndpoint string
- lb *rlsBalancer
- // The picker is given its own copy of the below fields from the RLS LB policy
- // to avoid having to grab the mutex on the latter.
- defaultPolicy *childPolicyWrapper // Child policy for the default target.
- ctrlCh *controlChannel // Control channel to the RLS server.
- maxAge time.Duration // Cache max age from LB config.
- staleAge time.Duration // Cache stale age from LB config.
- bg exitIdler
- logger *internalgrpclog.PrefixLogger
- }
- // isFullMethodNameValid return true if name is of the form `/service/method`.
- func isFullMethodNameValid(name string) bool {
- return strings.HasPrefix(name, "/") && strings.Count(name, "/") == 2
- }
- // Pick makes the routing decision for every outbound RPC.
- func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
- if name := info.FullMethodName; !isFullMethodNameValid(name) {
- return balancer.PickResult{}, fmt.Errorf("rls: method name %q is not of the form '/service/method", name)
- }
- // Build the request's keys using the key builders from LB config.
- md, _ := metadata.FromOutgoingContext(info.Ctx)
- reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)
- p.lb.cacheMu.Lock()
- defer p.lb.cacheMu.Unlock()
- // Lookup data cache and pending request map using request path and keys.
- cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
- dcEntry := p.lb.dataCache.getEntry(cacheKey)
- pendingEntry := p.lb.pendingMap[cacheKey]
- now := time.Now()
- switch {
- // No data cache entry. No pending request.
- case dcEntry == nil && pendingEntry == nil:
- throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
- if throttled {
- return p.useDefaultPickIfPossible(info, errRLSThrottled)
- }
- return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
- // No data cache entry. Pending request exits.
- case dcEntry == nil && pendingEntry != nil:
- return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
- // Data cache hit. No pending request.
- case dcEntry != nil && pendingEntry == nil:
- if dcEntry.expiryTime.After(now) {
- if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) {
- p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
- }
- // Delegate to child policies.
- res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
- return res, err
- }
- // We get here only if the data cache entry has expired. If entry is in
- // backoff, delegate to default target or fail the pick.
- if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
- // Avoid propagating the status code received on control plane RPCs to the
- // data plane which can lead to unexpected outcomes as we do not control
- // the status code sent by the control plane. Propagating the status
- // message received from the control plane is still fine, as it could be
- // useful for debugging purposes.
- st := dcEntry.status
- return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
- }
- // We get here only if the entry has expired and is not in backoff.
- throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
- if throttled {
- return p.useDefaultPickIfPossible(info, errRLSThrottled)
- }
- return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
- // Data cache hit. Pending request exists.
- default:
- if dcEntry.expiryTime.After(now) {
- res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
- return res, err
- }
- // Data cache entry has expired and pending request exists. Queue pick.
- return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
- }
- }
- // delegateToChildPoliciesLocked is a helper function which iterates through the
- // list of child policy wrappers in a cache entry and attempts to find a child
- // policy to which this RPC can be routed to. If all child policies are in
- // TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
- func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
- const rlsDataHeaderName = "x-google-rls-data"
- for i, cpw := range dcEntry.childPolicyWrappers {
- state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
- // Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if
- // it is the last one (which handles the case of delegating to the last
- // child picker if all child polcies are in TRANSIENT_FAILURE).
- if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 {
- // Any header data received from the RLS server is stored in the
- // cache entry and needs to be sent to the actual backend in the
- // X-Google-RLS-Data header.
- res, err := state.Picker.Pick(info)
- if err != nil {
- return res, err
- }
- if res.Metadata == nil {
- res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
- } else {
- res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData)
- }
- return res, nil
- }
- }
- // In the unlikely event that we have a cache entry with no targets, we end up
- // queueing the RPC.
- return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
- }
- // useDefaultPickIfPossible is a helper method which delegates to the default
- // target if one is configured, or fails the pick with the given error.
- func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
- if p.defaultPolicy != nil {
- state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
- return state.Picker.Pick(info)
- }
- return balancer.PickResult{}, errOnNoDefault
- }
- // sendRouteLookupRequestLocked adds an entry to the pending request map and
- // sends out an RLS request using the passed in arguments. Returns a value
- // indicating if the request was throttled by the client-side adaptive
- // throttler.
- func (p *rlsPicker) sendRouteLookupRequestLocked(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string) bool {
- if p.lb.pendingMap[cacheKey] != nil {
- return false
- }
- p.lb.pendingMap[cacheKey] = bs
- throttled := p.ctrlCh.lookup(reqKeys, reason, staleHeaders, func(targets []string, headerData string, err error) {
- p.handleRouteLookupResponse(cacheKey, targets, headerData, err)
- })
- if throttled {
- delete(p.lb.pendingMap, cacheKey)
- }
- return throttled
- }
- // handleRouteLookupResponse is the callback invoked by the control channel upon
- // receipt of an RLS response. Modifies the data cache and pending requests map
- // and sends a new picker.
- //
- // Acquires the write-lock on the cache. Caller must not hold p.lb.cacheMu.
- func (p *rlsPicker) handleRouteLookupResponse(cacheKey cacheKey, targets []string, headerData string, err error) {
- p.logger.Infof("Received RLS response for key %+v with targets %+v, headerData %q, err: %v", cacheKey, targets, headerData, err)
- p.lb.cacheMu.Lock()
- defer func() {
- // Pending request map entry is unconditionally deleted since the request is
- // no longer pending.
- p.logger.Infof("Removing pending request entry for key %+v", cacheKey)
- delete(p.lb.pendingMap, cacheKey)
- p.lb.sendNewPicker()
- p.lb.cacheMu.Unlock()
- }()
- // Lookup the data cache entry or create a new one.
- dcEntry := p.lb.dataCache.getEntry(cacheKey)
- if dcEntry == nil {
- dcEntry = &cacheEntry{}
- if _, ok := p.lb.dataCache.addEntry(cacheKey, dcEntry); !ok {
- // This is a very unlikely case where we are unable to add a
- // data cache entry. Log and leave.
- p.logger.Warningf("Failed to add data cache entry for %+v", cacheKey)
- return
- }
- }
- // For failed requests, the data cache entry is modified as follows:
- // - status is set to error returned from the control channel
- // - current backoff state is available in the pending entry
- // - `retries` field is incremented and
- // - backoff state is moved to the data cache
- // - backoffTime is set to the time indicated by the backoff state
- // - backoffExpirationTime is set to twice the backoff time
- // - backoffTimer is set to fire after backoffTime
- //
- // When a proactive cache refresh fails, this would leave the targets and the
- // expiry time from the old entry unchanged. And this mean that the old valid
- // entry would be used until expiration, and a new picker would be sent upon
- // backoff expiry.
- now := time.Now()
- if err != nil {
- dcEntry.status = err
- pendingEntry := p.lb.pendingMap[cacheKey]
- pendingEntry.retries++
- backoffTime := pendingEntry.bs.Backoff(pendingEntry.retries)
- dcEntry.backoffState = pendingEntry
- dcEntry.backoffTime = now.Add(backoffTime)
- dcEntry.backoffExpiryTime = now.Add(2 * backoffTime)
- if dcEntry.backoffState.timer != nil {
- dcEntry.backoffState.timer.Stop()
- }
- dcEntry.backoffState.timer = time.AfterFunc(backoffTime, p.lb.sendNewPicker)
- return
- }
- // For successful requests, the cache entry is modified as follows:
- // - childPolicyWrappers is set to point to the child policy wrappers
- // associated with the targets specified in the received response
- // - headerData is set to the value received in the response
- // - expiryTime, stateTime and earliestEvictionTime are set
- // - status is set to nil (OK status)
- // - backoff state is cleared
- p.setChildPolicyWrappersInCacheEntry(dcEntry, targets)
- dcEntry.headerData = headerData
- dcEntry.expiryTime = now.Add(p.maxAge)
- if p.staleAge != 0 {
- dcEntry.staleTime = now.Add(p.staleAge)
- }
- dcEntry.earliestEvictTime = now.Add(minEvictDuration)
- dcEntry.status = nil
- dcEntry.backoffState = &backoffState{bs: defaultBackoffStrategy}
- dcEntry.backoffTime = time.Time{}
- dcEntry.backoffExpiryTime = time.Time{}
- p.lb.dataCache.updateEntrySize(dcEntry, computeDataCacheEntrySize(cacheKey, dcEntry))
- }
- // setChildPolicyWrappersInCacheEntry sets up the childPolicyWrappers field in
- // the cache entry to point to the child policy wrappers for the targets
- // specified in the RLS response.
- //
- // Caller must hold a write-lock on p.lb.cacheMu.
- func (p *rlsPicker) setChildPolicyWrappersInCacheEntry(dcEntry *cacheEntry, newTargets []string) {
- // If the childPolicyWrappers field is already pointing to the right targets,
- // then the field's value does not need to change.
- targetsChanged := true
- func() {
- if cpws := dcEntry.childPolicyWrappers; cpws != nil {
- if len(newTargets) != len(cpws) {
- return
- }
- for i, target := range newTargets {
- if cpws[i].target != target {
- return
- }
- }
- targetsChanged = false
- }
- }()
- if !targetsChanged {
- return
- }
- // If the childPolicyWrappers field is not already set to the right targets,
- // then it must be reset. We construct a new list of child policies and
- // then swap out the old list for the new one.
- newChildPolicies := p.lb.acquireChildPolicyReferences(newTargets)
- oldChildPolicyTargets := make([]string, len(dcEntry.childPolicyWrappers))
- for i, cpw := range dcEntry.childPolicyWrappers {
- oldChildPolicyTargets[i] = cpw.target
- }
- p.lb.releaseChildPolicyReferences(oldChildPolicyTargets)
- dcEntry.childPolicyWrappers = newChildPolicies
- }
- func dcEntrySize(key cacheKey, entry *cacheEntry) int64 {
- return int64(len(key.path) + len(key.keys) + len(entry.headerData))
- }
|