123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- /*
- *
- * Copyright 2021 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package clusterresolver
- import (
- "encoding/json"
- "fmt"
- "sort"
- "google.golang.org/grpc/balancer/weightedroundrobin"
- "google.golang.org/grpc/internal/envconfig"
- "google.golang.org/grpc/internal/hierarchy"
- internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
- "google.golang.org/grpc/resolver"
- "google.golang.org/grpc/xds/internal"
- "google.golang.org/grpc/xds/internal/balancer/clusterimpl"
- "google.golang.org/grpc/xds/internal/balancer/outlierdetection"
- "google.golang.org/grpc/xds/internal/balancer/priority"
- "google.golang.org/grpc/xds/internal/balancer/wrrlocality"
- "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
- )
- const million = 1000000
- // priorityConfig is config for one priority. For example, if there an EDS and a
- // DNS, the priority list will be [priorityConfig{EDS}, priorityConfig{DNS}].
- //
- // Each priorityConfig corresponds to one discovery mechanism from the LBConfig
- // generated by the CDS balancer. The CDS balancer resolves the cluster name to
- // an ordered list of discovery mechanisms (if the top cluster is an aggregated
- // cluster), one for each underlying cluster.
- type priorityConfig struct {
- mechanism DiscoveryMechanism
- // edsResp is set only if type is EDS.
- edsResp xdsresource.EndpointsUpdate
- // addresses is set only if type is DNS.
- addresses []string
- // Each discovery mechanism has a name generator so that the child policies
- // can reuse names between updates (EDS updates for example).
- childNameGen *nameGenerator
- }
- // buildPriorityConfigJSON builds balancer config for the passed in
- // priorities.
- //
- // The built tree of balancers (see test for the output struct).
- //
- // ┌────────┐
- // │priority│
- // └┬──────┬┘
- // │ │
- // ┌──────────▼─┐ ┌─▼──────────┐
- // │cluster_impl│ │cluster_impl│
- // └──────┬─────┘ └─────┬──────┘
- // │ │
- // ┌──────▼─────┐ ┌─────▼──────┐
- // │xDSLBPolicy │ │xDSLBPolicy │ (Locality and Endpoint picking layer)
- // └────────────┘ └────────────┘
- func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) {
- pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to build priority config: %v", err)
- }
- ret, err := json.Marshal(pc)
- if err != nil {
- return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err)
- }
- return ret, addrs, nil
- }
- func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address, error) {
- var (
- retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)}
- retAddrs []resolver.Address
- )
- for _, p := range priorities {
- switch p.mechanism.Type {
- case DiscoveryMechanismTypeEDS:
- names, configs, addrs, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy)
- if err != nil {
- return nil, nil, err
- }
- retConfig.Priorities = append(retConfig.Priorities, names...)
- retAddrs = append(retAddrs, addrs...)
- var odCfgs map[string]*outlierdetection.LBConfig
- if envconfig.XDSOutlierDetection {
- odCfgs = convertClusterImplMapToOutlierDetection(configs, p.mechanism.outlierDetection)
- for n, c := range odCfgs {
- retConfig.Children[n] = &priority.Child{
- Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: c},
- // Ignore all re-resolution from EDS children.
- IgnoreReresolutionRequests: true,
- }
- }
- continue
- }
- for n, c := range configs {
- retConfig.Children[n] = &priority.Child{
- Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: c},
- // Ignore all re-resolution from EDS children.
- IgnoreReresolutionRequests: true,
- }
- }
- case DiscoveryMechanismTypeLogicalDNS:
- name, config, addrs := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism)
- retConfig.Priorities = append(retConfig.Priorities, name)
- retAddrs = append(retAddrs, addrs...)
- var odCfg *outlierdetection.LBConfig
- if envconfig.XDSOutlierDetection {
- odCfg = makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection)
- retConfig.Children[name] = &priority.Child{
- Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: odCfg},
- // Not ignore re-resolution from DNS children, they will trigger
- // DNS to re-resolve.
- IgnoreReresolutionRequests: false,
- }
- continue
- }
- retConfig.Children[name] = &priority.Child{
- Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config},
- // Not ignore re-resolution from DNS children, they will trigger
- // DNS to re-resolve.
- IgnoreReresolutionRequests: false,
- }
- }
- }
- return retConfig, retAddrs, nil
- }
- func convertClusterImplMapToOutlierDetection(ciCfgs map[string]*clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) map[string]*outlierdetection.LBConfig {
- odCfgs := make(map[string]*outlierdetection.LBConfig, len(ciCfgs))
- for n, c := range ciCfgs {
- odCfgs[n] = makeClusterImplOutlierDetectionChild(c, odCfg)
- }
- return odCfgs
- }
- func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) *outlierdetection.LBConfig {
- odCfgRet := odCfg
- odCfgRet.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: ciCfg}
- return &odCfgRet
- }
- func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) {
- // Endpoint picking policy for DNS is hardcoded to pick_first.
- const childPolicy = "pick_first"
- retAddrs := make([]resolver.Address, 0, len(addrStrs))
- pName := fmt.Sprintf("priority-%v", g.prefix)
- for _, addrStr := range addrStrs {
- retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
- }
- return pName, &clusterimpl.LBConfig{
- Cluster: mechanism.Cluster,
- ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
- }, retAddrs
- }
- // buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for
- // each priority, sorted by priority, and the addresses for each priority (with
- // hierarchy attributes set).
- //
- // For example, if there are two priorities, the returned values will be
- // - ["p0", "p1"]
- // - map{"p0":p0_config, "p1":p1_config}
- // - [p0_address_0, p0_address_1, p1_address_0, p1_address_1]
- // - p0 addresses' hierarchy attributes are set to p0
- func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) {
- drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops))
- for _, d := range edsResp.Drops {
- drops = append(drops, clusterimpl.DropConfig{
- Category: d.Category,
- RequestsPerMillion: d.Numerator * million / d.Denominator,
- })
- }
- priorities := groupLocalitiesByPriority(edsResp.Localities)
- retNames := g.generate(priorities)
- retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames))
- var retAddrs []resolver.Address
- for i, pName := range retNames {
- priorityLocalities := priorities[i]
- cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy)
- if err != nil {
- return nil, nil, nil, err
- }
- retConfigs[pName] = cfg
- retAddrs = append(retAddrs, addrs...)
- }
- return retNames, retConfigs, retAddrs, nil
- }
- // groupLocalitiesByPriority returns the localities grouped by priority.
- //
- // The returned list is sorted from higher priority to lower. Each item in the
- // list is a group of localities.
- //
- // For example, for L0-p0, L1-p0, L2-p1, results will be
- // - [[L0, L1], [L2]]
- func groupLocalitiesByPriority(localities []xdsresource.Locality) [][]xdsresource.Locality {
- var priorityIntSlice []int
- priorities := make(map[int][]xdsresource.Locality)
- for _, locality := range localities {
- priority := int(locality.Priority)
- priorities[priority] = append(priorities[priority], locality)
- priorityIntSlice = append(priorityIntSlice, priority)
- }
- // Sort the priorities based on the int value, deduplicate, and then turn
- // the sorted list into a string list. This will be child names, in priority
- // order.
- sort.Ints(priorityIntSlice)
- priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice)
- ret := make([][]xdsresource.Locality, 0, len(priorityIntSliceDeduped))
- for _, p := range priorityIntSliceDeduped {
- ret = append(ret, priorities[p])
- }
- return ret
- }
- func dedupSortedIntSlice(a []int) []int {
- if len(a) == 0 {
- return a
- }
- i, j := 0, 1
- for ; j < len(a); j++ {
- if a[i] == a[j] {
- continue
- }
- i++
- if i != j {
- a[i] = a[j]
- }
- }
- return a[:i+1]
- }
- // priorityLocalitiesToClusterImpl takes a list of localities (with the same
- // priority), and generates a cluster impl policy config, and a list of
- // addresses with their path hierarchy set to [priority-name, locality-name], so
- // priority and the xDS LB Policy know which child policy each address is for.
- func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) {
- var addrs []resolver.Address
- for _, locality := range localities {
- var lw uint32 = 1
- if locality.Weight != 0 {
- lw = locality.Weight
- }
- localityStr, err := locality.ID.ToString()
- if err != nil {
- localityStr = fmt.Sprintf("%+v", locality.ID)
- }
- for _, endpoint := range locality.Endpoints {
- // Filter out all "unhealthy" endpoints (unknown and healthy are
- // both considered to be healthy:
- // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
- if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown {
- continue
- }
- addr := resolver.Address{Addr: endpoint.Address}
- addr = hierarchy.Set(addr, []string{priorityName, localityStr})
- addr = internal.SetLocalityID(addr, locality.ID)
- // "To provide the xds_wrr_locality load balancer information about
- // locality weights received from EDS, the cluster resolver will
- // populate a new locality weight attribute for each address The
- // attribute will have the weight (as an integer) of the locality
- // the address is part of." - A52
- addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: lw})
- var ew uint32 = 1
- if endpoint.Weight != 0 {
- ew = endpoint.Weight
- }
- addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew})
- addrs = append(addrs, addr)
- }
- }
- return &clusterimpl.LBConfig{
- Cluster: mechanism.Cluster,
- EDSServiceName: mechanism.EDSServiceName,
- LoadReportingServer: mechanism.LoadReportingServer,
- MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
- DropCategories: drops,
- ChildPolicy: xdsLBPolicy,
- }, addrs, nil
- }
|