123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- /*
- *
- * Copyright 2022 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package orca
- import (
- "context"
- "sync"
- "google.golang.org/grpc"
- grpcinternal "google.golang.org/grpc/internal"
- "google.golang.org/grpc/metadata"
- "google.golang.org/grpc/orca/internal"
- "google.golang.org/protobuf/proto"
- )
- // CallMetricsRecorder allows a service method handler to record per-RPC
- // metrics. It contains all utilization-based metrics from
- // ServerMetricsRecorder as well as additional request cost metrics.
- type CallMetricsRecorder interface {
- ServerMetricsRecorder
- // SetRequestCost sets the relevant server metric.
- SetRequestCost(name string, val float64)
- // DeleteRequestCost deletes the relevant server metric to prevent it
- // from being sent.
- DeleteRequestCost(name string)
- // SetNamedMetric sets the relevant server metric.
- SetNamedMetric(name string, val float64)
- // DeleteNamedMetric deletes the relevant server metric to prevent it
- // from being sent.
- DeleteNamedMetric(name string)
- }
- type callMetricsRecorderCtxKey struct{}
- // CallMetricsRecorderFromContext returns the RPC-specific custom metrics
- // recorder embedded in the provided RPC context.
- //
- // Returns nil if no custom metrics recorder is found in the provided context,
- // which will be the case when custom metrics reporting is not enabled.
- func CallMetricsRecorderFromContext(ctx context.Context) CallMetricsRecorder {
- rw, ok := ctx.Value(callMetricsRecorderCtxKey{}).(*recorderWrapper)
- if !ok {
- return nil
- }
- return rw.recorder()
- }
- // recorderWrapper is a wrapper around a CallMetricsRecorder to ensure that
- // concurrent calls to CallMetricsRecorderFromContext() results in only one
- // allocation of the underlying metrics recorder, while also allowing for lazy
- // initialization of the recorder itself.
- type recorderWrapper struct {
- once sync.Once
- r CallMetricsRecorder
- smp ServerMetricsProvider
- }
- func (rw *recorderWrapper) recorder() CallMetricsRecorder {
- rw.once.Do(func() {
- rw.r = newServerMetricsRecorder()
- })
- return rw.r
- }
- // setTrailerMetadata adds a trailer metadata entry with key being set to
- // `internal.TrailerMetadataKey` and value being set to the binary-encoded
- // orca.OrcaLoadReport protobuf message.
- //
- // This function is called from the unary and streaming interceptors defined
- // above. Any errors encountered here are not propagated to the caller because
- // they are ignored there. Hence we simply log any errors encountered here at
- // warning level, and return nothing.
- func (rw *recorderWrapper) setTrailerMetadata(ctx context.Context) {
- var sm *ServerMetrics
- if rw.smp != nil {
- sm = rw.smp.ServerMetrics()
- sm.merge(rw.r.ServerMetrics())
- } else {
- sm = rw.r.ServerMetrics()
- }
- b, err := proto.Marshal(sm.toLoadReportProto())
- if err != nil {
- logger.Warningf("Failed to marshal load report: %v", err)
- return
- }
- if err := grpc.SetTrailer(ctx, metadata.Pairs(internal.TrailerMetadataKey, string(b))); err != nil {
- logger.Warningf("Failed to set trailer metadata: %v", err)
- }
- }
- var joinServerOptions = grpcinternal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption)
- // CallMetricsServerOption returns a server option which enables the reporting
- // of per-RPC custom backend metrics for unary and streaming RPCs.
- //
- // Server applications interested in injecting custom backend metrics should
- // pass the server option returned from this function as the first argument to
- // grpc.NewServer().
- //
- // Subsequently, server RPC handlers can retrieve a reference to the RPC
- // specific custom metrics recorder [CallMetricsRecorder] to be used, via a call
- // to CallMetricsRecorderFromContext(), and inject custom metrics at any time
- // during the RPC lifecycle.
- //
- // The injected custom metrics will be sent as part of trailer metadata, as a
- // binary-encoded [ORCA LoadReport] protobuf message, with the metadata key
- // being set be "endpoint-load-metrics-bin".
- //
- // If a non-nil ServerMetricsProvider is provided, the gRPC server will
- // transmit the metrics it provides, overwritten by any per-RPC metrics given
- // to the CallMetricsRecorder. A ServerMetricsProvider is typically obtained
- // by calling NewServerMetricsRecorder.
- //
- // [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15
- func CallMetricsServerOption(smp ServerMetricsProvider) grpc.ServerOption {
- return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt(smp)), grpc.ChainStreamInterceptor(streamInt(smp)))
- }
- func unaryInt(smp ServerMetricsProvider) func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
- // We don't allocate the metric recorder here. It will be allocated the
- // first time the user calls CallMetricsRecorderFromContext().
- rw := &recorderWrapper{smp: smp}
- ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw)
- resp, err := handler(ctxWithRecorder, req)
- // It is safe to access the underlying metric recorder inside the wrapper at
- // this point, as the user's RPC handler is done executing, and therefore
- // there will be no more calls to CallMetricsRecorderFromContext(), which is
- // where the metric recorder is lazy allocated.
- if rw.r != nil {
- rw.setTrailerMetadata(ctx)
- }
- return resp, err
- }
- }
- func streamInt(smp ServerMetricsProvider) func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
- // We don't allocate the metric recorder here. It will be allocated the
- // first time the user calls CallMetricsRecorderFromContext().
- rw := &recorderWrapper{smp: smp}
- ws := &wrappedStream{
- ServerStream: ss,
- ctx: newContextWithRecorderWrapper(ss.Context(), rw),
- }
- err := handler(srv, ws)
- // It is safe to access the underlying metric recorder inside the wrapper at
- // this point, as the user's RPC handler is done executing, and therefore
- // there will be no more calls to CallMetricsRecorderFromContext(), which is
- // where the metric recorder is lazy allocated.
- if rw.r != nil {
- rw.setTrailerMetadata(ss.Context())
- }
- return err
- }
- }
- func newContextWithRecorderWrapper(ctx context.Context, r *recorderWrapper) context.Context {
- return context.WithValue(ctx, callMetricsRecorderCtxKey{}, r)
- }
- // wrappedStream wraps the grpc.ServerStream received by the streaming
- // interceptor. Overrides only the Context() method to return a context which
- // contains a reference to the CallMetricsRecorder corresponding to this
- // stream.
- type wrappedStream struct {
- grpc.ServerStream
- ctx context.Context
- }
- func (w *wrappedStream) Context() context.Context {
- return w.ctx
- }
|