roundrobin.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. /*
  2. *
  3. * Copyright 2022 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package roundrobin contains helper functions to check for roundrobin and
  19. // weighted-roundrobin load balancing of RPCs in tests.
  20. package roundrobin
  21. import (
  22. "context"
  23. "fmt"
  24. "math"
  25. "time"
  26. "github.com/google/go-cmp/cmp"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/grpclog"
  29. "google.golang.org/grpc/peer"
  30. "google.golang.org/grpc/resolver"
  31. testgrpc "google.golang.org/grpc/interop/grpc_testing"
  32. testpb "google.golang.org/grpc/interop/grpc_testing"
  33. )
  34. var logger = grpclog.Component("testutils-roundrobin")
  35. // waitForTrafficToReachBackends repeatedly makes RPCs using the provided
  36. // TestServiceClient until RPCs reach all backends specified in addrs, or the
  37. // context expires, in which case a non-nil error is returned.
  38. func waitForTrafficToReachBackends(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
  39. // Make sure connections to all backends are up. We need to do this two
  40. // times (to be sure that round_robin has kicked in) because the channel
  41. // could have been configured with a different LB policy before the switch
  42. // to round_robin. And the previous LB policy could be sharing backends with
  43. // round_robin, and therefore in the first iteration of this loop, RPCs
  44. // could land on backends owned by the previous LB policy.
  45. for j := 0; j < 2; j++ {
  46. for i := 0; i < len(addrs); i++ {
  47. for {
  48. time.Sleep(time.Millisecond)
  49. if ctx.Err() != nil {
  50. return fmt.Errorf("timeout waiting for connection to %q to be up", addrs[i].Addr)
  51. }
  52. var peer peer.Peer
  53. if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
  54. // Some tests remove backends and check if round robin is
  55. // happening across the remaining backends. In such cases,
  56. // RPCs can initially fail on the connection using the
  57. // removed backend. Just keep retrying and eventually the
  58. // connection using the removed backend will shutdown and
  59. // will be removed.
  60. continue
  61. }
  62. if peer.Addr.String() == addrs[i].Addr {
  63. break
  64. }
  65. }
  66. }
  67. }
  68. return nil
  69. }
  70. // CheckRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn,
  71. // connected to a server exposing the test.grpc_testing.TestService, are
  72. // roundrobined across the given backend addresses.
  73. //
  74. // Returns a non-nil error if context deadline expires before RPCs start to get
  75. // roundrobined across the given backends.
  76. func CheckRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
  77. if err := waitForTrafficToReachBackends(ctx, client, addrs); err != nil {
  78. return err
  79. }
  80. // At this point, RPCs are getting successfully executed at the backends
  81. // that we care about. To support duplicate addresses (in addrs) and
  82. // backends being removed from the list of addresses passed to the
  83. // roundrobin LB, we do the following:
  84. // 1. Determine the count of RPCs that we expect each of our backends to
  85. // receive per iteration.
  86. // 2. Wait until the same pattern repeats a few times, or the context
  87. // deadline expires.
  88. wantAddrCount := make(map[string]int)
  89. for _, addr := range addrs {
  90. wantAddrCount[addr.Addr]++
  91. }
  92. for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
  93. // Perform 3 more iterations.
  94. var iterations [][]string
  95. for i := 0; i < 3; i++ {
  96. iteration := make([]string, len(addrs))
  97. for c := 0; c < len(addrs); c++ {
  98. var peer peer.Peer
  99. if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
  100. return fmt.Errorf("EmptyCall() = %v, want <nil>", err)
  101. }
  102. iteration[c] = peer.Addr.String()
  103. }
  104. iterations = append(iterations, iteration)
  105. }
  106. // Ensure the first iteration contains all addresses in addrs.
  107. gotAddrCount := make(map[string]int)
  108. for _, addr := range iterations[0] {
  109. gotAddrCount[addr]++
  110. }
  111. if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" {
  112. logger.Infof("non-roundrobin, got address count in one iteration: %v, want: %v, Diff: %s", gotAddrCount, wantAddrCount, diff)
  113. continue
  114. }
  115. // Ensure all three iterations contain the same addresses.
  116. if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) {
  117. logger.Infof("non-roundrobin, first iter: %v, second iter: %v, third iter: %v", iterations[0], iterations[1], iterations[2])
  118. continue
  119. }
  120. return nil
  121. }
  122. return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs)
  123. }
  124. // CheckWeightedRoundRobinRPCs verifies that EmptyCall RPCs on the given
  125. // ClientConn, connected to a server exposing the test.grpc_testing.TestService,
  126. // are weighted roundrobined (with randomness) across the given backend
  127. // addresses.
  128. //
  129. // Returns a non-nil error if context deadline expires before RPCs start to get
  130. // roundrobined across the given backends.
  131. func CheckWeightedRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
  132. if err := waitForTrafficToReachBackends(ctx, client, addrs); err != nil {
  133. return err
  134. }
  135. // At this point, RPCs are getting successfully executed at the backends
  136. // that we care about. To take the randomness of the WRR into account, we
  137. // look for approximate distribution instead of exact.
  138. wantAddrCount := make(map[string]int)
  139. for _, addr := range addrs {
  140. wantAddrCount[addr.Addr]++
  141. }
  142. wantRatio := make(map[string]float64)
  143. for addr, count := range wantAddrCount {
  144. wantRatio[addr] = float64(count) / float64(len(addrs))
  145. }
  146. // There is a small possibility that RPCs are reaching backends that we
  147. // don't expect them to reach here. The can happen because:
  148. // - at time T0, the list of backends [A, B, C, D].
  149. // - at time T1, the test updates the list of backends to [A, B, C], and
  150. // immediately starts attempting to check the distribution of RPCs to the
  151. // new backends.
  152. // - there is no way for the test to wait for a new picker to be pushed on
  153. // to the channel (which contains the updated list of backends) before
  154. // starting to attempt the RPC distribution checks.
  155. // - This is usually a transitory state and will eventually fix itself when
  156. // the new picker is pushed on the channel, and RPCs will start getting
  157. // routed to only backends that we care about.
  158. //
  159. // We work around this situation by using two loops. The inner loop contains
  160. // the meat of the calculations, and includes the logic which factors out
  161. // the randomness in weighted roundrobin. If we ever see an RPCs getting
  162. // routed to a backend that we dont expect it to get routed to, we break
  163. // from the inner loop thereby resetting all state and start afresh.
  164. for {
  165. results := make(map[string]float64)
  166. totalCount := float64(0)
  167. InnerLoop:
  168. for {
  169. if ctx.Err() != nil {
  170. return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs)
  171. }
  172. for i := 0; i < len(addrs); i++ {
  173. var peer peer.Peer
  174. if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
  175. return fmt.Errorf("EmptyCall() = %v, want <nil>", err)
  176. }
  177. if addr := peer.Addr.String(); wantAddrCount[addr] == 0 {
  178. break InnerLoop
  179. }
  180. results[peer.Addr.String()]++
  181. }
  182. totalCount += float64(len(addrs))
  183. gotRatio := make(map[string]float64)
  184. for addr, count := range results {
  185. gotRatio[addr] = count / totalCount
  186. }
  187. if equalApproximate(gotRatio, wantRatio) {
  188. return nil
  189. }
  190. logger.Infof("non-weighted-roundrobin, gotRatio: %v, wantRatio: %v", gotRatio, wantRatio)
  191. }
  192. <-time.After(time.Millisecond)
  193. }
  194. }
  195. func equalApproximate(got, want map[string]float64) bool {
  196. if len(got) != len(want) {
  197. return false
  198. }
  199. opt := cmp.Comparer(func(x, y float64) bool {
  200. delta := math.Abs(x - y)
  201. mean := math.Abs(x+y) / 2.0
  202. return delta/mean < 0.05
  203. })
  204. for addr := range want {
  205. if !cmp.Equal(got[addr], want[addr], opt) {
  206. return false
  207. }
  208. }
  209. return true
  210. }