watch_service.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. /*
  2. *
  3. * Copyright 2020 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package resolver
  19. import (
  20. "fmt"
  21. "sync"
  22. "time"
  23. "google.golang.org/grpc/internal/grpclog"
  24. "google.golang.org/grpc/internal/pretty"
  25. "google.golang.org/grpc/xds/internal/clusterspecifier"
  26. "google.golang.org/grpc/xds/internal/xdsclient"
  27. "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
  28. )
  29. // serviceUpdate contains information received from the LDS/RDS responses which
  30. // are of interest to the xds resolver. The RDS request is built by first
  31. // making a LDS to get the RouteConfig name.
  32. type serviceUpdate struct {
  33. // virtualHost contains routes and other configuration to route RPCs.
  34. virtualHost *xdsresource.VirtualHost
  35. // clusterSpecifierPlugins contains the configurations for any cluster
  36. // specifier plugins emitted by the xdsclient.
  37. clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig
  38. // ldsConfig contains configuration that applies to all routes.
  39. ldsConfig ldsConfig
  40. }
  41. // ldsConfig contains information received from the LDS responses which are of
  42. // interest to the xds resolver.
  43. type ldsConfig struct {
  44. // maxStreamDuration is from the HTTP connection manager's
  45. // common_http_protocol_options field.
  46. maxStreamDuration time.Duration
  47. httpFilterConfig []xdsresource.HTTPFilter
  48. }
  49. // watchService uses LDS and RDS to discover information about the provided
  50. // serviceName.
  51. //
  52. // Note that during race (e.g. an xDS response is received while the user is
  53. // calling cancel()), there's a small window where the callback can be called
  54. // after the watcher is canceled. The caller needs to handle this case.
  55. //
  56. // TODO(easwars): Make this function a method on the xdsResolver type.
  57. // Currently, there is a single call site for this function, and all arguments
  58. // passed to it are fields of the xdsResolver type.
  59. func watchService(c xdsclient.XDSClient, serviceName string, cb func(serviceUpdate, error), logger *grpclog.PrefixLogger) (cancel func()) {
  60. w := &serviceUpdateWatcher{
  61. logger: logger,
  62. c: c,
  63. serviceName: serviceName,
  64. serviceCb: cb,
  65. }
  66. w.ldsCancel = c.WatchListener(serviceName, w.handleLDSResp)
  67. return w.close
  68. }
  69. // serviceUpdateWatcher handles LDS and RDS response, and calls the service
  70. // callback at the right time.
  71. type serviceUpdateWatcher struct {
  72. logger *grpclog.PrefixLogger
  73. c xdsclient.XDSClient
  74. serviceName string
  75. ldsCancel func()
  76. serviceCb func(serviceUpdate, error)
  77. lastUpdate serviceUpdate
  78. mu sync.Mutex
  79. closed bool
  80. rdsName string
  81. rdsCancel func()
  82. }
  83. func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, err error) {
  84. w.logger.Infof("received LDS update: %+v, err: %v", pretty.ToJSON(update), err)
  85. w.mu.Lock()
  86. defer w.mu.Unlock()
  87. if w.closed {
  88. return
  89. }
  90. if err != nil {
  91. // We check the error type and do different things. For now, the only
  92. // type we check is ResourceNotFound, which indicates the LDS resource
  93. // was removed, and besides sending the error to callback, we also
  94. // cancel the RDS watch.
  95. if xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound && w.rdsCancel != nil {
  96. w.rdsCancel()
  97. w.rdsName = ""
  98. w.rdsCancel = nil
  99. w.lastUpdate = serviceUpdate{}
  100. }
  101. // The other error cases still return early without canceling the
  102. // existing RDS watch.
  103. w.serviceCb(serviceUpdate{}, err)
  104. return
  105. }
  106. w.lastUpdate.ldsConfig = ldsConfig{
  107. maxStreamDuration: update.MaxStreamDuration,
  108. httpFilterConfig: update.HTTPFilters,
  109. }
  110. if update.InlineRouteConfig != nil {
  111. // If there was an RDS watch, cancel it.
  112. w.rdsName = ""
  113. if w.rdsCancel != nil {
  114. w.rdsCancel()
  115. w.rdsCancel = nil
  116. }
  117. // Handle the inline RDS update as if it's from an RDS watch.
  118. w.applyRouteConfigUpdate(*update.InlineRouteConfig)
  119. return
  120. }
  121. // RDS name from update is not an empty string, need RDS to fetch the
  122. // routes.
  123. if w.rdsName == update.RouteConfigName {
  124. // If the new RouteConfigName is same as the previous, don't cancel and
  125. // restart the RDS watch.
  126. //
  127. // If the route name did change, then we must wait until the first RDS
  128. // update before reporting this LDS config.
  129. if w.lastUpdate.virtualHost != nil {
  130. // We want to send an update with the new fields from the new LDS
  131. // (e.g. max stream duration), and old fields from the previous
  132. // RDS.
  133. //
  134. // But note that this should only happen when virtual host is set,
  135. // which means an RDS was received.
  136. w.serviceCb(w.lastUpdate, nil)
  137. }
  138. return
  139. }
  140. w.rdsName = update.RouteConfigName
  141. if w.rdsCancel != nil {
  142. w.rdsCancel()
  143. }
  144. w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp)
  145. }
  146. func (w *serviceUpdateWatcher) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) {
  147. matchVh := xdsresource.FindBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
  148. if matchVh == nil {
  149. // No matching virtual host found.
  150. w.serviceCb(serviceUpdate{}, fmt.Errorf("no matching virtual host found for %q", w.serviceName))
  151. return
  152. }
  153. w.lastUpdate.virtualHost = matchVh
  154. w.lastUpdate.clusterSpecifierPlugins = update.ClusterSpecifierPlugins
  155. w.serviceCb(w.lastUpdate, nil)
  156. }
  157. func (w *serviceUpdateWatcher) handleRDSResp(update xdsresource.RouteConfigUpdate, err error) {
  158. w.logger.Infof("received RDS update: %+v, err: %v", pretty.ToJSON(update), err)
  159. w.mu.Lock()
  160. defer w.mu.Unlock()
  161. if w.closed {
  162. return
  163. }
  164. if w.rdsCancel == nil {
  165. // This mean only the RDS watch is canceled, can happen if the LDS
  166. // resource is removed.
  167. return
  168. }
  169. if err != nil {
  170. w.serviceCb(serviceUpdate{}, err)
  171. return
  172. }
  173. w.applyRouteConfigUpdate(update)
  174. }
  175. func (w *serviceUpdateWatcher) close() {
  176. w.mu.Lock()
  177. defer w.mu.Unlock()
  178. w.closed = true
  179. w.ldsCancel()
  180. if w.rdsCancel != nil {
  181. w.rdsCancel()
  182. w.rdsCancel = nil
  183. }
  184. }