meta_cache_subscribe.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package meta_cache
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "strings"
  10. )
  11. type MetadataFollower struct {
  12. PathPrefixToWatch string
  13. ProcessEventFn func(resp *filer_pb.SubscribeMetadataResponse) error
  14. }
  15. func mergeProcessors(mainProcessor func(resp *filer_pb.SubscribeMetadataResponse) error, followers ...*MetadataFollower) func(resp *filer_pb.SubscribeMetadataResponse) error {
  16. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  17. // build the full path
  18. entry := resp.EventNotification.NewEntry
  19. if entry == nil {
  20. entry = resp.EventNotification.OldEntry
  21. }
  22. if entry != nil {
  23. dir := resp.Directory
  24. if resp.EventNotification.NewParentPath != "" {
  25. dir = resp.EventNotification.NewParentPath
  26. }
  27. fp := util.NewFullPath(dir, entry.Name)
  28. for _, follower := range followers {
  29. if strings.HasPrefix(string(fp), follower.PathPrefixToWatch) {
  30. if err := follower.ProcessEventFn(resp); err != nil {
  31. return err
  32. }
  33. }
  34. }
  35. }
  36. return mainProcessor(resp)
  37. }
  38. }
  39. func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, followers ...*MetadataFollower) error {
  40. var prefixes []string
  41. for _, follower := range followers {
  42. prefixes = append(prefixes, follower.PathPrefixToWatch)
  43. }
  44. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  45. message := resp.EventNotification
  46. for _, sig := range message.Signatures {
  47. if sig == selfSignature && selfSignature != 0 {
  48. return nil
  49. }
  50. }
  51. dir := resp.Directory
  52. var oldPath util.FullPath
  53. var newEntry *filer.Entry
  54. if message.OldEntry != nil {
  55. oldPath = util.NewFullPath(dir, message.OldEntry.Name)
  56. glog.V(4).Infof("deleting %v", oldPath)
  57. }
  58. if message.NewEntry != nil {
  59. if message.NewParentPath != "" {
  60. dir = message.NewParentPath
  61. }
  62. key := util.NewFullPath(dir, message.NewEntry.Name)
  63. glog.V(4).Infof("creating %v", key)
  64. newEntry = filer.FromPbEntry(dir, message.NewEntry)
  65. }
  66. err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
  67. if err == nil {
  68. if message.OldEntry != nil && message.NewEntry != nil {
  69. oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
  70. mc.invalidateFunc(oldKey, message.OldEntry)
  71. if message.OldEntry.Name != message.NewEntry.Name {
  72. newKey := util.NewFullPath(dir, message.NewEntry.Name)
  73. mc.invalidateFunc(newKey, message.NewEntry)
  74. }
  75. } else if filer_pb.IsCreate(resp) {
  76. // no need to invalidate
  77. } else if filer_pb.IsDelete(resp) {
  78. oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
  79. mc.invalidateFunc(oldKey, message.OldEntry)
  80. }
  81. }
  82. return err
  83. }
  84. prefix := dir
  85. if !strings.HasSuffix(prefix, "/") {
  86. prefix = prefix + "/"
  87. }
  88. metadataFollowOption := &pb.MetadataFollowOption{
  89. ClientName: "mount",
  90. ClientId: selfSignature,
  91. ClientEpoch: 1,
  92. SelfSignature: selfSignature,
  93. PathPrefix: prefix,
  94. AdditionalPathPrefixes: prefixes,
  95. DirectoriesToWatch: nil,
  96. StartTsNs: lastTsNs,
  97. StopTsNs: 0,
  98. EventErrorType: pb.FatalOnError,
  99. }
  100. util.RetryUntil("followMetaUpdates", func() error {
  101. metadataFollowOption.ClientEpoch++
  102. return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, mergeProcessors(processEventFn, followers...))
  103. }, func(err error) bool {
  104. glog.Errorf("follow metadata updates: %v", err)
  105. return true
  106. })
  107. return nil
  108. }