123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- /*
- *
- * Copyright 2020 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- package resolver
- import (
- "fmt"
- "sync"
- "time"
- "google.golang.org/grpc/internal/grpclog"
- "google.golang.org/grpc/internal/pretty"
- "google.golang.org/grpc/xds/internal/clusterspecifier"
- "google.golang.org/grpc/xds/internal/xdsclient"
- "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
- )
- // serviceUpdate contains information received from the LDS/RDS responses which
- // are of interest to the xds resolver. The RDS request is built by first
- // making a LDS to get the RouteConfig name.
- type serviceUpdate struct {
- // virtualHost contains routes and other configuration to route RPCs.
- virtualHost *xdsresource.VirtualHost
- // clusterSpecifierPlugins contains the configurations for any cluster
- // specifier plugins emitted by the xdsclient.
- clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig
- // ldsConfig contains configuration that applies to all routes.
- ldsConfig ldsConfig
- }
- // ldsConfig contains information received from the LDS responses which are of
- // interest to the xds resolver.
- type ldsConfig struct {
- // maxStreamDuration is from the HTTP connection manager's
- // common_http_protocol_options field.
- maxStreamDuration time.Duration
- httpFilterConfig []xdsresource.HTTPFilter
- }
- // watchService uses LDS and RDS to discover information about the provided
- // serviceName.
- //
- // Note that during race (e.g. an xDS response is received while the user is
- // calling cancel()), there's a small window where the callback can be called
- // after the watcher is canceled. The caller needs to handle this case.
- //
- // TODO(easwars): Make this function a method on the xdsResolver type.
- // Currently, there is a single call site for this function, and all arguments
- // passed to it are fields of the xdsResolver type.
- func watchService(c xdsclient.XDSClient, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) {
- w := &serviceUpdateWatcher{
- logger: logger,
- c: c,
- serviceName: serviceName,
- serviceCb: cb,
- }
- w.ldsCancel = c.WatchListener(serviceName, w.handleLDSResp)
- return w.close
- }
- // serviceUpdateWatcher handles LDS and RDS response, and calls the service
- // callback at the right time.
- type serviceUpdateWatcher struct {
- logger *grpclog.PrefixLogger
- c xdsclient.XDSClient
- serviceName string
- ldsCancel func()
- serviceCb func(serviceUpdate, error)
- lastUpdate serviceUpdate
- mu sync.Mutex
- closed bool
- rdsName string
- rdsCancel func()
- }
- func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, err error) {
- w.logger.Infof("received LDS update: %+v, err: %v", pretty.ToJSON(update), err)
- w.mu.Lock()
- defer w.mu.Unlock()
- if w.closed {
- return
- }
- if err != nil {
- // We check the error type and do different things. For now, the only
- // type we check is ResourceNotFound, which indicates the LDS resource
- // was removed, and besides sending the error to callback, we also
- // cancel the RDS watch.
- if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound && w.rdsCancel != nil {
- w.rdsCancel()
- w.rdsName = ""
- w.rdsCancel = nil
- w.lastUpdate = serviceUpdate{}
- }
- // The other error cases still return early without canceling the
- // existing RDS watch.
- w.serviceCb(serviceUpdate{}, err)
- return
- }
- w.lastUpdate.ldsConfig = ldsConfig{
- maxStreamDuration: update.MaxStreamDuration,
- httpFilterConfig: update.HTTPFilters,
- }
- if update.InlineRouteConfig != nil {
- // If there was an RDS watch, cancel it.
- w.rdsName = ""
- if w.rdsCancel != nil {
- w.rdsCancel()
- w.rdsCancel = nil
- }
- // Handle the inline RDS update as if it's from an RDS watch.
- w.applyRouteConfigUpdate(*update.InlineRouteConfig)
- return
- }
- // RDS name from update is not an empty string, need RDS to fetch the
- // routes.
- if w.rdsName == update.RouteConfigName {
- // If the new RouteConfigName is same as the previous, don't cancel and
- // restart the RDS watch.
- //
- // If the route name did change, then we must wait until the first RDS
- // update before reporting this LDS config.
- if w.lastUpdate.virtualHost != nil {
- // We want to send an update with the new fields from the new LDS
- // (e.g. max stream duration), and old fields from the previous
- // RDS.
- //
- // But note that this should only happen when virtual host is set,
- // which means an RDS was received.
- w.serviceCb(w.lastUpdate, nil)
- }
- return
- }
- w.rdsName = update.RouteConfigName
- if w.rdsCancel != nil {
- w.rdsCancel()
- }
- w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp)
- }
- func (w *serviceUpdateWatcher) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) {
- matchVh := xdsresource.FindBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
- if matchVh == nil {
- // No matching virtual host found.
- w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
- return
- }
- w.lastUpdate.virtualHost = matchVh
- w.lastUpdate.clusterSpecifierPlugins = update.ClusterSpecifierPlugins
- w.serviceCb(w.lastUpdate, nil)
- }
- func (w *serviceUpdateWatcher) handleRDSResp(update xdsresource.RouteConfigUpdate, err error) {
- w.logger.Infof("received RDS update: %+v, err: %v", pretty.ToJSON(update), err)
- w.mu.Lock()
- defer w.mu.Unlock()
- if w.closed {
- return
- }
- if w.rdsCancel == nil {
- // This mean only the RDS watch is canceled, can happen if the LDS
- // resource is removed.
- return
- }
- if err != nil {
- w.serviceCb(serviceUpdate{}, err)
- return
- }
- w.applyRouteConfigUpdate(update)
- }
- func (w *serviceUpdateWatcher) close() {
- w.mu.Lock()
- defer w.mu.Unlock()
- w.closed = true
- w.ldsCancel()
- if w.rdsCancel != nil {
- w.rdsCancel()
- w.rdsCancel = nil
- }
- }
|