helpers_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. /*
  2. *
  3. * Copyright 2021 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package rls
  19. import (
  20. "context"
  21. "strings"
  22. "testing"
  23. "time"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/balancer/rls/internal/test/e2e"
  26. "google.golang.org/grpc/codes"
  27. "google.golang.org/grpc/internal"
  28. "google.golang.org/grpc/internal/balancergroup"
  29. "google.golang.org/grpc/internal/grpcsync"
  30. "google.golang.org/grpc/internal/grpctest"
  31. rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
  32. internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
  33. "google.golang.org/grpc/internal/stubserver"
  34. testgrpc "google.golang.org/grpc/interop/grpc_testing"
  35. testpb "google.golang.org/grpc/interop/grpc_testing"
  36. "google.golang.org/grpc/resolver"
  37. "google.golang.org/grpc/resolver/manual"
  38. "google.golang.org/grpc/serviceconfig"
  39. "google.golang.org/grpc/status"
  40. "google.golang.org/protobuf/types/known/durationpb"
  41. )
  42. const (
  43. defaultTestTimeout = 5 * time.Second
  44. defaultTestShortTimeout = 100 * time.Millisecond
  45. )
  46. func init() {
  47. balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
  48. }
  49. type s struct {
  50. grpctest.Tester
  51. }
  52. func Test(t *testing.T) {
  53. grpctest.RunSubTests(t, s{})
  54. }
  55. // fakeBackoffStrategy is a fake implementation of the backoff.Strategy
  56. // interface, for tests to inject the backoff duration.
  57. type fakeBackoffStrategy struct {
  58. backoff time.Duration
  59. }
  60. func (f *fakeBackoffStrategy) Backoff(retries int) time.Duration {
  61. return f.backoff
  62. }
  63. // fakeThrottler is a fake implementation of the adaptiveThrottler interface.
  64. type fakeThrottler struct {
  65. throttleFunc func() bool // Fake throttler implementation.
  66. throttleCh chan struct{} // Invocation of ShouldThrottle signals here.
  67. }
  68. func (f *fakeThrottler) ShouldThrottle() bool {
  69. select {
  70. case <-f.throttleCh:
  71. default:
  72. }
  73. f.throttleCh <- struct{}{}
  74. return f.throttleFunc()
  75. }
  76. func (f *fakeThrottler) RegisterBackendResponse(bool) {}
  77. // alwaysThrottlingThrottler returns a fake throttler which always throttles.
  78. func alwaysThrottlingThrottler() *fakeThrottler {
  79. return &fakeThrottler{
  80. throttleFunc: func() bool { return true },
  81. throttleCh: make(chan struct{}, 1),
  82. }
  83. }
  84. // neverThrottlingThrottler returns a fake throttler which never throttles.
  85. func neverThrottlingThrottler() *fakeThrottler {
  86. return &fakeThrottler{
  87. throttleFunc: func() bool { return false },
  88. throttleCh: make(chan struct{}, 1),
  89. }
  90. }
  91. // oneTimeAllowingThrottler returns a fake throttler which does not throttle
  92. // requests until the client RPC succeeds, but throttles everything that comes
  93. // after. This is useful for tests which need to set up a valid cache entry
  94. // before testing other cases.
  95. func oneTimeAllowingThrottler(firstRPCDone *grpcsync.Event) *fakeThrottler {
  96. return &fakeThrottler{
  97. throttleFunc: firstRPCDone.HasFired,
  98. throttleCh: make(chan struct{}, 1),
  99. }
  100. }
  101. func overrideAdaptiveThrottler(t *testing.T, f *fakeThrottler) {
  102. origAdaptiveThrottler := newAdaptiveThrottler
  103. newAdaptiveThrottler = func() adaptiveThrottler { return f }
  104. t.Cleanup(func() { newAdaptiveThrottler = origAdaptiveThrottler })
  105. }
  106. // buildBasicRLSConfig constructs a basic service config for the RLS LB policy
  107. // with header matching rules. This expects the passed child policy name to
  108. // have been registered by the caller.
  109. func buildBasicRLSConfig(childPolicyName, rlsServerAddress string) *e2e.RLSConfig {
  110. return &e2e.RLSConfig{
  111. RouteLookupConfig: &rlspb.RouteLookupConfig{
  112. GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{
  113. {
  114. Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}},
  115. Headers: []*rlspb.NameMatcher{
  116. {Key: "k1", Names: []string{"n1"}},
  117. {Key: "k2", Names: []string{"n2"}},
  118. },
  119. },
  120. },
  121. LookupService: rlsServerAddress,
  122. LookupServiceTimeout: durationpb.New(defaultTestTimeout),
  123. CacheSizeBytes: 1024,
  124. },
  125. RouteLookupChannelServiceConfig: `{"loadBalancingConfig": [{"pick_first": {}}]}`,
  126. ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
  127. ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField,
  128. }
  129. }
  130. // buildBasicRLSConfigWithChildPolicy constructs a very basic service config for
  131. // the RLS LB policy. It also registers a test LB policy which is capable of
  132. // being a child of the RLS LB policy.
  133. func buildBasicRLSConfigWithChildPolicy(t *testing.T, childPolicyName, rlsServerAddress string) *e2e.RLSConfig {
  134. childPolicyName = "test-child-policy" + childPolicyName
  135. e2e.RegisterRLSChildPolicy(childPolicyName, nil)
  136. t.Logf("Registered child policy with name %q", childPolicyName)
  137. return &e2e.RLSConfig{
  138. RouteLookupConfig: &rlspb.RouteLookupConfig{
  139. GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}},
  140. LookupService: rlsServerAddress,
  141. LookupServiceTimeout: durationpb.New(defaultTestTimeout),
  142. CacheSizeBytes: 1024,
  143. },
  144. RouteLookupChannelServiceConfig: `{"loadBalancingConfig": [{"pick_first": {}}]}`,
  145. ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
  146. ChildPolicyConfigTargetFieldName: e2e.RLSChildPolicyTargetNameField,
  147. }
  148. }
  149. // startBackend starts a backend implementing the TestService on a local port.
  150. // It returns a channel for tests to get notified whenever an RPC is invoked on
  151. // the backend. This allows tests to ensure that RPCs reach expected backends.
  152. // Also returns the address of the backend.
  153. func startBackend(t *testing.T, sopts ...grpc.ServerOption) (rpcCh chan struct{}, address string) {
  154. t.Helper()
  155. rpcCh = make(chan struct{}, 1)
  156. backend := &stubserver.StubServer{
  157. EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  158. select {
  159. case rpcCh <- struct{}{}:
  160. default:
  161. }
  162. return &testpb.Empty{}, nil
  163. },
  164. }
  165. if err := backend.StartServer(sopts...); err != nil {
  166. t.Fatalf("Failed to start backend: %v", err)
  167. }
  168. t.Logf("Started TestService backend at: %q", backend.Address)
  169. t.Cleanup(func() { backend.Stop() })
  170. return rpcCh, backend.Address
  171. }
  172. // startManualResolverWithConfig registers and returns a manual resolver which
  173. // pushes the RLS LB policy's service config on the channel.
  174. func startManualResolverWithConfig(t *testing.T, rlsConfig *e2e.RLSConfig) *manual.Resolver {
  175. t.Helper()
  176. scJSON, err := rlsConfig.ServiceConfigJSON()
  177. if err != nil {
  178. t.Fatal(err)
  179. }
  180. sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
  181. r := manual.NewBuilderWithScheme("rls-e2e")
  182. r.InitialState(resolver.State{ServiceConfig: sc})
  183. t.Cleanup(r.Close)
  184. return r
  185. }
  186. // makeTestRPCAndExpectItToReachBackend is a test helper function which makes
  187. // the EmptyCall RPC on the given ClientConn and verifies that it reaches a
  188. // backend. The latter is accomplished by listening on the provided channel
  189. // which gets pushed to whenever the backend in question gets an RPC.
  190. //
  191. // There are many instances where it can take a while before the attempted RPC
  192. // reaches the expected backend. Examples include, but are not limited to:
  193. // - control channel is changed in a config update. The RLS LB policy creates a
  194. // new control channel, and sends a new picker to gRPC. But it takes a while
  195. // before gRPC actually starts using the new picker.
  196. // - test is waiting for a cache entry to expire after which we expect a
  197. // different behavior because we have configured the fake RLS server to return
  198. // different backends.
  199. //
  200. // Therefore, we do not return an error when the RPC fails. Instead, we wait for
  201. // the context to expire before failing.
  202. func makeTestRPCAndExpectItToReachBackend(ctx context.Context, t *testing.T, cc *grpc.ClientConn, ch chan struct{}) {
  203. t.Helper()
  204. // Drain the backend channel before performing the RPC to remove any
  205. // notifications from previous RPCs.
  206. select {
  207. case <-ch:
  208. default:
  209. }
  210. for {
  211. if err := ctx.Err(); err != nil {
  212. t.Fatalf("Timeout when waiting for RPCs to be routed to the given target: %v", err)
  213. }
  214. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
  215. client := testgrpc.NewTestServiceClient(cc)
  216. client.EmptyCall(sCtx, &testpb.Empty{})
  217. select {
  218. case <-sCtx.Done():
  219. case <-ch:
  220. sCancel()
  221. return
  222. }
  223. }
  224. }
  225. // makeTestRPCAndVerifyError is a test helper function which makes the EmptyCall
  226. // RPC on the given ClientConn and verifies that the RPC fails with the given
  227. // status code and error.
  228. //
  229. // Similar to makeTestRPCAndExpectItToReachBackend, retries until expected
  230. // outcome is reached or the provided context has expired.
  231. func makeTestRPCAndVerifyError(ctx context.Context, t *testing.T, cc *grpc.ClientConn, wantCode codes.Code, wantErr error) {
  232. t.Helper()
  233. for {
  234. if err := ctx.Err(); err != nil {
  235. t.Fatalf("Timeout when waiting for RPCs to fail with given error: %v", err)
  236. }
  237. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
  238. client := testgrpc.NewTestServiceClient(cc)
  239. _, err := client.EmptyCall(sCtx, &testpb.Empty{})
  240. // If the RPC fails with the expected code and expected error message (if
  241. // one was provided), we return. Else we retry after blocking for a little
  242. // while to ensure that we don't keep blasting away with RPCs.
  243. if code := status.Code(err); code == wantCode {
  244. if wantErr == nil || strings.Contains(err.Error(), wantErr.Error()) {
  245. sCancel()
  246. return
  247. }
  248. }
  249. <-sCtx.Done()
  250. }
  251. }
  252. // verifyRLSRequest is a test helper which listens on a channel to see if an RLS
  253. // request was received by the fake RLS server. Based on whether the test
  254. // expects a request to be sent out or not, it uses a different timeout.
  255. func verifyRLSRequest(t *testing.T, ch chan struct{}, wantRequest bool) {
  256. t.Helper()
  257. if wantRequest {
  258. select {
  259. case <-time.After(defaultTestTimeout):
  260. t.Fatalf("Timeout when waiting for an RLS request to be sent out")
  261. case <-ch:
  262. }
  263. } else {
  264. select {
  265. case <-time.After(defaultTestShortTimeout):
  266. case <-ch:
  267. t.Fatalf("RLS request sent out when not expecting one")
  268. }
  269. }
  270. }