call_metrics.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. /*
  2. *
  3. * Copyright 2022 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package orca
  19. import (
  20. "context"
  21. "sync"
  22. "google.golang.org/grpc"
  23. grpcinternal "google.golang.org/grpc/internal"
  24. "google.golang.org/grpc/metadata"
  25. "google.golang.org/grpc/orca/internal"
  26. "google.golang.org/protobuf/proto"
  27. )
  28. // CallMetricsRecorder allows a service method handler to record per-RPC
  29. // metrics. It contains all utilization-based metrics from
  30. // ServerMetricsRecorder as well as additional request cost metrics.
  31. type CallMetricsRecorder interface {
  32. ServerMetricsRecorder
  33. // SetRequestCost sets the relevant server metric.
  34. SetRequestCost(name string, val float64)
  35. // DeleteRequestCost deletes the relevant server metric to prevent it
  36. // from being sent.
  37. DeleteRequestCost(name string)
  38. // SetNamedMetric sets the relevant server metric.
  39. SetNamedMetric(name string, val float64)
  40. // DeleteNamedMetric deletes the relevant server metric to prevent it
  41. // from being sent.
  42. DeleteNamedMetric(name string)
  43. }
  44. type callMetricsRecorderCtxKey struct{}
  45. // CallMetricsRecorderFromContext returns the RPC-specific custom metrics
  46. // recorder embedded in the provided RPC context.
  47. //
  48. // Returns nil if no custom metrics recorder is found in the provided context,
  49. // which will be the case when custom metrics reporting is not enabled.
  50. func CallMetricsRecorderFromContext(ctx context.Context) CallMetricsRecorder {
  51. rw, ok := ctx.Value(callMetricsRecorderCtxKey{}).(*recorderWrapper)
  52. if !ok {
  53. return nil
  54. }
  55. return rw.recorder()
  56. }
  57. // recorderWrapper is a wrapper around a CallMetricsRecorder to ensure that
  58. // concurrent calls to CallMetricsRecorderFromContext() results in only one
  59. // allocation of the underlying metrics recorder, while also allowing for lazy
  60. // initialization of the recorder itself.
  61. type recorderWrapper struct {
  62. once sync.Once
  63. r CallMetricsRecorder
  64. smp ServerMetricsProvider
  65. }
  66. func (rw *recorderWrapper) recorder() CallMetricsRecorder {
  67. rw.once.Do(func() {
  68. rw.r = newServerMetricsRecorder()
  69. })
  70. return rw.r
  71. }
  72. // setTrailerMetadata adds a trailer metadata entry with key being set to
  73. // `internal.TrailerMetadataKey` and value being set to the binary-encoded
  74. // orca.OrcaLoadReport protobuf message.
  75. //
  76. // This function is called from the unary and streaming interceptors defined
  77. // above. Any errors encountered here are not propagated to the caller because
  78. // they are ignored there. Hence we simply log any errors encountered here at
  79. // warning level, and return nothing.
  80. func (rw *recorderWrapper) setTrailerMetadata(ctx context.Context) {
  81. var sm *ServerMetrics
  82. if rw.smp != nil {
  83. sm = rw.smp.ServerMetrics()
  84. sm.merge(rw.r.ServerMetrics())
  85. } else {
  86. sm = rw.r.ServerMetrics()
  87. }
  88. b, err := proto.Marshal(sm.toLoadReportProto())
  89. if err != nil {
  90. logger.Warningf("Failed to marshal load report: %v", err)
  91. return
  92. }
  93. if err := grpc.SetTrailer(ctx, metadata.Pairs(internal.TrailerMetadataKey, string(b))); err != nil {
  94. logger.Warningf("Failed to set trailer metadata: %v", err)
  95. }
  96. }
  97. var joinServerOptions = grpcinternal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
  98. // CallMetricsServerOption returns a server option which enables the reporting
  99. // of per-RPC custom backend metrics for unary and streaming RPCs.
  100. //
  101. // Server applications interested in injecting custom backend metrics should
  102. // pass the server option returned from this function as the first argument to
  103. // grpc.NewServer().
  104. //
  105. // Subsequently, server RPC handlers can retrieve a reference to the RPC
  106. // specific custom metrics recorder [CallMetricsRecorder] to be used, via a call
  107. // to CallMetricsRecorderFromContext(), and inject custom metrics at any time
  108. // during the RPC lifecycle.
  109. //
  110. // The injected custom metrics will be sent as part of trailer metadata, as a
  111. // binary-encoded [ORCA LoadReport] protobuf message, with the metadata key
  112. // being set be "endpoint-load-metrics-bin".
  113. //
  114. // If a non-nil ServerMetricsProvider is provided, the gRPC server will
  115. // transmit the metrics it provides, overwritten by any per-RPC metrics given
  116. // to the CallMetricsRecorder. A ServerMetricsProvider is typically obtained
  117. // by calling NewServerMetricsRecorder.
  118. //
  119. // [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15
  120. func CallMetricsServerOption(smp ServerMetricsProvider) grpc.ServerOption {
  121. return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt(smp)), grpc.ChainStreamInterceptor(streamInt(smp)))
  122. }
  123. func unaryInt(smp ServerMetricsProvider) func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  124. return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
  125. // We don't allocate the metric recorder here. It will be allocated the
  126. // first time the user calls CallMetricsRecorderFromContext().
  127. rw := &recorderWrapper{smp: smp}
  128. ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw)
  129. resp, err := handler(ctxWithRecorder, req)
  130. // It is safe to access the underlying metric recorder inside the wrapper at
  131. // this point, as the user's RPC handler is done executing, and therefore
  132. // there will be no more calls to CallMetricsRecorderFromContext(), which is
  133. // where the metric recorder is lazy allocated.
  134. if rw.r != nil {
  135. rw.setTrailerMetadata(ctx)
  136. }
  137. return resp, err
  138. }
  139. }
  140. func streamInt(smp ServerMetricsProvider) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  141. return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  142. // We don't allocate the metric recorder here. It will be allocated the
  143. // first time the user calls CallMetricsRecorderFromContext().
  144. rw := &recorderWrapper{smp: smp}
  145. ws := &wrappedStream{
  146. ServerStream: ss,
  147. ctx: newContextWithRecorderWrapper(ss.Context(), rw),
  148. }
  149. err := handler(srv, ws)
  150. // It is safe to access the underlying metric recorder inside the wrapper at
  151. // this point, as the user's RPC handler is done executing, and therefore
  152. // there will be no more calls to CallMetricsRecorderFromContext(), which is
  153. // where the metric recorder is lazy allocated.
  154. if rw.r != nil {
  155. rw.setTrailerMetadata(ss.Context())
  156. }
  157. return err
  158. }
  159. }
  160. func newContextWithRecorderWrapper(ctx context.Context, r *recorderWrapper) context.Context {
  161. return context.WithValue(ctx, callMetricsRecorderCtxKey{}, r)
  162. }
  163. // wrappedStream wraps the grpc.ServerStream received by the streaming
  164. // interceptor. Overrides only the Context() method to return a context which
  165. // contains a reference to the CallMetricsRecorder corresponding to this
  166. // stream.
  167. type wrappedStream struct {
  168. grpc.ServerStream
  169. ctx context.Context
  170. }
  171. func (w *wrappedStream) Context() context.Context {
  172. return w.ctx
  173. }