config.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. /*
  2. *
  3. * Copyright 2020 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package rls
  19. import (
  20. "bytes"
  21. "encoding/json"
  22. "fmt"
  23. "net/url"
  24. "time"
  25. "github.com/golang/protobuf/ptypes"
  26. durationpb "github.com/golang/protobuf/ptypes/duration"
  27. "google.golang.org/grpc/balancer"
  28. "google.golang.org/grpc/balancer/rls/internal/keys"
  29. "google.golang.org/grpc/internal"
  30. "google.golang.org/grpc/internal/pretty"
  31. rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
  32. "google.golang.org/grpc/resolver"
  33. "google.golang.org/grpc/serviceconfig"
  34. "google.golang.org/protobuf/encoding/protojson"
  35. )
  36. const (
  37. // Default max_age if not specified (or greater than this value) in the
  38. // service config.
  39. maxMaxAge = 5 * time.Minute
  40. // Upper limit for cache_size since we don't fully trust the service config.
  41. maxCacheSize = 5 * 1024 * 1024 * 8 // 5MB in bytes
  42. // Default lookup_service_timeout if not specified in the service config.
  43. defaultLookupServiceTimeout = 10 * time.Second
  44. // Default value for targetNameField in the child policy config during
  45. // service config validation.
  46. dummyChildPolicyTarget = "target_name_to_be_filled_in_later"
  47. )
  48. // lbConfig is the internal representation of the RLS LB policy's config.
  49. type lbConfig struct {
  50. serviceconfig.LoadBalancingConfig
  51. cacheSizeBytes int64 // Keep this field 64-bit aligned.
  52. kbMap keys.BuilderMap
  53. lookupService string
  54. lookupServiceTimeout time.Duration
  55. maxAge time.Duration
  56. staleAge time.Duration
  57. defaultTarget string
  58. childPolicyName string
  59. childPolicyConfig map[string]json.RawMessage
  60. childPolicyTargetField string
  61. controlChannelServiceConfig string
  62. }
  63. func (lbCfg *lbConfig) Equal(other *lbConfig) bool {
  64. return lbCfg.kbMap.Equal(other.kbMap) &&
  65. lbCfg.lookupService == other.lookupService &&
  66. lbCfg.lookupServiceTimeout == other.lookupServiceTimeout &&
  67. lbCfg.maxAge == other.maxAge &&
  68. lbCfg.staleAge == other.staleAge &&
  69. lbCfg.cacheSizeBytes == other.cacheSizeBytes &&
  70. lbCfg.defaultTarget == other.defaultTarget &&
  71. lbCfg.childPolicyName == other.childPolicyName &&
  72. lbCfg.childPolicyTargetField == other.childPolicyTargetField &&
  73. lbCfg.controlChannelServiceConfig == other.controlChannelServiceConfig &&
  74. childPolicyConfigEqual(lbCfg.childPolicyConfig, other.childPolicyConfig)
  75. }
  76. func childPolicyConfigEqual(a, b map[string]json.RawMessage) bool {
  77. if (b == nil) != (a == nil) {
  78. return false
  79. }
  80. if len(b) != len(a) {
  81. return false
  82. }
  83. for k, jsonA := range a {
  84. jsonB, ok := b[k]
  85. if !ok {
  86. return false
  87. }
  88. if !bytes.Equal(jsonA, jsonB) {
  89. return false
  90. }
  91. }
  92. return true
  93. }
  94. // This struct resembles the JSON representation of the loadBalancing config
  95. // and makes it easier to unmarshal.
  96. type lbConfigJSON struct {
  97. RouteLookupConfig json.RawMessage
  98. RouteLookupChannelServiceConfig json.RawMessage
  99. ChildPolicy []map[string]json.RawMessage
  100. ChildPolicyConfigTargetFieldName string
  101. }
  102. // ParseConfig parses the JSON load balancer config provided into an
  103. // internal form or returns an error if the config is invalid.
  104. //
  105. // When parsing a config update, the following validations are performed:
  106. // - routeLookupConfig:
  107. // - grpc_keybuilders field:
  108. // - must have at least one entry
  109. // - must not have two entries with the same `Name`
  110. // - within each entry:
  111. // - must have at least one `Name`
  112. // - must not have a `Name` with the `service` field unset or empty
  113. // - within each `headers` entry:
  114. // - must not have `required_match` set
  115. // - must not have `key` unset or empty
  116. // - across all `headers`, `constant_keys` and `extra_keys` fields:
  117. // - must not have the same `key` specified twice
  118. // - no `key` must be the empty string
  119. // - `lookup_service` field must be set and must parse as a target URI
  120. // - if `max_age` > 5m, it should be set to 5 minutes
  121. // - if `stale_age` > `max_age`, ignore it
  122. // - if `stale_age` is set, then `max_age` must also be set
  123. // - ignore `valid_targets` field
  124. // - `cache_size_bytes` field must have a value greater than 0, and if its
  125. // value is greater than 5M, we cap it at 5M
  126. //
  127. // - routeLookupChannelServiceConfig:
  128. // - if specified, must parse as valid service config
  129. //
  130. // - childPolicy:
  131. // - must find a valid child policy with a valid config
  132. //
  133. // - childPolicyConfigTargetFieldName:
  134. // - must be set and non-empty
  135. func (rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
  136. logger.Infof("Received JSON service config: %v", pretty.ToJSON(c))
  137. cfgJSON := &lbConfigJSON{}
  138. if err := json.Unmarshal(c, cfgJSON); err != nil {
  139. return nil, fmt.Errorf("rls: json unmarshal failed for service config %+v: %v", string(c), err)
  140. }
  141. m := protojson.UnmarshalOptions{DiscardUnknown: true}
  142. rlsProto := &rlspb.RouteLookupConfig{}
  143. if err := m.Unmarshal(cfgJSON.RouteLookupConfig, rlsProto); err != nil {
  144. return nil, fmt.Errorf("rls: bad RouteLookupConfig proto %+v: %v", string(cfgJSON.RouteLookupConfig), err)
  145. }
  146. lbCfg, err := parseRLSProto(rlsProto)
  147. if err != nil {
  148. return nil, err
  149. }
  150. if sc := string(cfgJSON.RouteLookupChannelServiceConfig); sc != "" {
  151. parsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(sc)
  152. if parsed.Err != nil {
  153. return nil, fmt.Errorf("rls: bad control channel service config %q: %v", sc, parsed.Err)
  154. }
  155. lbCfg.controlChannelServiceConfig = sc
  156. }
  157. if cfgJSON.ChildPolicyConfigTargetFieldName == "" {
  158. return nil, fmt.Errorf("rls: childPolicyConfigTargetFieldName field is not set in service config %+v", string(c))
  159. }
  160. name, config, err := parseChildPolicyConfigs(cfgJSON.ChildPolicy, cfgJSON.ChildPolicyConfigTargetFieldName)
  161. if err != nil {
  162. return nil, err
  163. }
  164. lbCfg.childPolicyName = name
  165. lbCfg.childPolicyConfig = config
  166. lbCfg.childPolicyTargetField = cfgJSON.ChildPolicyConfigTargetFieldName
  167. return lbCfg, nil
  168. }
  169. func parseRLSProto(rlsProto *rlspb.RouteLookupConfig) (*lbConfig, error) {
  170. // Validations specified on the `grpc_keybuilders` field are performed here.
  171. kbMap, err := keys.MakeBuilderMap(rlsProto)
  172. if err != nil {
  173. return nil, err
  174. }
  175. // `lookup_service` field must be set and must parse as a target URI.
  176. lookupService := rlsProto.GetLookupService()
  177. if lookupService == "" {
  178. return nil, fmt.Errorf("rls: empty lookup_service in route lookup config %+v", rlsProto)
  179. }
  180. parsedTarget, err := url.Parse(lookupService)
  181. if err != nil {
  182. // url.Parse() fails if scheme is missing. Retry with default scheme.
  183. parsedTarget, err = url.Parse(resolver.GetDefaultScheme() + ":///" + lookupService)
  184. if err != nil {
  185. return nil, fmt.Errorf("rls: invalid target URI in lookup_service %s", lookupService)
  186. }
  187. }
  188. if parsedTarget.Scheme == "" {
  189. parsedTarget.Scheme = resolver.GetDefaultScheme()
  190. }
  191. if resolver.Get(parsedTarget.Scheme) == nil {
  192. return nil, fmt.Errorf("rls: unregistered scheme in lookup_service %s", lookupService)
  193. }
  194. lookupServiceTimeout, err := convertDuration(rlsProto.GetLookupServiceTimeout())
  195. if err != nil {
  196. return nil, fmt.Errorf("rls: failed to parse lookup_service_timeout in route lookup config %+v: %v", rlsProto, err)
  197. }
  198. if lookupServiceTimeout == 0 {
  199. lookupServiceTimeout = defaultLookupServiceTimeout
  200. }
  201. // Validations performed here:
  202. // - if `max_age` > 5m, it should be set to 5 minutes
  203. // - if `stale_age` > `max_age`, ignore it
  204. // - if `stale_age` is set, then `max_age` must also be set
  205. maxAge, err := convertDuration(rlsProto.GetMaxAge())
  206. if err != nil {
  207. return nil, fmt.Errorf("rls: failed to parse max_age in route lookup config %+v: %v", rlsProto, err)
  208. }
  209. staleAge, err := convertDuration(rlsProto.GetStaleAge())
  210. if err != nil {
  211. return nil, fmt.Errorf("rls: failed to parse staleAge in route lookup config %+v: %v", rlsProto, err)
  212. }
  213. if staleAge != 0 && maxAge == 0 {
  214. return nil, fmt.Errorf("rls: stale_age is set, but max_age is not in route lookup config %+v", rlsProto)
  215. }
  216. if staleAge >= maxAge {
  217. logger.Infof("rls: stale_age %v is not less than max_age %v, ignoring it", staleAge, maxAge)
  218. staleAge = 0
  219. }
  220. if maxAge == 0 || maxAge > maxMaxAge {
  221. logger.Infof("rls: max_age in route lookup config is %v, using %v", maxAge, maxMaxAge)
  222. maxAge = maxMaxAge
  223. }
  224. // `cache_size_bytes` field must have a value greater than 0, and if its
  225. // value is greater than 5M, we cap it at 5M
  226. cacheSizeBytes := rlsProto.GetCacheSizeBytes()
  227. if cacheSizeBytes <= 0 {
  228. return nil, fmt.Errorf("rls: cache_size_bytes must be set to a non-zero value: %+v", rlsProto)
  229. }
  230. if cacheSizeBytes > maxCacheSize {
  231. logger.Info("rls: cache_size_bytes %v is too large, setting it to: %v", cacheSizeBytes, maxCacheSize)
  232. cacheSizeBytes = maxCacheSize
  233. }
  234. return &lbConfig{
  235. kbMap: kbMap,
  236. lookupService: lookupService,
  237. lookupServiceTimeout: lookupServiceTimeout,
  238. maxAge: maxAge,
  239. staleAge: staleAge,
  240. cacheSizeBytes: cacheSizeBytes,
  241. defaultTarget: rlsProto.GetDefaultTarget(),
  242. }, nil
  243. }
  244. // parseChildPolicyConfigs iterates through the list of child policies and picks
  245. // the first registered policy and validates its config.
  246. func parseChildPolicyConfigs(childPolicies []map[string]json.RawMessage, targetFieldName string) (string, map[string]json.RawMessage, error) {
  247. for i, config := range childPolicies {
  248. if len(config) != 1 {
  249. return "", nil, fmt.Errorf("rls: invalid childPolicy: entry %v does not contain exactly 1 policy/config pair: %q", i, config)
  250. }
  251. var name string
  252. var rawCfg json.RawMessage
  253. for name, rawCfg = range config {
  254. }
  255. builder := balancer.Get(name)
  256. if builder == nil {
  257. continue
  258. }
  259. parser, ok := builder.(balancer.ConfigParser)
  260. if !ok {
  261. return "", nil, fmt.Errorf("rls: childPolicy %q with config %q does not support config parsing", name, string(rawCfg))
  262. }
  263. // To validate child policy configs we do the following:
  264. // - unmarshal the raw JSON bytes of the child policy config into a map
  265. // - add an entry with key set to `target_field_name` and a dummy value
  266. // - marshal the map back to JSON and parse the config using the parser
  267. // retrieved previously
  268. var childConfig map[string]json.RawMessage
  269. if err := json.Unmarshal(rawCfg, &childConfig); err != nil {
  270. return "", nil, fmt.Errorf("rls: json unmarshal failed for child policy config %q: %v", string(rawCfg), err)
  271. }
  272. childConfig[targetFieldName], _ = json.Marshal(dummyChildPolicyTarget)
  273. jsonCfg, err := json.Marshal(childConfig)
  274. if err != nil {
  275. return "", nil, fmt.Errorf("rls: json marshal failed for child policy config {%+v}: %v", childConfig, err)
  276. }
  277. if _, err := parser.ParseConfig(jsonCfg); err != nil {
  278. return "", nil, fmt.Errorf("rls: childPolicy config validation failed: %v", err)
  279. }
  280. return name, childConfig, nil
  281. }
  282. return "", nil, fmt.Errorf("rls: invalid childPolicy config: no supported policies found in %+v", childPolicies)
  283. }
  284. func convertDuration(d *durationpb.Duration) (time.Duration, error) {
  285. if d == nil {
  286. return 0, nil
  287. }
  288. return ptypes.Duration(d)
  289. }