call_metrics_test.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. /*
  2. *
  3. * Copyright 2022 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package orca_test
  19. import (
  20. "context"
  21. "errors"
  22. "io"
  23. "testing"
  24. "github.com/golang/protobuf/proto"
  25. "github.com/google/go-cmp/cmp"
  26. "google.golang.org/grpc"
  27. "google.golang.org/grpc/credentials/insecure"
  28. "google.golang.org/grpc/internal/pretty"
  29. "google.golang.org/grpc/internal/stubserver"
  30. "google.golang.org/grpc/metadata"
  31. "google.golang.org/grpc/orca"
  32. "google.golang.org/grpc/orca/internal"
  33. v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
  34. testgrpc "google.golang.org/grpc/interop/grpc_testing"
  35. testpb "google.golang.org/grpc/interop/grpc_testing"
  36. )
  37. // TestE2ECallMetricsUnary tests the injection of custom backend metrics from
  38. // the server application for a unary RPC, and verifies that expected load
  39. // reports are received at the client.
  40. func (s) TestE2ECallMetricsUnary(t *testing.T) {
  41. tests := []struct {
  42. desc string
  43. injectMetrics bool
  44. wantProto *v3orcapb.OrcaLoadReport
  45. }{
  46. {
  47. desc: "with custom backend metrics",
  48. injectMetrics: true,
  49. wantProto: &v3orcapb.OrcaLoadReport{
  50. CpuUtilization: 1.0,
  51. MemUtilization: 0.9,
  52. RequestCost: map[string]float64{"queryCost": 25.0},
  53. Utilization: map[string]float64{"queueSize": 0.75},
  54. },
  55. },
  56. {
  57. desc: "with no custom backend metrics",
  58. injectMetrics: false,
  59. },
  60. }
  61. for _, test := range tests {
  62. t.Run(test.desc, func(t *testing.T) {
  63. // A server option to enable reporting of per-call backend metrics.
  64. smr := orca.NewServerMetricsRecorder()
  65. callMetricsServerOption := orca.CallMetricsServerOption(smr)
  66. smr.SetCPUUtilization(1.0)
  67. // An interceptor to injects custom backend metrics, added only when
  68. // the injectMetrics field in the test is set.
  69. injectingInterceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
  70. recorder := orca.CallMetricsRecorderFromContext(ctx)
  71. if recorder == nil {
  72. err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context")
  73. t.Error(err)
  74. return nil, err
  75. }
  76. recorder.SetMemoryUtilization(0.9)
  77. // This value will be overwritten by a write to the same metric
  78. // from the server handler.
  79. recorder.SetNamedUtilization("queueSize", 1.0)
  80. return handler(ctx, req)
  81. }
  82. // A stub server whose unary handler injects custom metrics, if the
  83. // injectMetrics field in the test is set. It overwrites one of the
  84. // values injected above, by the interceptor.
  85. srv := stubserver.StubServer{
  86. EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  87. if !test.injectMetrics {
  88. return &testpb.Empty{}, nil
  89. }
  90. recorder := orca.CallMetricsRecorderFromContext(ctx)
  91. if recorder == nil {
  92. err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context")
  93. t.Error(err)
  94. return nil, err
  95. }
  96. recorder.SetRequestCost("queryCost", 25.0)
  97. recorder.SetNamedUtilization("queueSize", 0.75)
  98. return &testpb.Empty{}, nil
  99. },
  100. }
  101. // Start the stub server with the appropriate server options.
  102. sopts := []grpc.ServerOption{callMetricsServerOption}
  103. if test.injectMetrics {
  104. sopts = append(sopts, grpc.ChainUnaryInterceptor(injectingInterceptor))
  105. }
  106. if err := srv.StartServer(sopts...); err != nil {
  107. t.Fatalf("Failed to start server: %v", err)
  108. }
  109. defer srv.Stop()
  110. // Dial the stub server.
  111. cc, err := grpc.Dial(srv.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
  112. if err != nil {
  113. t.Fatalf("grpc.Dial(%s) failed: %v", srv.Address, err)
  114. }
  115. defer cc.Close()
  116. // Make a unary RPC and expect the trailer metadata to contain the custom
  117. // backend metrics as an ORCA LoadReport protobuf message.
  118. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  119. defer cancel()
  120. client := testgrpc.NewTestServiceClient(cc)
  121. trailer := metadata.MD{}
  122. if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Trailer(&trailer)); err != nil {
  123. t.Fatalf("EmptyCall failed: %v", err)
  124. }
  125. gotProto, err := internal.ToLoadReport(trailer)
  126. if err != nil {
  127. t.Fatalf("When retrieving load report, got error: %v, want: <nil>", err)
  128. }
  129. if test.wantProto != nil && !cmp.Equal(gotProto, test.wantProto, cmp.Comparer(proto.Equal)) {
  130. t.Fatalf("Received load report in trailer: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(test.wantProto))
  131. }
  132. })
  133. }
  134. }
  135. // TestE2ECallMetricsStreaming tests the injection of custom backend metrics
  136. // from the server application for a streaming RPC, and verifies that expected
  137. // load reports are received at the client.
  138. func (s) TestE2ECallMetricsStreaming(t *testing.T) {
  139. tests := []struct {
  140. desc string
  141. injectMetrics bool
  142. wantProto *v3orcapb.OrcaLoadReport
  143. }{
  144. {
  145. desc: "with custom backend metrics",
  146. injectMetrics: true,
  147. wantProto: &v3orcapb.OrcaLoadReport{
  148. CpuUtilization: 1.0,
  149. MemUtilization: 0.5,
  150. RequestCost: map[string]float64{"queryCost": 0.25},
  151. Utilization: map[string]float64{"queueSize": 0.75},
  152. },
  153. },
  154. {
  155. desc: "with no custom backend metrics",
  156. injectMetrics: false,
  157. },
  158. }
  159. for _, test := range tests {
  160. t.Run(test.desc, func(t *testing.T) {
  161. // A server option to enable reporting of per-call backend metrics.
  162. smr := orca.NewServerMetricsRecorder()
  163. callMetricsServerOption := orca.CallMetricsServerOption(smr)
  164. smr.SetCPUUtilization(1.0)
  165. // An interceptor which injects custom backend metrics, added only
  166. // when the injectMetrics field in the test is set.
  167. injectingInterceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  168. recorder := orca.CallMetricsRecorderFromContext(ss.Context())
  169. if recorder == nil {
  170. err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context")
  171. t.Error(err)
  172. return err
  173. }
  174. recorder.SetMemoryUtilization(0.5)
  175. // This value will be overwritten by a write to the same metric
  176. // from the server handler.
  177. recorder.SetNamedUtilization("queueSize", 1.0)
  178. return handler(srv, ss)
  179. }
  180. // A stub server whose streaming handler injects custom metrics, if
  181. // the injectMetrics field in the test is set. It overwrites one of
  182. // the values injected above, by the interceptor.
  183. srv := stubserver.StubServer{
  184. FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  185. if test.injectMetrics {
  186. recorder := orca.CallMetricsRecorderFromContext(stream.Context())
  187. if recorder == nil {
  188. err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context")
  189. t.Error(err)
  190. return err
  191. }
  192. recorder.SetRequestCost("queryCost", 0.25)
  193. recorder.SetNamedUtilization("queueSize", 0.75)
  194. }
  195. // Streaming implementation replies with a dummy response until the
  196. // client closes the stream (in which case it will see an io.EOF),
  197. // or an error occurs while reading/writing messages.
  198. for {
  199. _, err := stream.Recv()
  200. if err == io.EOF {
  201. return nil
  202. }
  203. if err != nil {
  204. return err
  205. }
  206. payload := &testpb.Payload{Body: make([]byte, 32)}
  207. if err := stream.Send(&testpb.StreamingOutputCallResponse{Payload: payload}); err != nil {
  208. return err
  209. }
  210. }
  211. },
  212. }
  213. // Start the stub server with the appropriate server options.
  214. sopts := []grpc.ServerOption{callMetricsServerOption}
  215. if test.injectMetrics {
  216. sopts = append(sopts, grpc.ChainStreamInterceptor(injectingInterceptor))
  217. }
  218. if err := srv.StartServer(sopts...); err != nil {
  219. t.Fatalf("Failed to start server: %v", err)
  220. }
  221. defer srv.Stop()
  222. // Dial the stub server.
  223. cc, err := grpc.Dial(srv.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
  224. if err != nil {
  225. t.Fatalf("grpc.Dial(%s) failed: %v", srv.Address, err)
  226. }
  227. defer cc.Close()
  228. // Start the full duplex streaming RPC.
  229. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  230. defer cancel()
  231. tc := testgrpc.NewTestServiceClient(cc)
  232. stream, err := tc.FullDuplexCall(ctx)
  233. if err != nil {
  234. t.Fatalf("FullDuplexCall failed: %v", err)
  235. }
  236. // Send one request to the server.
  237. payload := &testpb.Payload{Body: make([]byte, 32)}
  238. req := &testpb.StreamingOutputCallRequest{Payload: payload}
  239. if err := stream.Send(req); err != nil {
  240. t.Fatalf("stream.Send() failed: %v", err)
  241. }
  242. // Read one reply from the server.
  243. if _, err := stream.Recv(); err != nil {
  244. t.Fatalf("stream.Recv() failed: %v", err)
  245. }
  246. // Close the sending side.
  247. if err := stream.CloseSend(); err != nil {
  248. t.Fatalf("stream.CloseSend() failed: %v", err)
  249. }
  250. // Make sure it is safe to read the trailer.
  251. for {
  252. if _, err := stream.Recv(); err != nil {
  253. break
  254. }
  255. }
  256. gotProto, err := internal.ToLoadReport(stream.Trailer())
  257. if err != nil {
  258. t.Fatalf("When retrieving load report, got error: %v, want: <nil>", err)
  259. }
  260. if test.wantProto != nil && !cmp.Equal(gotProto, test.wantProto, cmp.Comparer(proto.Equal)) {
  261. t.Fatalf("Received load report in trailer: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(test.wantProto))
  262. }
  263. })
  264. }
  265. }