123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372 |
- /*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package testutils
- import (
- "context"
- "errors"
- "fmt"
- "testing"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/resolver"
- )
- // TestSubConnsCount is the number of TestSubConns initialized as part of
- // package init.
- const TestSubConnsCount = 16
- // testingLogger wraps the logging methods from testing.T.
- type testingLogger interface {
- Log(args ...interface{})
- Logf(format string, args ...interface{})
- }
- // TestSubConns contains a list of SubConns to be used in tests.
- var TestSubConns []*TestSubConn
- func init() {
- for i := 0; i < TestSubConnsCount; i++ {
- TestSubConns = append(TestSubConns, &TestSubConn{
- id: fmt.Sprintf("sc%d", i),
- ConnectCh: make(chan struct{}, 1),
- })
- }
- }
- // TestSubConn implements the SubConn interface, to be used in tests.
- type TestSubConn struct {
- id string
- ConnectCh chan struct{}
- }
- // UpdateAddresses is a no-op.
- func (tsc *TestSubConn) UpdateAddresses([]resolver.Address) {}
- // Connect is a no-op.
- func (tsc *TestSubConn) Connect() {
- select {
- case tsc.ConnectCh <- struct{}{}:
- default:
- }
- }
- // GetOrBuildProducer is a no-op.
- func (tsc *TestSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
- return nil, nil
- }
- // String implements stringer to print human friendly error message.
- func (tsc *TestSubConn) String() string {
- return tsc.id
- }
- // TestClientConn is a mock balancer.ClientConn used in tests.
- type TestClientConn struct {
- logger testingLogger
- NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn.
- NewSubConnCh chan balancer.SubConn // the last 10 subconn created.
- RemoveSubConnCh chan balancer.SubConn // the last 10 subconn removed.
- UpdateAddressesAddrsCh chan []resolver.Address // last updated address via UpdateAddresses().
- NewPickerCh chan balancer.Picker // the last picker updated.
- NewStateCh chan connectivity.State // the last state.
- ResolveNowCh chan resolver.ResolveNowOptions // the last ResolveNow().
- subConnIdx int
- }
- // NewTestClientConn creates a TestClientConn.
- func NewTestClientConn(t *testing.T) *TestClientConn {
- return &TestClientConn{
- logger: t,
- NewSubConnAddrsCh: make(chan []resolver.Address, 10),
- NewSubConnCh: make(chan balancer.SubConn, 10),
- RemoveSubConnCh: make(chan balancer.SubConn, 10),
- UpdateAddressesAddrsCh: make(chan []resolver.Address, 1),
- NewPickerCh: make(chan balancer.Picker, 1),
- NewStateCh: make(chan connectivity.State, 1),
- ResolveNowCh: make(chan resolver.ResolveNowOptions, 1),
- }
- }
- // NewSubConn creates a new SubConn.
- func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
- sc := TestSubConns[tcc.subConnIdx]
- tcc.subConnIdx++
- tcc.logger.Logf("testClientConn: NewSubConn(%v, %+v) => %s", a, o, sc)
- select {
- case tcc.NewSubConnAddrsCh <- a:
- default:
- }
- select {
- case tcc.NewSubConnCh <- sc:
- default:
- }
- return sc, nil
- }
- // RemoveSubConn removes the SubConn.
- func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
- tcc.logger.Logf("testClientConn: RemoveSubConn(%s)", sc)
- select {
- case tcc.RemoveSubConnCh <- sc:
- default:
- }
- }
- // UpdateAddresses updates the addresses on the SubConn.
- func (tcc *TestClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
- tcc.logger.Logf("testClientConn: UpdateAddresses(%v, %+v)", sc, addrs)
- select {
- case tcc.UpdateAddressesAddrsCh <- addrs:
- default:
- }
- }
- // UpdateState updates connectivity state and picker.
- func (tcc *TestClientConn) UpdateState(bs balancer.State) {
- tcc.logger.Logf("testClientConn: UpdateState(%v)", bs)
- select {
- case <-tcc.NewStateCh:
- default:
- }
- tcc.NewStateCh <- bs.ConnectivityState
- select {
- case <-tcc.NewPickerCh:
- default:
- }
- tcc.NewPickerCh <- bs.Picker
- }
- // ResolveNow panics.
- func (tcc *TestClientConn) ResolveNow(o resolver.ResolveNowOptions) {
- select {
- case <-tcc.ResolveNowCh:
- default:
- }
- tcc.ResolveNowCh <- o
- }
- // Target panics.
- func (tcc *TestClientConn) Target() string {
- panic("not implemented")
- }
- // WaitForErrPicker waits until an error picker is pushed to this ClientConn.
- // Returns error if the provided context expires or a non-error picker is pushed
- // to the ClientConn.
- func (tcc *TestClientConn) WaitForErrPicker(ctx context.Context) error {
- select {
- case <-ctx.Done():
- return errors.New("timeout when waiting for an error picker")
- case picker := <-tcc.NewPickerCh:
- if _, perr := picker.Pick(balancer.PickInfo{}); perr == nil {
- return fmt.Errorf("balancer returned a picker which is not an error picker")
- }
- }
- return nil
- }
- // WaitForPickerWithErr waits until an error picker is pushed to this
- // ClientConn with the error matching the wanted error. Returns an error if
- // the provided context expires, including the last received picker error (if
- // any).
- func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error) error {
- lastErr := errors.New("received no picker")
- for {
- select {
- case <-ctx.Done():
- return fmt.Errorf("timeout when waiting for an error picker with %v; last picker error: %v", want, lastErr)
- case picker := <-tcc.NewPickerCh:
- if _, lastErr = picker.Pick(balancer.PickInfo{}); lastErr != nil && lastErr.Error() == want.Error() {
- return nil
- }
- }
- }
- }
- // WaitForConnectivityState waits until the state pushed to this ClientConn
- // matches the wanted state. Returns an error if the provided context expires,
- // including the last received state (if any).
- func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want connectivity.State) error {
- var lastState connectivity.State = -1
- for {
- select {
- case <-ctx.Done():
- return fmt.Errorf("timeout when waiting for state to be %s; last state: %s", want, lastState)
- case s := <-tcc.NewStateCh:
- if s == want {
- return nil
- }
- lastState = s
- }
- }
- }
- // WaitForRoundRobinPicker waits for a picker that passes IsRoundRobin. Also
- // drains the matching state channel and requires it to be READY (if an entry
- // is pending) to be considered. Returns an error if the provided context
- // expires, including the last received error from IsRoundRobin or the picker
- // (if any).
- func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...balancer.SubConn) error {
- lastErr := errors.New("received no picker")
- for {
- select {
- case <-ctx.Done():
- return fmt.Errorf("timeout when waiting for round robin picker with %v; last error: %v", want, lastErr)
- case p := <-tcc.NewPickerCh:
- s := connectivity.Ready
- select {
- case s = <-tcc.NewStateCh:
- default:
- }
- if s != connectivity.Ready {
- lastErr = fmt.Errorf("received state %v instead of ready", s)
- break
- }
- var pickerErr error
- if err := IsRoundRobin(want, func() balancer.SubConn {
- sc, err := p.Pick(balancer.PickInfo{})
- if err != nil {
- pickerErr = err
- } else if sc.Done != nil {
- sc.Done(balancer.DoneInfo{})
- }
- return sc.SubConn
- }); pickerErr != nil {
- lastErr = pickerErr
- continue
- } else if err != nil {
- lastErr = err
- continue
- }
- return nil
- }
- }
- }
- // WaitForPicker waits for a picker that results in f returning nil. If the
- // context expires, returns the last error returned by f (if any).
- func (tcc *TestClientConn) WaitForPicker(ctx context.Context, f func(balancer.Picker) error) error {
- lastErr := errors.New("received no picker")
- for {
- select {
- case <-ctx.Done():
- return fmt.Errorf("timeout when waiting for picker; last error: %v", lastErr)
- case p := <-tcc.NewPickerCh:
- if err := f(p); err != nil {
- lastErr = err
- continue
- }
- return nil
- }
- }
- }
- // IsRoundRobin checks whether f's return value is roundrobin of elements from
- // want. But it doesn't check for the order. Note that want can contain
- // duplicate items, which makes it weight-round-robin.
- //
- // Step 1. the return values of f should form a permutation of all elements in
- // want, but not necessary in the same order. E.g. if want is {a,a,b}, the check
- // fails if f returns:
- // - {a,a,a}: third a is returned before b
- // - {a,b,b}: second b is returned before the second a
- //
- // If error is found in this step, the returned error contains only the first
- // iteration until where it goes wrong.
- //
- // Step 2. the return values of f should be repetitions of the same permutation.
- // E.g. if want is {a,a,b}, the check failes if f returns:
- // - {a,b,a,b,a,a}: though it satisfies step 1, the second iteration is not
- // repeating the first iteration.
- //
- // If error is found in this step, the returned error contains the first
- // iteration + the second iteration until where it goes wrong.
- func IsRoundRobin(want []balancer.SubConn, f func() balancer.SubConn) error {
- wantSet := make(map[balancer.SubConn]int) // SubConn -> count, for weighted RR.
- for _, sc := range want {
- wantSet[sc]++
- }
- // The first iteration: makes sure f's return values form a permutation of
- // elements in want.
- //
- // Also keep the returns values in a slice, so we can compare the order in
- // the second iteration.
- gotSliceFirstIteration := make([]balancer.SubConn, 0, len(want))
- for range want {
- got := f()
- gotSliceFirstIteration = append(gotSliceFirstIteration, got)
- wantSet[got]--
- if wantSet[got] < 0 {
- return fmt.Errorf("non-roundrobin want: %v, result: %v", want, gotSliceFirstIteration)
- }
- }
- // The second iteration should repeat the first iteration.
- var gotSliceSecondIteration []balancer.SubConn
- for i := 0; i < 2; i++ {
- for _, w := range gotSliceFirstIteration {
- g := f()
- gotSliceSecondIteration = append(gotSliceSecondIteration, g)
- if w != g {
- return fmt.Errorf("non-roundrobin, first iter: %v, second iter: %v", gotSliceFirstIteration, gotSliceSecondIteration)
- }
- }
- }
- return nil
- }
- // SubConnFromPicker returns a function which returns a SubConn by calling the
- // Pick() method of the provided picker. There is no caching of SubConns here.
- // Every invocation of the returned function results in a new pick.
- func SubConnFromPicker(p balancer.Picker) func() balancer.SubConn {
- return func() balancer.SubConn {
- scst, _ := p.Pick(balancer.PickInfo{})
- return scst.SubConn
- }
- }
- // ErrTestConstPicker is error returned by test const picker.
- var ErrTestConstPicker = fmt.Errorf("const picker error")
- // TestConstPicker is a const picker for tests.
- type TestConstPicker struct {
- Err error
- SC balancer.SubConn
- }
- // Pick returns the const SubConn or the error.
- func (tcp *TestConstPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
- if tcp.Err != nil {
- return balancer.PickResult{}, tcp.Err
- }
- return balancer.PickResult{SubConn: tcp.SC}, nil
- }
|