service.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. /*
  2. *
  3. * Copyright 2022 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package orca
  19. import (
  20. "fmt"
  21. "time"
  22. "google.golang.org/grpc"
  23. "google.golang.org/grpc/codes"
  24. "google.golang.org/grpc/internal"
  25. ointernal "google.golang.org/grpc/orca/internal"
  26. "google.golang.org/grpc/status"
  27. v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
  28. v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
  29. )
  30. func init() {
  31. ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
  32. so.allowAnyMinReportingInterval = true
  33. }
  34. internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval
  35. }
  36. // minReportingInterval is the absolute minimum value supported for
  37. // out-of-band metrics reporting from the ORCA service implementation
  38. // provided by the orca package.
  39. const minReportingInterval = 30 * time.Second
  40. // Service provides an implementation of the OpenRcaService as defined in the
  41. // [ORCA] service protos. Instances of this type must be created via calls to
  42. // Register() or NewService().
  43. //
  44. // Server applications can use the SetXxx() and DeleteXxx() methods to record
  45. // measurements corresponding to backend metrics, which eventually get pushed to
  46. // clients who have initiated the SteamCoreMetrics streaming RPC.
  47. //
  48. // [ORCA]: https://github.com/cncf/xds/blob/main/xds/service/orca/v3/orca.proto
  49. type Service struct {
  50. v3orcaservicegrpc.UnimplementedOpenRcaServiceServer
  51. // Minimum reporting interval, as configured by the user, or the default.
  52. minReportingInterval time.Duration
  53. smProvider ServerMetricsProvider
  54. }
  55. // ServiceOptions contains options to configure the ORCA service implementation.
  56. type ServiceOptions struct {
  57. // ServerMetricsProvider is the provider to be used by the service for
  58. // reporting OOB server metrics to clients. Typically obtained via
  59. // NewServerMetricsRecorder. This field is required.
  60. ServerMetricsProvider ServerMetricsProvider
  61. // MinReportingInterval sets the lower bound for how often out-of-band
  62. // metrics are reported on the streaming RPC initiated by the client. If
  63. // unspecified, negative or less than the default value of 30s, the default
  64. // is used. Clients may request a higher value as part of the
  65. // StreamCoreMetrics streaming RPC.
  66. MinReportingInterval time.Duration
  67. // Allow a minReportingInterval which is less than the default of 30s.
  68. // Used for testing purposes only.
  69. allowAnyMinReportingInterval bool
  70. }
  71. // A ServerMetricsProvider provides ServerMetrics upon request.
  72. type ServerMetricsProvider interface {
  73. // ServerMetrics returns the current set of server metrics. It should
  74. // return a read-only, immutable copy of the data that is active at the
  75. // time of the call.
  76. ServerMetrics() *ServerMetrics
  77. }
  78. // NewService creates a new ORCA service implementation configured using the
  79. // provided options.
  80. func NewService(opts ServiceOptions) (*Service, error) {
  81. // The default minimum supported reporting interval value can be overridden
  82. // for testing purposes through the orca internal package.
  83. if opts.ServerMetricsProvider == nil {
  84. return nil, fmt.Errorf("ServerMetricsProvider not specified")
  85. }
  86. if !opts.allowAnyMinReportingInterval {
  87. if opts.MinReportingInterval < 0 || opts.MinReportingInterval < minReportingInterval {
  88. opts.MinReportingInterval = minReportingInterval
  89. }
  90. }
  91. service := &Service{
  92. minReportingInterval: opts.MinReportingInterval,
  93. smProvider: opts.ServerMetricsProvider,
  94. }
  95. return service, nil
  96. }
  97. // Register creates a new ORCA service implementation configured using the
  98. // provided options and registers the same on the provided grpc Server.
  99. func Register(s *grpc.Server, opts ServiceOptions) error {
  100. // TODO(https://github.com/cncf/xds/issues/41): replace *grpc.Server with
  101. // grpc.ServiceRegistrar when possible.
  102. service, err := NewService(opts)
  103. if err != nil {
  104. return err
  105. }
  106. v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, service)
  107. return nil
  108. }
  109. // determineReportingInterval determines the reporting interval for out-of-band
  110. // metrics. If the reporting interval is not specified in the request, or is
  111. // negative or is less than the configured minimum (via
  112. // ServiceOptions.MinReportingInterval), the latter is used. Else the value from
  113. // the incoming request is used.
  114. func (s *Service) determineReportingInterval(req *v3orcaservicepb.OrcaLoadReportRequest) time.Duration {
  115. if req.GetReportInterval() == nil {
  116. return s.minReportingInterval
  117. }
  118. dur := req.GetReportInterval().AsDuration()
  119. if dur < s.minReportingInterval {
  120. logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using minimum", dur, s.minReportingInterval)
  121. return s.minReportingInterval
  122. }
  123. return dur
  124. }
  125. func (s *Service) sendMetricsResponse(stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
  126. return stream.Send(s.smProvider.ServerMetrics().toLoadReportProto())
  127. }
  128. // StreamCoreMetrics streams custom backend metrics injected by the server
  129. // application.
  130. func (s *Service) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
  131. ticker := time.NewTicker(s.determineReportingInterval(req))
  132. defer ticker.Stop()
  133. for {
  134. if err := s.sendMetricsResponse(stream); err != nil {
  135. return err
  136. }
  137. // Send a response containing the currently recorded metrics
  138. select {
  139. case <-stream.Context().Done():
  140. return status.Error(codes.Canceled, "Stream has ended.")
  141. case <-ticker.C:
  142. }
  143. }
  144. }