service_test.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. /*
  2. *
  3. * Copyright 2022 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package orca_test
  19. import (
  20. "context"
  21. "fmt"
  22. "sync"
  23. "testing"
  24. "time"
  25. "github.com/golang/protobuf/proto"
  26. "github.com/google/go-cmp/cmp"
  27. "google.golang.org/grpc"
  28. "google.golang.org/grpc/credentials/insecure"
  29. "google.golang.org/grpc/internal/pretty"
  30. "google.golang.org/grpc/internal/testutils"
  31. "google.golang.org/grpc/orca"
  32. "google.golang.org/grpc/orca/internal"
  33. "google.golang.org/protobuf/types/known/durationpb"
  34. v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
  35. v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
  36. v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
  37. testgrpc "google.golang.org/grpc/interop/grpc_testing"
  38. testpb "google.golang.org/grpc/interop/grpc_testing"
  39. )
  40. const requestsMetricKey = "test-service-requests"
  41. // An implementation of grpc_testing.TestService for the purpose of this test.
  42. // We cannot use the StubServer approach here because we need to register the
  43. // OpenRCAService as well on the same gRPC server.
  44. type testServiceImpl struct {
  45. mu sync.Mutex
  46. requests int64
  47. testgrpc.TestServiceServer
  48. smr orca.ServerMetricsRecorder
  49. }
  50. func (t *testServiceImpl) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  51. t.mu.Lock()
  52. t.requests++
  53. t.mu.Unlock()
  54. t.smr.SetNamedUtilization(requestsMetricKey, float64(t.requests)*0.01)
  55. t.smr.SetCPUUtilization(50.0)
  56. t.smr.SetMemoryUtilization(0.9)
  57. t.smr.SetApplicationUtilization(1.2)
  58. return &testpb.SimpleResponse{}, nil
  59. }
  60. func (t *testServiceImpl) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
  61. t.smr.DeleteNamedUtilization(requestsMetricKey)
  62. t.smr.SetCPUUtilization(0)
  63. t.smr.SetMemoryUtilization(0)
  64. t.smr.DeleteApplicationUtilization()
  65. return &testpb.Empty{}, nil
  66. }
  67. // TestE2E_CustomBackendMetrics_OutOfBand tests the injection of out-of-band
  68. // custom backend metrics from the server application, and verifies that
  69. // expected load reports are received at the client.
  70. //
  71. // TODO: Change this test to use the client API, when ready, to read the
  72. // out-of-band metrics pushed by the server.
  73. func (s) TestE2E_CustomBackendMetrics_OutOfBand(t *testing.T) {
  74. lis, err := testutils.LocalTCPListener()
  75. if err != nil {
  76. t.Fatal(err)
  77. }
  78. // Override the min reporting interval in the internal package.
  79. const shortReportingInterval = 10 * time.Millisecond
  80. smr := orca.NewServerMetricsRecorder()
  81. opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval, ServerMetricsProvider: smr}
  82. internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts)
  83. // Register the OpenRCAService with a very short metrics reporting interval.
  84. s := grpc.NewServer()
  85. if err := orca.Register(s, opts); err != nil {
  86. t.Fatalf("orca.EnableOutOfBandMetricsReportingForTesting() failed: %v", err)
  87. }
  88. // Register the test service implementation on the same grpc server, and start serving.
  89. testgrpc.RegisterTestServiceServer(s, &testServiceImpl{smr: smr})
  90. go s.Serve(lis)
  91. defer s.Stop()
  92. t.Logf("Started gRPC server at %s...", lis.Addr().String())
  93. // Dial the test server.
  94. cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
  95. if err != nil {
  96. t.Fatalf("grpc.Dial(%s) failed: %v", lis.Addr().String(), err)
  97. }
  98. defer cc.Close()
  99. // Spawn a goroutine which sends 20 unary RPCs to the test server. This
  100. // will trigger the injection of custom backend metrics from the
  101. // testServiceImpl.
  102. const numRequests = 20
  103. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  104. defer cancel()
  105. testStub := testgrpc.NewTestServiceClient(cc)
  106. errCh := make(chan error, 1)
  107. go func() {
  108. for i := 0; i < numRequests; i++ {
  109. if _, err := testStub.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
  110. errCh <- fmt.Errorf("UnaryCall failed: %v", err)
  111. return
  112. }
  113. time.Sleep(time.Millisecond)
  114. }
  115. errCh <- nil
  116. }()
  117. // Start the server streaming RPC to receive custom backend metrics.
  118. oobStub := v3orcaservicegrpc.NewOpenRcaServiceClient(cc)
  119. stream, err := oobStub.StreamCoreMetrics(ctx, &v3orcaservicepb.OrcaLoadReportRequest{ReportInterval: durationpb.New(shortReportingInterval)})
  120. if err != nil {
  121. t.Fatalf("Failed to create a stream for out-of-band metrics")
  122. }
  123. // Wait for the server to push metrics which indicate the completion of all
  124. // the unary RPCs made from the above goroutine.
  125. for {
  126. select {
  127. case <-ctx.Done():
  128. t.Fatal("Timeout when waiting for out-of-band custom backend metrics to match expected values")
  129. case err := <-errCh:
  130. if err != nil {
  131. t.Fatal(err)
  132. }
  133. default:
  134. }
  135. wantProto := &v3orcapb.OrcaLoadReport{
  136. CpuUtilization: 50.0,
  137. MemUtilization: 0.9,
  138. ApplicationUtilization: 1.2,
  139. Utilization: map[string]float64{requestsMetricKey: numRequests * 0.01},
  140. }
  141. gotProto, err := stream.Recv()
  142. if err != nil {
  143. t.Fatalf("Recv() failed: %v", err)
  144. }
  145. if !cmp.Equal(gotProto, wantProto, cmp.Comparer(proto.Equal)) {
  146. t.Logf("Received load report from stream: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(wantProto))
  147. continue
  148. }
  149. // This means that we received the metrics which we expected.
  150. break
  151. }
  152. // The EmptyCall RPC is expected to delete earlier injected metrics.
  153. if _, err := testStub.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  154. t.Fatalf("EmptyCall failed: %v", err)
  155. }
  156. // Wait for the server to push empty metrics which indicate the processing
  157. // of the above EmptyCall RPC.
  158. for {
  159. select {
  160. case <-ctx.Done():
  161. t.Fatal("Timeout when waiting for out-of-band custom backend metrics to match expected values")
  162. default:
  163. }
  164. wantProto := &v3orcapb.OrcaLoadReport{}
  165. gotProto, err := stream.Recv()
  166. if err != nil {
  167. t.Fatalf("Recv() failed: %v", err)
  168. }
  169. if !cmp.Equal(gotProto, wantProto, cmp.Comparer(proto.Equal)) {
  170. t.Logf("Received load report from stream: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(wantProto))
  171. continue
  172. }
  173. // This means that we received the metrics which we expected.
  174. break
  175. }
  176. }