watches.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package sotw
  2. import (
  3. "context"
  4. "reflect"
  5. discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
  6. "github.com/envoyproxy/go-control-plane/pkg/cache/types"
  7. "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
  8. )
  9. // watches for all xDS resource types
  10. type watches struct {
  11. responders map[string]*watch
  12. // cases is a dynamic select case for the watched channels.
  13. cases []reflect.SelectCase
  14. }
  15. // newWatches creates and initializes watches.
  16. func newWatches() watches {
  17. return watches{
  18. responders: make(map[string]*watch, int(types.UnknownType)),
  19. cases: make([]reflect.SelectCase, 0),
  20. }
  21. }
  22. // addWatch creates a new watch entry in the watches map.
  23. // Watches are sorted by typeURL.
  24. func (w *watches) addWatch(typeURL string, watch *watch) {
  25. w.responders[typeURL] = watch
  26. }
  27. // close all open watches
  28. func (w *watches) close() {
  29. for _, watch := range w.responders {
  30. watch.close()
  31. }
  32. }
  33. // recomputeWatches rebuilds the known list of dynamic channels if needed
  34. func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) {
  35. w.cases = w.cases[:0] // Clear the existing cases while retaining capacity.
  36. w.cases = append(w.cases,
  37. reflect.SelectCase{
  38. Dir: reflect.SelectRecv,
  39. Chan: reflect.ValueOf(ctx.Done()),
  40. }, reflect.SelectCase{
  41. Dir: reflect.SelectRecv,
  42. Chan: reflect.ValueOf(req),
  43. },
  44. )
  45. for _, watch := range w.responders {
  46. w.cases = append(w.cases, reflect.SelectCase{
  47. Dir: reflect.SelectRecv,
  48. Chan: reflect.ValueOf(watch.response),
  49. })
  50. }
  51. }
  52. // watch contains the necessary modifiable data for receiving resource responses
  53. type watch struct {
  54. cancel func()
  55. nonce string
  56. response chan cache.Response
  57. }
  58. // close cancels an open watch
  59. func (w *watch) close() {
  60. if w.cancel != nil {
  61. w.cancel()
  62. }
  63. }