balancer_test.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756
  1. /*
  2. *
  3. * Copyright 2023 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package weightedroundrobin_test
  19. import (
  20. "context"
  21. "encoding/json"
  22. "fmt"
  23. "sync"
  24. "sync/atomic"
  25. "testing"
  26. "time"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/internal"
  29. "google.golang.org/grpc/internal/grpctest"
  30. "google.golang.org/grpc/internal/stubserver"
  31. "google.golang.org/grpc/internal/testutils/roundrobin"
  32. "google.golang.org/grpc/orca"
  33. "google.golang.org/grpc/peer"
  34. "google.golang.org/grpc/resolver"
  35. wrr "google.golang.org/grpc/balancer/weightedroundrobin"
  36. iwrr "google.golang.org/grpc/balancer/weightedroundrobin/internal"
  37. testgrpc "google.golang.org/grpc/interop/grpc_testing"
  38. testpb "google.golang.org/grpc/interop/grpc_testing"
  39. )
  40. type s struct {
  41. grpctest.Tester
  42. }
  43. func Test(t *testing.T) {
  44. grpctest.RunSubTests(t, s{})
  45. }
  46. const defaultTestTimeout = 10 * time.Second
  47. const weightUpdatePeriod = 50 * time.Millisecond
  48. const weightExpirationPeriod = time.Minute
  49. const oobReportingInterval = 10 * time.Millisecond
  50. func init() {
  51. iwrr.AllowAnyWeightUpdatePeriod = true
  52. }
  53. func boolp(b bool) *bool { return &b }
  54. func float64p(f float64) *float64 { return &f }
  55. func stringp(s string) *string { return &s }
  56. var (
  57. perCallConfig = iwrr.LBConfig{
  58. EnableOOBLoadReport: boolp(false),
  59. OOBReportingPeriod: stringp("0.005s"),
  60. BlackoutPeriod: stringp("0s"),
  61. WeightExpirationPeriod: stringp("60s"),
  62. WeightUpdatePeriod: stringp(".050s"),
  63. ErrorUtilizationPenalty: float64p(0),
  64. }
  65. oobConfig = iwrr.LBConfig{
  66. EnableOOBLoadReport: boolp(true),
  67. OOBReportingPeriod: stringp("0.005s"),
  68. BlackoutPeriod: stringp("0s"),
  69. WeightExpirationPeriod: stringp("60s"),
  70. WeightUpdatePeriod: stringp(".050s"),
  71. ErrorUtilizationPenalty: float64p(0),
  72. }
  73. )
  74. type testServer struct {
  75. *stubserver.StubServer
  76. oobMetrics orca.ServerMetricsRecorder // Attached to the OOB stream.
  77. callMetrics orca.CallMetricsRecorder // Attached to per-call metrics.
  78. }
  79. type reportType int
  80. const (
  81. reportNone reportType = iota
  82. reportOOB
  83. reportCall
  84. reportBoth
  85. )
  86. func startServer(t *testing.T, r reportType) *testServer {
  87. t.Helper()
  88. smr := orca.NewServerMetricsRecorder()
  89. cmr := orca.NewServerMetricsRecorder().(orca.CallMetricsRecorder)
  90. ss := &stubserver.StubServer{
  91. EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  92. if r := orca.CallMetricsRecorderFromContext(ctx); r != nil {
  93. // Copy metrics from what the test set in cmr into r.
  94. sm := cmr.(orca.ServerMetricsProvider).ServerMetrics()
  95. r.SetApplicationUtilization(sm.AppUtilization)
  96. r.SetQPS(sm.QPS)
  97. r.SetEPS(sm.EPS)
  98. }
  99. return &testpb.Empty{}, nil
  100. },
  101. }
  102. var sopts []grpc.ServerOption
  103. if r == reportCall || r == reportBoth {
  104. sopts = append(sopts, orca.CallMetricsServerOption(nil))
  105. }
  106. if r == reportOOB || r == reportBoth {
  107. oso := orca.ServiceOptions{
  108. ServerMetricsProvider: smr,
  109. MinReportingInterval: 10 * time.Millisecond,
  110. }
  111. internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&oso)
  112. sopts = append(sopts, stubserver.RegisterServiceServerOption(func(s *grpc.Server) {
  113. if err := orca.Register(s, oso); err != nil {
  114. t.Fatalf("Failed to register orca service: %v", err)
  115. }
  116. }))
  117. }
  118. if err := ss.StartServer(sopts...); err != nil {
  119. t.Fatalf("Error starting server: %v", err)
  120. }
  121. t.Cleanup(ss.Stop)
  122. return &testServer{
  123. StubServer: ss,
  124. oobMetrics: smr,
  125. callMetrics: cmr,
  126. }
  127. }
  128. func svcConfig(t *testing.T, wrrCfg iwrr.LBConfig) string {
  129. t.Helper()
  130. m, err := json.Marshal(wrrCfg)
  131. if err != nil {
  132. t.Fatalf("Error marshaling JSON %v: %v", wrrCfg, err)
  133. }
  134. sc := fmt.Sprintf(`{"loadBalancingConfig": [ {%q:%v} ] }`, wrr.Name, string(m))
  135. t.Logf("Marshaled service config: %v", sc)
  136. return sc
  137. }
  138. // Tests basic functionality with one address. With only one address, load
  139. // reporting doesn't affect routing at all.
  140. func (s) TestBalancer_OneAddress(t *testing.T) {
  141. testCases := []struct {
  142. rt reportType
  143. cfg iwrr.LBConfig
  144. }{
  145. {rt: reportNone, cfg: perCallConfig},
  146. {rt: reportCall, cfg: perCallConfig},
  147. {rt: reportOOB, cfg: oobConfig},
  148. }
  149. for _, tc := range testCases {
  150. t.Run(fmt.Sprintf("reportType:%v", tc.rt), func(t *testing.T) {
  151. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  152. defer cancel()
  153. srv := startServer(t, tc.rt)
  154. sc := svcConfig(t, tc.cfg)
  155. if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
  156. t.Fatalf("Error starting client: %v", err)
  157. }
  158. // Perform many RPCs to ensure the LB policy works with 1 address.
  159. for i := 0; i < 100; i++ {
  160. srv.callMetrics.SetQPS(float64(i))
  161. srv.oobMetrics.SetQPS(float64(i))
  162. if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  163. t.Fatalf("Error from EmptyCall: %v", err)
  164. }
  165. time.Sleep(time.Millisecond) // Delay; test will run 100ms and should perform ~10 weight updates
  166. }
  167. })
  168. }
  169. }
  170. // Tests two addresses with ORCA reporting disabled (should fall back to pure
  171. // RR).
  172. func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) {
  173. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  174. defer cancel()
  175. srv1 := startServer(t, reportNone)
  176. srv2 := startServer(t, reportNone)
  177. sc := svcConfig(t, perCallConfig)
  178. if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
  179. t.Fatalf("Error starting client: %v", err)
  180. }
  181. addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
  182. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  183. // Perform many RPCs to ensure the LB policy works with 2 addresses.
  184. for i := 0; i < 20; i++ {
  185. roundrobin.CheckRoundRobinRPCs(ctx, srv1.Client, addrs)
  186. }
  187. }
  188. // Tests two addresses with per-call ORCA reporting enabled. Checks the
  189. // backends are called in the appropriate ratios.
  190. func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) {
  191. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  192. defer cancel()
  193. srv1 := startServer(t, reportCall)
  194. srv2 := startServer(t, reportCall)
  195. // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
  196. // disproportionately to srv2 (10:1).
  197. srv1.callMetrics.SetQPS(10.0)
  198. srv1.callMetrics.SetApplicationUtilization(1.0)
  199. srv2.callMetrics.SetQPS(10.0)
  200. srv2.callMetrics.SetApplicationUtilization(.1)
  201. sc := svcConfig(t, perCallConfig)
  202. if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
  203. t.Fatalf("Error starting client: %v", err)
  204. }
  205. addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
  206. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  207. // Call each backend once to ensure the weights have been received.
  208. ensureReached(ctx, t, srv1.Client, 2)
  209. // Wait for the weight update period to allow the new weights to be processed.
  210. time.Sleep(weightUpdatePeriod)
  211. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
  212. }
  213. // Tests two addresses with OOB ORCA reporting enabled. Checks the backends
  214. // are called in the appropriate ratios.
  215. func (s) TestBalancer_TwoAddresses_ReportingEnabledOOB(t *testing.T) {
  216. testCases := []struct {
  217. name string
  218. utilSetter func(orca.ServerMetricsRecorder, float64)
  219. }{{
  220. name: "application_utilization",
  221. utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
  222. smr.SetApplicationUtilization(val)
  223. },
  224. }, {
  225. name: "cpu_utilization",
  226. utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
  227. smr.SetCPUUtilization(val)
  228. },
  229. }, {
  230. name: "application over cpu",
  231. utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
  232. smr.SetApplicationUtilization(val)
  233. smr.SetCPUUtilization(2.0) // ignored because ApplicationUtilization is set
  234. },
  235. }}
  236. for _, tc := range testCases {
  237. t.Run(tc.name, func(t *testing.T) {
  238. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  239. defer cancel()
  240. srv1 := startServer(t, reportOOB)
  241. srv2 := startServer(t, reportOOB)
  242. // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
  243. // disproportionately to srv2 (10:1).
  244. srv1.oobMetrics.SetQPS(10.0)
  245. tc.utilSetter(srv1.oobMetrics, 1.0)
  246. srv2.oobMetrics.SetQPS(10.0)
  247. tc.utilSetter(srv2.oobMetrics, 0.1)
  248. sc := svcConfig(t, oobConfig)
  249. if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
  250. t.Fatalf("Error starting client: %v", err)
  251. }
  252. addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
  253. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  254. // Call each backend once to ensure the weights have been received.
  255. ensureReached(ctx, t, srv1.Client, 2)
  256. // Wait for the weight update period to allow the new weights to be processed.
  257. time.Sleep(weightUpdatePeriod)
  258. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
  259. })
  260. }
  261. }
  262. // Tests two addresses with OOB ORCA reporting enabled, where the reports
  263. // change over time. Checks the backends are called in the appropriate ratios
  264. // before and after modifying the reports.
  265. func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) {
  266. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  267. defer cancel()
  268. srv1 := startServer(t, reportOOB)
  269. srv2 := startServer(t, reportOOB)
  270. // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
  271. // disproportionately to srv2 (10:1).
  272. srv1.oobMetrics.SetQPS(10.0)
  273. srv1.oobMetrics.SetApplicationUtilization(1.0)
  274. srv2.oobMetrics.SetQPS(10.0)
  275. srv2.oobMetrics.SetApplicationUtilization(.1)
  276. sc := svcConfig(t, oobConfig)
  277. if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
  278. t.Fatalf("Error starting client: %v", err)
  279. }
  280. addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
  281. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  282. // Call each backend once to ensure the weights have been received.
  283. ensureReached(ctx, t, srv1.Client, 2)
  284. // Wait for the weight update period to allow the new weights to be processed.
  285. time.Sleep(weightUpdatePeriod)
  286. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
  287. // Update the loads so srv2 is loaded and srv1 is not; ensure RPCs are
  288. // routed disproportionately to srv1.
  289. srv1.oobMetrics.SetQPS(10.0)
  290. srv1.oobMetrics.SetApplicationUtilization(.1)
  291. srv2.oobMetrics.SetQPS(10.0)
  292. srv2.oobMetrics.SetApplicationUtilization(1.0)
  293. // Wait for the weight update period to allow the new weights to be processed.
  294. time.Sleep(weightUpdatePeriod + oobReportingInterval)
  295. checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 1})
  296. }
  297. // Tests two addresses with OOB ORCA reporting enabled, then with switching to
  298. // per-call reporting. Checks the backends are called in the appropriate
  299. // ratios before and after the change.
  300. func (s) TestBalancer_TwoAddresses_OOBThenPerCall(t *testing.T) {
  301. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  302. defer cancel()
  303. srv1 := startServer(t, reportBoth)
  304. srv2 := startServer(t, reportBoth)
  305. // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
  306. // disproportionately to srv2 (10:1).
  307. srv1.oobMetrics.SetQPS(10.0)
  308. srv1.oobMetrics.SetApplicationUtilization(1.0)
  309. srv2.oobMetrics.SetQPS(10.0)
  310. srv2.oobMetrics.SetApplicationUtilization(.1)
  311. // For per-call metrics (not used initially), srv2 reports that it is
  312. // loaded and srv1 reports low load. After confirming OOB works, switch to
  313. // per-call and confirm the new routing weights are applied.
  314. srv1.callMetrics.SetQPS(10.0)
  315. srv1.callMetrics.SetApplicationUtilization(.1)
  316. srv2.callMetrics.SetQPS(10.0)
  317. srv2.callMetrics.SetApplicationUtilization(1.0)
  318. sc := svcConfig(t, oobConfig)
  319. if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
  320. t.Fatalf("Error starting client: %v", err)
  321. }
  322. addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
  323. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  324. // Call each backend once to ensure the weights have been received.
  325. ensureReached(ctx, t, srv1.Client, 2)
  326. // Wait for the weight update period to allow the new weights to be processed.
  327. time.Sleep(weightUpdatePeriod)
  328. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
  329. // Update to per-call weights.
  330. c := svcConfig(t, perCallConfig)
  331. parsedCfg := srv1.R.CC.ParseServiceConfig(c)
  332. if parsedCfg.Err != nil {
  333. panic(fmt.Sprintf("Error parsing config %q: %v", c, parsedCfg.Err))
  334. }
  335. srv1.R.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parsedCfg})
  336. // Wait for the weight update period to allow the new weights to be processed.
  337. time.Sleep(weightUpdatePeriod)
  338. checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 1})
  339. }
  340. // Tests two addresses with OOB ORCA reporting enabled and a non-zero error
  341. // penalty applied.
  342. func (s) TestBalancer_TwoAddresses_ErrorPenalty(t *testing.T) {
  343. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  344. defer cancel()
  345. srv1 := startServer(t, reportOOB)
  346. srv2 := startServer(t, reportOOB)
  347. // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
  348. // disproportionately to srv2 (10:1). EPS values are set (but ignored
  349. // initially due to ErrorUtilizationPenalty=0). Later EUP will be updated
  350. // to 0.9 which will cause the weights to be equal and RPCs to be routed
  351. // 50/50.
  352. srv1.oobMetrics.SetQPS(10.0)
  353. srv1.oobMetrics.SetApplicationUtilization(1.0)
  354. srv1.oobMetrics.SetEPS(0)
  355. // srv1 weight before: 10.0 / 1.0 = 10.0
  356. // srv1 weight after: 10.0 / 1.0 = 10.0
  357. srv2.oobMetrics.SetQPS(10.0)
  358. srv2.oobMetrics.SetApplicationUtilization(.1)
  359. srv2.oobMetrics.SetEPS(10.0)
  360. // srv2 weight before: 10.0 / 0.1 = 100.0
  361. // srv2 weight after: 10.0 / 1.0 = 10.0
  362. sc := svcConfig(t, oobConfig)
  363. if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
  364. t.Fatalf("Error starting client: %v", err)
  365. }
  366. addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
  367. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  368. // Call each backend once to ensure the weights have been received.
  369. ensureReached(ctx, t, srv1.Client, 2)
  370. // Wait for the weight update period to allow the new weights to be processed.
  371. time.Sleep(weightUpdatePeriod)
  372. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
  373. // Update to include an error penalty in the weights.
  374. newCfg := oobConfig
  375. newCfg.ErrorUtilizationPenalty = float64p(0.9)
  376. c := svcConfig(t, newCfg)
  377. parsedCfg := srv1.R.CC.ParseServiceConfig(c)
  378. if parsedCfg.Err != nil {
  379. panic(fmt.Sprintf("Error parsing config %q: %v", c, parsedCfg.Err))
  380. }
  381. srv1.R.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parsedCfg})
  382. // Wait for the weight update period to allow the new weights to be processed.
  383. time.Sleep(weightUpdatePeriod + oobReportingInterval)
  384. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
  385. }
  386. // Tests that the blackout period causes backends to use 0 as their weight
  387. // (meaning to use the average weight) until the blackout period elapses.
  388. func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) {
  389. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  390. defer cancel()
  391. var mu sync.Mutex
  392. start := time.Now()
  393. now := start
  394. setNow := func(t time.Time) {
  395. mu.Lock()
  396. defer mu.Unlock()
  397. now = t
  398. }
  399. setTimeNow(func() time.Time {
  400. mu.Lock()
  401. defer mu.Unlock()
  402. return now
  403. })
  404. t.Cleanup(func() { setTimeNow(time.Now) })
  405. testCases := []struct {
  406. blackoutPeriodCfg *string
  407. blackoutPeriod time.Duration
  408. }{{
  409. blackoutPeriodCfg: stringp("1s"),
  410. blackoutPeriod: time.Second,
  411. }, {
  412. blackoutPeriodCfg: nil,
  413. blackoutPeriod: 10 * time.Second, // the default
  414. }}
  415. for _, tc := range testCases {
  416. setNow(start)
  417. srv1 := startServer(t, reportOOB)
  418. srv2 := startServer(t, reportOOB)
  419. // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
  420. // disproportionately to srv2 (10:1).
  421. srv1.oobMetrics.SetQPS(10.0)
  422. srv1.oobMetrics.SetApplicationUtilization(1.0)
  423. srv2.oobMetrics.SetQPS(10.0)
  424. srv2.oobMetrics.SetApplicationUtilization(.1)
  425. cfg := oobConfig
  426. cfg.BlackoutPeriod = tc.blackoutPeriodCfg
  427. sc := svcConfig(t, cfg)
  428. if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
  429. t.Fatalf("Error starting client: %v", err)
  430. }
  431. addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
  432. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  433. // Call each backend once to ensure the weights have been received.
  434. ensureReached(ctx, t, srv1.Client, 2)
  435. // Wait for the weight update period to allow the new weights to be processed.
  436. time.Sleep(weightUpdatePeriod)
  437. // During the blackout period (1s) we should route roughly 50/50.
  438. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
  439. // Advance time to right before the blackout period ends and the weights
  440. // should still be zero.
  441. setNow(start.Add(tc.blackoutPeriod - time.Nanosecond))
  442. // Wait for the weight update period to allow the new weights to be processed.
  443. time.Sleep(weightUpdatePeriod)
  444. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
  445. // Advance time to right after the blackout period ends and the weights
  446. // should now activate.
  447. setNow(start.Add(tc.blackoutPeriod))
  448. // Wait for the weight update period to allow the new weights to be processed.
  449. time.Sleep(weightUpdatePeriod)
  450. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
  451. }
  452. }
  453. // Tests that the weight expiration period causes backends to use 0 as their
  454. // weight (meaning to use the average weight) once the expiration period
  455. // elapses.
  456. func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) {
  457. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  458. defer cancel()
  459. var mu sync.Mutex
  460. start := time.Now()
  461. now := start
  462. setNow := func(t time.Time) {
  463. mu.Lock()
  464. defer mu.Unlock()
  465. now = t
  466. }
  467. setTimeNow(func() time.Time {
  468. mu.Lock()
  469. defer mu.Unlock()
  470. return now
  471. })
  472. t.Cleanup(func() { setTimeNow(time.Now) })
  473. srv1 := startServer(t, reportBoth)
  474. srv2 := startServer(t, reportBoth)
  475. // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
  476. // disproportionately to srv2 (10:1). Because the OOB reporting interval
  477. // is 1 minute but the weights expire in 1 second, routing will go to 50/50
  478. // after the weights expire.
  479. srv1.oobMetrics.SetQPS(10.0)
  480. srv1.oobMetrics.SetApplicationUtilization(1.0)
  481. srv2.oobMetrics.SetQPS(10.0)
  482. srv2.oobMetrics.SetApplicationUtilization(.1)
  483. cfg := oobConfig
  484. cfg.OOBReportingPeriod = stringp("60s")
  485. sc := svcConfig(t, cfg)
  486. if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
  487. t.Fatalf("Error starting client: %v", err)
  488. }
  489. addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
  490. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  491. // Call each backend once to ensure the weights have been received.
  492. ensureReached(ctx, t, srv1.Client, 2)
  493. // Wait for the weight update period to allow the new weights to be processed.
  494. time.Sleep(weightUpdatePeriod)
  495. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
  496. // Advance what time.Now returns to the weight expiration time minus 1s to
  497. // ensure all weights are still honored.
  498. setNow(start.Add(weightExpirationPeriod - time.Second))
  499. // Wait for the weight update period to allow the new weights to be processed.
  500. time.Sleep(weightUpdatePeriod)
  501. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
  502. // Advance what time.Now returns to the weight expiration time plus 1s to
  503. // ensure all weights expired and addresses are routed evenly.
  504. setNow(start.Add(weightExpirationPeriod + time.Second))
  505. // Wait for the weight expiration period so the weights have expired.
  506. time.Sleep(weightUpdatePeriod)
  507. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
  508. }
  509. // Tests logic surrounding subchannel management.
  510. func (s) TestBalancer_AddressesChanging(t *testing.T) {
  511. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  512. defer cancel()
  513. srv1 := startServer(t, reportBoth)
  514. srv2 := startServer(t, reportBoth)
  515. srv3 := startServer(t, reportBoth)
  516. srv4 := startServer(t, reportBoth)
  517. // srv1: weight 10
  518. srv1.oobMetrics.SetQPS(10.0)
  519. srv1.oobMetrics.SetApplicationUtilization(1.0)
  520. // srv2: weight 100
  521. srv2.oobMetrics.SetQPS(10.0)
  522. srv2.oobMetrics.SetApplicationUtilization(.1)
  523. // srv3: weight 20
  524. srv3.oobMetrics.SetQPS(20.0)
  525. srv3.oobMetrics.SetApplicationUtilization(1.0)
  526. // srv4: weight 200
  527. srv4.oobMetrics.SetQPS(20.0)
  528. srv4.oobMetrics.SetApplicationUtilization(.1)
  529. sc := svcConfig(t, oobConfig)
  530. if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
  531. t.Fatalf("Error starting client: %v", err)
  532. }
  533. srv2.Client = srv1.Client
  534. addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}, {Addr: srv3.Address}}
  535. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  536. // Call each backend once to ensure the weights have been received.
  537. ensureReached(ctx, t, srv1.Client, 3)
  538. time.Sleep(weightUpdatePeriod)
  539. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv3, 2})
  540. // Add backend 4
  541. addrs = append(addrs, resolver.Address{Addr: srv4.Address})
  542. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  543. time.Sleep(weightUpdatePeriod)
  544. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv3, 2}, srvWeight{srv4, 20})
  545. // Shutdown backend 3. RPCs will no longer be routed to it.
  546. srv3.Stop()
  547. time.Sleep(weightUpdatePeriod)
  548. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv4, 20})
  549. // Remove addresses 2 and 3. RPCs will no longer be routed to 2 either.
  550. addrs = []resolver.Address{{Addr: srv1.Address}, {Addr: srv4.Address}}
  551. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  552. time.Sleep(weightUpdatePeriod)
  553. checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv4, 20})
  554. // Re-add 2 and remove the rest.
  555. addrs = []resolver.Address{{Addr: srv2.Address}}
  556. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  557. time.Sleep(weightUpdatePeriod)
  558. checkWeights(ctx, t, srvWeight{srv2, 10})
  559. // Re-add 4.
  560. addrs = append(addrs, resolver.Address{Addr: srv4.Address})
  561. srv1.R.UpdateState(resolver.State{Addresses: addrs})
  562. time.Sleep(weightUpdatePeriod)
  563. checkWeights(ctx, t, srvWeight{srv2, 10}, srvWeight{srv4, 20})
  564. }
  565. func ensureReached(ctx context.Context, t *testing.T, c testgrpc.TestServiceClient, n int) {
  566. t.Helper()
  567. reached := make(map[string]struct{})
  568. for len(reached) != n {
  569. var peer peer.Peer
  570. if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
  571. t.Fatalf("Error from EmptyCall: %v", err)
  572. }
  573. reached[peer.Addr.String()] = struct{}{}
  574. }
  575. }
  576. type srvWeight struct {
  577. srv *testServer
  578. w int
  579. }
  580. const rrIterations = 100
  581. // checkWeights does rrIterations RPCs and expects the different backends to be
  582. // routed in a ratio as deterimined by the srvWeights passed in. Allows for
  583. // some variance (+/- 2 RPCs per backend).
  584. func checkWeights(ctx context.Context, t *testing.T, sws ...srvWeight) {
  585. t.Helper()
  586. c := sws[0].srv.Client
  587. // Replace the weights with approximate counts of RPCs wanted given the
  588. // iterations performed.
  589. weightSum := 0
  590. for _, sw := range sws {
  591. weightSum += sw.w
  592. }
  593. for i := range sws {
  594. sws[i].w = rrIterations * sws[i].w / weightSum
  595. }
  596. for attempts := 0; attempts < 10; attempts++ {
  597. serverCounts := make(map[string]int)
  598. for i := 0; i < rrIterations; i++ {
  599. var peer peer.Peer
  600. if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
  601. t.Fatalf("Error from EmptyCall: %v; timed out waiting for weighted RR behavior?", err)
  602. }
  603. serverCounts[peer.Addr.String()]++
  604. }
  605. if len(serverCounts) != len(sws) {
  606. continue
  607. }
  608. success := true
  609. for _, sw := range sws {
  610. c := serverCounts[sw.srv.Address]
  611. if c < sw.w-2 || c > sw.w+2 {
  612. success = false
  613. break
  614. }
  615. }
  616. if success {
  617. t.Logf("Passed iteration %v; counts: %v", attempts, serverCounts)
  618. return
  619. }
  620. t.Logf("Failed iteration %v; counts: %v; want %+v", attempts, serverCounts, sws)
  621. time.Sleep(5 * time.Millisecond)
  622. }
  623. t.Fatalf("Failed to route RPCs with proper ratio")
  624. }
  625. func init() {
  626. setTimeNow(time.Now)
  627. iwrr.TimeNow = timeNow
  628. }
  629. var timeNowFunc atomic.Value // func() time.Time
  630. func timeNow() time.Time {
  631. return timeNowFunc.Load().(func() time.Time)()
  632. }
  633. func setTimeNow(f func() time.Time) {
  634. timeNowFunc.Store(f)
  635. }