picker_test.go 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848
  1. /*
  2. *
  3. * Copyright 2021 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package rls
  19. import (
  20. "context"
  21. "fmt"
  22. "testing"
  23. "time"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/codes"
  26. "google.golang.org/grpc/credentials/insecure"
  27. "google.golang.org/grpc/internal/grpcsync"
  28. "google.golang.org/grpc/internal/stubserver"
  29. rlstest "google.golang.org/grpc/internal/testutils/rls"
  30. "google.golang.org/grpc/metadata"
  31. "google.golang.org/grpc/status"
  32. "google.golang.org/protobuf/types/known/durationpb"
  33. rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
  34. testgrpc "google.golang.org/grpc/interop/grpc_testing"
  35. testpb "google.golang.org/grpc/interop/grpc_testing"
  36. )
  37. // Test verifies the scenario where there is no matching entry in the data cache
  38. // and no pending request either, and the ensuing RLS request is throttled.
  39. func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithDefaultTarget(t *testing.T) {
  40. // Start an RLS server and set the throttler to always throttle requests.
  41. rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
  42. overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
  43. // Build RLS service config with a default target.
  44. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  45. defBackendCh, defBackendAddress := startBackend(t)
  46. rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
  47. // Register a manual resolver and push the RLS service config through it.
  48. r := startManualResolverWithConfig(t, rlsConfig)
  49. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  50. if err != nil {
  51. t.Fatalf("grpc.Dial() failed: %v", err)
  52. }
  53. defer cc.Close()
  54. // Make an RPC and ensure it gets routed to the default target.
  55. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  56. defer cancel()
  57. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
  58. // Make sure no RLS request is sent out.
  59. verifyRLSRequest(t, rlsReqCh, false)
  60. }
  61. // Test verifies the scenario where there is no matching entry in the data cache
  62. // and no pending request either, and the ensuing RLS request is throttled.
  63. // There is no default target configured in the service config, so the RPC is
  64. // expected to fail with an RLS throttled error.
  65. func (s) TestPick_DataCacheMiss_NoPendingEntry_ThrottledWithoutDefaultTarget(t *testing.T) {
  66. // Start an RLS server and set the throttler to always throttle requests.
  67. rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
  68. overrideAdaptiveThrottler(t, alwaysThrottlingThrottler())
  69. // Build an RLS config without a default target.
  70. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  71. // Register a manual resolver and push the RLS service config through it.
  72. r := startManualResolverWithConfig(t, rlsConfig)
  73. // Dial the backend.
  74. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  75. if err != nil {
  76. t.Fatalf("grpc.Dial() failed: %v", err)
  77. }
  78. defer cc.Close()
  79. // Make an RPC and expect it to fail with RLS throttled error.
  80. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  81. defer cancel()
  82. makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errRLSThrottled)
  83. // Make sure no RLS request is sent out.
  84. verifyRLSRequest(t, rlsReqCh, false)
  85. }
  86. // Test verifies the scenario where there is no matching entry in the data cache
  87. // and no pending request either, and the ensuing RLS request is not throttled.
  88. // The RLS response does not contain any backends, so the RPC fails with a
  89. // deadline exceeded error.
  90. func (s) TestPick_DataCacheMiss_NoPendingEntry_NotThrottled(t *testing.T) {
  91. // Start an RLS server and set the throttler to never throttle requests.
  92. rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
  93. overrideAdaptiveThrottler(t, neverThrottlingThrottler())
  94. // Build an RLS config without a default target.
  95. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  96. // Register a manual resolver and push the RLS service config through it.
  97. r := startManualResolverWithConfig(t, rlsConfig)
  98. // Dial the backend.
  99. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  100. if err != nil {
  101. t.Fatalf("grpc.Dial() failed: %v", err)
  102. }
  103. defer cc.Close()
  104. // Make an RPC and expect it to fail with deadline exceeded error. We use a
  105. // smaller timeout to ensure that the test doesn't run very long.
  106. ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  107. defer cancel()
  108. makeTestRPCAndVerifyError(ctx, t, cc, codes.DeadlineExceeded, context.DeadlineExceeded)
  109. // Make sure an RLS request is sent out.
  110. verifyRLSRequest(t, rlsReqCh, true)
  111. }
  112. // Test verifies the scenario where there is no matching entry in the data
  113. // cache, but there is a pending request. So, we expect no RLS request to be
  114. // sent out. The pick should be queued and not delegated to the default target.
  115. func (s) TestPick_DataCacheMiss_PendingEntryExists(t *testing.T) {
  116. tests := []struct {
  117. name string
  118. withDefaultTarget bool
  119. }{
  120. {
  121. name: "withDefaultTarget",
  122. withDefaultTarget: true,
  123. },
  124. {
  125. name: "withoutDefaultTarget",
  126. withDefaultTarget: false,
  127. },
  128. }
  129. for _, test := range tests {
  130. t.Run(test.name, func(t *testing.T) {
  131. // A unary interceptor which blocks the RouteLookup RPC on the fake
  132. // RLS server until the test is done. The first RPC by the client
  133. // will cause the LB policy to send out an RLS request. This will
  134. // also lead to creation of a pending entry, and further RPCs by the
  135. // client should not result in RLS requests being sent out.
  136. rlsReqCh := make(chan struct{}, 1)
  137. interceptor := func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  138. rlsReqCh <- struct{}{}
  139. <-ctx.Done()
  140. return nil, ctx.Err()
  141. }
  142. // Start an RLS server and set the throttler to never throttle.
  143. rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
  144. overrideAdaptiveThrottler(t, neverThrottlingThrottler())
  145. // Build RLS service config with an optional default target.
  146. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  147. if test.withDefaultTarget {
  148. _, defBackendAddress := startBackend(t)
  149. rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
  150. }
  151. // Register a manual resolver and push the RLS service config
  152. // through it.
  153. r := startManualResolverWithConfig(t, rlsConfig)
  154. // Dial the backend.
  155. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  156. if err != nil {
  157. t.Fatalf("grpc.Dial() failed: %v", err)
  158. }
  159. defer cc.Close()
  160. // Make an RPC that results in the RLS request being sent out. And
  161. // since the RLS server is configured to block on the first request,
  162. // this RPC will block until its context expires. This ensures that
  163. // we have a pending cache entry for the duration of the test.
  164. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  165. defer cancel()
  166. go func() {
  167. client := testgrpc.NewTestServiceClient(cc)
  168. client.EmptyCall(ctx, &testpb.Empty{})
  169. }()
  170. // Make sure an RLS request is sent out.
  171. verifyRLSRequest(t, rlsReqCh, true)
  172. // Make another RPC and expect it to fail the same way.
  173. ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
  174. defer cancel()
  175. makeTestRPCAndVerifyError(ctx, t, cc, codes.DeadlineExceeded, context.DeadlineExceeded)
  176. // Make sure no RLS request is sent out this time around.
  177. verifyRLSRequest(t, rlsReqCh, false)
  178. })
  179. }
  180. }
  181. // Test verifies the scenario where there is a matching entry in the data cache
  182. // which is valid and there is no pending request. The pick is expected to be
  183. // delegated to the child policy.
  184. func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry(t *testing.T) {
  185. // Start an RLS server and set the throttler to never throttle requests.
  186. rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
  187. overrideAdaptiveThrottler(t, neverThrottlingThrottler())
  188. // Build the RLS config without a default target.
  189. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  190. // Start a test backend, and setup the fake RLS server to return this as a
  191. // target in the RLS response.
  192. testBackendCh, testBackendAddress := startBackend(t)
  193. rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
  194. return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
  195. })
  196. // Register a manual resolver and push the RLS service config through it.
  197. r := startManualResolverWithConfig(t, rlsConfig)
  198. // Dial the backend.
  199. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  200. if err != nil {
  201. t.Fatalf("grpc.Dial() failed: %v", err)
  202. }
  203. defer cc.Close()
  204. // Make an RPC and ensure it gets routed to the test backend.
  205. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  206. defer cancel()
  207. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
  208. // Make sure an RLS request is sent out.
  209. verifyRLSRequest(t, rlsReqCh, true)
  210. // Make another RPC and expect it to find the target in the data cache.
  211. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
  212. // Make sure no RLS request is sent out this time around.
  213. verifyRLSRequest(t, rlsReqCh, false)
  214. }
  215. // Test verifies the scenario where there is a matching entry in the data cache
  216. // which is valid and there is no pending request. The pick is expected to be
  217. // delegated to the child policy.
  218. func (s) TestPick_DataCacheHit_NoPendingEntry_ValidEntry_WithHeaderData(t *testing.T) {
  219. // Start an RLS server and set the throttler to never throttle requests.
  220. rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil)
  221. overrideAdaptiveThrottler(t, neverThrottlingThrottler())
  222. // Build the RLS config without a default target.
  223. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  224. // Start a test backend which expects the header data contents sent from the
  225. // RLS server to be part of RPC metadata as X-Google-RLS-Data header.
  226. const headerDataContents = "foo,bar,baz"
  227. backend := &stubserver.StubServer{
  228. EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  229. gotHeaderData := metadata.ValueFromIncomingContext(ctx, "x-google-rls-data")
  230. if len(gotHeaderData) != 1 || gotHeaderData[0] != headerDataContents {
  231. return nil, fmt.Errorf("got metadata in `X-Google-RLS-Data` is %v, want %s", gotHeaderData, headerDataContents)
  232. }
  233. return &testpb.Empty{}, nil
  234. },
  235. }
  236. if err := backend.StartServer(); err != nil {
  237. t.Fatalf("Failed to start backend: %v", err)
  238. }
  239. t.Logf("Started TestService backend at: %q", backend.Address)
  240. defer backend.Stop()
  241. // Setup the fake RLS server to return the above backend as a target in the
  242. // RLS response. Also, populate the header data field in the response.
  243. rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
  244. return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{
  245. Targets: []string{backend.Address},
  246. HeaderData: headerDataContents,
  247. }}
  248. })
  249. // Register a manual resolver and push the RLS service config through it.
  250. r := startManualResolverWithConfig(t, rlsConfig)
  251. // Dial the backend.
  252. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  253. if err != nil {
  254. t.Fatalf("grpc.Dial() failed: %v", err)
  255. }
  256. defer cc.Close()
  257. // Make an RPC and ensure it gets routed to the test backend with the header
  258. // data sent by the RLS server.
  259. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  260. defer cancel()
  261. if _, err := testgrpc.NewTestServiceClient(cc).EmptyCall(ctx, &testpb.Empty{}); err != nil {
  262. t.Fatalf("EmptyCall() RPC: %v", err)
  263. }
  264. }
  265. // Test verifies the scenario where there is a matching entry in the data cache
  266. // which is stale and there is no pending request. The pick is expected to be
  267. // delegated to the child policy with a proactive cache refresh.
  268. func (s) TestPick_DataCacheHit_NoPendingEntry_StaleEntry(t *testing.T) {
  269. // We expect the same pick behavior (i.e delegated to the child policy) for
  270. // a proactive refresh whether the control channel is throttled or not.
  271. tests := []struct {
  272. name string
  273. throttled bool
  274. }{
  275. {
  276. name: "throttled",
  277. throttled: true,
  278. },
  279. {
  280. name: "notThrottled",
  281. throttled: false,
  282. },
  283. }
  284. for _, test := range tests {
  285. t.Run(test.name, func(t *testing.T) {
  286. // Start an RLS server and setup the throttler appropriately.
  287. rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
  288. var throttler *fakeThrottler
  289. firstRPCDone := grpcsync.NewEvent()
  290. if test.throttled {
  291. throttler = oneTimeAllowingThrottler(firstRPCDone)
  292. overrideAdaptiveThrottler(t, throttler)
  293. } else {
  294. throttler = neverThrottlingThrottler()
  295. overrideAdaptiveThrottler(t, throttler)
  296. }
  297. // Build the RLS config without a default target. Set the stale age
  298. // to a very low value to force entries to become stale quickly.
  299. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  300. rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(time.Minute)
  301. rlsConfig.RouteLookupConfig.StaleAge = durationpb.New(defaultTestShortTimeout)
  302. // Start a test backend, and setup the fake RLS server to return
  303. // this as a target in the RLS response.
  304. testBackendCh, testBackendAddress := startBackend(t)
  305. rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
  306. return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
  307. })
  308. // Register a manual resolver and push the RLS service config
  309. // through it.
  310. r := startManualResolverWithConfig(t, rlsConfig)
  311. // Dial the backend.
  312. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  313. if err != nil {
  314. t.Fatalf("grpc.Dial() failed: %v", err)
  315. }
  316. defer cc.Close()
  317. // Make an RPC and ensure it gets routed to the test backend.
  318. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  319. defer cancel()
  320. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
  321. // Make sure an RLS request is sent out.
  322. verifyRLSRequest(t, rlsReqCh, true)
  323. firstRPCDone.Fire()
  324. // The cache entry has a large maxAge, but a small stateAge. We keep
  325. // retrying until the cache entry becomes stale, in which case we expect a
  326. // proactive cache refresh.
  327. //
  328. // If the control channel is not throttled, then we expect an RLS request
  329. // to be sent out. If the control channel is throttled, we expect the fake
  330. // throttler's channel to be signalled.
  331. for {
  332. // Make another RPC and expect it to find the target in the data cache.
  333. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
  334. if !test.throttled {
  335. select {
  336. case <-time.After(defaultTestShortTimeout):
  337. // Go back and retry the RPC.
  338. case <-rlsReqCh:
  339. return
  340. }
  341. } else {
  342. select {
  343. case <-time.After(defaultTestShortTimeout):
  344. // Go back and retry the RPC.
  345. case <-throttler.throttleCh:
  346. return
  347. }
  348. }
  349. }
  350. })
  351. }
  352. }
  353. // Test verifies scenarios where there is a matching entry in the data cache
  354. // which has expired and there is no pending request.
  355. func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntry(t *testing.T) {
  356. tests := []struct {
  357. name string
  358. throttled bool
  359. withDefaultTarget bool
  360. }{
  361. {
  362. name: "throttledWithDefaultTarget",
  363. throttled: true,
  364. withDefaultTarget: true,
  365. },
  366. {
  367. name: "throttledWithoutDefaultTarget",
  368. throttled: true,
  369. withDefaultTarget: false,
  370. },
  371. {
  372. name: "notThrottled",
  373. throttled: false,
  374. },
  375. }
  376. for _, test := range tests {
  377. t.Run(test.name, func(t *testing.T) {
  378. // Start an RLS server and setup the throttler appropriately.
  379. rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
  380. var throttler *fakeThrottler
  381. firstRPCDone := grpcsync.NewEvent()
  382. if test.throttled {
  383. throttler = oneTimeAllowingThrottler(firstRPCDone)
  384. overrideAdaptiveThrottler(t, throttler)
  385. } else {
  386. throttler = neverThrottlingThrottler()
  387. overrideAdaptiveThrottler(t, throttler)
  388. }
  389. // Build the RLS config with a very low value for maxAge. This will
  390. // ensure that cache entries become invalid very soon.
  391. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  392. rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
  393. // Start a default backend if needed.
  394. var defBackendCh chan struct{}
  395. if test.withDefaultTarget {
  396. var defBackendAddress string
  397. defBackendCh, defBackendAddress = startBackend(t)
  398. rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
  399. }
  400. // Start a test backend, and setup the fake RLS server to return
  401. // this as a target in the RLS response.
  402. testBackendCh, testBackendAddress := startBackend(t)
  403. rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
  404. return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
  405. })
  406. // Register a manual resolver and push the RLS service config
  407. // through it.
  408. r := startManualResolverWithConfig(t, rlsConfig)
  409. // Dial the backend.
  410. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  411. if err != nil {
  412. t.Fatalf("grpc.Dial() failed: %v", err)
  413. }
  414. defer cc.Close()
  415. // Make an RPC and ensure it gets routed to the test backend.
  416. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  417. defer cancel()
  418. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
  419. // Make sure an RLS request is sent out.
  420. verifyRLSRequest(t, rlsReqCh, true)
  421. firstRPCDone.Fire()
  422. // Keep retrying the RPC until the cache entry expires. Expected behavior
  423. // is dependent on the scenario being tested.
  424. switch {
  425. case test.throttled && test.withDefaultTarget:
  426. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
  427. <-throttler.throttleCh
  428. case test.throttled && !test.withDefaultTarget:
  429. makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, errRLSThrottled)
  430. <-throttler.throttleCh
  431. case !test.throttled:
  432. for {
  433. // The backend to which the RPC is routed does not change after the
  434. // cache entry expires because the control channel is not throttled.
  435. // So, we need to keep retrying until the cache entry expires, at
  436. // which point we expect an RLS request to be sent out and the RPC to
  437. // get routed to the same testBackend.
  438. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
  439. select {
  440. case <-time.After(defaultTestShortTimeout):
  441. // Go back and retry the RPC.
  442. case <-rlsReqCh:
  443. return
  444. }
  445. }
  446. }
  447. })
  448. }
  449. }
  450. // Test verifies scenarios where there is a matching entry in the data cache
  451. // which has expired and is in backoff and there is no pending request.
  452. func (s) TestPick_DataCacheHit_NoPendingEntry_ExpiredEntryInBackoff(t *testing.T) {
  453. tests := []struct {
  454. name string
  455. withDefaultTarget bool
  456. }{
  457. {
  458. name: "withDefaultTarget",
  459. withDefaultTarget: true,
  460. },
  461. {
  462. name: "withoutDefaultTarget",
  463. withDefaultTarget: false,
  464. },
  465. }
  466. for _, test := range tests {
  467. t.Run(test.name, func(t *testing.T) {
  468. // Start an RLS server and set the throttler to never throttle requests.
  469. rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
  470. overrideAdaptiveThrottler(t, neverThrottlingThrottler())
  471. // Override the backoff strategy to return a large backoff which
  472. // will make sure the date cache entry remains in backoff for the
  473. // duration of the test.
  474. origBackoffStrategy := defaultBackoffStrategy
  475. defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
  476. defer func() { defaultBackoffStrategy = origBackoffStrategy }()
  477. // Build the RLS config with a very low value for maxAge. This will
  478. // ensure that cache entries become invalid very soon.
  479. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  480. rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
  481. // Start a default backend if needed.
  482. var defBackendCh chan struct{}
  483. if test.withDefaultTarget {
  484. var defBackendAddress string
  485. defBackendCh, defBackendAddress = startBackend(t)
  486. rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
  487. }
  488. // Start a test backend, and set up the fake RLS server to return this as
  489. // a target in the RLS response.
  490. testBackendCh, testBackendAddress := startBackend(t)
  491. rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
  492. return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
  493. })
  494. // Register a manual resolver and push the RLS service config through it.
  495. r := startManualResolverWithConfig(t, rlsConfig)
  496. // Dial the backend.
  497. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  498. if err != nil {
  499. t.Fatalf("grpc.Dial() failed: %v", err)
  500. }
  501. defer cc.Close()
  502. // Make an RPC and ensure it gets routed to the test backend.
  503. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  504. defer cancel()
  505. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
  506. // Make sure an RLS request is sent out.
  507. verifyRLSRequest(t, rlsReqCh, true)
  508. // Set up the fake RLS server to return errors. This will push the cache
  509. // entry into backoff.
  510. var rlsLastErr = status.Error(codes.DeadlineExceeded, "last RLS request failed")
  511. rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
  512. return &rlstest.RouteLookupResponse{Err: rlsLastErr}
  513. })
  514. // Since the RLS server is now configured to return errors, this will push
  515. // the cache entry into backoff. The pick will be delegated to the default
  516. // backend if one exits, and will fail with the error returned by the RLS
  517. // server otherwise.
  518. if test.withDefaultTarget {
  519. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, defBackendCh)
  520. } else {
  521. makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, rlsLastErr)
  522. }
  523. })
  524. }
  525. }
  526. // Test verifies scenarios where there is a matching entry in the data cache
  527. // which is stale and there is a pending request.
  528. func (s) TestPick_DataCacheHit_PendingEntryExists_StaleEntry(t *testing.T) {
  529. tests := []struct {
  530. name string
  531. withDefaultTarget bool
  532. }{
  533. {
  534. name: "withDefaultTarget",
  535. withDefaultTarget: true,
  536. },
  537. {
  538. name: "withoutDefaultTarget",
  539. withDefaultTarget: false,
  540. },
  541. }
  542. for _, test := range tests {
  543. t.Run(test.name, func(t *testing.T) {
  544. // A unary interceptor which simply calls the underlying handler
  545. // until the first client RPC is done. We want one client RPC to
  546. // succeed to ensure that a data cache entry is created. For
  547. // subsequent client RPCs which result in RLS requests, this
  548. // interceptor blocks until the test's context expires. And since we
  549. // configure the RLS LB policy with a really low value for max age,
  550. // this allows us to simulate the condition where the it has an
  551. // expired entry and a pending entry in the cache.
  552. rlsReqCh := make(chan struct{}, 1)
  553. firstRPCDone := grpcsync.NewEvent()
  554. interceptor := func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  555. select {
  556. case rlsReqCh <- struct{}{}:
  557. default:
  558. }
  559. if firstRPCDone.HasFired() {
  560. <-ctx.Done()
  561. return nil, ctx.Err()
  562. }
  563. return handler(ctx, req)
  564. }
  565. // Start an RLS server and set the throttler to never throttle.
  566. rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
  567. overrideAdaptiveThrottler(t, neverThrottlingThrottler())
  568. // Build RLS service config with an optional default target.
  569. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  570. if test.withDefaultTarget {
  571. _, defBackendAddress := startBackend(t)
  572. rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
  573. }
  574. // Low value for stale age to force entries to become stale quickly.
  575. rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(time.Minute)
  576. rlsConfig.RouteLookupConfig.StaleAge = durationpb.New(defaultTestShortTimeout)
  577. // Start a test backend, and setup the fake RLS server to return
  578. // this as a target in the RLS response.
  579. testBackendCh, testBackendAddress := startBackend(t)
  580. rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
  581. return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
  582. })
  583. // Register a manual resolver and push the RLS service config
  584. // through it.
  585. r := startManualResolverWithConfig(t, rlsConfig)
  586. // Dial the backend.
  587. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  588. if err != nil {
  589. t.Fatalf("grpc.Dial() failed: %v", err)
  590. }
  591. defer cc.Close()
  592. // Make an RPC and ensure it gets routed to the test backend.
  593. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  594. defer cancel()
  595. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
  596. // Make sure an RLS request is sent out.
  597. verifyRLSRequest(t, rlsReqCh, true)
  598. firstRPCDone.Fire()
  599. // The cache entry has a large maxAge, but a small stateAge. We keep
  600. // retrying until the cache entry becomes stale, in which case we expect a
  601. // proactive cache refresh.
  602. for {
  603. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
  604. select {
  605. case <-time.After(defaultTestShortTimeout):
  606. // Go back and retry the RPC.
  607. case <-rlsReqCh:
  608. return
  609. }
  610. }
  611. })
  612. }
  613. }
  614. // Test verifies scenarios where there is a matching entry in the data cache
  615. // which is expired and there is a pending request.
  616. func (s) TestPick_DataCacheHit_PendingEntryExists_ExpiredEntry(t *testing.T) {
  617. tests := []struct {
  618. name string
  619. withDefaultTarget bool
  620. }{
  621. {
  622. name: "withDefaultTarget",
  623. withDefaultTarget: true,
  624. },
  625. {
  626. name: "withoutDefaultTarget",
  627. withDefaultTarget: false,
  628. },
  629. }
  630. for _, test := range tests {
  631. t.Run(test.name, func(t *testing.T) {
  632. // A unary interceptor which simply calls the underlying handler
  633. // until the first client RPC is done. We want one client RPC to
  634. // succeed to ensure that a data cache entry is created. For
  635. // subsequent client RPCs which result in RLS requests, this
  636. // interceptor blocks until the test's context expires. And since we
  637. // configure the RLS LB policy with a really low value for max age,
  638. // this allows us to simulate the condition where the it has an
  639. // expired entry and a pending entry in the cache.
  640. rlsReqCh := make(chan struct{}, 1)
  641. firstRPCDone := grpcsync.NewEvent()
  642. interceptor := func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  643. select {
  644. case rlsReqCh <- struct{}{}:
  645. default:
  646. }
  647. if firstRPCDone.HasFired() {
  648. <-ctx.Done()
  649. return nil, ctx.Err()
  650. }
  651. return handler(ctx, req)
  652. }
  653. // Start an RLS server and set the throttler to never throttle.
  654. rlsServer, _ := rlstest.SetupFakeRLSServer(t, nil, grpc.UnaryInterceptor(interceptor))
  655. overrideAdaptiveThrottler(t, neverThrottlingThrottler())
  656. // Build RLS service config with an optional default target.
  657. rlsConfig := buildBasicRLSConfigWithChildPolicy(t, t.Name(), rlsServer.Address)
  658. if test.withDefaultTarget {
  659. _, defBackendAddress := startBackend(t)
  660. rlsConfig.RouteLookupConfig.DefaultTarget = defBackendAddress
  661. }
  662. // Set a low value for maxAge to ensure cache entries expire soon.
  663. rlsConfig.RouteLookupConfig.MaxAge = durationpb.New(defaultTestShortTimeout)
  664. // Start a test backend, and setup the fake RLS server to return
  665. // this as a target in the RLS response.
  666. testBackendCh, testBackendAddress := startBackend(t)
  667. rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
  668. return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}}
  669. })
  670. // Register a manual resolver and push the RLS service config
  671. // through it.
  672. r := startManualResolverWithConfig(t, rlsConfig)
  673. // Dial the backend.
  674. cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  675. if err != nil {
  676. t.Fatalf("grpc.Dial() failed: %v", err)
  677. }
  678. defer cc.Close()
  679. // Make an RPC and ensure it gets routed to the test backend.
  680. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  681. defer cancel()
  682. makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh)
  683. // Make sure an RLS request is sent out.
  684. verifyRLSRequest(t, rlsReqCh, true)
  685. firstRPCDone.Fire()
  686. // At this point, we have a cache entry with a small maxAge, and the
  687. // RLS server is configured to block on further RLS requests. As we
  688. // retry the RPC, at some point the cache entry would expire and
  689. // force us to send an RLS request which would block on the server,
  690. // giving us a pending cache entry for the duration of the test.
  691. go func() {
  692. for client := testgrpc.NewTestServiceClient(cc); ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
  693. client.EmptyCall(ctx, &testpb.Empty{})
  694. }
  695. }()
  696. verifyRLSRequest(t, rlsReqCh, true)
  697. // Another RPC at this point should find the pending entry and be queued.
  698. // But since we pass a small deadline, this RPC should fail with a
  699. // deadline exceeded error since the pending request does not return until
  700. // the test is done. And since we have a pending entry, we expect no RLS
  701. // request to be sent out.
  702. sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
  703. defer sCancel()
  704. makeTestRPCAndVerifyError(sCtx, t, cc, codes.DeadlineExceeded, context.DeadlineExceeded)
  705. verifyRLSRequest(t, rlsReqCh, false)
  706. })
  707. }
  708. }
  709. func TestIsFullMethodNameValid(t *testing.T) {
  710. tests := []struct {
  711. desc string
  712. methodName string
  713. want bool
  714. }{
  715. {
  716. desc: "does not start with a slash",
  717. methodName: "service/method",
  718. want: false,
  719. },
  720. {
  721. desc: "does not contain a method",
  722. methodName: "/service",
  723. want: false,
  724. },
  725. {
  726. desc: "path has more elements",
  727. methodName: "/service/path/to/method",
  728. want: false,
  729. },
  730. {
  731. desc: "valid",
  732. methodName: "/service/method",
  733. want: true,
  734. },
  735. }
  736. for _, test := range tests {
  737. t.Run(test.desc, func(t *testing.T) {
  738. if got := isFullMethodNameValid(test.methodName); got != test.want {
  739. t.Fatalf("isFullMethodNameValid(%q) = %v, want %v", test.methodName, got, test.want)
  740. }
  741. })
  742. }
  743. }