|
- /*
- *
- * Copyright 2023 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package weightedroundrobin_test
- import (
- "context"
- "encoding/json"
- "fmt"
- "sync"
- "sync/atomic"
- "testing"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/internal"
- "google.golang.org/grpc/internal/grpctest"
- "google.golang.org/grpc/internal/stubserver"
- "google.golang.org/grpc/internal/testutils/roundrobin"
- "google.golang.org/grpc/orca"
- "google.golang.org/grpc/peer"
- "google.golang.org/grpc/resolver"
- wrr "google.golang.org/grpc/balancer/weightedroundrobin"
- iwrr "google.golang.org/grpc/balancer/weightedroundrobin/internal"
- testgrpc "google.golang.org/grpc/interop/grpc_testing"
- testpb "google.golang.org/grpc/interop/grpc_testing"
- )
- type s struct {
- grpctest.Tester
- }
- func Test(t *testing.T) {
- grpctest.RunSubTests(t, s{})
- }
- const defaultTestTimeout = 10 * time.Second
- const weightUpdatePeriod = 50 * time.Millisecond
- const weightExpirationPeriod = time.Minute
- const oobReportingInterval = 10 * time.Millisecond
- func init() {
- iwrr.AllowAnyWeightUpdatePeriod = true
- }
- func boolp(b bool) *bool { return &b }
- func float64p(f float64) *float64 { return &f }
- func stringp(s string) *string { return &s }
- var (
- perCallConfig = iwrr.LBConfig{
- EnableOOBLoadReport: boolp(false),
- OOBReportingPeriod: stringp("0.005s"),
- BlackoutPeriod: stringp("0s"),
- WeightExpirationPeriod: stringp("60s"),
- WeightUpdatePeriod: stringp(".050s"),
- ErrorUtilizationPenalty: float64p(0),
- }
- oobConfig = iwrr.LBConfig{
- EnableOOBLoadReport: boolp(true),
- OOBReportingPeriod: stringp("0.005s"),
- BlackoutPeriod: stringp("0s"),
- WeightExpirationPeriod: stringp("60s"),
- WeightUpdatePeriod: stringp(".050s"),
- ErrorUtilizationPenalty: float64p(0),
- }
- )
- type testServer struct {
- *stubserver.StubServer
- oobMetrics orca.ServerMetricsRecorder // Attached to the OOB stream.
- callMetrics orca.CallMetricsRecorder // Attached to per-call metrics.
- }
- type reportType int
- const (
- reportNone reportType = iota
- reportOOB
- reportCall
- reportBoth
- )
- func startServer(t *testing.T, r reportType) *testServer {
- t.Helper()
- smr := orca.NewServerMetricsRecorder()
- cmr := orca.NewServerMetricsRecorder().(orca.CallMetricsRecorder)
- ss := &stubserver.StubServer{
- EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
- if r := orca.CallMetricsRecorderFromContext(ctx); r != nil {
- // Copy metrics from what the test set in cmr into r.
- sm := cmr.(orca.ServerMetricsProvider).ServerMetrics()
- r.SetApplicationUtilization(sm.AppUtilization)
- r.SetQPS(sm.QPS)
- r.SetEPS(sm.EPS)
- }
- return &testpb.Empty{}, nil
- },
- }
- var sopts []grpc.ServerOption
- if r == reportCall || r == reportBoth {
- sopts = append(sopts, orca.CallMetricsServerOption(nil))
- }
- if r == reportOOB || r == reportBoth {
- oso := orca.ServiceOptions{
- ServerMetricsProvider: smr,
- MinReportingInterval: 10 * time.Millisecond,
- }
- internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&oso)
- sopts = append(sopts, stubserver.RegisterServiceServerOption(func(s *grpc.Server) {
- if err := orca.Register(s, oso); err != nil {
- t.Fatalf("Failed to register orca service: %v", err)
- }
- }))
- }
- if err := ss.StartServer(sopts...); err != nil {
- t.Fatalf("Error starting server: %v", err)
- }
- t.Cleanup(ss.Stop)
- return &testServer{
- StubServer: ss,
- oobMetrics: smr,
- callMetrics: cmr,
- }
- }
- func svcConfig(t *testing.T, wrrCfg iwrr.LBConfig) string {
- t.Helper()
- m, err := json.Marshal(wrrCfg)
- if err != nil {
- t.Fatalf("Error marshaling JSON %v: %v", wrrCfg, err)
- }
- sc := fmt.Sprintf(`{"loadBalancingConfig": [ {%q:%v} ] }`, wrr.Name, string(m))
- t.Logf("Marshaled service config: %v", sc)
- return sc
- }
- // Tests basic functionality with one address. With only one address, load
- // reporting doesn't affect routing at all.
- func (s) TestBalancer_OneAddress(t *testing.T) {
- testCases := []struct {
- rt reportType
- cfg iwrr.LBConfig
- }{
- {rt: reportNone, cfg: perCallConfig},
- {rt: reportCall, cfg: perCallConfig},
- {rt: reportOOB, cfg: oobConfig},
- }
- for _, tc := range testCases {
- t.Run(fmt.Sprintf("reportType:%v", tc.rt), func(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- srv := startServer(t, tc.rt)
- sc := svcConfig(t, tc.cfg)
- if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
- t.Fatalf("Error starting client: %v", err)
- }
- // Perform many RPCs to ensure the LB policy works with 1 address.
- for i := 0; i < 100; i++ {
- srv.callMetrics.SetQPS(float64(i))
- srv.oobMetrics.SetQPS(float64(i))
- if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
- t.Fatalf("Error from EmptyCall: %v", err)
- }
- time.Sleep(time.Millisecond) // Delay; test will run 100ms and should perform ~10 weight updates
- }
- })
- }
- }
- // Tests two addresses with ORCA reporting disabled (should fall back to pure
- // RR).
- func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- srv1 := startServer(t, reportNone)
- srv2 := startServer(t, reportNone)
- sc := svcConfig(t, perCallConfig)
- if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
- t.Fatalf("Error starting client: %v", err)
- }
- addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- // Perform many RPCs to ensure the LB policy works with 2 addresses.
- for i := 0; i < 20; i++ {
- roundrobin.CheckRoundRobinRPCs(ctx, srv1.Client, addrs)
- }
- }
- // Tests two addresses with per-call ORCA reporting enabled. Checks the
- // backends are called in the appropriate ratios.
- func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- srv1 := startServer(t, reportCall)
- srv2 := startServer(t, reportCall)
- // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
- // disproportionately to srv2 (10:1).
- srv1.callMetrics.SetQPS(10.0)
- srv1.callMetrics.SetApplicationUtilization(1.0)
- srv2.callMetrics.SetQPS(10.0)
- srv2.callMetrics.SetApplicationUtilization(.1)
- sc := svcConfig(t, perCallConfig)
- if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
- t.Fatalf("Error starting client: %v", err)
- }
- addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- // Call each backend once to ensure the weights have been received.
- ensureReached(ctx, t, srv1.Client, 2)
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
- }
- // Tests two addresses with OOB ORCA reporting enabled. Checks the backends
- // are called in the appropriate ratios.
- func (s) TestBalancer_TwoAddresses_ReportingEnabledOOB(t *testing.T) {
- testCases := []struct {
- name string
- utilSetter func(orca.ServerMetricsRecorder, float64)
- }{{
- name: "application_utilization",
- utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
- smr.SetApplicationUtilization(val)
- },
- }, {
- name: "cpu_utilization",
- utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
- smr.SetCPUUtilization(val)
- },
- }, {
- name: "application over cpu",
- utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
- smr.SetApplicationUtilization(val)
- smr.SetCPUUtilization(2.0) // ignored because ApplicationUtilization is set
- },
- }}
- for _, tc := range testCases {
- t.Run(tc.name, func(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- srv1 := startServer(t, reportOOB)
- srv2 := startServer(t, reportOOB)
- // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
- // disproportionately to srv2 (10:1).
- srv1.oobMetrics.SetQPS(10.0)
- tc.utilSetter(srv1.oobMetrics, 1.0)
- srv2.oobMetrics.SetQPS(10.0)
- tc.utilSetter(srv2.oobMetrics, 0.1)
- sc := svcConfig(t, oobConfig)
- if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
- t.Fatalf("Error starting client: %v", err)
- }
- addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- // Call each backend once to ensure the weights have been received.
- ensureReached(ctx, t, srv1.Client, 2)
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
- })
- }
- }
- // Tests two addresses with OOB ORCA reporting enabled, where the reports
- // change over time. Checks the backends are called in the appropriate ratios
- // before and after modifying the reports.
- func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- srv1 := startServer(t, reportOOB)
- srv2 := startServer(t, reportOOB)
- // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
- // disproportionately to srv2 (10:1).
- srv1.oobMetrics.SetQPS(10.0)
- srv1.oobMetrics.SetApplicationUtilization(1.0)
- srv2.oobMetrics.SetQPS(10.0)
- srv2.oobMetrics.SetApplicationUtilization(.1)
- sc := svcConfig(t, oobConfig)
- if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
- t.Fatalf("Error starting client: %v", err)
- }
- addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- // Call each backend once to ensure the weights have been received.
- ensureReached(ctx, t, srv1.Client, 2)
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
- // Update the loads so srv2 is loaded and srv1 is not; ensure RPCs are
- // routed disproportionately to srv1.
- srv1.oobMetrics.SetQPS(10.0)
- srv1.oobMetrics.SetApplicationUtilization(.1)
- srv2.oobMetrics.SetQPS(10.0)
- srv2.oobMetrics.SetApplicationUtilization(1.0)
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod + oobReportingInterval)
- checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 1})
- }
- // Tests two addresses with OOB ORCA reporting enabled, then with switching to
- // per-call reporting. Checks the backends are called in the appropriate
- // ratios before and after the change.
- func (s) TestBalancer_TwoAddresses_OOBThenPerCall(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- srv1 := startServer(t, reportBoth)
- srv2 := startServer(t, reportBoth)
- // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
- // disproportionately to srv2 (10:1).
- srv1.oobMetrics.SetQPS(10.0)
- srv1.oobMetrics.SetApplicationUtilization(1.0)
- srv2.oobMetrics.SetQPS(10.0)
- srv2.oobMetrics.SetApplicationUtilization(.1)
- // For per-call metrics (not used initially), srv2 reports that it is
- // loaded and srv1 reports low load. After confirming OOB works, switch to
- // per-call and confirm the new routing weights are applied.
- srv1.callMetrics.SetQPS(10.0)
- srv1.callMetrics.SetApplicationUtilization(.1)
- srv2.callMetrics.SetQPS(10.0)
- srv2.callMetrics.SetApplicationUtilization(1.0)
- sc := svcConfig(t, oobConfig)
- if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
- t.Fatalf("Error starting client: %v", err)
- }
- addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- // Call each backend once to ensure the weights have been received.
- ensureReached(ctx, t, srv1.Client, 2)
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
- // Update to per-call weights.
- c := svcConfig(t, perCallConfig)
- parsedCfg := srv1.R.CC.ParseServiceConfig(c)
- if parsedCfg.Err != nil {
- panic(fmt.Sprintf("Error parsing config %q: %v", c, parsedCfg.Err))
- }
- srv1.R.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parsedCfg})
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 1})
- }
- // Tests two addresses with OOB ORCA reporting enabled and a non-zero error
- // penalty applied.
- func (s) TestBalancer_TwoAddresses_ErrorPenalty(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- srv1 := startServer(t, reportOOB)
- srv2 := startServer(t, reportOOB)
- // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
- // disproportionately to srv2 (10:1). EPS values are set (but ignored
- // initially due to ErrorUtilizationPenalty=0). Later EUP will be updated
- // to 0.9 which will cause the weights to be equal and RPCs to be routed
- // 50/50.
- srv1.oobMetrics.SetQPS(10.0)
- srv1.oobMetrics.SetApplicationUtilization(1.0)
- srv1.oobMetrics.SetEPS(0)
- // srv1 weight before: 10.0 / 1.0 = 10.0
- // srv1 weight after: 10.0 / 1.0 = 10.0
- srv2.oobMetrics.SetQPS(10.0)
- srv2.oobMetrics.SetApplicationUtilization(.1)
- srv2.oobMetrics.SetEPS(10.0)
- // srv2 weight before: 10.0 / 0.1 = 100.0
- // srv2 weight after: 10.0 / 1.0 = 10.0
- sc := svcConfig(t, oobConfig)
- if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
- t.Fatalf("Error starting client: %v", err)
- }
- addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- // Call each backend once to ensure the weights have been received.
- ensureReached(ctx, t, srv1.Client, 2)
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
- // Update to include an error penalty in the weights.
- newCfg := oobConfig
- newCfg.ErrorUtilizationPenalty = float64p(0.9)
- c := svcConfig(t, newCfg)
- parsedCfg := srv1.R.CC.ParseServiceConfig(c)
- if parsedCfg.Err != nil {
- panic(fmt.Sprintf("Error parsing config %q: %v", c, parsedCfg.Err))
- }
- srv1.R.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parsedCfg})
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod + oobReportingInterval)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
- }
- // Tests that the blackout period causes backends to use 0 as their weight
- // (meaning to use the average weight) until the blackout period elapses.
- func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- var mu sync.Mutex
- start := time.Now()
- now := start
- setNow := func(t time.Time) {
- mu.Lock()
- defer mu.Unlock()
- now = t
- }
- setTimeNow(func() time.Time {
- mu.Lock()
- defer mu.Unlock()
- return now
- })
- t.Cleanup(func() { setTimeNow(time.Now) })
- testCases := []struct {
- blackoutPeriodCfg *string
- blackoutPeriod time.Duration
- }{{
- blackoutPeriodCfg: stringp("1s"),
- blackoutPeriod: time.Second,
- }, {
- blackoutPeriodCfg: nil,
- blackoutPeriod: 10 * time.Second, // the default
- }}
- for _, tc := range testCases {
- setNow(start)
- srv1 := startServer(t, reportOOB)
- srv2 := startServer(t, reportOOB)
- // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
- // disproportionately to srv2 (10:1).
- srv1.oobMetrics.SetQPS(10.0)
- srv1.oobMetrics.SetApplicationUtilization(1.0)
- srv2.oobMetrics.SetQPS(10.0)
- srv2.oobMetrics.SetApplicationUtilization(.1)
- cfg := oobConfig
- cfg.BlackoutPeriod = tc.blackoutPeriodCfg
- sc := svcConfig(t, cfg)
- if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
- t.Fatalf("Error starting client: %v", err)
- }
- addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- // Call each backend once to ensure the weights have been received.
- ensureReached(ctx, t, srv1.Client, 2)
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- // During the blackout period (1s) we should route roughly 50/50.
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
- // Advance time to right before the blackout period ends and the weights
- // should still be zero.
- setNow(start.Add(tc.blackoutPeriod - time.Nanosecond))
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
- // Advance time to right after the blackout period ends and the weights
- // should now activate.
- setNow(start.Add(tc.blackoutPeriod))
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
- }
- }
- // Tests that the weight expiration period causes backends to use 0 as their
- // weight (meaning to use the average weight) once the expiration period
- // elapses.
- func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- var mu sync.Mutex
- start := time.Now()
- now := start
- setNow := func(t time.Time) {
- mu.Lock()
- defer mu.Unlock()
- now = t
- }
- setTimeNow(func() time.Time {
- mu.Lock()
- defer mu.Unlock()
- return now
- })
- t.Cleanup(func() { setTimeNow(time.Now) })
- srv1 := startServer(t, reportBoth)
- srv2 := startServer(t, reportBoth)
- // srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
- // disproportionately to srv2 (10:1). Because the OOB reporting interval
- // is 1 minute but the weights expire in 1 second, routing will go to 50/50
- // after the weights expire.
- srv1.oobMetrics.SetQPS(10.0)
- srv1.oobMetrics.SetApplicationUtilization(1.0)
- srv2.oobMetrics.SetQPS(10.0)
- srv2.oobMetrics.SetApplicationUtilization(.1)
- cfg := oobConfig
- cfg.OOBReportingPeriod = stringp("60s")
- sc := svcConfig(t, cfg)
- if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
- t.Fatalf("Error starting client: %v", err)
- }
- addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- // Call each backend once to ensure the weights have been received.
- ensureReached(ctx, t, srv1.Client, 2)
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
- // Advance what time.Now returns to the weight expiration time minus 1s to
- // ensure all weights are still honored.
- setNow(start.Add(weightExpirationPeriod - time.Second))
- // Wait for the weight update period to allow the new weights to be processed.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
- // Advance what time.Now returns to the weight expiration time plus 1s to
- // ensure all weights expired and addresses are routed evenly.
- setNow(start.Add(weightExpirationPeriod + time.Second))
- // Wait for the weight expiration period so the weights have expired.
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
- }
- // Tests logic surrounding subchannel management.
- func (s) TestBalancer_AddressesChanging(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- srv1 := startServer(t, reportBoth)
- srv2 := startServer(t, reportBoth)
- srv3 := startServer(t, reportBoth)
- srv4 := startServer(t, reportBoth)
- // srv1: weight 10
- srv1.oobMetrics.SetQPS(10.0)
- srv1.oobMetrics.SetApplicationUtilization(1.0)
- // srv2: weight 100
- srv2.oobMetrics.SetQPS(10.0)
- srv2.oobMetrics.SetApplicationUtilization(.1)
- // srv3: weight 20
- srv3.oobMetrics.SetQPS(20.0)
- srv3.oobMetrics.SetApplicationUtilization(1.0)
- // srv4: weight 200
- srv4.oobMetrics.SetQPS(20.0)
- srv4.oobMetrics.SetApplicationUtilization(.1)
- sc := svcConfig(t, oobConfig)
- if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
- t.Fatalf("Error starting client: %v", err)
- }
- srv2.Client = srv1.Client
- addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}, {Addr: srv3.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- // Call each backend once to ensure the weights have been received.
- ensureReached(ctx, t, srv1.Client, 3)
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv3, 2})
- // Add backend 4
- addrs = append(addrs, resolver.Address{Addr: srv4.Address})
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv3, 2}, srvWeight{srv4, 20})
- // Shutdown backend 3. RPCs will no longer be routed to it.
- srv3.Stop()
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv4, 20})
- // Remove addresses 2 and 3. RPCs will no longer be routed to 2 either.
- addrs = []resolver.Address{{Addr: srv1.Address}, {Addr: srv4.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv4, 20})
- // Re-add 2 and remove the rest.
- addrs = []resolver.Address{{Addr: srv2.Address}}
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv2, 10})
- // Re-add 4.
- addrs = append(addrs, resolver.Address{Addr: srv4.Address})
- srv1.R.UpdateState(resolver.State{Addresses: addrs})
- time.Sleep(weightUpdatePeriod)
- checkWeights(ctx, t, srvWeight{srv2, 10}, srvWeight{srv4, 20})
- }
- func ensureReached(ctx context.Context, t *testing.T, c testgrpc.TestServiceClient, n int) {
- t.Helper()
- reached := make(map[string]struct{})
- for len(reached) != n {
- var peer peer.Peer
- if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
- t.Fatalf("Error from EmptyCall: %v", err)
- }
- reached[peer.Addr.String()] = struct{}{}
- }
- }
- type srvWeight struct {
- srv *testServer
- w int
- }
- const rrIterations = 100
- // checkWeights does rrIterations RPCs and expects the different backends to be
- // routed in a ratio as deterimined by the srvWeights passed in. Allows for
- // some variance (+/- 2 RPCs per backend).
- func checkWeights(ctx context.Context, t *testing.T, sws ...srvWeight) {
- t.Helper()
- c := sws[0].srv.Client
- // Replace the weights with approximate counts of RPCs wanted given the
- // iterations performed.
- weightSum := 0
- for _, sw := range sws {
- weightSum += sw.w
- }
- for i := range sws {
- sws[i].w = rrIterations * sws[i].w / weightSum
- }
- for attempts := 0; attempts < 10; attempts++ {
- serverCounts := make(map[string]int)
- for i := 0; i < rrIterations; i++ {
- var peer peer.Peer
- if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
- t.Fatalf("Error from EmptyCall: %v; timed out waiting for weighted RR behavior?", err)
- }
- serverCounts[peer.Addr.String()]++
- }
- if len(serverCounts) != len(sws) {
- continue
- }
- success := true
- for _, sw := range sws {
- c := serverCounts[sw.srv.Address]
- if c < sw.w-2 || c > sw.w+2 {
- success = false
- break
- }
- }
- if success {
- t.Logf("Passed iteration %v; counts: %v", attempts, serverCounts)
- return
- }
- t.Logf("Failed iteration %v; counts: %v; want %+v", attempts, serverCounts, sws)
- time.Sleep(5 * time.Millisecond)
- }
- t.Fatalf("Failed to route RPCs with proper ratio")
- }
- func init() {
- setTimeNow(time.Now)
- iwrr.TimeNow = timeNow
- }
- var timeNowFunc atomic.Value // func() time.Time
- func timeNow() time.Time {
- return timeNowFunc.Load().(func() time.Time)()
- }
- func setTimeNow(f func() time.Time) {
- timeNowFunc.Store(f)
- }
|