xds_resolver.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. /*
  2. * Copyright 2019 gRPC authors.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *
  16. */
  17. // Package resolver implements the xds resolver, that does LDS and RDS to find
  18. // the cluster to use.
  19. package resolver
  20. import (
  21. "errors"
  22. "fmt"
  23. "strings"
  24. "google.golang.org/grpc/credentials"
  25. "google.golang.org/grpc/internal"
  26. "google.golang.org/grpc/internal/grpclog"
  27. "google.golang.org/grpc/internal/grpcrand"
  28. "google.golang.org/grpc/internal/grpcsync"
  29. "google.golang.org/grpc/internal/pretty"
  30. iresolver "google.golang.org/grpc/internal/resolver"
  31. "google.golang.org/grpc/resolver"
  32. "google.golang.org/grpc/xds/internal/xdsclient"
  33. "google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
  34. "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
  35. )
  36. const xdsScheme = "xds"
  37. // newBuilderForTesting creates a new xds resolver builder using a specific xds
  38. // bootstrap config, so tests can use multiple xds clients in different
  39. // ClientConns at the same time.
  40. func newBuilderForTesting(config []byte) (resolver.Builder, error) {
  41. return &xdsResolverBuilder{
  42. newXDSClient: func() (xdsclient.XDSClient, func(), error) {
  43. return xdsclient.NewWithBootstrapContentsForTesting(config)
  44. },
  45. }, nil
  46. }
  47. // For overriding in unittests.
  48. var newXDSClient = func() (xdsclient.XDSClient, func(), error) { return xdsclient.New() }
  49. func init() {
  50. resolver.Register(&xdsResolverBuilder{})
  51. internal.NewXDSResolverWithConfigForTesting = newBuilderForTesting
  52. }
  53. type xdsResolverBuilder struct {
  54. newXDSClient func() (xdsclient.XDSClient, func(), error)
  55. }
  56. // Build helps implement the resolver.Builder interface.
  57. //
  58. // The xds bootstrap process is performed (and a new xds client is built) every
  59. // time an xds resolver is built.
  60. func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (_ resolver.Resolver, retErr error) {
  61. r := &xdsResolver{
  62. cc: cc,
  63. closed: grpcsync.NewEvent(),
  64. updateCh: make(chan suWithError, 1),
  65. activeClusters: make(map[string]*clusterInfo),
  66. channelID: grpcrand.Uint64(),
  67. }
  68. defer func() {
  69. if retErr != nil {
  70. r.Close()
  71. }
  72. }()
  73. r.logger = prefixLogger(r)
  74. r.logger.Infof("Creating resolver for target: %+v", target)
  75. newXDSClient := newXDSClient
  76. if b.newXDSClient != nil {
  77. newXDSClient = b.newXDSClient
  78. }
  79. client, close, err := newXDSClient()
  80. if err != nil {
  81. return nil, fmt.Errorf("xds: failed to create xds-client: %v", err)
  82. }
  83. r.xdsClient = client
  84. r.xdsClientClose = close
  85. bootstrapConfig := client.BootstrapConfig()
  86. if bootstrapConfig == nil {
  87. return nil, errors.New("bootstrap configuration is empty")
  88. }
  89. // If xds credentials were specified by the user, but bootstrap configs do
  90. // not contain any certificate provider configuration, it is better to fail
  91. // right now rather than failing when attempting to create certificate
  92. // providers after receiving an CDS response with security configuration.
  93. var creds credentials.TransportCredentials
  94. switch {
  95. case opts.DialCreds != nil:
  96. creds = opts.DialCreds
  97. case opts.CredsBundle != nil:
  98. creds = opts.CredsBundle.TransportCredentials()
  99. }
  100. if xc, ok := creds.(interface{ UsesXDS() bool }); ok && xc.UsesXDS() {
  101. if len(bootstrapConfig.CertProviderConfigs) == 0 {
  102. return nil, errors.New("xds: xdsCreds specified but certificate_providers config missing in bootstrap file")
  103. }
  104. }
  105. // Find the client listener template to use from the bootstrap config:
  106. // - If authority is not set in the target, use the top level template
  107. // - If authority is set, use the template from the authority map.
  108. template := bootstrapConfig.ClientDefaultListenerResourceNameTemplate
  109. if authority := target.URL.Host; authority != "" {
  110. a := bootstrapConfig.Authorities[authority]
  111. if a == nil {
  112. return nil, fmt.Errorf("xds: authority %q is not found in the bootstrap file", authority)
  113. }
  114. if a.ClientListenerResourceNameTemplate != "" {
  115. // This check will never be false, because
  116. // ClientListenerResourceNameTemplate is required to start with
  117. // xdstp://, and has a default value (not an empty string) if unset.
  118. template = a.ClientListenerResourceNameTemplate
  119. }
  120. }
  121. endpoint := target.URL.Path
  122. if endpoint == "" {
  123. endpoint = target.URL.Opaque
  124. }
  125. endpoint = strings.TrimPrefix(endpoint, "/")
  126. r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, endpoint)
  127. // Register a watch on the xdsClient for the resource name determined above.
  128. cancelWatch := watchService(r.xdsClient, r.ldsResourceName, r.handleServiceUpdate, r.logger)
  129. r.logger.Infof("Watch started on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient)
  130. r.cancelWatch = func() {
  131. cancelWatch()
  132. r.logger.Infof("Watch cancel on resource name %v with xds-client %p", r.ldsResourceName, r.xdsClient)
  133. }
  134. go r.run()
  135. return r, nil
  136. }
  137. // Name helps implement the resolver.Builder interface.
  138. func (*xdsResolverBuilder) Scheme() string {
  139. return xdsScheme
  140. }
  141. // suWithError wraps the ServiceUpdate and error received through a watch API
  142. // callback, so that it can pushed onto the update channel as a single entity.
  143. type suWithError struct {
  144. su serviceUpdate
  145. emptyUpdate bool
  146. err error
  147. }
  148. // xdsResolver implements the resolver.Resolver interface.
  149. //
  150. // It registers a watcher for ServiceConfig updates with the xdsClient object
  151. // (which performs LDS/RDS queries for the same), and passes the received
  152. // updates to the ClientConn.
  153. type xdsResolver struct {
  154. cc resolver.ClientConn
  155. closed *grpcsync.Event
  156. logger *grpclog.PrefixLogger
  157. ldsResourceName string
  158. // The underlying xdsClient which performs all xDS requests and responses.
  159. xdsClient xdsclient.XDSClient
  160. xdsClientClose func()
  161. // A channel for the watch API callback to write service updates on to. The
  162. // updates are read by the run goroutine and passed on to the ClientConn.
  163. updateCh chan suWithError
  164. // cancelWatch is the function to cancel the watcher.
  165. cancelWatch func()
  166. // activeClusters is a map from cluster name to a ref count. Only read or
  167. // written during a service update (synchronous).
  168. activeClusters map[string]*clusterInfo
  169. curConfigSelector *configSelector
  170. // A random number which uniquely identifies the channel which owns this
  171. // resolver.
  172. channelID uint64
  173. }
  174. // sendNewServiceConfig prunes active clusters, generates a new service config
  175. // based on the current set of active clusters, and sends an update to the
  176. // channel with that service config and the provided config selector. Returns
  177. // false if an error occurs while generating the service config and the update
  178. // cannot be sent.
  179. func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
  180. // Delete entries from r.activeClusters with zero references;
  181. // otherwise serviceConfigJSON will generate a config including
  182. // them.
  183. r.pruneActiveClusters()
  184. if cs == nil && len(r.activeClusters) == 0 {
  185. // There are no clusters and we are sending a failing configSelector.
  186. // Send an empty config, which picks pick-first, with no address, and
  187. // puts the ClientConn into transient failure.
  188. r.cc.UpdateState(resolver.State{ServiceConfig: r.cc.ParseServiceConfig("{}")})
  189. return true
  190. }
  191. sc, err := serviceConfigJSON(r.activeClusters)
  192. if err != nil {
  193. // JSON marshal error; should never happen.
  194. r.logger.Errorf("%v", err)
  195. r.cc.ReportError(err)
  196. return false
  197. }
  198. r.logger.Infof("Received update on resource %v from xds-client %p, generated service config: %v", r.ldsResourceName, r.xdsClient, pretty.FormatJSON(sc))
  199. // Send the update to the ClientConn.
  200. state := iresolver.SetConfigSelector(resolver.State{
  201. ServiceConfig: r.cc.ParseServiceConfig(string(sc)),
  202. }, cs)
  203. r.cc.UpdateState(xdsclient.SetClient(state, r.xdsClient))
  204. return true
  205. }
  206. // run is a long running goroutine which blocks on receiving service updates
  207. // and passes it on the ClientConn.
  208. func (r *xdsResolver) run() {
  209. for {
  210. select {
  211. case <-r.closed.Done():
  212. return
  213. case update := <-r.updateCh:
  214. if update.err != nil {
  215. r.logger.Warningf("Watch error on resource %v from xds-client %p, %v", r.ldsResourceName, r.xdsClient, update.err)
  216. if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound {
  217. // If error is resource-not-found, it means the LDS
  218. // resource was removed. Ultimately send an empty service
  219. // config, which picks pick-first, with no address, and
  220. // puts the ClientConn into transient failure. Before we
  221. // can do that, we may need to send a normal service config
  222. // along with an erroring (nil) config selector.
  223. r.sendNewServiceConfig(nil)
  224. // Stop and dereference the active config selector, if one exists.
  225. r.curConfigSelector.stop()
  226. r.curConfigSelector = nil
  227. continue
  228. }
  229. // Send error to ClientConn, and balancers, if error is not
  230. // resource not found. No need to update resolver state if we
  231. // can keep using the old config.
  232. r.cc.ReportError(update.err)
  233. continue
  234. }
  235. if update.emptyUpdate {
  236. r.sendNewServiceConfig(r.curConfigSelector)
  237. continue
  238. }
  239. // Create the config selector for this update.
  240. cs, err := r.newConfigSelector(update.su)
  241. if err != nil {
  242. r.logger.Warningf("Error parsing update on resource %v from xds-client %p: %v", r.ldsResourceName, r.xdsClient, err)
  243. r.cc.ReportError(err)
  244. continue
  245. }
  246. if !r.sendNewServiceConfig(cs) {
  247. // JSON error creating the service config (unexpected); erase
  248. // this config selector and ignore this update, continuing with
  249. // the previous config selector.
  250. cs.stop()
  251. continue
  252. }
  253. // Decrement references to the old config selector and assign the
  254. // new one as the current one.
  255. r.curConfigSelector.stop()
  256. r.curConfigSelector = cs
  257. }
  258. }
  259. }
  260. // handleServiceUpdate is the callback which handles service updates. It writes
  261. // the received update to the update channel, which is picked by the run
  262. // goroutine.
  263. func (r *xdsResolver) handleServiceUpdate(su serviceUpdate, err error) {
  264. if r.closed.HasFired() {
  265. // Do not pass updates to the ClientConn once the resolver is closed.
  266. return
  267. }
  268. // Remove any existing entry in updateCh and replace with the new one.
  269. select {
  270. case <-r.updateCh:
  271. default:
  272. }
  273. r.updateCh <- suWithError{su: su, err: err}
  274. }
  275. // ResolveNow is a no-op at this point.
  276. func (*xdsResolver) ResolveNow(o resolver.ResolveNowOptions) {}
  277. // Close closes the resolver, and also closes the underlying xdsClient.
  278. func (r *xdsResolver) Close() {
  279. // Note that Close needs to check for nils even if some of them are always
  280. // set in the constructor. This is because the constructor defers Close() in
  281. // error cases, and the fields might not be set when the error happens.
  282. if r.cancelWatch != nil {
  283. r.cancelWatch()
  284. }
  285. if r.xdsClientClose != nil {
  286. r.xdsClientClose()
  287. }
  288. r.closed.Fire()
  289. r.logger.Infof("Shutdown")
  290. }