|
- /*
- * Copyright 2022 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package orca
- import (
- "context"
- "sync"
- "time"
- "google.golang.org/grpc"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/internal/grpcsync"
- "google.golang.org/grpc/orca/internal"
- "google.golang.org/grpc/status"
- v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
- v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
- v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
- "google.golang.org/protobuf/types/known/durationpb"
- )
- type producerBuilder struct{}
- // Build constructs and returns a producer and its cleanup function
- func (*producerBuilder) Build(cci interface{}) (balancer.Producer, func()) {
- p := &producer{
- client: v3orcaservicegrpc.NewOpenRcaServiceClient(cci.(grpc.ClientConnInterface)),
- intervals: make(map[time.Duration]int),
- listeners: make(map[OOBListener]struct{}),
- backoff: internal.DefaultBackoffFunc,
- }
- return p, func() {
- <-p.stopped
- }
- }
- var producerBuilderSingleton = &producerBuilder{}
- // OOBListener is used to receive out-of-band load reports as they arrive.
- type OOBListener interface {
- // OnLoadReport is called when a load report is received.
- OnLoadReport(*v3orcapb.OrcaLoadReport)
- }
- // OOBListenerOptions contains options to control how an OOBListener is called.
- type OOBListenerOptions struct {
- // ReportInterval specifies how often to request the server to provide a
- // load report. May be provided less frequently if the server requires a
- // longer interval, or may be provided more frequently if another
- // subscriber requests a shorter interval.
- ReportInterval time.Duration
- }
- // RegisterOOBListener registers an out-of-band load report listener on sc.
- // Any OOBListener may only be registered once per subchannel at a time. The
- // returned stop function must be called when no longer needed. Do not
- // register a single OOBListener more than once per SubConn.
- func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) {
- pr, close := sc.GetOrBuildProducer(producerBuilderSingleton)
- p := pr.(*producer)
- p.registerListener(l, opts.ReportInterval)
- // TODO: When we can register for SubConn state updates, automatically call
- // stop() on SHUTDOWN.
- // If stop is called multiple times, prevent it from having any effect on
- // subsequent calls.
- return grpcsync.OnceFunc(func() {
- p.unregisterListener(l, opts.ReportInterval)
- close()
- })
- }
- type producer struct {
- client v3orcaservicegrpc.OpenRcaServiceClient
- // backoff is called between stream attempts to determine how long to delay
- // to avoid overloading a server experiencing problems. The attempt count
- // is incremented when stream errors occur and is reset when the stream
- // reports a result.
- backoff func(int) time.Duration
- mu sync.Mutex
- intervals map[time.Duration]int // map from interval time to count of listeners requesting that time
- listeners map[OOBListener]struct{} // set of registered listeners
- minInterval time.Duration
- stop func() // stops the current run goroutine
- stopped chan struct{} // closed when the run goroutine exits
- }
- // registerListener adds the listener and its requested report interval to the
- // producer.
- func (p *producer) registerListener(l OOBListener, interval time.Duration) {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.listeners[l] = struct{}{}
- p.intervals[interval]++
- if len(p.listeners) == 1 || interval < p.minInterval {
- p.minInterval = interval
- p.updateRunLocked()
- }
- }
- // registerListener removes the listener and its requested report interval to
- // the producer.
- func (p *producer) unregisterListener(l OOBListener, interval time.Duration) {
- p.mu.Lock()
- defer p.mu.Unlock()
- delete(p.listeners, l)
- p.intervals[interval]--
- if p.intervals[interval] == 0 {
- delete(p.intervals, interval)
- if p.minInterval == interval {
- p.recomputeMinInterval()
- p.updateRunLocked()
- }
- }
- }
- // recomputeMinInterval sets p.minInterval to the minimum key's value in
- // p.intervals.
- func (p *producer) recomputeMinInterval() {
- first := true
- for interval := range p.intervals {
- if first || interval < p.minInterval {
- p.minInterval = interval
- first = false
- }
- }
- }
- // updateRunLocked is called whenever the run goroutine needs to be started /
- // stopped / restarted due to: 1. the initial listener being registered, 2. the
- // final listener being unregistered, or 3. the minimum registered interval
- // changing.
- func (p *producer) updateRunLocked() {
- if p.stop != nil {
- p.stop()
- p.stop = nil
- }
- if len(p.listeners) > 0 {
- var ctx context.Context
- ctx, p.stop = context.WithCancel(context.Background())
- p.stopped = make(chan struct{})
- go p.run(ctx, p.stopped, p.minInterval)
- }
- }
- // run manages the ORCA OOB stream on the subchannel.
- func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Duration) {
- defer close(done)
- backoffAttempt := 0
- backoffTimer := time.NewTimer(0)
- for ctx.Err() == nil {
- select {
- case <-backoffTimer.C:
- case <-ctx.Done():
- return
- }
- resetBackoff, err := p.runStream(ctx, interval)
- if resetBackoff {
- backoffTimer.Reset(0)
- backoffAttempt = 0
- } else {
- backoffTimer.Reset(p.backoff(backoffAttempt))
- backoffAttempt++
- }
- switch {
- case err == nil:
- // No error was encountered; restart the stream.
- case ctx.Err() != nil:
- // Producer was stopped; exit immediately and without logging an
- // error.
- return
- case status.Code(err) == codes.Unimplemented:
- // Unimplemented; do not retry.
- logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.")
- return
- case status.Code(err) == codes.Unavailable, status.Code(err) == codes.Canceled:
- // TODO: these codes should ideally log an error, too, but for now
- // we receive them when shutting down the ClientConn (Unavailable
- // if the stream hasn't started yet, and Canceled if it happens
- // mid-stream). Once we can determine the state or ensure the
- // producer is stopped before the stream ends, we can log an error
- // when it's not a natural shutdown.
- default:
- // Log all other errors.
- logger.Error("Received unexpected stream error:", err)
- }
- }
- }
- // runStream runs a single stream on the subchannel and returns the resulting
- // error, if any, and whether or not the run loop should reset the backoff
- // timer to zero or advance it.
- func (p *producer) runStream(ctx context.Context, interval time.Duration) (resetBackoff bool, err error) {
- streamCtx, cancel := context.WithCancel(ctx)
- defer cancel()
- stream, err := p.client.StreamCoreMetrics(streamCtx, &v3orcaservicepb.OrcaLoadReportRequest{
- ReportInterval: durationpb.New(interval),
- })
- if err != nil {
- return false, err
- }
- for {
- report, err := stream.Recv()
- if err != nil {
- return resetBackoff, err
- }
- resetBackoff = true
- p.mu.Lock()
- for l := range p.listeners {
- l.OnLoadReport(report)
- }
- p.mu.Unlock()
- }
- }
|