123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- /*
- *
- * Copyright 2023 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package interop
- import (
- "context"
- "fmt"
- "sync"
- "time"
- v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/balancer/base"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/orca"
- )
- func init() {
- balancer.Register(orcabb{})
- }
- type orcabb struct{}
- func (orcabb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
- return &orcab{cc: cc}
- }
- func (orcabb) Name() string {
- return "test_backend_metrics_load_balancer"
- }
- type orcab struct {
- cc balancer.ClientConn
- sc balancer.SubConn
- cancelWatch func()
- reportMu sync.Mutex
- report *v3orcapb.OrcaLoadReport
- }
- func (o *orcab) UpdateClientConnState(s balancer.ClientConnState) error {
- if o.sc != nil {
- o.sc.UpdateAddresses(s.ResolverState.Addresses)
- return nil
- }
- if len(s.ResolverState.Addresses) == 0 {
- o.ResolverError(fmt.Errorf("produced no addresses"))
- return fmt.Errorf("resolver produced no addresses")
- }
- var err error
- o.sc, err = o.cc.NewSubConn(s.ResolverState.Addresses, balancer.NewSubConnOptions{})
- if err != nil {
- o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("error creating subconn: %v", err))})
- return nil
- }
- o.cancelWatch = orca.RegisterOOBListener(o.sc, o, orca.OOBListenerOptions{ReportInterval: time.Second})
- o.sc.Connect()
- o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
- return nil
- }
- func (o *orcab) ResolverError(err error) {
- if o.sc == nil {
- o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("resolver error: %v", err))})
- }
- }
- func (o *orcab) UpdateSubConnState(sc balancer.SubConn, scState balancer.SubConnState) {
- if o.sc != sc {
- logger.Errorf("received subconn update for unknown subconn: %v vs %v", o.sc, sc)
- return
- }
- switch scState.ConnectivityState {
- case connectivity.Ready:
- o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &scPicker{sc: sc, o: o}})
- case connectivity.TransientFailure:
- o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(fmt.Errorf("all subchannels in transient failure: %v", scState.ConnectionError))})
- case connectivity.Connecting:
- // Ignore; picker already set to "connecting".
- case connectivity.Idle:
- sc.Connect()
- o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
- case connectivity.Shutdown:
- // Ignore; we are closing but handle that in Close instead.
- }
- }
- func (o *orcab) Close() {
- o.cancelWatch()
- }
- func (o *orcab) OnLoadReport(r *v3orcapb.OrcaLoadReport) {
- o.reportMu.Lock()
- defer o.reportMu.Unlock()
- logger.Infof("received OOB load report: %v", r)
- o.report = r
- }
- type scPicker struct {
- sc balancer.SubConn
- o *orcab
- }
- func (p *scPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
- doneCB := func(di balancer.DoneInfo) {
- if lr, _ := di.ServerLoad.(*v3orcapb.OrcaLoadReport); lr != nil &&
- (lr.CpuUtilization != 0 || lr.MemUtilization != 0 || len(lr.Utilization) > 0 || len(lr.RequestCost) > 0) {
- // Since all RPCs will respond with a load report due to the
- // presence of the DialOption, we need to inspect every field and
- // use the out-of-band report instead if all are unset/zero.
- setContextCMR(info.Ctx, lr)
- } else {
- p.o.reportMu.Lock()
- defer p.o.reportMu.Unlock()
- if lr := p.o.report; lr != nil {
- setContextCMR(info.Ctx, lr)
- }
- }
- }
- return balancer.PickResult{SubConn: p.sc, Done: doneCB}, nil
- }
- func setContextCMR(ctx context.Context, lr *v3orcapb.OrcaLoadReport) {
- if r := orcaResultFromContext(ctx); r != nil {
- *r = lr
- }
- }
- type orcaKey string
- var orcaCtxKey = orcaKey("orcaResult")
- // contextWithORCAResult sets a key in ctx with a pointer to an ORCA load
- // report that is to be filled in by the "test_backend_metrics_load_balancer"
- // LB policy's Picker's Done callback.
- //
- // If a per-call load report is provided from the server for the call, result
- // will be filled with that, otherwise the most recent OOB load report is used.
- // If no OOB report has been received, result is not modified.
- func contextWithORCAResult(ctx context.Context, result **v3orcapb.OrcaLoadReport) context.Context {
- return context.WithValue(ctx, orcaCtxKey, result)
- }
- // orcaResultFromContext returns the ORCA load report stored in the context.
- // The LB policy uses this to communicate the load report back to the interop
- // client application.
- func orcaResultFromContext(ctx context.Context) **v3orcapb.OrcaLoadReport {
- v := ctx.Value(orcaCtxKey)
- if v == nil {
- return nil
- }
- return v.(**v3orcapb.OrcaLoadReport)
- }
|