123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679 |
- /*
- * Copyright 2019 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package balancergroup
- import (
- "context"
- "fmt"
- "testing"
- "time"
- "github.com/google/go-cmp/cmp"
- "google.golang.org/grpc"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/balancer/roundrobin"
- "google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/internal/balancer/stub"
- "google.golang.org/grpc/internal/channelz"
- "google.golang.org/grpc/internal/grpctest"
- "google.golang.org/grpc/internal/testutils"
- "google.golang.org/grpc/resolver"
- )
- const (
- defaultTestTimeout = 5 * time.Second
- defaultTestShortTimeout = 10 * time.Millisecond
- )
- var (
- rrBuilder = balancer.Get(roundrobin.Name)
- testBalancerIDs = []string{"b1", "b2", "b3"}
- testBackendAddrs []resolver.Address
- )
- const testBackendAddrsCount = 12
- func init() {
- for i := 0; i < testBackendAddrsCount; i++ {
- testBackendAddrs = append(testBackendAddrs, resolver.Address{Addr: fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)})
- }
- // Disable caching for all tests. It will be re-enabled in caching specific
- // tests.
- DefaultSubBalancerCloseTimeout = time.Millisecond
- }
- type s struct {
- grpctest.Tester
- }
- func Test(t *testing.T) {
- grpctest.RunSubTests(t, s{})
- }
- // Create a new balancer group, add balancer and backends, but not start.
- // - b1, weight 2, backends [0,1]
- // - b2, weight 1, backends [2,3]
- // Start the balancer group and check behavior.
- //
- // Close the balancer group, call add/remove/change weight/change address.
- // - b2, weight 3, backends [0,3]
- // - b3, weight 1, backends [1,2]
- // Start the balancer group again and check for behavior.
- func (s) TestBalancerGroup_start_close(t *testing.T) {
- cc := testutils.NewTestClientConn(t)
- gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
- gator.Start()
- bg := New(cc, balancer.BuildOptions{}, gator, nil)
- // Add two balancers to group and send two resolved addresses to both
- // balancers.
- gator.Add(testBalancerIDs[0], 2)
- bg.Add(testBalancerIDs[0], rrBuilder)
- bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
- gator.Add(testBalancerIDs[1], 1)
- bg.Add(testBalancerIDs[1], rrBuilder)
- bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
- bg.Start()
- m1 := make(map[resolver.Address]balancer.SubConn)
- for i := 0; i < 4; i++ {
- addrs := <-cc.NewSubConnAddrsCh
- sc := <-cc.NewSubConnCh
- m1[addrs[0]] = sc
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
- }
- // Test roundrobin on the last picker.
- p1 := <-cc.NewPickerCh
- want := []balancer.SubConn{
- m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
- m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
- m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
- }
- if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil {
- t.Fatalf("want %v, got %v", want, err)
- }
- gator.Stop()
- bg.Close()
- for i := 0; i < 4; i++ {
- bg.UpdateSubConnState(<-cc.RemoveSubConnCh, balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
- }
- // Add b3, weight 1, backends [1,2].
- gator.Add(testBalancerIDs[2], 1)
- bg.Add(testBalancerIDs[2], rrBuilder)
- bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:3]}})
- // Remove b1.
- gator.Remove(testBalancerIDs[0])
- bg.Remove(testBalancerIDs[0])
- // Update b2 to weight 3, backends [0,3].
- gator.UpdateWeight(testBalancerIDs[1], 3)
- bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3])}})
- gator.Start()
- bg.Start()
- m2 := make(map[resolver.Address]balancer.SubConn)
- for i := 0; i < 4; i++ {
- addrs := <-cc.NewSubConnAddrsCh
- sc := <-cc.NewSubConnCh
- m2[addrs[0]] = sc
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
- }
- // Test roundrobin on the last picker.
- p2 := <-cc.NewPickerCh
- want = []balancer.SubConn{
- m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], m2[testBackendAddrs[0]],
- m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[3]],
- m2[testBackendAddrs[1]], m2[testBackendAddrs[2]],
- }
- if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p2)); err != nil {
- t.Fatalf("want %v, got %v", want, err)
- }
- }
- // Test that balancer group start() doesn't deadlock if the balancer calls back
- // into balancer group inline when it gets an update.
- //
- // The potential deadlock can happen if we
- // - hold a lock and send updates to balancer (e.g. update resolved addresses)
- // - the balancer calls back (NewSubConn or update picker) in line
- //
- // The callback will try to hold hte same lock again, which will cause a
- // deadlock.
- //
- // This test starts the balancer group with a test balancer, will updates picker
- // whenever it gets an address update. It's expected that start() doesn't block
- // because of deadlock.
- func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
- const balancerName = "stub-TestBalancerGroup_start_close_deadlock"
- stub.Register(balancerName, stub.BalancerFuncs{})
- builder := balancer.Get(balancerName)
- cc := testutils.NewTestClientConn(t)
- gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
- gator.Start()
- bg := New(cc, balancer.BuildOptions{}, gator, nil)
- gator.Add(testBalancerIDs[0], 2)
- bg.Add(testBalancerIDs[0], builder)
- bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
- gator.Add(testBalancerIDs[1], 1)
- bg.Add(testBalancerIDs[1], builder)
- bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
- bg.Start()
- }
- func replaceDefaultSubBalancerCloseTimeout(n time.Duration) func() {
- old := DefaultSubBalancerCloseTimeout
- DefaultSubBalancerCloseTimeout = n
- return func() { DefaultSubBalancerCloseTimeout = old }
- }
- // initBalancerGroupForCachingTest creates a balancer group, and initialize it
- // to be ready for caching tests.
- //
- // Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer
- // is removed later, so the balancer group returned has one sub-balancer in its
- // own map, and one sub-balancer in cache.
- func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.TestClientConn, map[resolver.Address]balancer.SubConn) {
- cc := testutils.NewTestClientConn(t)
- gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
- gator.Start()
- bg := New(cc, balancer.BuildOptions{}, gator, nil)
- // Add two balancers to group and send two resolved addresses to both
- // balancers.
- gator.Add(testBalancerIDs[0], 2)
- bg.Add(testBalancerIDs[0], rrBuilder)
- bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
- gator.Add(testBalancerIDs[1], 1)
- bg.Add(testBalancerIDs[1], rrBuilder)
- bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
- bg.Start()
- m1 := make(map[resolver.Address]balancer.SubConn)
- for i := 0; i < 4; i++ {
- addrs := <-cc.NewSubConnAddrsCh
- sc := <-cc.NewSubConnCh
- m1[addrs[0]] = sc
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
- }
- // Test roundrobin on the last picker.
- p1 := <-cc.NewPickerCh
- want := []balancer.SubConn{
- m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
- m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
- m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
- }
- if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil {
- t.Fatalf("want %v, got %v", want, err)
- }
- gator.Remove(testBalancerIDs[1])
- bg.Remove(testBalancerIDs[1])
- // Don't wait for SubConns to be removed after close, because they are only
- // removed after close timeout.
- for i := 0; i < 10; i++ {
- select {
- case <-cc.RemoveSubConnCh:
- t.Fatalf("Got request to remove subconn, want no remove subconn (because subconns were still in cache)")
- default:
- }
- time.Sleep(time.Millisecond)
- }
- // Test roundrobin on the with only sub-balancer0.
- p2 := <-cc.NewPickerCh
- want = []balancer.SubConn{
- m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
- }
- if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p2)); err != nil {
- t.Fatalf("want %v, got %v", want, err)
- }
- return gator, bg, cc, m1
- }
- // Test that if a sub-balancer is removed, and re-added within close timeout,
- // the subConns won't be re-created.
- func (s) TestBalancerGroup_locality_caching(t *testing.T) {
- defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
- gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
- // Turn down subconn for addr2, shouldn't get picker update because
- // sub-balancer1 was removed.
- bg.UpdateSubConnState(addrToSC[testBackendAddrs[2]], balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
- for i := 0; i < 10; i++ {
- select {
- case <-cc.NewPickerCh:
- t.Fatalf("Got new picker, want no new picker (because the sub-balancer was removed)")
- default:
- }
- time.Sleep(time.Millisecond)
- }
- // Sleep, but sleep less then close timeout.
- time.Sleep(time.Millisecond * 100)
- // Re-add sub-balancer-1, because subconns were in cache, no new subconns
- // should be created. But a new picker will still be generated, with subconn
- // states update to date.
- gator.Add(testBalancerIDs[1], 1)
- bg.Add(testBalancerIDs[1], rrBuilder)
- p3 := <-cc.NewPickerCh
- want := []balancer.SubConn{
- addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
- addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
- // addr2 is down, b2 only has addr3 in READY state.
- addrToSC[testBackendAddrs[3]], addrToSC[testBackendAddrs[3]],
- }
- if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p3)); err != nil {
- t.Fatalf("want %v, got %v", want, err)
- }
- for i := 0; i < 10; i++ {
- select {
- case <-cc.NewSubConnAddrsCh:
- t.Fatalf("Got new subconn, want no new subconn (because subconns were still in cache)")
- default:
- }
- time.Sleep(time.Millisecond * 10)
- }
- }
- // Sub-balancers are put in cache when they are removed. If balancer group is
- // closed within close timeout, all subconns should still be rmeoved
- // immediately.
- func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
- defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
- _, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
- bg.Close()
- // The balancer group is closed. The subconns should be removed immediately.
- removeTimeout := time.After(time.Millisecond * 500)
- scToRemove := map[balancer.SubConn]int{
- addrToSC[testBackendAddrs[0]]: 1,
- addrToSC[testBackendAddrs[1]]: 1,
- addrToSC[testBackendAddrs[2]]: 1,
- addrToSC[testBackendAddrs[3]]: 1,
- }
- for i := 0; i < len(scToRemove); i++ {
- select {
- case sc := <-cc.RemoveSubConnCh:
- c := scToRemove[sc]
- if c == 0 {
- t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
- }
- scToRemove[sc] = c - 1
- case <-removeTimeout:
- t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
- }
- }
- }
- // Sub-balancers in cache will be closed if not re-added within timeout, and
- // subConns will be removed.
- func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.T) {
- defer replaceDefaultSubBalancerCloseTimeout(time.Second)()
- _, _, cc, addrToSC := initBalancerGroupForCachingTest(t)
- // The sub-balancer is not re-added within timeout. The subconns should be
- // removed.
- removeTimeout := time.After(DefaultSubBalancerCloseTimeout)
- scToRemove := map[balancer.SubConn]int{
- addrToSC[testBackendAddrs[2]]: 1,
- addrToSC[testBackendAddrs[3]]: 1,
- }
- for i := 0; i < len(scToRemove); i++ {
- select {
- case sc := <-cc.RemoveSubConnCh:
- c := scToRemove[sc]
- if c == 0 {
- t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
- }
- scToRemove[sc] = c - 1
- case <-removeTimeout:
- t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
- }
- }
- }
- // Wrap the rr builder, so it behaves the same, but has a different name.
- type noopBalancerBuilderWrapper struct {
- balancer.Builder
- }
- func init() {
- balancer.Register(&noopBalancerBuilderWrapper{Builder: rrBuilder})
- }
- func (*noopBalancerBuilderWrapper) Name() string {
- return "noopBalancerBuilderWrapper"
- }
- // After removing a sub-balancer, re-add with same ID, but different balancer
- // builder. Old subconns should be removed, and new subconns should be created.
- func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) {
- defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
- gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
- // Re-add sub-balancer-1, but with a different balancer builder. The
- // sub-balancer was still in cache, but cann't be reused. This should cause
- // old sub-balancer's subconns to be removed immediately, and new subconns
- // to be created.
- gator.Add(testBalancerIDs[1], 1)
- bg.Add(testBalancerIDs[1], &noopBalancerBuilderWrapper{rrBuilder})
- // The cached sub-balancer should be closed, and the subconns should be
- // removed immediately.
- removeTimeout := time.After(time.Millisecond * 500)
- scToRemove := map[balancer.SubConn]int{
- addrToSC[testBackendAddrs[2]]: 1,
- addrToSC[testBackendAddrs[3]]: 1,
- }
- for i := 0; i < len(scToRemove); i++ {
- select {
- case sc := <-cc.RemoveSubConnCh:
- c := scToRemove[sc]
- if c == 0 {
- t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
- }
- scToRemove[sc] = c - 1
- case <-removeTimeout:
- t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
- }
- }
- bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[4:6]}})
- newSCTimeout := time.After(time.Millisecond * 500)
- scToAdd := map[resolver.Address]int{
- testBackendAddrs[4]: 1,
- testBackendAddrs[5]: 1,
- }
- for i := 0; i < len(scToAdd); i++ {
- select {
- case addr := <-cc.NewSubConnAddrsCh:
- c := scToAdd[addr[0]]
- if c == 0 {
- t.Fatalf("Got newSubConn for %v when there's %d new expected", addr, c)
- }
- scToAdd[addr[0]] = c - 1
- sc := <-cc.NewSubConnCh
- addrToSC[addr[0]] = sc
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
- case <-newSCTimeout:
- t.Fatalf("timeout waiting for subConns (from new sub-balancer) to be newed")
- }
- }
- // Test roundrobin on the new picker.
- p3 := <-cc.NewPickerCh
- want := []balancer.SubConn{
- addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
- addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
- addrToSC[testBackendAddrs[4]], addrToSC[testBackendAddrs[5]],
- }
- if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p3)); err != nil {
- t.Fatalf("want %v, got %v", want, err)
- }
- }
- // After removing a sub-balancer, it will be kept in cache. Make sure that this
- // sub-balancer's Close is called when the balancer group is closed.
- func (s) TestBalancerGroup_CloseStopsBalancerInCache(t *testing.T) {
- const balancerName = "stub-TestBalancerGroup_check_close"
- closed := make(chan struct{})
- stub.Register(balancerName, stub.BalancerFuncs{Close: func(_ *stub.BalancerData) {
- close(closed)
- }})
- builder := balancer.Get(balancerName)
- defer replaceDefaultSubBalancerCloseTimeout(time.Second)()
- gator, bg, _, _ := initBalancerGroupForCachingTest(t)
- // Add balancer, and remove
- gator.Add(testBalancerIDs[2], 1)
- bg.Add(testBalancerIDs[2], builder)
- gator.Remove(testBalancerIDs[2])
- bg.Remove(testBalancerIDs[2])
- // Immediately close balancergroup, before the cache timeout.
- bg.Close()
- // Make sure the removed child balancer is closed eventually.
- select {
- case <-closed:
- case <-time.After(time.Second * 2):
- t.Fatalf("timeout waiting for the child balancer in cache to be closed")
- }
- }
- // TestBalancerGroupBuildOptions verifies that the balancer.BuildOptions passed
- // to the balancergroup at creation time is passed to child policies.
- func (s) TestBalancerGroupBuildOptions(t *testing.T) {
- const (
- balancerName = "stubBalancer-TestBalancerGroupBuildOptions"
- userAgent = "ua"
- )
- // Setup the stub balancer such that we can read the build options passed to
- // it in the UpdateClientConnState method.
- bOpts := balancer.BuildOptions{
- DialCreds: insecure.NewCredentials(),
- ChannelzParentID: channelz.NewIdentifierForTesting(channelz.RefChannel, 1234, nil),
- CustomUserAgent: userAgent,
- }
- stub.Register(balancerName, stub.BalancerFuncs{
- UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
- if !cmp.Equal(bd.BuildOptions, bOpts) {
- return fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
- }
- return nil
- },
- })
- cc := testutils.NewTestClientConn(t)
- bg := New(cc, bOpts, nil, nil)
- bg.Start()
- // Add the stub balancer build above as a child policy.
- balancerBuilder := balancer.Get(balancerName)
- bg.Add(testBalancerIDs[0], balancerBuilder)
- // Send an empty clientConn state change. This should trigger the
- // verification of the buildOptions being passed to the child policy.
- if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{}); err != nil {
- t.Fatal(err)
- }
- }
- func (s) TestBalancerExitIdleOne(t *testing.T) {
- const balancerName = "stub-balancer-test-balancergroup-exit-idle-one"
- exitIdleCh := make(chan struct{}, 1)
- stub.Register(balancerName, stub.BalancerFuncs{
- ExitIdle: func(*stub.BalancerData) {
- exitIdleCh <- struct{}{}
- },
- })
- cc := testutils.NewTestClientConn(t)
- bg := New(cc, balancer.BuildOptions{}, nil, nil)
- bg.Start()
- defer bg.Close()
- // Add the stub balancer build above as a child policy.
- builder := balancer.Get(balancerName)
- bg.Add(testBalancerIDs[0], builder)
- // Call ExitIdle on the child policy.
- bg.ExitIdleOne(testBalancerIDs[0])
- select {
- case <-time.After(time.Second):
- t.Fatal("Timeout when waiting for ExitIdle to be invoked on child policy")
- case <-exitIdleCh:
- }
- }
- // TestBalancerGracefulSwitch tests the graceful switch functionality for a
- // child of the balancer group. At first, the child is configured as a round
- // robin load balancer, and thus should behave accordingly. The test then
- // gracefully switches this child to a custom type which only creates a SubConn
- // for the second passed in address and also only picks that created SubConn.
- // The new aggregated picker should reflect this change for the child.
- func (s) TestBalancerGracefulSwitch(t *testing.T) {
- cc := testutils.NewTestClientConn(t)
- gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
- gator.Start()
- bg := New(cc, balancer.BuildOptions{}, gator, nil)
- gator.Add(testBalancerIDs[0], 1)
- bg.Add(testBalancerIDs[0], rrBuilder)
- bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
- bg.Start()
- m1 := make(map[resolver.Address]balancer.SubConn)
- scs := make(map[balancer.SubConn]bool)
- for i := 0; i < 2; i++ {
- addrs := <-cc.NewSubConnAddrsCh
- sc := <-cc.NewSubConnCh
- m1[addrs[0]] = sc
- scs[sc] = true
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
- }
- p1 := <-cc.NewPickerCh
- want := []balancer.SubConn{
- m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
- }
- if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil {
- t.Fatalf("want %v, got %v", want, err)
- }
- // The balancer type for testBalancersIDs[0] is currently Round Robin. Now,
- // change it to a balancer that has separate behavior logically (creating
- // SubConn for second address in address list and always picking that
- // SubConn), and see if the downstream behavior reflects that change.
- bg.UpdateBuilder(testBalancerIDs[0], wrappedPickFirstBalancerBuilder{})
- if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
- t.Fatalf("error updating ClientConn state: %v", err)
- }
- addrs := <-cc.NewSubConnAddrsCh
- if addrs[0].Addr != testBackendAddrs[3].Addr {
- // Verifies forwarded to new created balancer, as the wrapped pick first
- // balancer will delete first address.
- t.Fatalf("newSubConn called with wrong address, want: %v, got : %v", testBackendAddrs[3].Addr, addrs[0].Addr)
- }
- sc := <-cc.NewSubConnCh
- // Update the pick first balancers SubConn as CONNECTING. This will cause
- // the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
- // a Picker update back, as the Graceful Switch process is not complete.
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
- defer cancel()
- select {
- case <-cc.NewPickerCh:
- t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing")
- case <-ctx.Done():
- }
- // Update the pick first balancers SubConn as READY. This will cause
- // the pick first balancer to UpdateState() with READY, which should send a
- // Picker update back, as the Graceful Switch process is complete. This
- // Picker should always pick the pick first's created SubConn which
- // corresponds to address 3.
- bg.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready})
- p2 := <-cc.NewPickerCh
- pr, err := p2.Pick(balancer.PickInfo{})
- if err != nil {
- t.Fatalf("error picking: %v", err)
- }
- if pr.SubConn != sc {
- t.Fatalf("picker.Pick(), want %v, got %v", sc, pr.SubConn)
- }
- // The Graceful Switch process completing for the child should cause the
- // SubConns for the balancer being gracefully switched from to get deleted.
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- for i := 0; i < 2; i++ {
- select {
- case <-ctx.Done():
- t.Fatalf("error waiting for RemoveSubConn()")
- case sc := <-cc.RemoveSubConnCh:
- // The SubConn removed should have been one of the two created
- // SubConns, and both should be deleted.
- if ok := scs[sc]; ok {
- delete(scs, sc)
- continue
- } else {
- t.Fatalf("RemoveSubConn called for wrong SubConn %v, want in %v", sc, scs)
- }
- }
- }
- }
- type wrappedPickFirstBalancerBuilder struct{}
- func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
- builder := balancer.Get(grpc.PickFirstBalancerName)
- wpfb := &wrappedPickFirstBalancer{
- ClientConn: cc,
- }
- pf := builder.Build(wpfb, opts)
- wpfb.Balancer = pf
- return wpfb
- }
- func (wrappedPickFirstBalancerBuilder) Name() string {
- return "wrappedPickFirstBalancer"
- }
- type wrappedPickFirstBalancer struct {
- balancer.Balancer
- balancer.ClientConn
- }
- func (wb *wrappedPickFirstBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
- s.ResolverState.Addresses = s.ResolverState.Addresses[1:]
- return wb.Balancer.UpdateClientConnState(s)
- }
- func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
- // Eat it if IDLE - allows it to switch over only on a READY SubConn.
- if state.ConnectivityState == connectivity.Idle {
- return
- }
- wb.ClientConn.UpdateState(state)
- }
|