123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848 |
- /*
- *
- * Copyright 2021 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package rls
- import (
- "context"
- "fmt"
- "testing"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/internal/grpcsync"
- "google.golang.org/grpc/internal/stubserver"
- rlstest "google.golang.org/grpc/internal/testutils/rls"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
- "google.golang.org/protobuf/types/known/durationpb"
- rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
- testgrpc "google.golang.org/grpc/interop/grpc_testing"
- testpb "google.golang.org/grpc/interop/grpc_testing"
- )
- // Test verifies the scenario where there is no matching entry in the data cache
- // and no pending request either, and the ensuing RLS request is throttled.
- func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithDefaultTarget(t *testing.T) {
- // Start an RLS server and set the throttler to always throttle requests.
- rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
- overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
- // Build RLS service config with a default target.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- defBackendCh, defBackendAddress := startBackend(t)
- rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
- // Register a manual resolver and push the RLS service config through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC and ensure it gets routed to the default target.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
- // Make sure no RLS request is sent out.
- verifyRLSRequest(t, rlsReqCh, false)
- }
- // Test verifies the scenario where there is no matching entry in the data cache
- // and no pending request either, and the ensuing RLS request is throttled.
- // There is no default target configured in the service config, so the RPC is
- // expected to fail with an RLS throttled error.
- func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithoutDefaultTarget(t *testing.T) {
- // Start an RLS server and set the throttler to always throttle requests.
- rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
- overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
- // Build an RLS config without a default target.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- // Register a manual resolver and push the RLS service config through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- // Dial the backend.
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC and expect it to fail with RLS throttled error.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errRLSThrottled)
- // Make sure no RLS request is sent out.
- verifyRLSRequest(t, rlsReqCh, false)
- }
- // Test verifies the scenario where there is no matching entry in the data cache
- // and no pending request either, and the ensuing RLS request is not throttled.
- // The RLS response does not contain any backends, so the RPC fails with a
- // deadline exceeded error.
- func (s) TestPick_DataCacheMiss_NoPendingEntry_NotThrottled(t *testing.T) {
- // Start an RLS server and set the throttler to never throttle requests.
- rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
- overrideAdaptiveThrottler(t, neverThrottlingThrottler())
- // Build an RLS config without a default target.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- // Register a manual resolver and push the RLS service config through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- // Dial the backend.
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC and expect it to fail with deadline exceeded error. We use a
- // smaller timeout to ensure that the test doesn't run very long.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
- defer cancel()
- makeTestRPCAndVerifyError(ctx, t, cc, codes.DeadlineExceeded, context.DeadlineExceeded)
- // Make sure an RLS request is sent out.
- verifyRLSRequest(t, rlsReqCh, true)
- }
- // Test verifies the scenario where there is no matching entry in the data
- // cache, but there is a pending request. So, we expect no RLS request to be
- // sent out. The pick should be queued and not delegated to the default target.
- func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) {
- tests := []struct {
- name string
- withDefaultTarget bool
- }{
- {
- name: "withDefaultTarget",
- withDefaultTarget: true,
- },
- {
- name: "withoutDefaultTarget",
- withDefaultTarget: false,
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- // A unary interceptor which blocks the RouteLookup RPC on the fake
- // RLS server until the test is done. The first RPC by the client
- // will cause the LB policy to send out an RLS request. This will
- // also lead to creation of a pending entry, and further RPCs by the
- // client should not result in RLS requests being sent out.
- rlsReqCh := make(chan struct{}, 1)
- interceptor := func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- rlsReqCh <- struct{}{}
- <-ctx.Done()
- return nil, ctx.Err()
- }
- // Start an RLS server and set the throttler to never throttle.
- rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
- overrideAdaptiveThrottler(t, neverThrottlingThrottler())
- // Build RLS service config with an optional default target.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- if test.withDefaultTarget {
- _, defBackendAddress := startBackend(t)
- rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
- }
- // Register a manual resolver and push the RLS service config
- // through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- // Dial the backend.
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC that results in the RLS request being sent out. And
- // since the RLS server is configured to block on the first request,
- // this RPC will block until its context expires. This ensures that
- // we have a pending cache entry for the duration of the test.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- go func() {
- client := testgrpc.NewTestServiceClient(cc)
- client.EmptyCall(ctx, &testpb.Empty{})
- }()
- // Make sure an RLS request is sent out.
- verifyRLSRequest(t, rlsReqCh, true)
- // Make another RPC and expect it to fail the same way.
- ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
- defer cancel()
- makeTestRPCAndVerifyError(ctx, t, cc, codes.DeadlineExceeded, context.DeadlineExceeded)
- // Make sure no RLS request is sent out this time around.
- verifyRLSRequest(t, rlsReqCh, false)
- })
- }
- }
- // Test verifies the scenario where there is a matching entry in the data cache
- // which is valid and there is no pending request. The pick is expected to be
- // delegated to the child policy.
- func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) {
- // Start an RLS server and set the throttler to never throttle requests.
- rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
- overrideAdaptiveThrottler(t, neverThrottlingThrottler())
- // Build the RLS config without a default target.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- // Start a test backend, and setup the fake RLS server to return this as a
- // target in the RLS response.
- testBackendCh, testBackendAddress := startBackend(t)
- rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
- return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
- })
- // Register a manual resolver and push the RLS service config through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- // Dial the backend.
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC and ensure it gets routed to the test backend.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
- // Make sure an RLS request is sent out.
- verifyRLSRequest(t, rlsReqCh, true)
- // Make another RPC and expect it to find the target in the data cache.
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
- // Make sure no RLS request is sent out this time around.
- verifyRLSRequest(t, rlsReqCh, false)
- }
- // Test verifies the scenario where there is a matching entry in the data cache
- // which is valid and there is no pending request. The pick is expected to be
- // delegated to the child policy.
- func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry_WithHeaderData(t *testing.T) {
- // Start an RLS server and set the throttler to never throttle requests.
- rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
- overrideAdaptiveThrottler(t, neverThrottlingThrottler())
- // Build the RLS config without a default target.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- // Start a test backend which expects the header data contents sent from the
- // RLS server to be part of RPC metadata as X-Google-RLS-Data header.
- const headerDataContents = "foo,bar,baz"
- backend := &stubserver.StubServer{
- EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
- gotHeaderData := metadata.ValueFromIncomingContext(ctx, "x-google-rls-data")
- if len(gotHeaderData) != 1 || gotHeaderData[0] != headerDataContents {
- return nil, fmt.Errorf("got metadata in `X-Google-RLS-Data` is %v, want %s", gotHeaderData, headerDataContents)
- }
- return &testpb.Empty{}, nil
- },
- }
- if err := backend.StartServer(); err != nil {
- t.Fatalf("Failed to start backend: %v", err)
- }
- t.Logf("Started TestService backend at: %q", backend.Address)
- defer backend.Stop()
- // Setup the fake RLS server to return the above backend as a target in the
- // RLS response. Also, populate the header data field in the response.
- rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
- return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{
- Targets: []string{backend.Address},
- HeaderData: headerDataContents,
- }}
- })
- // Register a manual resolver and push the RLS service config through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- // Dial the backend.
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC and ensure it gets routed to the test backend with the header
- // data sent by the RLS server.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err != nil {
- t.Fatalf("EmptyCall() RPC: %v", err)
- }
- }
- // Test verifies the scenario where there is a matching entry in the data cache
- // which is stale and there is no pending request. The pick is expected to be
- // delegated to the child policy with a proactive cache refresh.
- func (s) TestPick_DataCacheHit_NoPendingEntry_StaleEntry(t *testing.T) {
- // We expect the same pick behavior (i.e delegated to the child policy) for
- // a proactive refresh whether the control channel is throttled or not.
- tests := []struct {
- name string
- throttled bool
- }{
- {
- name: "throttled",
- throttled: true,
- },
- {
- name: "notThrottled",
- throttled: false,
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- // Start an RLS server and setup the throttler appropriately.
- rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
- var throttler *fakeThrottler
- firstRPCDone := grpcsync.NewEvent()
- if test.throttled {
- throttler = oneTimeAllowingThrottler(firstRPCDone)
- overrideAdaptiveThrottler(t, throttler)
- } else {
- throttler = neverThrottlingThrottler()
- overrideAdaptiveThrottler(t, throttler)
- }
- // Build the RLS config without a default target. Set the stale age
- // to a very low value to force entries to become stale quickly.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(time.Minute)
- rlsConfig.RouteLookupConfig.StaleAge = durationpb.New(defaultTestShortTimeout)
- // Start a test backend, and setup the fake RLS server to return
- // this as a target in the RLS response.
- testBackendCh, testBackendAddress := startBackend(t)
- rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
- return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
- })
- // Register a manual resolver and push the RLS service config
- // through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- // Dial the backend.
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC and ensure it gets routed to the test backend.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
- // Make sure an RLS request is sent out.
- verifyRLSRequest(t, rlsReqCh, true)
- firstRPCDone.Fire()
- // The cache entry has a large maxAge, but a small stateAge. We keep
- // retrying until the cache entry becomes stale, in which case we expect a
- // proactive cache refresh.
- //
- // If the control channel is not throttled, then we expect an RLS request
- // to be sent out. If the control channel is throttled, we expect the fake
- // throttler's channel to be signalled.
- for {
- // Make another RPC and expect it to find the target in the data cache.
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
- if !test.throttled {
- select {
- case <-time.After(defaultTestShortTimeout):
- // Go back and retry the RPC.
- case <-rlsReqCh:
- return
- }
- } else {
- select {
- case <-time.After(defaultTestShortTimeout):
- // Go back and retry the RPC.
- case <-throttler.throttleCh:
- return
- }
- }
- }
- })
- }
- }
- // Test verifies scenarios where there is a matching entry in the data cache
- // which has expired and there is no pending request.
- func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntry(t *testing.T) {
- tests := []struct {
- name string
- throttled bool
- withDefaultTarget bool
- }{
- {
- name: "throttledWithDefaultTarget",
- throttled: true,
- withDefaultTarget: true,
- },
- {
- name: "throttledWithoutDefaultTarget",
- throttled: true,
- withDefaultTarget: false,
- },
- {
- name: "notThrottled",
- throttled: false,
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- // Start an RLS server and setup the throttler appropriately.
- rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
- var throttler *fakeThrottler
- firstRPCDone := grpcsync.NewEvent()
- if test.throttled {
- throttler = oneTimeAllowingThrottler(firstRPCDone)
- overrideAdaptiveThrottler(t, throttler)
- } else {
- throttler = neverThrottlingThrottler()
- overrideAdaptiveThrottler(t, throttler)
- }
- // Build the RLS config with a very low value for maxAge. This will
- // ensure that cache entries become invalid very soon.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
- // Start a default backend if needed.
- var defBackendCh chan struct{}
- if test.withDefaultTarget {
- var defBackendAddress string
- defBackendCh, defBackendAddress = startBackend(t)
- rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
- }
- // Start a test backend, and setup the fake RLS server to return
- // this as a target in the RLS response.
- testBackendCh, testBackendAddress := startBackend(t)
- rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
- return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
- })
- // Register a manual resolver and push the RLS service config
- // through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- // Dial the backend.
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC and ensure it gets routed to the test backend.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
- // Make sure an RLS request is sent out.
- verifyRLSRequest(t, rlsReqCh, true)
- firstRPCDone.Fire()
- // Keep retrying the RPC until the cache entry expires. Expected behavior
- // is dependent on the scenario being tested.
- switch {
- case test.throttled && test.withDefaultTarget:
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
- <-throttler.throttleCh
- case test.throttled && !test.withDefaultTarget:
- makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errRLSThrottled)
- <-throttler.throttleCh
- case !test.throttled:
- for {
- // The backend to which the RPC is routed does not change after the
- // cache entry expires because the control channel is not throttled.
- // So, we need to keep retrying until the cache entry expires, at
- // which point we expect an RLS request to be sent out and the RPC to
- // get routed to the same testBackend.
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
- select {
- case <-time.After(defaultTestShortTimeout):
- // Go back and retry the RPC.
- case <-rlsReqCh:
- return
- }
- }
- }
- })
- }
- }
- // Test verifies scenarios where there is a matching entry in the data cache
- // which has expired and is in backoff and there is no pending request.
- func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntryInBackoff(t *testing.T) {
- tests := []struct {
- name string
- withDefaultTarget bool
- }{
- {
- name: "withDefaultTarget",
- withDefaultTarget: true,
- },
- {
- name: "withoutDefaultTarget",
- withDefaultTarget: false,
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- // Start an RLS server and set the throttler to never throttle requests.
- rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
- overrideAdaptiveThrottler(t, neverThrottlingThrottler())
- // Override the backoff strategy to return a large backoff which
- // will make sure the date cache entry remains in backoff for the
- // duration of the test.
- origBackoffStrategy := defaultBackoffStrategy
- defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
- defer func() { defaultBackoffStrategy = origBackoffStrategy }()
- // Build the RLS config with a very low value for maxAge. This will
- // ensure that cache entries become invalid very soon.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
- // Start a default backend if needed.
- var defBackendCh chan struct{}
- if test.withDefaultTarget {
- var defBackendAddress string
- defBackendCh, defBackendAddress = startBackend(t)
- rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
- }
- // Start a test backend, and set up the fake RLS server to return this as
- // a target in the RLS response.
- testBackendCh, testBackendAddress := startBackend(t)
- rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
- return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
- })
- // Register a manual resolver and push the RLS service config through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- // Dial the backend.
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC and ensure it gets routed to the test backend.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
- // Make sure an RLS request is sent out.
- verifyRLSRequest(t, rlsReqCh, true)
- // Set up the fake RLS server to return errors. This will push the cache
- // entry into backoff.
- var rlsLastErr = status.Error(codes.DeadlineExceeded, "last RLS request failed")
- rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
- return &rlstest.RouteLookupResponse{Err: rlsLastErr}
- })
- // Since the RLS server is now configured to return errors, this will push
- // the cache entry into backoff. The pick will be delegated to the default
- // backend if one exits, and will fail with the error returned by the RLS
- // server otherwise.
- if test.withDefaultTarget {
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
- } else {
- makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, rlsLastErr)
- }
- })
- }
- }
- // Test verifies scenarios where there is a matching entry in the data cache
- // which is stale and there is a pending request.
- func (s) TestPick_DataCacheHit_PendingEntryExists_StaleEntry(t *testing.T) {
- tests := []struct {
- name string
- withDefaultTarget bool
- }{
- {
- name: "withDefaultTarget",
- withDefaultTarget: true,
- },
- {
- name: "withoutDefaultTarget",
- withDefaultTarget: false,
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- // A unary interceptor which simply calls the underlying handler
- // until the first client RPC is done. We want one client RPC to
- // succeed to ensure that a data cache entry is created. For
- // subsequent client RPCs which result in RLS requests, this
- // interceptor blocks until the test's context expires. And since we
- // configure the RLS LB policy with a really low value for max age,
- // this allows us to simulate the condition where the it has an
- // expired entry and a pending entry in the cache.
- rlsReqCh := make(chan struct{}, 1)
- firstRPCDone := grpcsync.NewEvent()
- interceptor := func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- select {
- case rlsReqCh <- struct{}{}:
- default:
- }
- if firstRPCDone.HasFired() {
- <-ctx.Done()
- return nil, ctx.Err()
- }
- return handler(ctx, req)
- }
- // Start an RLS server and set the throttler to never throttle.
- rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
- overrideAdaptiveThrottler(t, neverThrottlingThrottler())
- // Build RLS service config with an optional default target.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- if test.withDefaultTarget {
- _, defBackendAddress := startBackend(t)
- rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
- }
- // Low value for stale age to force entries to become stale quickly.
- rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(time.Minute)
- rlsConfig.RouteLookupConfig.StaleAge = durationpb.New(defaultTestShortTimeout)
- // Start a test backend, and setup the fake RLS server to return
- // this as a target in the RLS response.
- testBackendCh, testBackendAddress := startBackend(t)
- rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
- return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
- })
- // Register a manual resolver and push the RLS service config
- // through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- // Dial the backend.
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC and ensure it gets routed to the test backend.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
- // Make sure an RLS request is sent out.
- verifyRLSRequest(t, rlsReqCh, true)
- firstRPCDone.Fire()
- // The cache entry has a large maxAge, but a small stateAge. We keep
- // retrying until the cache entry becomes stale, in which case we expect a
- // proactive cache refresh.
- for {
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
- select {
- case <-time.After(defaultTestShortTimeout):
- // Go back and retry the RPC.
- case <-rlsReqCh:
- return
- }
- }
- })
- }
- }
- // Test verifies scenarios where there is a matching entry in the data cache
- // which is expired and there is a pending request.
- func (s) TestPick_DataCacheHit_PendingEntryExists_ExpiredEntry(t *testing.T) {
- tests := []struct {
- name string
- withDefaultTarget bool
- }{
- {
- name: "withDefaultTarget",
- withDefaultTarget: true,
- },
- {
- name: "withoutDefaultTarget",
- withDefaultTarget: false,
- },
- }
- for _, test := range tests {
- t.Run(test.name, func(t *testing.T) {
- // A unary interceptor which simply calls the underlying handler
- // until the first client RPC is done. We want one client RPC to
- // succeed to ensure that a data cache entry is created. For
- // subsequent client RPCs which result in RLS requests, this
- // interceptor blocks until the test's context expires. And since we
- // configure the RLS LB policy with a really low value for max age,
- // this allows us to simulate the condition where the it has an
- // expired entry and a pending entry in the cache.
- rlsReqCh := make(chan struct{}, 1)
- firstRPCDone := grpcsync.NewEvent()
- interceptor := func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
- select {
- case rlsReqCh <- struct{}{}:
- default:
- }
- if firstRPCDone.HasFired() {
- <-ctx.Done()
- return nil, ctx.Err()
- }
- return handler(ctx, req)
- }
- // Start an RLS server and set the throttler to never throttle.
- rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
- overrideAdaptiveThrottler(t, neverThrottlingThrottler())
- // Build RLS service config with an optional default target.
- rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
- if test.withDefaultTarget {
- _, defBackendAddress := startBackend(t)
- rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
- }
- // Set a low value for maxAge to ensure cache entries expire soon.
- rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
- // Start a test backend, and setup the fake RLS server to return
- // this as a target in the RLS response.
- testBackendCh, testBackendAddress := startBackend(t)
- rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
- return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
- })
- // Register a manual resolver and push the RLS service config
- // through it.
- r := startManualResolverWithConfig(t, rlsConfig)
- // Dial the backend.
- cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial() failed: %v", err)
- }
- defer cc.Close()
- // Make an RPC and ensure it gets routed to the test backend.
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
- // Make sure an RLS request is sent out.
- verifyRLSRequest(t, rlsReqCh, true)
- firstRPCDone.Fire()
- // At this point, we have a cache entry with a small maxAge, and the
- // RLS server is configured to block on further RLS requests. As we
- // retry the RPC, at some point the cache entry would expire and
- // force us to send an RLS request which would block on the server,
- // giving us a pending cache entry for the duration of the test.
- go func() {
- for client := testgrpc.NewTestServiceClient(cc); ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
- client.EmptyCall(ctx, &testpb.Empty{})
- }
- }()
- verifyRLSRequest(t, rlsReqCh, true)
- // Another RPC at this point should find the pending entry and be queued.
- // But since we pass a small deadline, this RPC should fail with a
- // deadline exceeded error since the pending request does not return until
- // the test is done. And since we have a pending entry, we expect no RLS
- // request to be sent out.
- sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
- defer sCancel()
- makeTestRPCAndVerifyError(sCtx, t, cc, codes.DeadlineExceeded, context.DeadlineExceeded)
- verifyRLSRequest(t, rlsReqCh, false)
- })
- }
- }
- func TestIsFullMethodNameValid(t *testing.T) {
- tests := []struct {
- desc string
- methodName string
- want bool
- }{
- {
- desc: "does not start with a slash",
- methodName: "service/method",
- want: false,
- },
- {
- desc: "does not contain a method",
- methodName: "/service",
- want: false,
- },
- {
- desc: "path has more elements",
- methodName: "/service/path/to/method",
- want: false,
- },
- {
- desc: "valid",
- methodName: "/service/method",
- want: true,
- },
- }
- for _, test := range tests {
- t.Run(test.desc, func(t *testing.T) {
- if got := isFullMethodNameValid(test.methodName); got != test.want {
- t.Fatalf("isFullMethodNameValid(%q) = %v, want %v", test.methodName, got, test.want)
- }
- })
- }
- }
|