123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- /*
- *
- * Copyright 2022 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package orca_test
- import (
- "context"
- "fmt"
- "sync"
- "testing"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/google/go-cmp/cmp"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/internal/pretty"
- "google.golang.org/grpc/internal/testutils"
- "google.golang.org/grpc/orca"
- "google.golang.org/grpc/orca/internal"
- "google.golang.org/protobuf/types/known/durationpb"
- v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
- v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
- v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
- testgrpc "google.golang.org/grpc/interop/grpc_testing"
- testpb "google.golang.org/grpc/interop/grpc_testing"
- )
- const requestsMetricKey = "test-service-requests"
- // An implementation of grpc_testing.TestService for the purpose of this test.
- // We cannot use the StubServer approach here because we need to register the
- // OpenRCAService as well on the same gRPC server.
- type testServiceImpl struct {
- mu sync.Mutex
- requests int64
- testgrpc.TestServiceServer
- smr orca.ServerMetricsRecorder
- }
- func (t *testServiceImpl) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
- t.mu.Lock()
- t.requests++
- t.mu.Unlock()
- t.smr.SetNamedUtilization(requestsMetricKey, float64(t.requests)*0.01)
- t.smr.SetCPUUtilization(50.0)
- t.smr.SetMemoryUtilization(0.9)
- t.smr.SetApplicationUtilization(1.2)
- return &testpb.SimpleResponse{}, nil
- }
- func (t *testServiceImpl) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
- t.smr.DeleteNamedUtilization(requestsMetricKey)
- t.smr.SetCPUUtilization(0)
- t.smr.SetMemoryUtilization(0)
- t.smr.DeleteApplicationUtilization()
- return &testpb.Empty{}, nil
- }
- // TestE2E_CustomBackendMetrics_OutOfBand tests the injection of out-of-band
- // custom backend metrics from the server application, and verifies that
- // expected load reports are received at the client.
- //
- // TODO: Change this test to use the client API, when ready, to read the
- // out-of-band metrics pushed by the server.
- func (s) TestE2E_CustomBackendMetrics_OutOfBand(t *testing.T) {
- lis, err := testutils.LocalTCPListener()
- if err != nil {
- t.Fatal(err)
- }
- // Override the min reporting interval in the internal package.
- const shortReportingInterval = 10 * time.Millisecond
- smr := orca.NewServerMetricsRecorder()
- opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval, ServerMetricsProvider: smr}
- internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts)
- // Register the OpenRCAService with a very short metrics reporting interval.
- s := grpc.NewServer()
- if err := orca.Register(s, opts); err != nil {
- t.Fatalf("orca.EnableOutOfBandMetricsReportingForTesting() failed: %v", err)
- }
- // Register the test service implementation on the same grpc server, and start serving.
- testgrpc.RegisterTestServiceServer(s, &testServiceImpl{smr: smr})
- go s.Serve(lis)
- defer s.Stop()
- t.Logf("Started gRPC server at %s...", lis.Addr().String())
- // Dial the test server.
- cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
- if err != nil {
- t.Fatalf("grpc.Dial(%s) failed: %v", lis.Addr().String(), err)
- }
- defer cc.Close()
- // Spawn a goroutine which sends 20 unary RPCs to the test server. This
- // will trigger the injection of custom backend metrics from the
- // testServiceImpl.
- const numRequests = 20
- ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
- defer cancel()
- testStub := testgrpc.NewTestServiceClient(cc)
- errCh := make(chan error, 1)
- go func() {
- for i := 0; i < numRequests; i++ {
- if _, err := testStub.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
- errCh <- fmt.Errorf("UnaryCall failed: %v", err)
- return
- }
- time.Sleep(time.Millisecond)
- }
- errCh <- nil
- }()
- // Start the server streaming RPC to receive custom backend metrics.
- oobStub := v3orcaservicegrpc.NewOpenRcaServiceClient(cc)
- stream, err := oobStub.StreamCoreMetrics(ctx, &v3orcaservicepb.OrcaLoadReportRequest{ReportInterval: durationpb.New(shortReportingInterval)})
- if err != nil {
- t.Fatalf("Failed to create a stream for out-of-band metrics")
- }
- // Wait for the server to push metrics which indicate the completion of all
- // the unary RPCs made from the above goroutine.
- for {
- select {
- case <-ctx.Done():
- t.Fatal("Timeout when waiting for out-of-band custom backend metrics to match expected values")
- case err := <-errCh:
- if err != nil {
- t.Fatal(err)
- }
- default:
- }
- wantProto := &v3orcapb.OrcaLoadReport{
- CpuUtilization: 50.0,
- MemUtilization: 0.9,
- ApplicationUtilization: 1.2,
- Utilization: map[string]float64{requestsMetricKey: numRequests * 0.01},
- }
- gotProto, err := stream.Recv()
- if err != nil {
- t.Fatalf("Recv() failed: %v", err)
- }
- if !cmp.Equal(gotProto, wantProto, cmp.Comparer(proto.Equal)) {
- t.Logf("Received load report from stream: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(wantProto))
- continue
- }
- // This means that we received the metrics which we expected.
- break
- }
- // The EmptyCall RPC is expected to delete earlier injected metrics.
- if _, err := testStub.EmptyCall(ctx, &testpb.Empty{}); err != nil {
- t.Fatalf("EmptyCall failed: %v", err)
- }
- // Wait for the server to push empty metrics which indicate the processing
- // of the above EmptyCall RPC.
- for {
- select {
- case <-ctx.Done():
- t.Fatal("Timeout when waiting for out-of-band custom backend metrics to match expected values")
- default:
- }
- wantProto := &v3orcapb.OrcaLoadReport{}
- gotProto, err := stream.Recv()
- if err != nil {
- t.Fatalf("Recv() failed: %v", err)
- }
- if !cmp.Equal(gotProto, wantProto, cmp.Comparer(proto.Equal)) {
- t.Logf("Received load report from stream: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(wantProto))
- continue
- }
- // This means that we received the metrics which we expected.
- break
- }
- }
|