balancer.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. /*
  2. *
  3. * Copyright 2023 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package weightedroundrobin
  19. import (
  20. "context"
  21. "encoding/json"
  22. "errors"
  23. "fmt"
  24. "sync"
  25. "sync/atomic"
  26. "time"
  27. "unsafe"
  28. "google.golang.org/grpc/balancer"
  29. "google.golang.org/grpc/balancer/base"
  30. "google.golang.org/grpc/balancer/weightedroundrobin/internal"
  31. "google.golang.org/grpc/connectivity"
  32. "google.golang.org/grpc/internal/grpclog"
  33. "google.golang.org/grpc/internal/grpcrand"
  34. iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
  35. "google.golang.org/grpc/orca"
  36. "google.golang.org/grpc/resolver"
  37. "google.golang.org/grpc/serviceconfig"
  38. v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
  39. )
  40. // Name is the name of the weighted round robin balancer.
  41. const Name = "weighted_round_robin_experimental"
  42. func init() {
  43. balancer.Register(bb{})
  44. }
  45. type bb struct{}
  46. func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
  47. b := &wrrBalancer{
  48. cc: cc,
  49. subConns: resolver.NewAddressMap(),
  50. csEvltr: &balancer.ConnectivityStateEvaluator{},
  51. scMap: make(map[balancer.SubConn]*weightedSubConn),
  52. connectivityState: connectivity.Connecting,
  53. }
  54. b.logger = prefixLogger(b)
  55. b.logger.Infof("Created")
  56. return b
  57. }
  58. func (bb) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
  59. lbCfg := &lbConfig{
  60. // Default values as documented in A58.
  61. OOBReportingPeriod: iserviceconfig.Duration(10 * time.Second),
  62. BlackoutPeriod: iserviceconfig.Duration(10 * time.Second),
  63. WeightExpirationPeriod: iserviceconfig.Duration(3 * time.Minute),
  64. WeightUpdatePeriod: iserviceconfig.Duration(time.Second),
  65. ErrorUtilizationPenalty: 1,
  66. }
  67. if err := json.Unmarshal(js, lbCfg); err != nil {
  68. return nil, fmt.Errorf("wrr: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
  69. }
  70. if lbCfg.ErrorUtilizationPenalty < 0 {
  71. return nil, fmt.Errorf("wrr: errorUtilizationPenalty must be non-negative")
  72. }
  73. // For easier comparisons later, ensure the OOB reporting period is unset
  74. // (0s) when OOB reports are disabled.
  75. if !lbCfg.EnableOOBLoadReport {
  76. lbCfg.OOBReportingPeriod = 0
  77. }
  78. // Impose lower bound of 100ms on weightUpdatePeriod.
  79. if !internal.AllowAnyWeightUpdatePeriod && lbCfg.WeightUpdatePeriod < iserviceconfig.Duration(100*time.Millisecond) {
  80. lbCfg.WeightUpdatePeriod = iserviceconfig.Duration(100 * time.Millisecond)
  81. }
  82. return lbCfg, nil
  83. }
  84. func (bb) Name() string {
  85. return Name
  86. }
  87. // wrrBalancer implements the weighted round robin LB policy.
  88. type wrrBalancer struct {
  89. cc balancer.ClientConn
  90. logger *grpclog.PrefixLogger
  91. // The following fields are only accessed on calls into the LB policy, and
  92. // do not need a mutex.
  93. cfg *lbConfig // active config
  94. subConns *resolver.AddressMap // active weightedSubConns mapped by address
  95. scMap map[balancer.SubConn]*weightedSubConn
  96. connectivityState connectivity.State // aggregate state
  97. csEvltr *balancer.ConnectivityStateEvaluator
  98. resolverErr error // the last error reported by the resolver; cleared on successful resolution
  99. connErr error // the last connection error; cleared upon leaving TransientFailure
  100. stopPicker func()
  101. }
  102. func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
  103. b.logger.Infof("UpdateCCS: %v", ccs)
  104. b.resolverErr = nil
  105. cfg, ok := ccs.BalancerConfig.(*lbConfig)
  106. if !ok {
  107. return fmt.Errorf("wrr: received nil or illegal BalancerConfig (type %T): %v", ccs.BalancerConfig, ccs.BalancerConfig)
  108. }
  109. b.cfg = cfg
  110. b.updateAddresses(ccs.ResolverState.Addresses)
  111. if len(ccs.ResolverState.Addresses) == 0 {
  112. b.ResolverError(errors.New("resolver produced zero addresses")) // will call regeneratePicker
  113. return balancer.ErrBadResolverState
  114. }
  115. b.regeneratePicker()
  116. return nil
  117. }
  118. func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) {
  119. addrsSet := resolver.NewAddressMap()
  120. // Loop through new address list and create subconns for any new addresses.
  121. for _, addr := range addrs {
  122. if _, ok := addrsSet.Get(addr); ok {
  123. // Redundant address; skip.
  124. continue
  125. }
  126. addrsSet.Set(addr, nil)
  127. var wsc *weightedSubConn
  128. wsci, ok := b.subConns.Get(addr)
  129. if ok {
  130. wsc = wsci.(*weightedSubConn)
  131. } else {
  132. // addr is a new address (not existing in b.subConns).
  133. sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
  134. if err != nil {
  135. b.logger.Warningf("Failed to create new SubConn for address %v: %v", addr, err)
  136. continue
  137. }
  138. wsc = &weightedSubConn{
  139. SubConn: sc,
  140. logger: b.logger,
  141. connectivityState: connectivity.Idle,
  142. // Initially, we set load reports to off, because they are not
  143. // running upon initial weightedSubConn creation.
  144. cfg: &lbConfig{EnableOOBLoadReport: false},
  145. }
  146. b.subConns.Set(addr, wsc)
  147. b.scMap[sc] = wsc
  148. b.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Idle)
  149. sc.Connect()
  150. }
  151. // Update config for existing weightedSubConn or send update for first
  152. // time to new one. Ensures an OOB listener is running if needed
  153. // (and stops the existing one if applicable).
  154. wsc.updateConfig(b.cfg)
  155. }
  156. // Loop through existing subconns and remove ones that are not in addrs.
  157. for _, addr := range b.subConns.Keys() {
  158. if _, ok := addrsSet.Get(addr); ok {
  159. // Existing address also in new address list; skip.
  160. continue
  161. }
  162. // addr was removed by resolver. Remove.
  163. wsci, _ := b.subConns.Get(addr)
  164. wsc := wsci.(*weightedSubConn)
  165. b.cc.RemoveSubConn(wsc.SubConn)
  166. b.subConns.Delete(addr)
  167. }
  168. }
  169. func (b *wrrBalancer) ResolverError(err error) {
  170. b.resolverErr = err
  171. if b.subConns.Len() == 0 {
  172. b.connectivityState = connectivity.TransientFailure
  173. }
  174. if b.connectivityState != connectivity.TransientFailure {
  175. // No need to update the picker since no error is being returned.
  176. return
  177. }
  178. b.regeneratePicker()
  179. }
  180. func (b *wrrBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
  181. wsc := b.scMap[sc]
  182. if wsc == nil {
  183. b.logger.Errorf("UpdateSubConnState called with an unknown SubConn: %p, %v", sc, state)
  184. return
  185. }
  186. if b.logger.V(2) {
  187. logger.Infof("UpdateSubConnState(%+v, %+v)", sc, state)
  188. }
  189. cs := state.ConnectivityState
  190. if cs == connectivity.TransientFailure {
  191. // Save error to be reported via picker.
  192. b.connErr = state.ConnectionError
  193. }
  194. if cs == connectivity.Shutdown {
  195. delete(b.scMap, sc)
  196. // The subconn was removed from b.subConns when the address was removed
  197. // in updateAddresses.
  198. }
  199. oldCS := wsc.updateConnectivityState(cs)
  200. b.connectivityState = b.csEvltr.RecordTransition(oldCS, cs)
  201. // Regenerate picker when one of the following happens:
  202. // - this sc entered or left ready
  203. // - the aggregated state of balancer is TransientFailure
  204. // (may need to update error message)
  205. if (cs == connectivity.Ready) != (oldCS == connectivity.Ready) ||
  206. b.connectivityState == connectivity.TransientFailure {
  207. b.regeneratePicker()
  208. }
  209. }
  210. // Close stops the balancer. It cancels any ongoing scheduler updates and
  211. // stops any ORCA listeners.
  212. func (b *wrrBalancer) Close() {
  213. if b.stopPicker != nil {
  214. b.stopPicker()
  215. b.stopPicker = nil
  216. }
  217. for _, wsc := range b.scMap {
  218. // Ensure any lingering OOB watchers are stopped.
  219. wsc.updateConnectivityState(connectivity.Shutdown)
  220. }
  221. }
  222. // ExitIdle is ignored; we always connect to all backends.
  223. func (b *wrrBalancer) ExitIdle() {}
  224. func (b *wrrBalancer) readySubConns() []*weightedSubConn {
  225. var ret []*weightedSubConn
  226. for _, v := range b.subConns.Values() {
  227. wsc := v.(*weightedSubConn)
  228. if wsc.connectivityState == connectivity.Ready {
  229. ret = append(ret, wsc)
  230. }
  231. }
  232. return ret
  233. }
  234. // mergeErrors builds an error from the last connection error and the last
  235. // resolver error. Must only be called if b.connectivityState is
  236. // TransientFailure.
  237. func (b *wrrBalancer) mergeErrors() error {
  238. // connErr must always be non-nil unless there are no SubConns, in which
  239. // case resolverErr must be non-nil.
  240. if b.connErr == nil {
  241. return fmt.Errorf("last resolver error: %v", b.resolverErr)
  242. }
  243. if b.resolverErr == nil {
  244. return fmt.Errorf("last connection error: %v", b.connErr)
  245. }
  246. return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
  247. }
  248. func (b *wrrBalancer) regeneratePicker() {
  249. if b.stopPicker != nil {
  250. b.stopPicker()
  251. b.stopPicker = nil
  252. }
  253. switch b.connectivityState {
  254. case connectivity.TransientFailure:
  255. b.cc.UpdateState(balancer.State{
  256. ConnectivityState: connectivity.TransientFailure,
  257. Picker: base.NewErrPicker(b.mergeErrors()),
  258. })
  259. return
  260. case connectivity.Connecting, connectivity.Idle:
  261. // Idle could happen very briefly if all subconns are Idle and we've
  262. // asked them to connect but they haven't reported Connecting yet.
  263. // Report the same as Connecting since this is temporary.
  264. b.cc.UpdateState(balancer.State{
  265. ConnectivityState: connectivity.Connecting,
  266. Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
  267. })
  268. return
  269. case connectivity.Ready:
  270. b.connErr = nil
  271. }
  272. p := &picker{
  273. v: grpcrand.Uint32(), // start the scheduler at a random point
  274. cfg: b.cfg,
  275. subConns: b.readySubConns(),
  276. }
  277. var ctx context.Context
  278. ctx, b.stopPicker = context.WithCancel(context.Background())
  279. p.start(ctx)
  280. b.cc.UpdateState(balancer.State{
  281. ConnectivityState: b.connectivityState,
  282. Picker: p,
  283. })
  284. }
  285. // picker is the WRR policy's picker. It uses live-updating backend weights to
  286. // update the scheduler periodically and ensure picks are routed proportional
  287. // to those weights.
  288. type picker struct {
  289. scheduler unsafe.Pointer // *scheduler; accessed atomically
  290. v uint32 // incrementing value used by the scheduler; accessed atomically
  291. cfg *lbConfig // active config when picker created
  292. subConns []*weightedSubConn // all READY subconns
  293. }
  294. // scWeights returns a slice containing the weights from p.subConns in the same
  295. // order as p.subConns.
  296. func (p *picker) scWeights() []float64 {
  297. ws := make([]float64, len(p.subConns))
  298. now := internal.TimeNow()
  299. for i, wsc := range p.subConns {
  300. ws[i] = wsc.weight(now, time.Duration(p.cfg.WeightExpirationPeriod), time.Duration(p.cfg.BlackoutPeriod))
  301. }
  302. return ws
  303. }
  304. func (p *picker) inc() uint32 {
  305. return atomic.AddUint32(&p.v, 1)
  306. }
  307. func (p *picker) regenerateScheduler() {
  308. s := newScheduler(p.scWeights(), p.inc)
  309. atomic.StorePointer(&p.scheduler, unsafe.Pointer(&s))
  310. }
  311. func (p *picker) start(ctx context.Context) {
  312. p.regenerateScheduler()
  313. if len(p.subConns) == 1 {
  314. // No need to regenerate weights with only one backend.
  315. return
  316. }
  317. go func() {
  318. ticker := time.NewTicker(time.Duration(p.cfg.WeightUpdatePeriod))
  319. for {
  320. select {
  321. case <-ctx.Done():
  322. return
  323. case <-ticker.C:
  324. p.regenerateScheduler()
  325. }
  326. }
  327. }()
  328. }
  329. func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
  330. // Read the scheduler atomically. All scheduler operations are threadsafe,
  331. // and if the scheduler is replaced during this usage, we want to use the
  332. // scheduler that was live when the pick started.
  333. sched := *(*scheduler)(atomic.LoadPointer(&p.scheduler))
  334. pickedSC := p.subConns[sched.nextIndex()]
  335. pr := balancer.PickResult{SubConn: pickedSC.SubConn}
  336. if !p.cfg.EnableOOBLoadReport {
  337. pr.Done = func(info balancer.DoneInfo) {
  338. if load, ok := info.ServerLoad.(*v3orcapb.OrcaLoadReport); ok && load != nil {
  339. pickedSC.OnLoadReport(load)
  340. }
  341. }
  342. }
  343. return pr, nil
  344. }
  345. // weightedSubConn is the wrapper of a subconn that holds the subconn and its
  346. // weight (and other parameters relevant to computing the effective weight).
  347. // When needed, it also tracks connectivity state, listens for metrics updates
  348. // by implementing the orca.OOBListener interface and manages that listener.
  349. type weightedSubConn struct {
  350. balancer.SubConn
  351. logger *grpclog.PrefixLogger
  352. // The following fields are only accessed on calls into the LB policy, and
  353. // do not need a mutex.
  354. connectivityState connectivity.State
  355. stopORCAListener func()
  356. // The following fields are accessed asynchronously and are protected by
  357. // mu. Note that mu may not be held when calling into the stopORCAListener
  358. // or when registering a new listener, as those calls require the ORCA
  359. // producer mu which is held when calling the listener, and the listener
  360. // holds mu.
  361. mu sync.Mutex
  362. weightVal float64
  363. nonEmptySince time.Time
  364. lastUpdated time.Time
  365. cfg *lbConfig
  366. }
  367. func (w *weightedSubConn) OnLoadReport(load *v3orcapb.OrcaLoadReport) {
  368. if w.logger.V(2) {
  369. w.logger.Infof("Received load report for subchannel %v: %v", w.SubConn, load)
  370. }
  371. // Update weights of this subchannel according to the reported load
  372. utilization := load.ApplicationUtilization
  373. if utilization == 0 {
  374. utilization = load.CpuUtilization
  375. }
  376. if utilization == 0 || load.RpsFractional == 0 {
  377. if w.logger.V(2) {
  378. w.logger.Infof("Ignoring empty load report for subchannel %v", w.SubConn)
  379. }
  380. return
  381. }
  382. w.mu.Lock()
  383. defer w.mu.Unlock()
  384. errorRate := load.Eps / load.RpsFractional
  385. w.weightVal = load.RpsFractional / (utilization + errorRate*w.cfg.ErrorUtilizationPenalty)
  386. if w.logger.V(2) {
  387. w.logger.Infof("New weight for subchannel %v: %v", w.SubConn, w.weightVal)
  388. }
  389. w.lastUpdated = internal.TimeNow()
  390. if w.nonEmptySince == (time.Time{}) {
  391. w.nonEmptySince = w.lastUpdated
  392. }
  393. }
  394. // updateConfig updates the parameters of the WRR policy and
  395. // stops/starts/restarts the ORCA OOB listener.
  396. func (w *weightedSubConn) updateConfig(cfg *lbConfig) {
  397. w.mu.Lock()
  398. oldCfg := w.cfg
  399. w.cfg = cfg
  400. w.mu.Unlock()
  401. newPeriod := cfg.OOBReportingPeriod
  402. if cfg.EnableOOBLoadReport == oldCfg.EnableOOBLoadReport &&
  403. newPeriod == oldCfg.OOBReportingPeriod {
  404. // Load reporting wasn't enabled before or after, or load reporting was
  405. // enabled before and after, and had the same period. (Note that with
  406. // load reporting disabled, OOBReportingPeriod is always 0.)
  407. return
  408. }
  409. // (Optionally stop and) start the listener to use the new config's
  410. // settings for OOB reporting.
  411. if w.stopORCAListener != nil {
  412. w.stopORCAListener()
  413. }
  414. if !cfg.EnableOOBLoadReport {
  415. w.stopORCAListener = nil
  416. return
  417. }
  418. if w.logger.V(2) {
  419. w.logger.Infof("Registering ORCA listener for %v with interval %v", w.SubConn, newPeriod)
  420. }
  421. opts := orca.OOBListenerOptions{ReportInterval: time.Duration(newPeriod)}
  422. w.stopORCAListener = orca.RegisterOOBListener(w.SubConn, w, opts)
  423. }
  424. func (w *weightedSubConn) updateConnectivityState(cs connectivity.State) connectivity.State {
  425. switch cs {
  426. case connectivity.Idle:
  427. // Always reconnect when idle.
  428. w.SubConn.Connect()
  429. case connectivity.Ready:
  430. // If we transition back to READY state, reset nonEmptySince so that we
  431. // apply the blackout period after we start receiving load data. Note
  432. // that we cannot guarantee that we will never receive lingering
  433. // callbacks for backend metric reports from the previous connection
  434. // after the new connection has been established, but they should be
  435. // masked by new backend metric reports from the new connection by the
  436. // time the blackout period ends.
  437. w.mu.Lock()
  438. w.nonEmptySince = time.Time{}
  439. w.mu.Unlock()
  440. case connectivity.Shutdown:
  441. if w.stopORCAListener != nil {
  442. w.stopORCAListener()
  443. }
  444. }
  445. oldCS := w.connectivityState
  446. if oldCS == connectivity.TransientFailure &&
  447. (cs == connectivity.Connecting || cs == connectivity.Idle) {
  448. // Once a subconn enters TRANSIENT_FAILURE, ignore subsequent IDLE or
  449. // CONNECTING transitions to prevent the aggregated state from being
  450. // always CONNECTING when many backends exist but are all down.
  451. return oldCS
  452. }
  453. w.connectivityState = cs
  454. return oldCS
  455. }
  456. // weight returns the current effective weight of the subconn, taking into
  457. // account the parameters. Returns 0 for blacked out or expired data, which
  458. // will cause the backend weight to be treated as the mean of the weights of
  459. // the other backends.
  460. func (w *weightedSubConn) weight(now time.Time, weightExpirationPeriod, blackoutPeriod time.Duration) float64 {
  461. w.mu.Lock()
  462. defer w.mu.Unlock()
  463. // If the most recent update was longer ago than the expiration period,
  464. // reset nonEmptySince so that we apply the blackout period again if we
  465. // start getting data again in the future, and return 0.
  466. if now.Sub(w.lastUpdated) >= weightExpirationPeriod {
  467. w.nonEmptySince = time.Time{}
  468. return 0
  469. }
  470. // If we don't have at least blackoutPeriod worth of data, return 0.
  471. if blackoutPeriod != 0 && (w.nonEmptySince == (time.Time{}) || now.Sub(w.nonEmptySince) < blackoutPeriod) {
  472. return 0
  473. }
  474. return w.weightVal
  475. }