producer.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /*
  2. * Copyright 2022 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package orca
  17. import (
  18. "context"
  19. "sync"
  20. "time"
  21. "google.golang.org/grpc"
  22. "google.golang.org/grpc/balancer"
  23. "google.golang.org/grpc/codes"
  24. "google.golang.org/grpc/internal/grpcsync"
  25. "google.golang.org/grpc/orca/internal"
  26. "google.golang.org/grpc/status"
  27. v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
  28. v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
  29. v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
  30. "google.golang.org/protobuf/types/known/durationpb"
  31. )
  32. type producerBuilder struct{}
  33. // Build constructs and returns a producer and its cleanup function
  34. func (*producerBuilder) Build(cci interface{}) (balancer.Producer, func()) {
  35. p := &producer{
  36. client: v3orcaservicegrpc.NewOpenRcaServiceClient(cci.(grpc.ClientConnInterface)),
  37. intervals: make(map[time.Duration]int),
  38. listeners: make(map[OOBListener]struct{}),
  39. backoff: internal.DefaultBackoffFunc,
  40. }
  41. return p, func() {
  42. <-p.stopped
  43. }
  44. }
  45. var producerBuilderSingleton = &producerBuilder{}
  46. // OOBListener is used to receive out-of-band load reports as they arrive.
  47. type OOBListener interface {
  48. // OnLoadReport is called when a load report is received.
  49. OnLoadReport(*v3orcapb.OrcaLoadReport)
  50. }
  51. // OOBListenerOptions contains options to control how an OOBListener is called.
  52. type OOBListenerOptions struct {
  53. // ReportInterval specifies how often to request the server to provide a
  54. // load report. May be provided less frequently if the server requires a
  55. // longer interval, or may be provided more frequently if another
  56. // subscriber requests a shorter interval.
  57. ReportInterval time.Duration
  58. }
  59. // RegisterOOBListener registers an out-of-band load report listener on sc.
  60. // Any OOBListener may only be registered once per subchannel at a time. The
  61. // returned stop function must be called when no longer needed. Do not
  62. // register a single OOBListener more than once per SubConn.
  63. func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) {
  64. pr, close := sc.GetOrBuildProducer(producerBuilderSingleton)
  65. p := pr.(*producer)
  66. p.registerListener(l, opts.ReportInterval)
  67. // TODO: When we can register for SubConn state updates, automatically call
  68. // stop() on SHUTDOWN.
  69. // If stop is called multiple times, prevent it from having any effect on
  70. // subsequent calls.
  71. return grpcsync.OnceFunc(func() {
  72. p.unregisterListener(l, opts.ReportInterval)
  73. close()
  74. })
  75. }
  76. type producer struct {
  77. client v3orcaservicegrpc.OpenRcaServiceClient
  78. // backoff is called between stream attempts to determine how long to delay
  79. // to avoid overloading a server experiencing problems. The attempt count
  80. // is incremented when stream errors occur and is reset when the stream
  81. // reports a result.
  82. backoff func(int) time.Duration
  83. mu sync.Mutex
  84. intervals map[time.Duration]int // map from interval time to count of listeners requesting that time
  85. listeners map[OOBListener]struct{} // set of registered listeners
  86. minInterval time.Duration
  87. stop func() // stops the current run goroutine
  88. stopped chan struct{} // closed when the run goroutine exits
  89. }
  90. // registerListener adds the listener and its requested report interval to the
  91. // producer.
  92. func (p *producer) registerListener(l OOBListener, interval time.Duration) {
  93. p.mu.Lock()
  94. defer p.mu.Unlock()
  95. p.listeners[l] = struct{}{}
  96. p.intervals[interval]++
  97. if len(p.listeners) == 1 || interval < p.minInterval {
  98. p.minInterval = interval
  99. p.updateRunLocked()
  100. }
  101. }
  102. // registerListener removes the listener and its requested report interval to
  103. // the producer.
  104. func (p *producer) unregisterListener(l OOBListener, interval time.Duration) {
  105. p.mu.Lock()
  106. defer p.mu.Unlock()
  107. delete(p.listeners, l)
  108. p.intervals[interval]--
  109. if p.intervals[interval] == 0 {
  110. delete(p.intervals, interval)
  111. if p.minInterval == interval {
  112. p.recomputeMinInterval()
  113. p.updateRunLocked()
  114. }
  115. }
  116. }
  117. // recomputeMinInterval sets p.minInterval to the minimum key's value in
  118. // p.intervals.
  119. func (p *producer) recomputeMinInterval() {
  120. first := true
  121. for interval := range p.intervals {
  122. if first || interval < p.minInterval {
  123. p.minInterval = interval
  124. first = false
  125. }
  126. }
  127. }
  128. // updateRunLocked is called whenever the run goroutine needs to be started /
  129. // stopped / restarted due to: 1. the initial listener being registered, 2. the
  130. // final listener being unregistered, or 3. the minimum registered interval
  131. // changing.
  132. func (p *producer) updateRunLocked() {
  133. if p.stop != nil {
  134. p.stop()
  135. p.stop = nil
  136. }
  137. if len(p.listeners) > 0 {
  138. var ctx context.Context
  139. ctx, p.stop = context.WithCancel(context.Background())
  140. p.stopped = make(chan struct{})
  141. go p.run(ctx, p.stopped, p.minInterval)
  142. }
  143. }
  144. // run manages the ORCA OOB stream on the subchannel.
  145. func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Duration) {
  146. defer close(done)
  147. backoffAttempt := 0
  148. backoffTimer := time.NewTimer(0)
  149. for ctx.Err() == nil {
  150. select {
  151. case <-backoffTimer.C:
  152. case <-ctx.Done():
  153. return
  154. }
  155. resetBackoff, err := p.runStream(ctx, interval)
  156. if resetBackoff {
  157. backoffTimer.Reset(0)
  158. backoffAttempt = 0
  159. } else {
  160. backoffTimer.Reset(p.backoff(backoffAttempt))
  161. backoffAttempt++
  162. }
  163. switch {
  164. case err == nil:
  165. // No error was encountered; restart the stream.
  166. case ctx.Err() != nil:
  167. // Producer was stopped; exit immediately and without logging an
  168. // error.
  169. return
  170. case status.Code(err) == codes.Unimplemented:
  171. // Unimplemented; do not retry.
  172. logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.")
  173. return
  174. case status.Code(err) == codes.Unavailable, status.Code(err) == codes.Canceled:
  175. // TODO: these codes should ideally log an error, too, but for now
  176. // we receive them when shutting down the ClientConn (Unavailable
  177. // if the stream hasn't started yet, and Canceled if it happens
  178. // mid-stream). Once we can determine the state or ensure the
  179. // producer is stopped before the stream ends, we can log an error
  180. // when it's not a natural shutdown.
  181. default:
  182. // Log all other errors.
  183. logger.Error("Received unexpected stream error:", err)
  184. }
  185. }
  186. }
  187. // runStream runs a single stream on the subchannel and returns the resulting
  188. // error, if any, and whether or not the run loop should reset the backoff
  189. // timer to zero or advance it.
  190. func (p *producer) runStream(ctx context.Context, interval time.Duration) (resetBackoff bool, err error) {
  191. streamCtx, cancel := context.WithCancel(ctx)
  192. defer cancel()
  193. stream, err := p.client.StreamCoreMetrics(streamCtx, &v3orcaservicepb.OrcaLoadReportRequest{
  194. ReportInterval: durationpb.New(interval),
  195. })
  196. if err != nil {
  197. return false, err
  198. }
  199. for {
  200. report, err := stream.Recv()
  201. if err != nil {
  202. return resetBackoff, err
  203. }
  204. resetBackoff = true
  205. p.mu.Lock()
  206. for l := range p.listeners {
  207. l.OnLoadReport(report)
  208. }
  209. p.mu.Unlock()
  210. }
  211. }