configbuilder.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. /*
  2. *
  3. * Copyright 2021 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package clusterresolver
  19. import (
  20. "encoding/json"
  21. "fmt"
  22. "sort"
  23. "google.golang.org/grpc/balancer/weightedroundrobin"
  24. "google.golang.org/grpc/internal/envconfig"
  25. "google.golang.org/grpc/internal/hierarchy"
  26. internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
  27. "google.golang.org/grpc/resolver"
  28. "google.golang.org/grpc/xds/internal"
  29. "google.golang.org/grpc/xds/internal/balancer/clusterimpl"
  30. "google.golang.org/grpc/xds/internal/balancer/outlierdetection"
  31. "google.golang.org/grpc/xds/internal/balancer/priority"
  32. "google.golang.org/grpc/xds/internal/balancer/wrrlocality"
  33. "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
  34. )
  35. const million = 1000000
  36. // priorityConfig is config for one priority. For example, if there an EDS and a
  37. // DNS, the priority list will be [priorityConfig{EDS}, priorityConfig{DNS}].
  38. //
  39. // Each priorityConfig corresponds to one discovery mechanism from the LBConfig
  40. // generated by the CDS balancer. The CDS balancer resolves the cluster name to
  41. // an ordered list of discovery mechanisms (if the top cluster is an aggregated
  42. // cluster), one for each underlying cluster.
  43. type priorityConfig struct {
  44. mechanism DiscoveryMechanism
  45. // edsResp is set only if type is EDS.
  46. edsResp xdsresource.EndpointsUpdate
  47. // addresses is set only if type is DNS.
  48. addresses []string
  49. // Each discovery mechanism has a name generator so that the child policies
  50. // can reuse names between updates (EDS updates for example).
  51. childNameGen *nameGenerator
  52. }
  53. // buildPriorityConfigJSON builds balancer config for the passed in
  54. // priorities.
  55. //
  56. // The built tree of balancers (see test for the output struct).
  57. //
  58. // ┌────────┐
  59. // │priority│
  60. // └┬──────┬┘
  61. // │ │
  62. // ┌──────────▼─┐ ┌─▼──────────┐
  63. // │cluster_impl│ │cluster_impl│
  64. // └──────┬─────┘ └─────┬──────┘
  65. // │ │
  66. // ┌──────▼─────┐ ┌─────▼──────┐
  67. // │xDSLBPolicy │ │xDSLBPolicy │ (Locality and Endpoint picking layer)
  68. // └────────────┘ └────────────┘
  69. func buildPriorityConfigJSON(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]byte, []resolver.Address, error) {
  70. pc, addrs, err := buildPriorityConfig(priorities, xdsLBPolicy)
  71. if err != nil {
  72. return nil, nil, fmt.Errorf("failed to build priority config: %v", err)
  73. }
  74. ret, err := json.Marshal(pc)
  75. if err != nil {
  76. return nil, nil, fmt.Errorf("failed to marshal built priority config struct into json: %v", err)
  77. }
  78. return ret, addrs, nil
  79. }
  80. func buildPriorityConfig(priorities []priorityConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*priority.LBConfig, []resolver.Address, error) {
  81. var (
  82. retConfig = &priority.LBConfig{Children: make(map[string]*priority.Child)}
  83. retAddrs []resolver.Address
  84. )
  85. for _, p := range priorities {
  86. switch p.mechanism.Type {
  87. case DiscoveryMechanismTypeEDS:
  88. names, configs, addrs, err := buildClusterImplConfigForEDS(p.childNameGen, p.edsResp, p.mechanism, xdsLBPolicy)
  89. if err != nil {
  90. return nil, nil, err
  91. }
  92. retConfig.Priorities = append(retConfig.Priorities, names...)
  93. retAddrs = append(retAddrs, addrs...)
  94. var odCfgs map[string]*outlierdetection.LBConfig
  95. if envconfig.XDSOutlierDetection {
  96. odCfgs = convertClusterImplMapToOutlierDetection(configs, p.mechanism.outlierDetection)
  97. for n, c := range odCfgs {
  98. retConfig.Children[n] = &priority.Child{
  99. Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: c},
  100. // Ignore all re-resolution from EDS children.
  101. IgnoreReresolutionRequests: true,
  102. }
  103. }
  104. continue
  105. }
  106. for n, c := range configs {
  107. retConfig.Children[n] = &priority.Child{
  108. Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: c},
  109. // Ignore all re-resolution from EDS children.
  110. IgnoreReresolutionRequests: true,
  111. }
  112. }
  113. case DiscoveryMechanismTypeLogicalDNS:
  114. name, config, addrs := buildClusterImplConfigForDNS(p.childNameGen, p.addresses, p.mechanism)
  115. retConfig.Priorities = append(retConfig.Priorities, name)
  116. retAddrs = append(retAddrs, addrs...)
  117. var odCfg *outlierdetection.LBConfig
  118. if envconfig.XDSOutlierDetection {
  119. odCfg = makeClusterImplOutlierDetectionChild(config, p.mechanism.outlierDetection)
  120. retConfig.Children[name] = &priority.Child{
  121. Config: &internalserviceconfig.BalancerConfig{Name: outlierdetection.Name, Config: odCfg},
  122. // Not ignore re-resolution from DNS children, they will trigger
  123. // DNS to re-resolve.
  124. IgnoreReresolutionRequests: false,
  125. }
  126. continue
  127. }
  128. retConfig.Children[name] = &priority.Child{
  129. Config: &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: config},
  130. // Not ignore re-resolution from DNS children, they will trigger
  131. // DNS to re-resolve.
  132. IgnoreReresolutionRequests: false,
  133. }
  134. }
  135. }
  136. return retConfig, retAddrs, nil
  137. }
  138. func convertClusterImplMapToOutlierDetection(ciCfgs map[string]*clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) map[string]*outlierdetection.LBConfig {
  139. odCfgs := make(map[string]*outlierdetection.LBConfig, len(ciCfgs))
  140. for n, c := range ciCfgs {
  141. odCfgs[n] = makeClusterImplOutlierDetectionChild(c, odCfg)
  142. }
  143. return odCfgs
  144. }
  145. func makeClusterImplOutlierDetectionChild(ciCfg *clusterimpl.LBConfig, odCfg outlierdetection.LBConfig) *outlierdetection.LBConfig {
  146. odCfgRet := odCfg
  147. odCfgRet.ChildPolicy = &internalserviceconfig.BalancerConfig{Name: clusterimpl.Name, Config: ciCfg}
  148. return &odCfgRet
  149. }
  150. func buildClusterImplConfigForDNS(g *nameGenerator, addrStrs []string, mechanism DiscoveryMechanism) (string, *clusterimpl.LBConfig, []resolver.Address) {
  151. // Endpoint picking policy for DNS is hardcoded to pick_first.
  152. const childPolicy = "pick_first"
  153. retAddrs := make([]resolver.Address, 0, len(addrStrs))
  154. pName := fmt.Sprintf("priority-%v", g.prefix)
  155. for _, addrStr := range addrStrs {
  156. retAddrs = append(retAddrs, hierarchy.Set(resolver.Address{Addr: addrStr}, []string{pName}))
  157. }
  158. return pName, &clusterimpl.LBConfig{
  159. Cluster: mechanism.Cluster,
  160. ChildPolicy: &internalserviceconfig.BalancerConfig{Name: childPolicy},
  161. }, retAddrs
  162. }
  163. // buildClusterImplConfigForEDS returns a list of cluster_impl configs, one for
  164. // each priority, sorted by priority, and the addresses for each priority (with
  165. // hierarchy attributes set).
  166. //
  167. // For example, if there are two priorities, the returned values will be
  168. // - ["p0", "p1"]
  169. // - map{"p0":p0_config, "p1":p1_config}
  170. // - [p0_address_0, p0_address_1, p1_address_0, p1_address_1]
  171. // - p0 addresses' hierarchy attributes are set to p0
  172. func buildClusterImplConfigForEDS(g *nameGenerator, edsResp xdsresource.EndpointsUpdate, mechanism DiscoveryMechanism, xdsLBPolicy *internalserviceconfig.BalancerConfig) ([]string, map[string]*clusterimpl.LBConfig, []resolver.Address, error) {
  173. drops := make([]clusterimpl.DropConfig, 0, len(edsResp.Drops))
  174. for _, d := range edsResp.Drops {
  175. drops = append(drops, clusterimpl.DropConfig{
  176. Category: d.Category,
  177. RequestsPerMillion: d.Numerator * million / d.Denominator,
  178. })
  179. }
  180. priorities := groupLocalitiesByPriority(edsResp.Localities)
  181. retNames := g.generate(priorities)
  182. retConfigs := make(map[string]*clusterimpl.LBConfig, len(retNames))
  183. var retAddrs []resolver.Address
  184. for i, pName := range retNames {
  185. priorityLocalities := priorities[i]
  186. cfg, addrs, err := priorityLocalitiesToClusterImpl(priorityLocalities, pName, mechanism, drops, xdsLBPolicy)
  187. if err != nil {
  188. return nil, nil, nil, err
  189. }
  190. retConfigs[pName] = cfg
  191. retAddrs = append(retAddrs, addrs...)
  192. }
  193. return retNames, retConfigs, retAddrs, nil
  194. }
  195. // groupLocalitiesByPriority returns the localities grouped by priority.
  196. //
  197. // The returned list is sorted from higher priority to lower. Each item in the
  198. // list is a group of localities.
  199. //
  200. // For example, for L0-p0, L1-p0, L2-p1, results will be
  201. // - [[L0, L1], [L2]]
  202. func groupLocalitiesByPriority(localities []xdsresource.Locality) [][]xdsresource.Locality {
  203. var priorityIntSlice []int
  204. priorities := make(map[int][]xdsresource.Locality)
  205. for _, locality := range localities {
  206. priority := int(locality.Priority)
  207. priorities[priority] = append(priorities[priority], locality)
  208. priorityIntSlice = append(priorityIntSlice, priority)
  209. }
  210. // Sort the priorities based on the int value, deduplicate, and then turn
  211. // the sorted list into a string list. This will be child names, in priority
  212. // order.
  213. sort.Ints(priorityIntSlice)
  214. priorityIntSliceDeduped := dedupSortedIntSlice(priorityIntSlice)
  215. ret := make([][]xdsresource.Locality, 0, len(priorityIntSliceDeduped))
  216. for _, p := range priorityIntSliceDeduped {
  217. ret = append(ret, priorities[p])
  218. }
  219. return ret
  220. }
  221. func dedupSortedIntSlice(a []int) []int {
  222. if len(a) == 0 {
  223. return a
  224. }
  225. i, j := 0, 1
  226. for ; j < len(a); j++ {
  227. if a[i] == a[j] {
  228. continue
  229. }
  230. i++
  231. if i != j {
  232. a[i] = a[j]
  233. }
  234. }
  235. return a[:i+1]
  236. }
  237. // priorityLocalitiesToClusterImpl takes a list of localities (with the same
  238. // priority), and generates a cluster impl policy config, and a list of
  239. // addresses with their path hierarchy set to [priority-name, locality-name], so
  240. // priority and the xDS LB Policy know which child policy each address is for.
  241. func priorityLocalitiesToClusterImpl(localities []xdsresource.Locality, priorityName string, mechanism DiscoveryMechanism, drops []clusterimpl.DropConfig, xdsLBPolicy *internalserviceconfig.BalancerConfig) (*clusterimpl.LBConfig, []resolver.Address, error) {
  242. var addrs []resolver.Address
  243. for _, locality := range localities {
  244. var lw uint32 = 1
  245. if locality.Weight != 0 {
  246. lw = locality.Weight
  247. }
  248. localityStr, err := locality.ID.ToString()
  249. if err != nil {
  250. localityStr = fmt.Sprintf("%+v", locality.ID)
  251. }
  252. for _, endpoint := range locality.Endpoints {
  253. // Filter out all "unhealthy" endpoints (unknown and healthy are
  254. // both considered to be healthy:
  255. // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
  256. if endpoint.HealthStatus != xdsresource.EndpointHealthStatusHealthy && endpoint.HealthStatus != xdsresource.EndpointHealthStatusUnknown {
  257. continue
  258. }
  259. addr := resolver.Address{Addr: endpoint.Address}
  260. addr = hierarchy.Set(addr, []string{priorityName, localityStr})
  261. addr = internal.SetLocalityID(addr, locality.ID)
  262. // "To provide the xds_wrr_locality load balancer information about
  263. // locality weights received from EDS, the cluster resolver will
  264. // populate a new locality weight attribute for each address The
  265. // attribute will have the weight (as an integer) of the locality
  266. // the address is part of." - A52
  267. addr = wrrlocality.SetAddrInfo(addr, wrrlocality.AddrInfo{LocalityWeight: lw})
  268. var ew uint32 = 1
  269. if endpoint.Weight != 0 {
  270. ew = endpoint.Weight
  271. }
  272. addr = weightedroundrobin.SetAddrInfo(addr, weightedroundrobin.AddrInfo{Weight: lw * ew})
  273. addrs = append(addrs, addr)
  274. }
  275. }
  276. return &clusterimpl.LBConfig{
  277. Cluster: mechanism.Cluster,
  278. EDSServiceName: mechanism.EDSServiceName,
  279. LoadReportingServer: mechanism.LoadReportingServer,
  280. MaxConcurrentRequests: mechanism.MaxConcurrentRequests,
  281. DropCategories: drops,
  282. ChildPolicy: xdsLBPolicy,
  283. }, addrs, nil
  284. }