picker.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. /*
  2. *
  3. * Copyright 2020 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package clusterimpl
  19. import (
  20. "google.golang.org/grpc/balancer"
  21. "google.golang.org/grpc/codes"
  22. "google.golang.org/grpc/connectivity"
  23. "google.golang.org/grpc/internal/wrr"
  24. "google.golang.org/grpc/status"
  25. "google.golang.org/grpc/xds/internal/xdsclient"
  26. "google.golang.org/grpc/xds/internal/xdsclient/load"
  27. v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
  28. )
  29. // NewRandomWRR is used when calculating drops. It's exported so that tests can
  30. // override it.
  31. var NewRandomWRR = wrr.NewRandom
  32. const million = 1000000
  33. type dropper struct {
  34. category string
  35. w wrr.WRR
  36. }
  37. // greatest common divisor (GCD) via Euclidean algorithm
  38. func gcd(a, b uint32) uint32 {
  39. for b != 0 {
  40. t := b
  41. b = a % b
  42. a = t
  43. }
  44. return a
  45. }
  46. func newDropper(c DropConfig) *dropper {
  47. w := NewRandomWRR()
  48. gcdv := gcd(c.RequestsPerMillion, million)
  49. // Return true for RequestPerMillion, false for the rest.
  50. w.Add(true, int64(c.RequestsPerMillion/gcdv))
  51. w.Add(false, int64((million-c.RequestsPerMillion)/gcdv))
  52. return &dropper{
  53. category: c.Category,
  54. w: w,
  55. }
  56. }
  57. func (d *dropper) drop() (ret bool) {
  58. return d.w.Next().(bool)
  59. }
  60. const (
  61. serverLoadCPUName = "cpu_utilization"
  62. serverLoadMemoryName = "mem_utilization"
  63. )
  64. // loadReporter wraps the methods from the loadStore that are used here.
  65. type loadReporter interface {
  66. CallStarted(locality string)
  67. CallFinished(locality string, err error)
  68. CallServerLoad(locality, name string, val float64)
  69. CallDropped(locality string)
  70. }
  71. // Picker implements RPC drop, circuit breaking drop and load reporting.
  72. type picker struct {
  73. drops []*dropper
  74. s balancer.State
  75. loadStore loadReporter
  76. counter *xdsclient.ClusterRequestsCounter
  77. countMax uint32
  78. }
  79. func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *picker {
  80. return &picker{
  81. drops: config.drops,
  82. s: s,
  83. loadStore: loadStore,
  84. counter: config.requestCounter,
  85. countMax: config.requestCountMax,
  86. }
  87. }
  88. func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
  89. // Don't drop unless the inner picker is READY. Similar to
  90. // https://github.com/grpc/grpc-go/issues/2622.
  91. if d.s.ConnectivityState == connectivity.Ready {
  92. // Check if this RPC should be dropped by category.
  93. for _, dp := range d.drops {
  94. if dp.drop() {
  95. if d.loadStore != nil {
  96. d.loadStore.CallDropped(dp.category)
  97. }
  98. return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped")
  99. }
  100. }
  101. }
  102. // Check if this RPC should be dropped by circuit breaking.
  103. if d.counter != nil {
  104. if err := d.counter.StartRequest(d.countMax); err != nil {
  105. // Drops by circuit breaking are reported with empty category. They
  106. // will be reported only in total drops, but not in per category.
  107. if d.loadStore != nil {
  108. d.loadStore.CallDropped("")
  109. }
  110. return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
  111. }
  112. }
  113. var lIDStr string
  114. pr, err := d.s.Picker.Pick(info)
  115. if scw, ok := pr.SubConn.(*scWrapper); ok {
  116. // This OK check also covers the case err!=nil, because SubConn will be
  117. // nil.
  118. pr.SubConn = scw.SubConn
  119. var e error
  120. // If locality ID isn't found in the wrapper, an empty locality ID will
  121. // be used.
  122. lIDStr, e = scw.localityID().ToString()
  123. if e != nil {
  124. logger.Infof("failed to marshal LocalityID: %#v, loads won't be reported", scw.localityID())
  125. }
  126. }
  127. if err != nil {
  128. if d.counter != nil {
  129. // Release one request count if this pick fails.
  130. d.counter.EndRequest()
  131. }
  132. return pr, err
  133. }
  134. if d.loadStore != nil {
  135. d.loadStore.CallStarted(lIDStr)
  136. oldDone := pr.Done
  137. pr.Done = func(info balancer.DoneInfo) {
  138. if oldDone != nil {
  139. oldDone(info)
  140. }
  141. d.loadStore.CallFinished(lIDStr, info.Err)
  142. load, ok := info.ServerLoad.(*v3orcapb.OrcaLoadReport)
  143. if !ok || load == nil {
  144. return
  145. }
  146. d.loadStore.CallServerLoad(lIDStr, serverLoadCPUName, load.CpuUtilization)
  147. d.loadStore.CallServerLoad(lIDStr, serverLoadMemoryName, load.MemUtilization)
  148. for n, c := range load.RequestCost {
  149. d.loadStore.CallServerLoad(lIDStr, n, c)
  150. }
  151. for n, c := range load.Utilization {
  152. d.loadStore.CallServerLoad(lIDStr, n, c)
  153. }
  154. }
  155. }
  156. if d.counter != nil {
  157. // Update Done() so that when the RPC finishes, the request count will
  158. // be released.
  159. oldDone := pr.Done
  160. pr.Done = func(doneInfo balancer.DoneInfo) {
  161. d.counter.EndRequest()
  162. if oldDone != nil {
  163. oldDone(doneInfo)
  164. }
  165. }
  166. }
  167. return pr, err
  168. }