client.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. /*
  2. *
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package fakeclient provides a fake implementation of an xDS client.
  19. package fakeclient
  20. import (
  21. "context"
  22. "google.golang.org/grpc/internal/testutils"
  23. "google.golang.org/grpc/xds/internal/xdsclient"
  24. "google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
  25. "google.golang.org/grpc/xds/internal/xdsclient/load"
  26. "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
  27. )
  28. // Client is a fake implementation of an xds client. It exposes a bunch of
  29. // channels to signal the occurrence of various events.
  30. type Client struct {
  31. // Embed XDSClient so this fake client implements the interface, but it's
  32. // never set (it's always nil). This may cause nil panic since not all the
  33. // methods are implemented.
  34. xdsclient.XDSClient
  35. name string
  36. ldsWatchCh *testutils.Channel
  37. rdsWatchCh *testutils.Channel
  38. cdsWatchCh *testutils.Channel
  39. edsWatchCh *testutils.Channel
  40. ldsCancelCh *testutils.Channel
  41. rdsCancelCh *testutils.Channel
  42. cdsCancelCh *testutils.Channel
  43. edsCancelCh *testutils.Channel
  44. loadReportCh *testutils.Channel
  45. lrsCancelCh *testutils.Channel
  46. loadStore *load.Store
  47. bootstrapCfg *bootstrap.Config
  48. ldsCb func(xdsresource.ListenerUpdate, error)
  49. rdsCbs map[string]func(xdsresource.RouteConfigUpdate, error)
  50. cdsCbs map[string]func(xdsresource.ClusterUpdate, error)
  51. edsCbs map[string]func(xdsresource.EndpointsUpdate, error)
  52. }
  53. // WatchListener registers a LDS watch.
  54. func (xdsC *Client) WatchListener(serviceName string, callback func(xdsresource.ListenerUpdate, error)) func() {
  55. xdsC.ldsCb = callback
  56. xdsC.ldsWatchCh.Send(serviceName)
  57. return func() {
  58. xdsC.ldsCancelCh.Send(nil)
  59. }
  60. }
  61. // WaitForWatchListener waits for WatchCluster to be invoked on this client and
  62. // returns the serviceName being watched.
  63. func (xdsC *Client) WaitForWatchListener(ctx context.Context) (string, error) {
  64. val, err := xdsC.ldsWatchCh.Receive(ctx)
  65. if err != nil {
  66. return "", err
  67. }
  68. return val.(string), err
  69. }
  70. // InvokeWatchListenerCallback invokes the registered ldsWatch callback.
  71. //
  72. // Not thread safe with WatchListener. Only call this after
  73. // WaitForWatchListener.
  74. func (xdsC *Client) InvokeWatchListenerCallback(update xdsresource.ListenerUpdate, err error) {
  75. xdsC.ldsCb(update, err)
  76. }
  77. // WaitForCancelListenerWatch waits for a LDS watch to be cancelled and returns
  78. // context.DeadlineExceeded otherwise.
  79. func (xdsC *Client) WaitForCancelListenerWatch(ctx context.Context) error {
  80. _, err := xdsC.ldsCancelCh.Receive(ctx)
  81. return err
  82. }
  83. // WatchRouteConfig registers a RDS watch.
  84. func (xdsC *Client) WatchRouteConfig(routeName string, callback func(xdsresource.RouteConfigUpdate, error)) func() {
  85. xdsC.rdsCbs[routeName] = callback
  86. xdsC.rdsWatchCh.Send(routeName)
  87. return func() {
  88. xdsC.rdsCancelCh.Send(routeName)
  89. }
  90. }
  91. // WaitForWatchRouteConfig waits for WatchCluster to be invoked on this client and
  92. // returns the routeName being watched.
  93. func (xdsC *Client) WaitForWatchRouteConfig(ctx context.Context) (string, error) {
  94. val, err := xdsC.rdsWatchCh.Receive(ctx)
  95. if err != nil {
  96. return "", err
  97. }
  98. return val.(string), err
  99. }
  100. // InvokeWatchRouteConfigCallback invokes the registered rdsWatch callback.
  101. //
  102. // Not thread safe with WatchRouteConfig. Only call this after
  103. // WaitForWatchRouteConfig.
  104. func (xdsC *Client) InvokeWatchRouteConfigCallback(name string, update xdsresource.RouteConfigUpdate, err error) {
  105. if len(xdsC.rdsCbs) != 1 {
  106. xdsC.rdsCbs[name](update, err)
  107. return
  108. }
  109. // Keeps functionality with previous usage of this on client side, if single
  110. // callback call that callback.
  111. var routeName string
  112. for route := range xdsC.rdsCbs {
  113. routeName = route
  114. }
  115. xdsC.rdsCbs[routeName](update, err)
  116. }
  117. // WaitForCancelRouteConfigWatch waits for a RDS watch to be cancelled and returns
  118. // context.DeadlineExceeded otherwise.
  119. func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) (string, error) {
  120. val, err := xdsC.rdsCancelCh.Receive(ctx)
  121. if err != nil {
  122. return "", err
  123. }
  124. return val.(string), err
  125. }
  126. // WatchCluster registers a CDS watch.
  127. func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsresource.ClusterUpdate, error)) func() {
  128. // Due to the tree like structure of aggregate clusters, there can be multiple callbacks persisted for each cluster
  129. // node. However, the client doesn't care about the parent child relationship between the nodes, only that it invokes
  130. // the right callback for a particular cluster.
  131. xdsC.cdsCbs[clusterName] = callback
  132. xdsC.cdsWatchCh.Send(clusterName)
  133. return func() {
  134. xdsC.cdsCancelCh.Send(clusterName)
  135. }
  136. }
  137. // WaitForWatchCluster waits for WatchCluster to be invoked on this client and
  138. // returns the clusterName being watched.
  139. func (xdsC *Client) WaitForWatchCluster(ctx context.Context) (string, error) {
  140. val, err := xdsC.cdsWatchCh.Receive(ctx)
  141. if err != nil {
  142. return "", err
  143. }
  144. return val.(string), err
  145. }
  146. // InvokeWatchClusterCallback invokes the registered cdsWatch callback.
  147. //
  148. // Not thread safe with WatchCluster. Only call this after
  149. // WaitForWatchCluster.
  150. func (xdsC *Client) InvokeWatchClusterCallback(update xdsresource.ClusterUpdate, err error) {
  151. // Keeps functionality with previous usage of this, if single callback call that callback.
  152. if len(xdsC.cdsCbs) == 1 {
  153. var clusterName string
  154. for cluster := range xdsC.cdsCbs {
  155. clusterName = cluster
  156. }
  157. xdsC.cdsCbs[clusterName](update, err)
  158. } else {
  159. // Have what callback you call with the update determined by the service name in the ClusterUpdate. Left up to the
  160. // caller to make sure the cluster update matches with a persisted callback.
  161. xdsC.cdsCbs[update.ClusterName](update, err)
  162. }
  163. }
  164. // WaitForCancelClusterWatch waits for a CDS watch to be cancelled and returns
  165. // context.DeadlineExceeded otherwise.
  166. func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) (string, error) {
  167. clusterNameReceived, err := xdsC.cdsCancelCh.Receive(ctx)
  168. if err != nil {
  169. return "", err
  170. }
  171. return clusterNameReceived.(string), err
  172. }
  173. // WatchEndpoints registers an EDS watch for provided clusterName.
  174. func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsresource.EndpointsUpdate, error)) (cancel func()) {
  175. xdsC.edsCbs[clusterName] = callback
  176. xdsC.edsWatchCh.Send(clusterName)
  177. return func() {
  178. xdsC.edsCancelCh.Send(clusterName)
  179. }
  180. }
  181. // WaitForWatchEDS waits for WatchEndpoints to be invoked on this client and
  182. // returns the clusterName being watched.
  183. func (xdsC *Client) WaitForWatchEDS(ctx context.Context) (string, error) {
  184. val, err := xdsC.edsWatchCh.Receive(ctx)
  185. if err != nil {
  186. return "", err
  187. }
  188. return val.(string), err
  189. }
  190. // InvokeWatchEDSCallback invokes the registered edsWatch callback.
  191. //
  192. // Not thread safe with WatchEndpoints. Only call this after
  193. // WaitForWatchEDS.
  194. func (xdsC *Client) InvokeWatchEDSCallback(name string, update xdsresource.EndpointsUpdate, err error) {
  195. if len(xdsC.edsCbs) != 1 {
  196. // This may panic if name isn't found. But it's fine for tests.
  197. xdsC.edsCbs[name](update, err)
  198. return
  199. }
  200. // Keeps functionality with previous usage of this, if single callback call
  201. // that callback.
  202. for n := range xdsC.edsCbs {
  203. name = n
  204. }
  205. xdsC.edsCbs[name](update, err)
  206. }
  207. // WaitForCancelEDSWatch waits for a EDS watch to be cancelled and returns
  208. // context.DeadlineExceeded otherwise.
  209. func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) (string, error) {
  210. edsNameReceived, err := xdsC.edsCancelCh.Receive(ctx)
  211. if err != nil {
  212. return "", err
  213. }
  214. return edsNameReceived.(string), err
  215. }
  216. // ReportLoadArgs wraps the arguments passed to ReportLoad.
  217. type ReportLoadArgs struct {
  218. // Server is the name of the server to which the load is reported.
  219. Server *bootstrap.ServerConfig
  220. }
  221. // ReportLoad starts reporting load about clusterName to server.
  222. func (xdsC *Client) ReportLoad(server *bootstrap.ServerConfig) (loadStore *load.Store, cancel func()) {
  223. xdsC.loadReportCh.Send(ReportLoadArgs{Server: server})
  224. return xdsC.loadStore, func() {
  225. xdsC.lrsCancelCh.Send(nil)
  226. }
  227. }
  228. // WaitForCancelReportLoad waits for a load report to be cancelled and returns
  229. // context.DeadlineExceeded otherwise.
  230. func (xdsC *Client) WaitForCancelReportLoad(ctx context.Context) error {
  231. _, err := xdsC.lrsCancelCh.Receive(ctx)
  232. return err
  233. }
  234. // LoadStore returns the underlying load data store.
  235. func (xdsC *Client) LoadStore() *load.Store {
  236. return xdsC.loadStore
  237. }
  238. // WaitForReportLoad waits for ReportLoad to be invoked on this client and
  239. // returns the arguments passed to it.
  240. func (xdsC *Client) WaitForReportLoad(ctx context.Context) (ReportLoadArgs, error) {
  241. val, err := xdsC.loadReportCh.Receive(ctx)
  242. if err != nil {
  243. return ReportLoadArgs{}, err
  244. }
  245. return val.(ReportLoadArgs), nil
  246. }
  247. // BootstrapConfig returns the bootstrap config.
  248. func (xdsC *Client) BootstrapConfig() *bootstrap.Config {
  249. return xdsC.bootstrapCfg
  250. }
  251. // SetBootstrapConfig updates the bootstrap config.
  252. func (xdsC *Client) SetBootstrapConfig(cfg *bootstrap.Config) {
  253. xdsC.bootstrapCfg = cfg
  254. }
  255. // Name returns the name of the xds client.
  256. func (xdsC *Client) Name() string {
  257. return xdsC.name
  258. }
  259. // NewClient returns a new fake xds client.
  260. func NewClient() *Client {
  261. return NewClientWithName("")
  262. }
  263. // NewClientWithName returns a new fake xds client with the provided name. This
  264. // is used in cases where multiple clients are created in the tests and we need
  265. // to make sure the client is created for the expected balancer name.
  266. func NewClientWithName(name string) *Client {
  267. return &Client{
  268. name: name,
  269. ldsWatchCh: testutils.NewChannelWithSize(10),
  270. rdsWatchCh: testutils.NewChannelWithSize(10),
  271. cdsWatchCh: testutils.NewChannelWithSize(10),
  272. edsWatchCh: testutils.NewChannelWithSize(10),
  273. ldsCancelCh: testutils.NewChannelWithSize(10),
  274. rdsCancelCh: testutils.NewChannelWithSize(10),
  275. cdsCancelCh: testutils.NewChannelWithSize(10),
  276. edsCancelCh: testutils.NewChannelWithSize(10),
  277. loadReportCh: testutils.NewChannel(),
  278. lrsCancelCh: testutils.NewChannel(),
  279. loadStore: load.NewStore(),
  280. bootstrapCfg: &bootstrap.Config{ClientDefaultListenerResourceNameTemplate: "%s"},
  281. rdsCbs: make(map[string]func(xdsresource.RouteConfigUpdate, error)),
  282. cdsCbs: make(map[string]func(xdsresource.ClusterUpdate, error)),
  283. edsCbs: make(map[string]func(xdsresource.EndpointsUpdate, error)),
  284. }
  285. }