123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- /*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package clusterimpl
- import (
- "google.golang.org/grpc/balancer"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/internal/wrr"
- "google.golang.org/grpc/status"
- "google.golang.org/grpc/xds/internal/xdsclient"
- "google.golang.org/grpc/xds/internal/xdsclient/load"
- v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
- )
- // NewRandomWRR is used when calculating drops. It's exported so that tests can
- // override it.
- var NewRandomWRR = wrr.NewRandom
- const million = 1000000
- type dropper struct {
- category string
- w wrr.WRR
- }
- // greatest common divisor (GCD) via Euclidean algorithm
- func gcd(a, b uint32) uint32 {
- for b != 0 {
- t := b
- b = a % b
- a = t
- }
- return a
- }
- func newDropper(c DropConfig) *dropper {
- w := NewRandomWRR()
- gcdv := gcd(c.RequestsPerMillion, million)
- // Return true for RequestPerMillion, false for the rest.
- w.Add(true, int64(c.RequestsPerMillion/gcdv))
- w.Add(false, int64((million-c.RequestsPerMillion)/gcdv))
- return &dropper{
- category: c.Category,
- w: w,
- }
- }
- func (d *dropper) drop() (ret bool) {
- return d.w.Next().(bool)
- }
- const (
- serverLoadCPUName = "cpu_utilization"
- serverLoadMemoryName = "mem_utilization"
- )
- // loadReporter wraps the methods from the loadStore that are used here.
- type loadReporter interface {
- CallStarted(locality string)
- CallFinished(locality string, err error)
- CallServerLoad(locality, name string, val float64)
- CallDropped(locality string)
- }
- // Picker implements RPC drop, circuit breaking drop and load reporting.
- type picker struct {
- drops []*dropper
- s balancer.State
- loadStore loadReporter
- counter *xdsclient.ClusterRequestsCounter
- countMax uint32
- }
- func newPicker(s balancer.State, config *dropConfigs, loadStore load.PerClusterReporter) *picker {
- return &picker{
- drops: config.drops,
- s: s,
- loadStore: loadStore,
- counter: config.requestCounter,
- countMax: config.requestCountMax,
- }
- }
- func (d *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
- // Don't drop unless the inner picker is READY. Similar to
- // https://github.com/grpc/grpc-go/issues/2622.
- if d.s.ConnectivityState == connectivity.Ready {
- // Check if this RPC should be dropped by category.
- for _, dp := range d.drops {
- if dp.drop() {
- if d.loadStore != nil {
- d.loadStore.CallDropped(dp.category)
- }
- return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped")
- }
- }
- }
- // Check if this RPC should be dropped by circuit breaking.
- if d.counter != nil {
- if err := d.counter.StartRequest(d.countMax); err != nil {
- // Drops by circuit breaking are reported with empty category. They
- // will be reported only in total drops, but not in per category.
- if d.loadStore != nil {
- d.loadStore.CallDropped("")
- }
- return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error())
- }
- }
- var lIDStr string
- pr, err := d.s.Picker.Pick(info)
- if scw, ok := pr.SubConn.(*scWrapper); ok {
- // This OK check also covers the case err!=nil, because SubConn will be
- // nil.
- pr.SubConn = scw.SubConn
- var e error
- // If locality ID isn't found in the wrapper, an empty locality ID will
- // be used.
- lIDStr, e = scw.localityID().ToString()
- if e != nil {
- logger.Infof("failed to marshal LocalityID: %#v, loads won't be reported", scw.localityID())
- }
- }
- if err != nil {
- if d.counter != nil {
- // Release one request count if this pick fails.
- d.counter.EndRequest()
- }
- return pr, err
- }
- if d.loadStore != nil {
- d.loadStore.CallStarted(lIDStr)
- oldDone := pr.Done
- pr.Done = func(info balancer.DoneInfo) {
- if oldDone != nil {
- oldDone(info)
- }
- d.loadStore.CallFinished(lIDStr, info.Err)
- load, ok := info.ServerLoad.(*v3orcapb.OrcaLoadReport)
- if !ok || load == nil {
- return
- }
- d.loadStore.CallServerLoad(lIDStr, serverLoadCPUName, load.CpuUtilization)
- d.loadStore.CallServerLoad(lIDStr, serverLoadMemoryName, load.MemUtilization)
- for n, c := range load.RequestCost {
- d.loadStore.CallServerLoad(lIDStr, n, c)
- }
- for n, c := range load.Utilization {
- d.loadStore.CallServerLoad(lIDStr, n, c)
- }
- }
- }
- if d.counter != nil {
- // Update Done() so that when the RPC finishes, the request count will
- // be released.
- oldDone := pr.Done
- pr.Done = func(doneInfo balancer.DoneInfo) {
- d.counter.EndRequest()
- if oldDone != nil {
- oldDone(doneInfo)
- }
- }
- }
- return pr, err
- }
|