balancer.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658
  1. /*
  2. *
  3. * Copyright 2020 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package rls implements the RLS LB policy.
  19. package rls
  20. import (
  21. "encoding/json"
  22. "errors"
  23. "fmt"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. "unsafe"
  28. "google.golang.org/grpc/balancer"
  29. "google.golang.org/grpc/connectivity"
  30. "google.golang.org/grpc/grpclog"
  31. "google.golang.org/grpc/internal"
  32. "google.golang.org/grpc/internal/backoff"
  33. "google.golang.org/grpc/internal/balancergroup"
  34. "google.golang.org/grpc/internal/buffer"
  35. internalgrpclog "google.golang.org/grpc/internal/grpclog"
  36. "google.golang.org/grpc/internal/grpcsync"
  37. "google.golang.org/grpc/internal/pretty"
  38. "google.golang.org/grpc/resolver"
  39. )
  40. const (
  41. // Name is the name of the RLS LB policy.
  42. //
  43. // It currently has an experimental suffix which would be removed once
  44. // end-to-end testing of the policy is completed.
  45. Name = internal.RLSLoadBalancingPolicyName
  46. // Default frequency for data cache purging.
  47. periodicCachePurgeFreq = time.Minute
  48. )
  49. var (
  50. logger = grpclog.Component("rls")
  51. errBalancerClosed = errors.New("rls LB policy is closed")
  52. // Below defined vars for overriding in unit tests.
  53. // Default exponential backoff strategy for data cache entries.
  54. defaultBackoffStrategy = backoff.Strategy(backoff.DefaultExponential)
  55. // Ticker used for periodic data cache purging.
  56. dataCachePurgeTicker = func() *time.Ticker { return time.NewTicker(periodicCachePurgeFreq) }
  57. // We want every cache entry to live in the cache for at least this
  58. // duration. If we encounter a cache entry whose minimum expiration time is
  59. // in the future, we abort the LRU pass, which may temporarily leave the
  60. // cache being too large. This is necessary to ensure that in cases where
  61. // the cache is too small, when we receive an RLS Response, we keep the
  62. // resulting cache entry around long enough for the pending incoming
  63. // requests to be re-processed through the new Picker. If we didn't do this,
  64. // then we'd risk throwing away each RLS response as we receive it, in which
  65. // case we would fail to actually route any of our incoming requests.
  66. minEvictDuration = 5 * time.Second
  67. // Following functions are no-ops in actual code, but can be overridden in
  68. // tests to give tests visibility into exactly when certain events happen.
  69. clientConnUpdateHook = func() {}
  70. dataCachePurgeHook = func() {}
  71. resetBackoffHook = func() {}
  72. )
  73. func init() {
  74. balancer.Register(&rlsBB{})
  75. }
  76. type rlsBB struct{}
  77. func (rlsBB) Name() string {
  78. return Name
  79. }
  80. func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  81. lb := &rlsBalancer{
  82. closed: grpcsync.NewEvent(),
  83. done: grpcsync.NewEvent(),
  84. cc: cc,
  85. bopts: opts,
  86. purgeTicker: dataCachePurgeTicker(),
  87. dataCachePurgeHook: dataCachePurgeHook,
  88. lbCfg: &lbConfig{},
  89. pendingMap: make(map[cacheKey]*backoffState),
  90. childPolicies: make(map[string]*childPolicyWrapper),
  91. updateCh: buffer.NewUnbounded(),
  92. }
  93. lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
  94. lb.dataCache = newDataCache(maxCacheSize, lb.logger)
  95. lb.bg = balancergroup.New(cc, opts, lb, lb.logger)
  96. lb.bg.Start()
  97. go lb.run()
  98. return lb
  99. }
  100. // rlsBalancer implements the RLS LB policy.
  101. type rlsBalancer struct {
  102. closed *grpcsync.Event // Fires when Close() is invoked. Guarded by stateMu.
  103. done *grpcsync.Event // Fires when Close() is done.
  104. cc balancer.ClientConn
  105. bopts balancer.BuildOptions
  106. purgeTicker *time.Ticker
  107. dataCachePurgeHook func()
  108. logger *internalgrpclog.PrefixLogger
  109. // If both cacheMu and stateMu need to be acquired, the former must be
  110. // acquired first to prevent a deadlock. This order restriction is due to the
  111. // fact that in places where we need to acquire both the locks, we always
  112. // start off reading the cache.
  113. // cacheMu guards access to the data cache and pending requests map. We
  114. // cannot use an RWMutex here since even an operation like
  115. // dataCache.getEntry() modifies the underlying LRU, which is implemented as
  116. // a doubly linked list.
  117. cacheMu sync.Mutex
  118. dataCache *dataCache // Cache of RLS data.
  119. pendingMap map[cacheKey]*backoffState // Map of pending RLS requests.
  120. // stateMu guards access to all LB policy state.
  121. stateMu sync.Mutex
  122. lbCfg *lbConfig // Most recently received service config.
  123. childPolicyBuilder balancer.Builder // Cached child policy builder.
  124. resolverState resolver.State // Cached resolver state.
  125. ctrlCh *controlChannel // Control channel to the RLS server.
  126. bg *balancergroup.BalancerGroup
  127. childPolicies map[string]*childPolicyWrapper
  128. defaultPolicy *childPolicyWrapper
  129. // A reference to the most recent picker sent to gRPC as part of a state
  130. // update is cached in this field so that we can release the reference to the
  131. // default child policy wrapper when a new picker is created. See
  132. // sendNewPickerLocked() for details.
  133. lastPicker *rlsPicker
  134. // Set during UpdateClientConnState when pushing updates to child policies.
  135. // Prevents state updates from child policies causing new pickers to be sent
  136. // up the channel. Cleared after all child policies have processed the
  137. // updates sent to them, after which a new picker is sent up the channel.
  138. inhibitPickerUpdates bool
  139. // Channel on which all updates are pushed. Processed in run().
  140. updateCh *buffer.Unbounded
  141. }
  142. type resumePickerUpdates struct {
  143. done chan struct{}
  144. }
  145. // childPolicyIDAndState wraps a child policy id and its state update.
  146. type childPolicyIDAndState struct {
  147. id string
  148. state balancer.State
  149. }
  150. type controlChannelReady struct{}
  151. // run is a long-running goroutine which handles all the updates that the
  152. // balancer wishes to handle. The appropriate updateHandler will push the update
  153. // on to a channel that this goroutine will select on, thereby the handling of
  154. // the update will happen asynchronously.
  155. func (b *rlsBalancer) run() {
  156. // We exit out of the for loop below only after `Close()` has been invoked.
  157. // Firing the done event here will ensure that Close() returns only after
  158. // all goroutines are done.
  159. defer func() { b.done.Fire() }()
  160. // Wait for purgeDataCache() goroutine to exit before returning from here.
  161. doneCh := make(chan struct{})
  162. defer func() {
  163. <-doneCh
  164. }()
  165. go b.purgeDataCache(doneCh)
  166. for {
  167. select {
  168. case u, ok := <-b.updateCh.Get():
  169. if !ok {
  170. return
  171. }
  172. b.updateCh.Load()
  173. switch update := u.(type) {
  174. case childPolicyIDAndState:
  175. b.handleChildPolicyStateUpdate(update.id, update.state)
  176. case controlChannelReady:
  177. b.logger.Infof("Resetting backoff state after control channel getting back to READY")
  178. b.cacheMu.Lock()
  179. updatePicker := b.dataCache.resetBackoffState(&backoffState{bs: defaultBackoffStrategy})
  180. b.cacheMu.Unlock()
  181. if updatePicker {
  182. b.sendNewPicker()
  183. }
  184. resetBackoffHook()
  185. case resumePickerUpdates:
  186. b.stateMu.Lock()
  187. b.logger.Infof("Resuming picker updates after config propagation to child policies")
  188. b.inhibitPickerUpdates = false
  189. b.sendNewPickerLocked()
  190. close(update.done)
  191. b.stateMu.Unlock()
  192. default:
  193. b.logger.Errorf("Unsupported update type %T", update)
  194. }
  195. case <-b.closed.Done():
  196. return
  197. }
  198. }
  199. }
  200. // purgeDataCache is a long-running goroutine which periodically deletes expired
  201. // entries. An expired entry is one for which both the expiryTime and
  202. // backoffExpiryTime are in the past.
  203. func (b *rlsBalancer) purgeDataCache(doneCh chan struct{}) {
  204. defer close(doneCh)
  205. for {
  206. select {
  207. case <-b.closed.Done():
  208. return
  209. case <-b.purgeTicker.C:
  210. b.cacheMu.Lock()
  211. updatePicker := b.dataCache.evictExpiredEntries()
  212. b.cacheMu.Unlock()
  213. if updatePicker {
  214. b.sendNewPicker()
  215. }
  216. b.dataCachePurgeHook()
  217. }
  218. }
  219. }
  220. func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
  221. defer clientConnUpdateHook()
  222. b.stateMu.Lock()
  223. if b.closed.HasFired() {
  224. b.stateMu.Unlock()
  225. b.logger.Warningf("Received service config after balancer close: %s", pretty.ToJSON(ccs.BalancerConfig))
  226. return errBalancerClosed
  227. }
  228. newCfg := ccs.BalancerConfig.(*lbConfig)
  229. if b.lbCfg.Equal(newCfg) {
  230. b.stateMu.Unlock()
  231. b.logger.Infof("New service config matches existing config")
  232. return nil
  233. }
  234. b.logger.Infof("Delaying picker updates until config is propagated to and processed by child policies")
  235. b.inhibitPickerUpdates = true
  236. // When the RLS server name changes, the old control channel needs to be
  237. // swapped out for a new one. All state associated with the throttling
  238. // algorithm is stored on a per-control-channel basis; when we swap out
  239. // channels, we also swap out the throttling state.
  240. b.handleControlChannelUpdate(newCfg)
  241. // Any changes to child policy name or configuration needs to be handled by
  242. // either creating new child policies or pushing updates to existing ones.
  243. b.resolverState = ccs.ResolverState
  244. b.handleChildPolicyConfigUpdate(newCfg, &ccs)
  245. // Resize the cache if the size in the config has changed.
  246. resizeCache := newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes
  247. // Update the copy of the config in the LB policy before releasing the lock.
  248. b.lbCfg = newCfg
  249. // Enqueue an event which will notify us when the above update has been
  250. // propagated to all child policies, and the child policies have all
  251. // processed their updates, and we have sent a picker update.
  252. done := make(chan struct{})
  253. b.updateCh.Put(resumePickerUpdates{done: done})
  254. b.stateMu.Unlock()
  255. <-done
  256. if resizeCache {
  257. // If the new config changes reduces the size of the data cache, we
  258. // might have to evict entries to get the cache size down to the newly
  259. // specified size.
  260. //
  261. // And we cannot do this operation above (where we compute the
  262. // `resizeCache` boolean) because `cacheMu` needs to be grabbed before
  263. // `stateMu` if we are to hold both locks at the same time.
  264. b.cacheMu.Lock()
  265. b.dataCache.resize(newCfg.cacheSizeBytes)
  266. b.cacheMu.Unlock()
  267. }
  268. return nil
  269. }
  270. // handleControlChannelUpdate handles updates to service config fields which
  271. // influence the control channel to the RLS server.
  272. //
  273. // Caller must hold lb.stateMu.
  274. func (b *rlsBalancer) handleControlChannelUpdate(newCfg *lbConfig) {
  275. if newCfg.lookupService == b.lbCfg.lookupService && newCfg.lookupServiceTimeout == b.lbCfg.lookupServiceTimeout {
  276. return
  277. }
  278. // Create a new control channel and close the existing one.
  279. b.logger.Infof("Creating control channel to RLS server at: %v", newCfg.lookupService)
  280. backToReadyFn := func() {
  281. b.updateCh.Put(controlChannelReady{})
  282. }
  283. ctrlCh, err := newControlChannel(newCfg.lookupService, newCfg.controlChannelServiceConfig, newCfg.lookupServiceTimeout, b.bopts, backToReadyFn)
  284. if err != nil {
  285. // This is very uncommon and usually represents a non-transient error.
  286. // There is not much we can do here other than wait for another update
  287. // which might fix things.
  288. b.logger.Errorf("Failed to create control channel to %q: %v", newCfg.lookupService, err)
  289. return
  290. }
  291. if b.ctrlCh != nil {
  292. b.ctrlCh.close()
  293. }
  294. b.ctrlCh = ctrlCh
  295. }
  296. // handleChildPolicyConfigUpdate handles updates to service config fields which
  297. // influence child policy configuration.
  298. //
  299. // Caller must hold lb.stateMu.
  300. func (b *rlsBalancer) handleChildPolicyConfigUpdate(newCfg *lbConfig, ccs *balancer.ClientConnState) {
  301. // Update child policy builder first since other steps are dependent on this.
  302. if b.childPolicyBuilder == nil || b.childPolicyBuilder.Name() != newCfg.childPolicyName {
  303. b.logger.Infof("Child policy changed to %q", newCfg.childPolicyName)
  304. b.childPolicyBuilder = balancer.Get(newCfg.childPolicyName)
  305. for _, cpw := range b.childPolicies {
  306. // If the child policy has changed, we need to remove the old policy
  307. // from the BalancerGroup and add a new one. The BalancerGroup takes
  308. // care of closing the old one in this case.
  309. b.bg.Remove(cpw.target)
  310. b.bg.Add(cpw.target, b.childPolicyBuilder)
  311. }
  312. }
  313. configSentToDefault := false
  314. if b.lbCfg.defaultTarget != newCfg.defaultTarget {
  315. // If the default target has changed, create a new childPolicyWrapper for
  316. // the new target if required. If a new wrapper is created, add it to the
  317. // childPolicies map and the BalancerGroup.
  318. b.logger.Infof("Default target in LB config changing from %q to %q", b.lbCfg.defaultTarget, newCfg.defaultTarget)
  319. cpw := b.childPolicies[newCfg.defaultTarget]
  320. if cpw == nil {
  321. cpw = newChildPolicyWrapper(newCfg.defaultTarget)
  322. b.childPolicies[newCfg.defaultTarget] = cpw
  323. b.bg.Add(newCfg.defaultTarget, b.childPolicyBuilder)
  324. b.logger.Infof("Child policy %q added to BalancerGroup", newCfg.defaultTarget)
  325. }
  326. if err := b.buildAndPushChildPolicyConfigs(newCfg.defaultTarget, newCfg, ccs); err != nil {
  327. cpw.lamify(err)
  328. }
  329. // If an old default exists, release its reference. If this was the last
  330. // reference, remove the child policy from the BalancerGroup and remove the
  331. // corresponding entry the childPolicies map.
  332. if b.defaultPolicy != nil {
  333. if b.defaultPolicy.releaseRef() {
  334. delete(b.childPolicies, b.lbCfg.defaultTarget)
  335. b.bg.Remove(b.defaultPolicy.target)
  336. }
  337. }
  338. b.defaultPolicy = cpw
  339. configSentToDefault = true
  340. }
  341. // No change in configuration affecting child policies. Return early.
  342. if b.lbCfg.childPolicyName == newCfg.childPolicyName && b.lbCfg.childPolicyTargetField == newCfg.childPolicyTargetField && childPolicyConfigEqual(b.lbCfg.childPolicyConfig, newCfg.childPolicyConfig) {
  343. return
  344. }
  345. // If fields affecting child policy configuration have changed, the changes
  346. // are pushed to the childPolicyWrapper which handles them appropriately.
  347. for _, cpw := range b.childPolicies {
  348. if configSentToDefault && cpw.target == newCfg.defaultTarget {
  349. // Default target has already been taken care of.
  350. continue
  351. }
  352. if err := b.buildAndPushChildPolicyConfigs(cpw.target, newCfg, ccs); err != nil {
  353. cpw.lamify(err)
  354. }
  355. }
  356. }
  357. // buildAndPushChildPolicyConfigs builds the final child policy configuration by
  358. // adding the `targetField` to the base child policy configuration received in
  359. // RLS LB policy configuration. The `targetField` is set to target and
  360. // configuration is pushed to the child policy through the BalancerGroup.
  361. //
  362. // Caller must hold lb.stateMu.
  363. func (b *rlsBalancer) buildAndPushChildPolicyConfigs(target string, newCfg *lbConfig, ccs *balancer.ClientConnState) error {
  364. jsonTarget, err := json.Marshal(target)
  365. if err != nil {
  366. return fmt.Errorf("failed to marshal child policy target %q: %v", target, err)
  367. }
  368. config := newCfg.childPolicyConfig
  369. targetField := newCfg.childPolicyTargetField
  370. config[targetField] = jsonTarget
  371. jsonCfg, err := json.Marshal(config)
  372. if err != nil {
  373. return fmt.Errorf("failed to marshal child policy config %+v: %v", config, err)
  374. }
  375. parser, _ := b.childPolicyBuilder.(balancer.ConfigParser)
  376. parsedCfg, err := parser.ParseConfig(jsonCfg)
  377. if err != nil {
  378. return fmt.Errorf("childPolicy config parsing failed: %v", err)
  379. }
  380. state := balancer.ClientConnState{ResolverState: ccs.ResolverState, BalancerConfig: parsedCfg}
  381. b.logger.Infof("Pushing new state to child policy %q: %+v", target, state)
  382. if err := b.bg.UpdateClientConnState(target, state); err != nil {
  383. b.logger.Warningf("UpdateClientConnState(%q, %+v) failed : %v", target, ccs, err)
  384. }
  385. return nil
  386. }
  387. func (b *rlsBalancer) ResolverError(err error) {
  388. b.bg.ResolverError(err)
  389. }
  390. func (b *rlsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
  391. b.bg.UpdateSubConnState(sc, state)
  392. }
  393. func (b *rlsBalancer) Close() {
  394. b.stateMu.Lock()
  395. b.closed.Fire()
  396. b.purgeTicker.Stop()
  397. if b.ctrlCh != nil {
  398. b.ctrlCh.close()
  399. }
  400. b.bg.Close()
  401. b.stateMu.Unlock()
  402. b.cacheMu.Lock()
  403. b.dataCache.stop()
  404. b.cacheMu.Unlock()
  405. b.updateCh.Close()
  406. <-b.done.Done()
  407. }
  408. func (b *rlsBalancer) ExitIdle() {
  409. b.bg.ExitIdle()
  410. }
  411. // sendNewPickerLocked pushes a new picker on to the channel.
  412. //
  413. // Note that regardless of what connectivity state is reported, the policy will
  414. // return its own picker, and not a picker that unconditionally queues
  415. // (typically used for IDLE or CONNECTING) or a picker that unconditionally
  416. // fails (typically used for TRANSIENT_FAILURE). This is required because,
  417. // irrespective of the connectivity state, we need to able to perform RLS
  418. // lookups for incoming RPCs and affect the status of queued RPCs based on the
  419. // receipt of RLS responses.
  420. //
  421. // Caller must hold lb.stateMu.
  422. func (b *rlsBalancer) sendNewPickerLocked() {
  423. aggregatedState := b.aggregatedConnectivityState()
  424. // Acquire a separate reference for the picker. This is required to ensure
  425. // that the wrapper held by the old picker is not closed when the default
  426. // target changes in the config, and a new wrapper is created for the new
  427. // default target. See handleChildPolicyConfigUpdate() for how config changes
  428. // affecting the default target are handled.
  429. if b.defaultPolicy != nil {
  430. b.defaultPolicy.acquireRef()
  431. }
  432. picker := &rlsPicker{
  433. kbm: b.lbCfg.kbMap,
  434. origEndpoint: b.bopts.Target.Endpoint(),
  435. lb: b,
  436. defaultPolicy: b.defaultPolicy,
  437. ctrlCh: b.ctrlCh,
  438. maxAge: b.lbCfg.maxAge,
  439. staleAge: b.lbCfg.staleAge,
  440. bg: b.bg,
  441. }
  442. picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
  443. state := balancer.State{
  444. ConnectivityState: aggregatedState,
  445. Picker: picker,
  446. }
  447. if !b.inhibitPickerUpdates {
  448. b.logger.Infof("New balancer.State: %+v", state)
  449. b.cc.UpdateState(state)
  450. } else {
  451. b.logger.Infof("Delaying picker update: %+v", state)
  452. }
  453. if b.lastPicker != nil {
  454. if b.defaultPolicy != nil {
  455. b.defaultPolicy.releaseRef()
  456. }
  457. }
  458. b.lastPicker = picker
  459. }
  460. func (b *rlsBalancer) sendNewPicker() {
  461. b.stateMu.Lock()
  462. defer b.stateMu.Unlock()
  463. if b.closed.HasFired() {
  464. return
  465. }
  466. b.sendNewPickerLocked()
  467. }
  468. // The aggregated connectivity state reported is determined as follows:
  469. // - If there is at least one child policy in state READY, the connectivity
  470. // state is READY.
  471. // - Otherwise, if there is at least one child policy in state CONNECTING, the
  472. // connectivity state is CONNECTING.
  473. // - Otherwise, if there is at least one child policy in state IDLE, the
  474. // connectivity state is IDLE.
  475. // - Otherwise, all child policies are in TRANSIENT_FAILURE, and the
  476. // connectivity state is TRANSIENT_FAILURE.
  477. //
  478. // If the RLS policy has no child policies and no configured default target,
  479. // then we will report connectivity state IDLE.
  480. //
  481. // Caller must hold lb.stateMu.
  482. func (b *rlsBalancer) aggregatedConnectivityState() connectivity.State {
  483. if len(b.childPolicies) == 0 && b.lbCfg.defaultTarget == "" {
  484. return connectivity.Idle
  485. }
  486. var readyN, connectingN, idleN int
  487. for _, cpw := range b.childPolicies {
  488. state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
  489. switch state.ConnectivityState {
  490. case connectivity.Ready:
  491. readyN++
  492. case connectivity.Connecting:
  493. connectingN++
  494. case connectivity.Idle:
  495. idleN++
  496. }
  497. }
  498. switch {
  499. case readyN > 0:
  500. return connectivity.Ready
  501. case connectingN > 0:
  502. return connectivity.Connecting
  503. case idleN > 0:
  504. return connectivity.Idle
  505. default:
  506. return connectivity.TransientFailure
  507. }
  508. }
  509. // UpdateState is a implementation of the balancergroup.BalancerStateAggregator
  510. // interface. The actual state aggregation functionality is handled
  511. // asynchronously. This method only pushes the state update on to channel read
  512. // and dispatched by the run() goroutine.
  513. func (b *rlsBalancer) UpdateState(id string, state balancer.State) {
  514. b.updateCh.Put(childPolicyIDAndState{id: id, state: state})
  515. }
  516. // handleChildPolicyStateUpdate provides the state aggregator functionality for
  517. // the BalancerGroup.
  518. //
  519. // This method is invoked by the BalancerGroup whenever a child policy sends a
  520. // state update. We cache the child policy's connectivity state and picker for
  521. // two reasons:
  522. // - to suppress connectivity state transitions from TRANSIENT_FAILURE to states
  523. // other than READY
  524. // - to delegate picks to child policies
  525. func (b *rlsBalancer) handleChildPolicyStateUpdate(id string, newState balancer.State) {
  526. b.stateMu.Lock()
  527. defer b.stateMu.Unlock()
  528. cpw := b.childPolicies[id]
  529. if cpw == nil {
  530. // All child policies start with an entry in the map. If ID is not in
  531. // map, it's either been removed, or never existed.
  532. b.logger.Warningf("Received state update %+v for missing child policy %q", newState, id)
  533. return
  534. }
  535. oldState := (*balancer.State)(atomic.LoadPointer(&cpw.state))
  536. if oldState.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting {
  537. // Ignore state transitions from TRANSIENT_FAILURE to CONNECTING, and thus
  538. // fail pending RPCs instead of queuing them indefinitely when all
  539. // subChannels are failing, even if the subChannels are bouncing back and
  540. // forth between CONNECTING and TRANSIENT_FAILURE.
  541. return
  542. }
  543. atomic.StorePointer(&cpw.state, unsafe.Pointer(&newState))
  544. b.logger.Infof("Child policy %q has new state %+v", id, newState)
  545. b.sendNewPickerLocked()
  546. }
  547. // acquireChildPolicyReferences attempts to acquire references to
  548. // childPolicyWrappers corresponding to the passed in targets. If there is no
  549. // childPolicyWrapper corresponding to one of the targets, a new one is created
  550. // and added to the BalancerGroup.
  551. func (b *rlsBalancer) acquireChildPolicyReferences(targets []string) []*childPolicyWrapper {
  552. b.stateMu.Lock()
  553. var newChildPolicies []*childPolicyWrapper
  554. for _, target := range targets {
  555. // If the target exists in the LB policy's childPolicies map. a new
  556. // reference is taken here and added to the new list.
  557. if cpw := b.childPolicies[target]; cpw != nil {
  558. cpw.acquireRef()
  559. newChildPolicies = append(newChildPolicies, cpw)
  560. continue
  561. }
  562. // If the target does not exist in the child policy map, then a new
  563. // child policy wrapper is created and added to the new list.
  564. cpw := newChildPolicyWrapper(target)
  565. b.childPolicies[target] = cpw
  566. b.bg.Add(target, b.childPolicyBuilder)
  567. b.logger.Infof("Child policy %q added to BalancerGroup", target)
  568. newChildPolicies = append(newChildPolicies, cpw)
  569. if err := b.buildAndPushChildPolicyConfigs(target, b.lbCfg, &balancer.ClientConnState{
  570. ResolverState: b.resolverState,
  571. }); err != nil {
  572. cpw.lamify(err)
  573. }
  574. }
  575. b.stateMu.Unlock()
  576. return newChildPolicies
  577. }
  578. // releaseChildPolicyReferences releases references to childPolicyWrappers
  579. // corresponding to the passed in targets. If the release reference was the last
  580. // one, the child policy is removed from the BalancerGroup.
  581. func (b *rlsBalancer) releaseChildPolicyReferences(targets []string) {
  582. b.stateMu.Lock()
  583. for _, target := range targets {
  584. if cpw := b.childPolicies[target]; cpw.releaseRef() {
  585. delete(b.childPolicies, cpw.target)
  586. b.bg.Remove(cpw.target)
  587. }
  588. }
  589. b.stateMu.Unlock()
  590. }