picker.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. /*
  2. *
  3. * Copyright 2022 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package rls
  19. import (
  20. "errors"
  21. "fmt"
  22. "strings"
  23. "sync/atomic"
  24. "time"
  25. "google.golang.org/grpc/balancer"
  26. "google.golang.org/grpc/balancer/rls/internal/keys"
  27. "google.golang.org/grpc/codes"
  28. "google.golang.org/grpc/connectivity"
  29. internalgrpclog "google.golang.org/grpc/internal/grpclog"
  30. rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
  31. "google.golang.org/grpc/metadata"
  32. "google.golang.org/grpc/status"
  33. )
  34. var (
  35. errRLSThrottled = errors.New("RLS call throttled at client side")
  36. // Function to compute data cache entry size.
  37. computeDataCacheEntrySize = dcEntrySize
  38. )
  39. // exitIdler wraps the only method on the BalancerGroup that the picker calls.
  40. type exitIdler interface {
  41. ExitIdleOne(id string)
  42. }
  43. // rlsPicker selects the subConn to be used for a particular RPC. It does not
  44. // manage subConns directly and delegates to pickers provided by child policies.
  45. type rlsPicker struct {
  46. // The keyBuilder map used to generate RLS keys for the RPC. This is built
  47. // by the LB policy based on the received ServiceConfig.
  48. kbm keys.BuilderMap
  49. // Endpoint from the user's original dial target. Used to set the `host_key`
  50. // field in `extra_keys`.
  51. origEndpoint string
  52. lb *rlsBalancer
  53. // The picker is given its own copy of the below fields from the RLS LB policy
  54. // to avoid having to grab the mutex on the latter.
  55. defaultPolicy *childPolicyWrapper // Child policy for the default target.
  56. ctrlCh *controlChannel // Control channel to the RLS server.
  57. maxAge time.Duration // Cache max age from LB config.
  58. staleAge time.Duration // Cache stale age from LB config.
  59. bg exitIdler
  60. logger *internalgrpclog.PrefixLogger
  61. }
  62. // isFullMethodNameValid return true if name is of the form `/service/method`.
  63. func isFullMethodNameValid(name string) bool {
  64. return strings.HasPrefix(name, "/") && strings.Count(name, "/") == 2
  65. }
  66. // Pick makes the routing decision for every outbound RPC.
  67. func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
  68. if name := info.FullMethodName; !isFullMethodNameValid(name) {
  69. return balancer.PickResult{}, fmt.Errorf("rls: method name %q is not of the form '/service/method", name)
  70. }
  71. // Build the request's keys using the key builders from LB config.
  72. md, _ := metadata.FromOutgoingContext(info.Ctx)
  73. reqKeys := p.kbm.RLSKey(md, p.origEndpoint, info.FullMethodName)
  74. p.lb.cacheMu.Lock()
  75. defer p.lb.cacheMu.Unlock()
  76. // Lookup data cache and pending request map using request path and keys.
  77. cacheKey := cacheKey{path: info.FullMethodName, keys: reqKeys.Str}
  78. dcEntry := p.lb.dataCache.getEntry(cacheKey)
  79. pendingEntry := p.lb.pendingMap[cacheKey]
  80. now := time.Now()
  81. switch {
  82. // No data cache entry. No pending request.
  83. case dcEntry == nil && pendingEntry == nil:
  84. throttled := p.sendRouteLookupRequestLocked(cacheKey, &backoffState{bs: defaultBackoffStrategy}, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
  85. if throttled {
  86. return p.useDefaultPickIfPossible(info, errRLSThrottled)
  87. }
  88. return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
  89. // No data cache entry. Pending request exits.
  90. case dcEntry == nil && pendingEntry != nil:
  91. return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
  92. // Data cache hit. No pending request.
  93. case dcEntry != nil && pendingEntry == nil:
  94. if dcEntry.expiryTime.After(now) {
  95. if !dcEntry.staleTime.IsZero() && dcEntry.staleTime.Before(now) && dcEntry.backoffTime.Before(now) {
  96. p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_STALE, dcEntry.headerData)
  97. }
  98. // Delegate to child policies.
  99. res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
  100. return res, err
  101. }
  102. // We get here only if the data cache entry has expired. If entry is in
  103. // backoff, delegate to default target or fail the pick.
  104. if dcEntry.backoffState != nil && dcEntry.backoffTime.After(now) {
  105. // Avoid propagating the status code received on control plane RPCs to the
  106. // data plane which can lead to unexpected outcomes as we do not control
  107. // the status code sent by the control plane. Propagating the status
  108. // message received from the control plane is still fine, as it could be
  109. // useful for debugging purposes.
  110. st := dcEntry.status
  111. return p.useDefaultPickIfPossible(info, status.Error(codes.Unavailable, fmt.Sprintf("most recent error from RLS server: %v", st.Error())))
  112. }
  113. // We get here only if the entry has expired and is not in backoff.
  114. throttled := p.sendRouteLookupRequestLocked(cacheKey, dcEntry.backoffState, reqKeys.Map, rlspb.RouteLookupRequest_REASON_MISS, "")
  115. if throttled {
  116. return p.useDefaultPickIfPossible(info, errRLSThrottled)
  117. }
  118. return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
  119. // Data cache hit. Pending request exists.
  120. default:
  121. if dcEntry.expiryTime.After(now) {
  122. res, err := p.delegateToChildPoliciesLocked(dcEntry, info)
  123. return res, err
  124. }
  125. // Data cache entry has expired and pending request exists. Queue pick.
  126. return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
  127. }
  128. }
  129. // delegateToChildPoliciesLocked is a helper function which iterates through the
  130. // list of child policy wrappers in a cache entry and attempts to find a child
  131. // policy to which this RPC can be routed to. If all child policies are in
  132. // TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
  133. func (p *rlsPicker) delegateToChildPoliciesLocked(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
  134. const rlsDataHeaderName = "x-google-rls-data"
  135. for i, cpw := range dcEntry.childPolicyWrappers {
  136. state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
  137. // Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if
  138. // it is the last one (which handles the case of delegating to the last
  139. // child picker if all child polcies are in TRANSIENT_FAILURE).
  140. if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 {
  141. // Any header data received from the RLS server is stored in the
  142. // cache entry and needs to be sent to the actual backend in the
  143. // X-Google-RLS-Data header.
  144. res, err := state.Picker.Pick(info)
  145. if err != nil {
  146. return res, err
  147. }
  148. if res.Metadata == nil {
  149. res.Metadata = metadata.Pairs(rlsDataHeaderName, dcEntry.headerData)
  150. } else {
  151. res.Metadata.Append(rlsDataHeaderName, dcEntry.headerData)
  152. }
  153. return res, nil
  154. }
  155. }
  156. // In the unlikely event that we have a cache entry with no targets, we end up
  157. // queueing the RPC.
  158. return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
  159. }
  160. // useDefaultPickIfPossible is a helper method which delegates to the default
  161. // target if one is configured, or fails the pick with the given error.
  162. func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
  163. if p.defaultPolicy != nil {
  164. state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
  165. return state.Picker.Pick(info)
  166. }
  167. return balancer.PickResult{}, errOnNoDefault
  168. }
  169. // sendRouteLookupRequestLocked adds an entry to the pending request map and
  170. // sends out an RLS request using the passed in arguments. Returns a value
  171. // indicating if the request was throttled by the client-side adaptive
  172. // throttler.
  173. func (p *rlsPicker) sendRouteLookupRequestLocked(cacheKey cacheKey, bs *backoffState, reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string) bool {
  174. if p.lb.pendingMap[cacheKey] != nil {
  175. return false
  176. }
  177. p.lb.pendingMap[cacheKey] = bs
  178. throttled := p.ctrlCh.lookup(reqKeys, reason, staleHeaders, func(targets []string, headerData string, err error) {
  179. p.handleRouteLookupResponse(cacheKey, targets, headerData, err)
  180. })
  181. if throttled {
  182. delete(p.lb.pendingMap, cacheKey)
  183. }
  184. return throttled
  185. }
  186. // handleRouteLookupResponse is the callback invoked by the control channel upon
  187. // receipt of an RLS response. Modifies the data cache and pending requests map
  188. // and sends a new picker.
  189. //
  190. // Acquires the write-lock on the cache. Caller must not hold p.lb.cacheMu.
  191. func (p *rlsPicker) handleRouteLookupResponse(cacheKey cacheKey, targets []string, headerData string, err error) {
  192. p.logger.Infof("Received RLS response for key %+v with targets %+v, headerData %q, err: %v", cacheKey, targets, headerData, err)
  193. p.lb.cacheMu.Lock()
  194. defer func() {
  195. // Pending request map entry is unconditionally deleted since the request is
  196. // no longer pending.
  197. p.logger.Infof("Removing pending request entry for key %+v", cacheKey)
  198. delete(p.lb.pendingMap, cacheKey)
  199. p.lb.sendNewPicker()
  200. p.lb.cacheMu.Unlock()
  201. }()
  202. // Lookup the data cache entry or create a new one.
  203. dcEntry := p.lb.dataCache.getEntry(cacheKey)
  204. if dcEntry == nil {
  205. dcEntry = &cacheEntry{}
  206. if _, ok := p.lb.dataCache.addEntry(cacheKey, dcEntry); !ok {
  207. // This is a very unlikely case where we are unable to add a
  208. // data cache entry. Log and leave.
  209. p.logger.Warningf("Failed to add data cache entry for %+v", cacheKey)
  210. return
  211. }
  212. }
  213. // For failed requests, the data cache entry is modified as follows:
  214. // - status is set to error returned from the control channel
  215. // - current backoff state is available in the pending entry
  216. // - `retries` field is incremented and
  217. // - backoff state is moved to the data cache
  218. // - backoffTime is set to the time indicated by the backoff state
  219. // - backoffExpirationTime is set to twice the backoff time
  220. // - backoffTimer is set to fire after backoffTime
  221. //
  222. // When a proactive cache refresh fails, this would leave the targets and the
  223. // expiry time from the old entry unchanged. And this mean that the old valid
  224. // entry would be used until expiration, and a new picker would be sent upon
  225. // backoff expiry.
  226. now := time.Now()
  227. if err != nil {
  228. dcEntry.status = err
  229. pendingEntry := p.lb.pendingMap[cacheKey]
  230. pendingEntry.retries++
  231. backoffTime := pendingEntry.bs.Backoff(pendingEntry.retries)
  232. dcEntry.backoffState = pendingEntry
  233. dcEntry.backoffTime = now.Add(backoffTime)
  234. dcEntry.backoffExpiryTime = now.Add(2 * backoffTime)
  235. if dcEntry.backoffState.timer != nil {
  236. dcEntry.backoffState.timer.Stop()
  237. }
  238. dcEntry.backoffState.timer = time.AfterFunc(backoffTime, p.lb.sendNewPicker)
  239. return
  240. }
  241. // For successful requests, the cache entry is modified as follows:
  242. // - childPolicyWrappers is set to point to the child policy wrappers
  243. // associated with the targets specified in the received response
  244. // - headerData is set to the value received in the response
  245. // - expiryTime, stateTime and earliestEvictionTime are set
  246. // - status is set to nil (OK status)
  247. // - backoff state is cleared
  248. p.setChildPolicyWrappersInCacheEntry(dcEntry, targets)
  249. dcEntry.headerData = headerData
  250. dcEntry.expiryTime = now.Add(p.maxAge)
  251. if p.staleAge != 0 {
  252. dcEntry.staleTime = now.Add(p.staleAge)
  253. }
  254. dcEntry.earliestEvictTime = now.Add(minEvictDuration)
  255. dcEntry.status = nil
  256. dcEntry.backoffState = &backoffState{bs: defaultBackoffStrategy}
  257. dcEntry.backoffTime = time.Time{}
  258. dcEntry.backoffExpiryTime = time.Time{}
  259. p.lb.dataCache.updateEntrySize(dcEntry, computeDataCacheEntrySize(cacheKey, dcEntry))
  260. }
  261. // setChildPolicyWrappersInCacheEntry sets up the childPolicyWrappers field in
  262. // the cache entry to point to the child policy wrappers for the targets
  263. // specified in the RLS response.
  264. //
  265. // Caller must hold a write-lock on p.lb.cacheMu.
  266. func (p *rlsPicker) setChildPolicyWrappersInCacheEntry(dcEntry *cacheEntry, newTargets []string) {
  267. // If the childPolicyWrappers field is already pointing to the right targets,
  268. // then the field's value does not need to change.
  269. targetsChanged := true
  270. func() {
  271. if cpws := dcEntry.childPolicyWrappers; cpws != nil {
  272. if len(newTargets) != len(cpws) {
  273. return
  274. }
  275. for i, target := range newTargets {
  276. if cpws[i].target != target {
  277. return
  278. }
  279. }
  280. targetsChanged = false
  281. }
  282. }()
  283. if !targetsChanged {
  284. return
  285. }
  286. // If the childPolicyWrappers field is not already set to the right targets,
  287. // then it must be reset. We construct a new list of child policies and
  288. // then swap out the old list for the new one.
  289. newChildPolicies := p.lb.acquireChildPolicyReferences(newTargets)
  290. oldChildPolicyTargets := make([]string, len(dcEntry.childPolicyWrappers))
  291. for i, cpw := range dcEntry.childPolicyWrappers {
  292. oldChildPolicyTargets[i] = cpw.target
  293. }
  294. p.lb.releaseChildPolicyReferences(oldChildPolicyTargets)
  295. dcEntry.childPolicyWrappers = newChildPolicies
  296. }
  297. func dcEntrySize(key cacheKey, entry *cacheEntry) int64 {
  298. return int64(len(key.path) + len(key.keys) + len(entry.headerData))
  299. }