producer_test.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. /*
  2. * Copyright 2022 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package orca_test
  17. import (
  18. "context"
  19. "fmt"
  20. "sync"
  21. "testing"
  22. "time"
  23. "github.com/golang/protobuf/proto"
  24. "google.golang.org/grpc"
  25. "google.golang.org/grpc/balancer"
  26. "google.golang.org/grpc/balancer/roundrobin"
  27. "google.golang.org/grpc/codes"
  28. "google.golang.org/grpc/credentials/insecure"
  29. "google.golang.org/grpc/internal/grpctest"
  30. "google.golang.org/grpc/internal/testutils"
  31. "google.golang.org/grpc/orca"
  32. "google.golang.org/grpc/orca/internal"
  33. "google.golang.org/grpc/resolver"
  34. "google.golang.org/grpc/resolver/manual"
  35. "google.golang.org/grpc/status"
  36. v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
  37. v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
  38. v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
  39. )
  40. // customLBB wraps a round robin LB policy but provides a ClientConn wrapper to
  41. // add an ORCA OOB report producer for all created SubConns.
  42. type customLBB struct{}
  43. func (customLBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  44. return balancer.Get(roundrobin.Name).Build(&ccWrapper{ClientConn: cc}, opts)
  45. }
  46. func (customLBB) Name() string { return "customLB" }
  47. func init() {
  48. balancer.Register(customLBB{})
  49. }
  50. type ccWrapper struct {
  51. balancer.ClientConn
  52. }
  53. func (w *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
  54. if len(addrs) != 1 {
  55. panic(fmt.Sprintf("got addrs=%v; want len(addrs) == 1", addrs))
  56. }
  57. sc, err := w.ClientConn.NewSubConn(addrs, opts)
  58. if err != nil {
  59. return sc, err
  60. }
  61. l := getListenerInfo(addrs[0])
  62. l.listener.cleanup = orca.RegisterOOBListener(sc, l.listener, l.opts)
  63. l.sc = sc
  64. return sc, nil
  65. }
  66. // listenerInfo is stored in an address's attributes to allow ORCA
  67. // listeners to be registered on subconns created for that address.
  68. type listenerInfo struct {
  69. listener *testOOBListener
  70. opts orca.OOBListenerOptions
  71. sc balancer.SubConn // Set by the LB policy
  72. }
  73. type listenerInfoKey struct{}
  74. func setListenerInfo(addr resolver.Address, l *listenerInfo) resolver.Address {
  75. addr.Attributes = addr.Attributes.WithValue(listenerInfoKey{}, l)
  76. return addr
  77. }
  78. func getListenerInfo(addr resolver.Address) *listenerInfo {
  79. return addr.Attributes.Value(listenerInfoKey{}).(*listenerInfo)
  80. }
  81. // testOOBListener is a simple listener that pushes load reports to a channel.
  82. type testOOBListener struct {
  83. cleanup func()
  84. loadReportCh chan *v3orcapb.OrcaLoadReport
  85. }
  86. func newTestOOBListener() *testOOBListener {
  87. return &testOOBListener{cleanup: func() {}, loadReportCh: make(chan *v3orcapb.OrcaLoadReport)}
  88. }
  89. func (t *testOOBListener) Stop() { t.cleanup() }
  90. func (t *testOOBListener) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
  91. t.loadReportCh <- r
  92. }
  93. // TestProducer is a basic, end-to-end style test of an LB policy with an
  94. // OOBListener communicating with a server with an ORCA service.
  95. func (s) TestProducer(t *testing.T) {
  96. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  97. defer cancel()
  98. // Use a fixed backoff for stream recreation.
  99. oldBackoff := internal.DefaultBackoffFunc
  100. internal.DefaultBackoffFunc = func(int) time.Duration { return 10 * time.Millisecond }
  101. defer func() { internal.DefaultBackoffFunc = oldBackoff }()
  102. // Initialize listener for our ORCA server.
  103. lis, err := testutils.LocalTCPListener()
  104. if err != nil {
  105. t.Fatal(err)
  106. }
  107. // Register the OpenRCAService with a very short metrics reporting interval.
  108. const shortReportingInterval = 50 * time.Millisecond
  109. smr := orca.NewServerMetricsRecorder()
  110. opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval, ServerMetricsProvider: smr}
  111. internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts)
  112. s := grpc.NewServer()
  113. if err := orca.Register(s, opts); err != nil {
  114. t.Fatalf("orca.Register failed: %v", err)
  115. }
  116. go s.Serve(lis)
  117. defer s.Stop()
  118. // Create our client with an OOB listener in the LB policy it selects.
  119. r := manual.NewBuilderWithScheme("whatever")
  120. oobLis := newTestOOBListener()
  121. lisOpts := orca.OOBListenerOptions{ReportInterval: 50 * time.Millisecond}
  122. li := &listenerInfo{listener: oobLis, opts: lisOpts}
  123. addr := setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)
  124. r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
  125. cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  126. if err != nil {
  127. t.Fatalf("grpc.Dial failed: %v", err)
  128. }
  129. defer cc.Close()
  130. // Ensure the OOB listener is stopped before the client is closed to avoid
  131. // a potential irrelevant error in the logs.
  132. defer oobLis.Stop()
  133. // Set a few metrics and wait for them on the client side.
  134. smr.SetCPUUtilization(10)
  135. smr.SetMemoryUtilization(0.1)
  136. smr.SetNamedUtilization("bob", 0.555)
  137. loadReportWant := &v3orcapb.OrcaLoadReport{
  138. CpuUtilization: 10,
  139. MemUtilization: 0.1,
  140. Utilization: map[string]float64{"bob": 0.555},
  141. }
  142. testReport:
  143. for {
  144. select {
  145. case r := <-oobLis.loadReportCh:
  146. t.Log("Load report received: ", r)
  147. if proto.Equal(r, loadReportWant) {
  148. // Success!
  149. break testReport
  150. }
  151. case <-ctx.Done():
  152. t.Fatalf("timed out waiting for load report: %v", loadReportWant)
  153. }
  154. }
  155. // Change and add metrics and wait for them on the client side.
  156. smr.SetCPUUtilization(0.5)
  157. smr.SetMemoryUtilization(0.2)
  158. smr.SetNamedUtilization("mary", 0.321)
  159. loadReportWant = &v3orcapb.OrcaLoadReport{
  160. CpuUtilization: 0.5,
  161. MemUtilization: 0.2,
  162. Utilization: map[string]float64{"bob": 0.555, "mary": 0.321},
  163. }
  164. for {
  165. select {
  166. case r := <-oobLis.loadReportCh:
  167. t.Log("Load report received: ", r)
  168. if proto.Equal(r, loadReportWant) {
  169. // Success!
  170. return
  171. }
  172. case <-ctx.Done():
  173. t.Fatalf("timed out waiting for load report: %v", loadReportWant)
  174. }
  175. }
  176. }
  177. // fakeORCAService is a simple implementation of an ORCA service that pushes
  178. // requests it receives from clients to a channel and sends responses from a
  179. // channel back. This allows tests to verify the client is sending requests
  180. // and processing responses properly.
  181. type fakeORCAService struct {
  182. v3orcaservicegrpc.UnimplementedOpenRcaServiceServer
  183. reqCh chan *v3orcaservicepb.OrcaLoadReportRequest
  184. respCh chan interface{} // either *v3orcapb.OrcaLoadReport or error
  185. }
  186. func newFakeORCAService() *fakeORCAService {
  187. return &fakeORCAService{
  188. reqCh: make(chan *v3orcaservicepb.OrcaLoadReportRequest),
  189. respCh: make(chan interface{}),
  190. }
  191. }
  192. func (f *fakeORCAService) close() {
  193. close(f.respCh)
  194. }
  195. func (f *fakeORCAService) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
  196. f.reqCh <- req
  197. for resp := range f.respCh {
  198. if err, ok := resp.(error); ok {
  199. return err
  200. }
  201. if err := stream.Send(resp.(*v3orcapb.OrcaLoadReport)); err != nil {
  202. // In the event that a stream error occurs, a new stream will have
  203. // been created that was waiting for this response message. Push
  204. // it back onto the channel and return.
  205. //
  206. // This happens because we range over respCh. If we changed to
  207. // instead select on respCh + stream.Context(), the same situation
  208. // could still occur due to a race between noticing the two events,
  209. // so such a workaround would still be needed to prevent flakiness.
  210. f.respCh <- resp
  211. return err
  212. }
  213. }
  214. return nil
  215. }
  216. // TestProducerBackoff verifies that the ORCA producer applies the proper
  217. // backoff after stream failures.
  218. func (s) TestProducerBackoff(t *testing.T) {
  219. grpctest.TLogger.ExpectErrorN("injected error", 4)
  220. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  221. defer cancel()
  222. // Provide a convenient way to expect backoff calls and return a minimal
  223. // value.
  224. const backoffShouldNotBeCalled = 9999 // Use to assert backoff function is not called.
  225. const backoffAllowAny = -1 // Use to ignore any backoff calls.
  226. expectedBackoff := backoffAllowAny
  227. oldBackoff := internal.DefaultBackoffFunc
  228. internal.DefaultBackoffFunc = func(got int) time.Duration {
  229. if expectedBackoff == backoffShouldNotBeCalled {
  230. t.Errorf("Unexpected backoff call; parameter = %v", got)
  231. } else if expectedBackoff != backoffAllowAny {
  232. if got != expectedBackoff {
  233. t.Errorf("Unexpected backoff received; got %v want %v", got, expectedBackoff)
  234. }
  235. }
  236. return time.Millisecond
  237. }
  238. defer func() { internal.DefaultBackoffFunc = oldBackoff }()
  239. // Initialize listener for our ORCA server.
  240. lis, err := testutils.LocalTCPListener()
  241. if err != nil {
  242. t.Fatal(err)
  243. }
  244. // Register our fake ORCA service.
  245. s := grpc.NewServer()
  246. fake := newFakeORCAService()
  247. defer fake.close()
  248. v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, fake)
  249. go s.Serve(lis)
  250. defer s.Stop()
  251. // Define the report interval and a function to wait for it to be sent to
  252. // the server.
  253. const reportInterval = 123 * time.Second
  254. awaitRequest := func(interval time.Duration) {
  255. select {
  256. case req := <-fake.reqCh:
  257. if got := req.GetReportInterval().AsDuration(); got != interval {
  258. t.Errorf("Unexpected report interval; got %v want %v", got, interval)
  259. }
  260. case <-ctx.Done():
  261. t.Fatalf("Did not receive client request")
  262. }
  263. }
  264. // Create our client with an OOB listener in the LB policy it selects.
  265. r := manual.NewBuilderWithScheme("whatever")
  266. oobLis := newTestOOBListener()
  267. lisOpts := orca.OOBListenerOptions{ReportInterval: reportInterval}
  268. li := &listenerInfo{listener: oobLis, opts: lisOpts}
  269. r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
  270. cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  271. if err != nil {
  272. t.Fatalf("grpc.Dial failed: %v", err)
  273. }
  274. defer cc.Close()
  275. // Ensure the OOB listener is stopped before the client is closed to avoid
  276. // a potential irrelevant error in the logs.
  277. defer oobLis.Stop()
  278. // Define a load report to send and expect the client to see.
  279. loadReportWant := &v3orcapb.OrcaLoadReport{
  280. CpuUtilization: 10,
  281. MemUtilization: 0.1,
  282. Utilization: map[string]float64{"bob": 0.555},
  283. }
  284. // Unblock the fake.
  285. awaitRequest(reportInterval)
  286. fake.respCh <- loadReportWant
  287. select {
  288. case r := <-oobLis.loadReportCh:
  289. t.Log("Load report received: ", r)
  290. if proto.Equal(r, loadReportWant) {
  291. // Success!
  292. break
  293. }
  294. case <-ctx.Done():
  295. t.Fatalf("timed out waiting for load report: %v", loadReportWant)
  296. }
  297. // The next request should be immediate, since there was a message
  298. // received.
  299. expectedBackoff = backoffShouldNotBeCalled
  300. fake.respCh <- status.Errorf(codes.Internal, "injected error")
  301. awaitRequest(reportInterval)
  302. // The next requests will need to backoff.
  303. expectedBackoff = 0
  304. fake.respCh <- status.Errorf(codes.Internal, "injected error")
  305. awaitRequest(reportInterval)
  306. expectedBackoff = 1
  307. fake.respCh <- status.Errorf(codes.Internal, "injected error")
  308. awaitRequest(reportInterval)
  309. expectedBackoff = 2
  310. fake.respCh <- status.Errorf(codes.Internal, "injected error")
  311. awaitRequest(reportInterval)
  312. // The next request should be immediate, since there was a message
  313. // received.
  314. expectedBackoff = backoffShouldNotBeCalled
  315. // Send another valid response and wait for it on the client.
  316. fake.respCh <- loadReportWant
  317. select {
  318. case r := <-oobLis.loadReportCh:
  319. t.Log("Load report received: ", r)
  320. if proto.Equal(r, loadReportWant) {
  321. // Success!
  322. break
  323. }
  324. case <-ctx.Done():
  325. t.Fatalf("timed out waiting for load report: %v", loadReportWant)
  326. }
  327. }
  328. // TestProducerMultipleListeners tests that multiple listeners works as
  329. // expected in a producer: requesting the proper interval and delivering the
  330. // update to all listeners.
  331. func (s) TestProducerMultipleListeners(t *testing.T) {
  332. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  333. defer cancel()
  334. // Provide a convenient way to expect backoff calls and return a minimal
  335. // value.
  336. oldBackoff := internal.DefaultBackoffFunc
  337. internal.DefaultBackoffFunc = func(got int) time.Duration {
  338. return time.Millisecond
  339. }
  340. defer func() { internal.DefaultBackoffFunc = oldBackoff }()
  341. // Initialize listener for our ORCA server.
  342. lis, err := testutils.LocalTCPListener()
  343. if err != nil {
  344. t.Fatal(err)
  345. }
  346. // Register our fake ORCA service.
  347. s := grpc.NewServer()
  348. fake := newFakeORCAService()
  349. defer fake.close()
  350. v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, fake)
  351. go s.Serve(lis)
  352. defer s.Stop()
  353. // Define the report interval and a function to wait for it to be sent to
  354. // the server.
  355. const reportInterval1 = 123 * time.Second
  356. const reportInterval2 = 234 * time.Second
  357. const reportInterval3 = 56 * time.Second
  358. awaitRequest := func(interval time.Duration) {
  359. select {
  360. case req := <-fake.reqCh:
  361. if got := req.GetReportInterval().AsDuration(); got != interval {
  362. t.Errorf("Unexpected report interval; got %v want %v", got, interval)
  363. }
  364. case <-ctx.Done():
  365. t.Fatalf("Did not receive client request")
  366. }
  367. }
  368. // Create our client with an OOB listener in the LB policy it selects.
  369. r := manual.NewBuilderWithScheme("whatever")
  370. oobLis1 := newTestOOBListener()
  371. lisOpts1 := orca.OOBListenerOptions{ReportInterval: reportInterval1}
  372. li := &listenerInfo{listener: oobLis1, opts: lisOpts1}
  373. r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
  374. cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
  375. if err != nil {
  376. t.Fatalf("grpc.Dial failed: %v", err)
  377. }
  378. defer cc.Close()
  379. // Ensure the OOB listener is stopped before the client is closed to avoid
  380. // a potential irrelevant error in the logs.
  381. defer oobLis1.Stop()
  382. oobLis2 := newTestOOBListener()
  383. lisOpts2 := orca.OOBListenerOptions{ReportInterval: reportInterval2}
  384. oobLis3 := newTestOOBListener()
  385. lisOpts3 := orca.OOBListenerOptions{ReportInterval: reportInterval3}
  386. // Define a load report to send and expect the client to see.
  387. loadReportWant := &v3orcapb.OrcaLoadReport{
  388. CpuUtilization: 10,
  389. MemUtilization: 0.1,
  390. Utilization: map[string]float64{"bob": 0.555},
  391. }
  392. // Receive reports and update counts for the three listeners.
  393. var reportsMu sync.Mutex
  394. var reportsReceived1, reportsReceived2, reportsReceived3 int
  395. go func() {
  396. for {
  397. select {
  398. case r := <-oobLis1.loadReportCh:
  399. t.Log("Load report 1 received: ", r)
  400. if !proto.Equal(r, loadReportWant) {
  401. t.Errorf("Unexpected report received: %+v", r)
  402. }
  403. reportsMu.Lock()
  404. reportsReceived1++
  405. reportsMu.Unlock()
  406. case r := <-oobLis2.loadReportCh:
  407. t.Log("Load report 2 received: ", r)
  408. if !proto.Equal(r, loadReportWant) {
  409. t.Errorf("Unexpected report received: %+v", r)
  410. }
  411. reportsMu.Lock()
  412. reportsReceived2++
  413. reportsMu.Unlock()
  414. case r := <-oobLis3.loadReportCh:
  415. t.Log("Load report 3 received: ", r)
  416. if !proto.Equal(r, loadReportWant) {
  417. t.Errorf("Unexpected report received: %+v", r)
  418. }
  419. reportsMu.Lock()
  420. reportsReceived3++
  421. reportsMu.Unlock()
  422. case <-ctx.Done():
  423. // Test has ended; exit
  424. return
  425. }
  426. }
  427. }()
  428. // checkReports is a helper function to check the report counts for the three listeners.
  429. checkReports := func(r1, r2, r3 int) {
  430. t.Helper()
  431. for ctx.Err() == nil {
  432. reportsMu.Lock()
  433. if r1 == reportsReceived1 && r2 == reportsReceived2 && r3 == reportsReceived3 {
  434. // Success!
  435. reportsMu.Unlock()
  436. return
  437. }
  438. if reportsReceived1 > r1 || reportsReceived2 > r2 || reportsReceived3 > r3 {
  439. reportsMu.Unlock()
  440. t.Fatalf("received excess reports. got %v %v %v; want %v %v %v", reportsReceived1, reportsReceived2, reportsReceived3, r1, r2, r3)
  441. return
  442. }
  443. reportsMu.Unlock()
  444. time.Sleep(10 * time.Millisecond)
  445. }
  446. t.Fatalf("timed out waiting for reports received. got %v %v %v; want %v %v %v", reportsReceived1, reportsReceived2, reportsReceived3, r1, r2, r3)
  447. }
  448. // Only 1 listener; expect reportInterval1 to be used and expect the report
  449. // to be sent to the listener.
  450. awaitRequest(reportInterval1)
  451. fake.respCh <- loadReportWant
  452. checkReports(1, 0, 0)
  453. // Register listener 2 with a less frequent interval; no need to recreate
  454. // stream. Report should go to both listeners.
  455. oobLis2.cleanup = orca.RegisterOOBListener(li.sc, oobLis2, lisOpts2)
  456. fake.respCh <- loadReportWant
  457. checkReports(2, 1, 0)
  458. // Register listener 3 with a more frequent interval; stream is recreated
  459. // with this interval. The next report will go to all three listeners.
  460. oobLis3.cleanup = orca.RegisterOOBListener(li.sc, oobLis3, lisOpts3)
  461. awaitRequest(reportInterval3)
  462. fake.respCh <- loadReportWant
  463. checkReports(3, 2, 1)
  464. // Another report without a change in listeners should go to all three listeners.
  465. fake.respCh <- loadReportWant
  466. checkReports(4, 3, 2)
  467. // Stop listener 2. This does not affect the interval as listener 3 is
  468. // still the shortest. The next update goes to listeners 1 and 3.
  469. oobLis2.Stop()
  470. fake.respCh <- loadReportWant
  471. checkReports(5, 3, 3)
  472. // Stop listener 3. This makes the interval longer. Reports should only
  473. // go to listener 1 now.
  474. oobLis3.Stop()
  475. awaitRequest(reportInterval1)
  476. fake.respCh <- loadReportWant
  477. checkReports(6, 3, 3)
  478. // Another report without a change in listeners should go to the first listener.
  479. fake.respCh <- loadReportWant
  480. checkReports(7, 3, 3)
  481. }