123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- /*
- *
- * Copyright 2021 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package rls
- import (
- "context"
- "strings"
- "testing"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/balancer/rls/internal/test/e2e"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/internal"
- "google.golang.org/grpc/internal/balancergroup"
- "google.golang.org/grpc/internal/grpcsync"
- "google.golang.org/grpc/internal/grpctest"
- rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
- internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
- "google.golang.org/grpc/internal/stubserver"
- testgrpc "google.golang.org/grpc/interop/grpc_testing"
- testpb "google.golang.org/grpc/interop/grpc_testing"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/resolver/manual"
- "google.golang.org/grpc/serviceconfig"
- "google.golang.org/grpc/status"
- "google.golang.org/protobuf/types/known/durationpb"
- )
- const (
- defaultTestTimeout = 5 * time.Second
- defaultTestShortTimeout = 100 * time.Millisecond
- )
- func init() {
- balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
- }
- type s struct {
- grpctest.Tester
- }
- func Test(t *testing.T) {
- grpctest.RunSubTests(t, s{})
- }
- // fakeBackoffStrategy is a fake implementation of the backoff.Strategy
- // interface, for tests to inject the backoff duration.
- type fakeBackoffStrategy struct {
- backoff time.Duration
- }
- func (f *fakeBackoffStrategy) Backoff(retries int) time.Duration {
- return f.backoff
- }
- // fakeThrottler is a fake implementation of the adaptiveThrottler interface.
- type fakeThrottler struct {
- throttleFunc func() bool // Fake throttler implementation.
- throttleCh chan struct{} // Invocation of ShouldThrottle signals here.
- }
- func (f *fakeThrottler) ShouldThrottle() bool {
- select {
- case <-f.throttleCh:
- default:
- }
- f.throttleCh <- struct{}{}
- return f.throttleFunc()
- }
- func (f *fakeThrottler) RegisterBackendResponse(bool) {}
- // alwaysThrottlingThrottler returns a fake throttler which always throttles.
- func alwaysThrottlingThrottler() *fakeThrottler {
- return &fakeThrottler{
- throttleFunc: func() bool { return true },
- throttleCh: make(chan struct{}, 1),
- }
- }
- // neverThrottlingThrottler returns a fake throttler which never throttles.
- func neverThrottlingThrottler() *fakeThrottler {
- return &fakeThrottler{
- throttleFunc: func() bool { return false },
- throttleCh: make(chan struct{}, 1),
- }
- }
- // oneTimeAllowingThrottler returns a fake throttler which does not throttle
- // requests until the client RPC succeeds, but throttles everything that comes
- // after. This is useful for tests which need to set up a valid cache entry
- // before testing other cases.
- func oneTimeAllowingThrottler(firstRPCDone *grpcsync.Event) *fakeThrottler {
- return &fakeThrottler{
- throttleFunc: firstRPCDone.HasFired,
- throttleCh: make(chan struct{}, 1),
- }
- }
- func overrideAdaptiveThrottler(t *testing.T, f *fakeThrottler) {
- origAdaptiveThrottler := newAdaptiveThrottler
- newAdaptiveThrottler = func() adaptiveThrottler { return f }
- t.Cleanup(func() { newAdaptiveThrottler = origAdaptiveThrottler })
- }
- // buildBasicRLSConfig constructs a basic service config for the RLS LB policy
- // with header matching rules. This expects the passed child policy name to
- // have been registered by the caller.
- func buildBasicRLSConfig(childPolicyName, rlsServerAddress string) *e2e.RLSConfig {
- return &e2e.RLSConfig{
- RouteLookupConfig: &rlspb.RouteLookupConfig{
- GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{
- {
- Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}},
- Headers: []*rlspb.NameMatcher{
- {Key: "k1", Names: []string{"n1"}},
- {Key: "k2", Names: []string{"n2"}},
- },
- },
- },
- LookupService: rlsServerAddress,
- LookupServiceTimeout: durationpb.New(defaultTestTimeout),
- CacheSizeBytes: 1024,
- },
- RouteLookupChannelServiceConfig: `{"loadBalancingConfig": [{"pick_first": {}}]}`,
- ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
- ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField,
- }
- }
- // buildBasicRLSConfigWithChildPolicy constructs a very basic service config for
- // the RLS LB policy. It also registers a test LB policy which is capable of
- // being a child of the RLS LB policy.
- func buildBasicRLSConfigWithChildPolicy(t *testing.T, childPolicyName, rlsServerAddress string) *e2e.RLSConfig {
- childPolicyName = "test-child-policy" + childPolicyName
- e2e.RegisterRLSChildPolicy(childPolicyName, nil)
- t.Logf("Registered child policy with name %q", childPolicyName)
- return &e2e.RLSConfig{
- RouteLookupConfig: &rlspb.RouteLookupConfig{
- GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}},
- LookupService: rlsServerAddress,
- LookupServiceTimeout: durationpb.New(defaultTestTimeout),
- CacheSizeBytes: 1024,
- },
- RouteLookupChannelServiceConfig: `{"loadBalancingConfig": [{"pick_first": {}}]}`,
- ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
- ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField,
- }
- }
- // startBackend starts a backend implementing the TestService on a local port.
- // It returns a channel for tests to get notified whenever an RPC is invoked on
- // the backend. This allows tests to ensure that RPCs reach expected backends.
- // Also returns the address of the backend.
- func startBackend(t *testing.T, sopts ...grpc.ServerOption) (rpcCh chan struct{}, address string) {
- t.Helper()
- rpcCh = make(chan struct{}, 1)
- backend := &stubserver.StubServer{
- EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
- select {
- case rpcCh <- struct{}{}:
- default:
- }
- return &testpb.Empty{}, nil
- },
- }
- if err := backend.StartServer(sopts...); err != nil {
- t.Fatalf("Failed to start backend: %v", err)
- }
- t.Logf("Started TestService backend at: %q", backend.Address)
- t.Cleanup(func() { backend.Stop() })
- return rpcCh, backend.Address
- }
- // startManualResolverWithConfig registers and returns a manual resolver which
- // pushes the RLS LB policy's service config on the channel.
- func startManualResolverWithConfig(t *testing.T, rlsConfig *e2e.RLSConfig) *manual.Resolver {
- t.Helper()
- scJSON, err := rlsConfig.ServiceConfigJSON()
- if err != nil {
- t.Fatal(err)
- }
- sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
- r := manual.NewBuilderWithScheme("rls-e2e")
- r.InitialState(resolver.State{ServiceConfig: sc})
- t.Cleanup(r.Close)
- return r
- }
- // makeTestRPCAndExpectItToReachBackend is a test helper function which makes
- // the EmptyCall RPC on the given ClientConn and verifies that it reaches a
- // backend. The latter is accomplished by listening on the provided channel
- // which gets pushed to whenever the backend in question gets an RPC.
- //
- // There are many instances where it can take a while before the attempted RPC
- // reaches the expected backend. Examples include, but are not limited to:
- // - control channel is changed in a config update. The RLS LB policy creates a
- // new control channel, and sends a new picker to gRPC. But it takes a while
- // before gRPC actually starts using the new picker.
- // - test is waiting for a cache entry to expire after which we expect a
- // different behavior because we have configured the fake RLS server to return
- // different backends.
- //
- // Therefore, we do not return an error when the RPC fails. Instead, we wait for
- // the context to expire before failing.
- func makeTestRPCAndExpectItToReachBackend(ctx context.Context, t *testing.T, cc *grpc.ClientConn, ch chan struct{}) {
- t.Helper()
- // Drain the backend channel before performing the RPC to remove any
- // notifications from previous RPCs.
- select {
- case <-ch:
- default:
- }
- for {
- if err := ctx.Err(); err != nil {
- t.Fatalf("Timeout when waiting for RPCs to be routed to the given target: %v", err)
- }
- sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
- client := testgrpc.NewTestServiceClient(cc)
- client.EmptyCall(sCtx, &testpb.Empty{})
- select {
- case <-sCtx.Done():
- case <-ch:
- sCancel()
- return
- }
- }
- }
- // makeTestRPCAndVerifyError is a test helper function which makes the EmptyCall
- // RPC on the given ClientConn and verifies that the RPC fails with the given
- // status code and error.
- //
- // Similar to makeTestRPCAndExpectItToReachBackend, retries until expected
- // outcome is reached or the provided context has expired.
- func makeTestRPCAndVerifyError(ctx context.Context, t *testing.T, cc *grpc.ClientConn, wantCode codes.Code, wantErr error) {
- t.Helper()
- for {
- if err := ctx.Err(); err != nil {
- t.Fatalf("Timeout when waiting for RPCs to fail with given error: %v", err)
- }
- sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
- client := testgrpc.NewTestServiceClient(cc)
- _, err := client.EmptyCall(sCtx, &testpb.Empty{})
- // If the RPC fails with the expected code and expected error message (if
- // one was provided), we return. Else we retry after blocking for a little
- // while to ensure that we don't keep blasting away with RPCs.
- if code := status.Code(err); code == wantCode {
- if wantErr == nil || strings.Contains(err.Error(), wantErr.Error()) {
- sCancel()
- return
- }
- }
- <-sCtx.Done()
- }
- }
- // verifyRLSRequest is a test helper which listens on a channel to see if an RLS
- // request was received by the fake RLS server. Based on whether the test
- // expects a request to be sent out or not, it uses a different timeout.
- func verifyRLSRequest(t *testing.T, ch chan struct{}, wantRequest bool) {
- t.Helper()
- if wantRequest {
- select {
- case <-time.After(defaultTestTimeout):
- t.Fatalf("Timeout when waiting for an RLS request to be sent out")
- case <-ch:
- }
- } else {
- select {
- case <-time.After(defaultTestShortTimeout):
- case <-ch:
- t.Fatalf("RLS request sent out when not expecting one")
- }
- }
- }
|