meta_cache_subscribe.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package meta_cache
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "strings"
  10. )
  11. type MetadataFollower struct {
  12. PathPrefixToWatch string
  13. ProcessEventFn func(resp *filer_pb.SubscribeMetadataResponse) error
  14. }
  15. func mergeProceesors(mainProcessor func(resp *filer_pb.SubscribeMetadataResponse) error, followers ...*MetadataFollower) func(resp *filer_pb.SubscribeMetadataResponse) error {
  16. return func(resp *filer_pb.SubscribeMetadataResponse) error {
  17. // build the full path
  18. entry := resp.EventNotification.NewEntry
  19. if entry == nil {
  20. entry = resp.EventNotification.OldEntry
  21. }
  22. dir := resp.Directory
  23. if resp.EventNotification.NewParentPath != "" {
  24. dir = resp.EventNotification.NewParentPath
  25. }
  26. fp := util.NewFullPath(dir, entry.Name)
  27. for _, follower := range followers {
  28. if strings.HasPrefix(string(fp), follower.PathPrefixToWatch) {
  29. if err := follower.ProcessEventFn(resp); err != nil {
  30. return err
  31. }
  32. }
  33. }
  34. return mainProcessor(resp)
  35. }
  36. }
  37. func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64, followers ...*MetadataFollower) error {
  38. var prefixes []string
  39. for _, follower := range followers {
  40. prefixes = append(prefixes, follower.PathPrefixToWatch)
  41. }
  42. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  43. message := resp.EventNotification
  44. for _, sig := range message.Signatures {
  45. if sig == selfSignature && selfSignature != 0 {
  46. return nil
  47. }
  48. }
  49. dir := resp.Directory
  50. var oldPath util.FullPath
  51. var newEntry *filer.Entry
  52. if message.OldEntry != nil {
  53. oldPath = util.NewFullPath(dir, message.OldEntry.Name)
  54. glog.V(4).Infof("deleting %v", oldPath)
  55. }
  56. if message.NewEntry != nil {
  57. if message.NewParentPath != "" {
  58. dir = message.NewParentPath
  59. }
  60. key := util.NewFullPath(dir, message.NewEntry.Name)
  61. glog.V(4).Infof("creating %v", key)
  62. newEntry = filer.FromPbEntry(dir, message.NewEntry)
  63. }
  64. err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
  65. if err == nil {
  66. if message.OldEntry != nil && message.NewEntry != nil {
  67. oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
  68. mc.invalidateFunc(oldKey, message.OldEntry)
  69. if message.OldEntry.Name != message.NewEntry.Name {
  70. newKey := util.NewFullPath(dir, message.NewEntry.Name)
  71. mc.invalidateFunc(newKey, message.NewEntry)
  72. }
  73. } else if filer_pb.IsCreate(resp) {
  74. // no need to invalidate
  75. } else if filer_pb.IsDelete(resp) {
  76. oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
  77. mc.invalidateFunc(oldKey, message.OldEntry)
  78. }
  79. }
  80. return err
  81. }
  82. prefix := dir
  83. if !strings.HasSuffix(prefix, "/") {
  84. prefix = prefix + "/"
  85. }
  86. metadataFollowOption := &pb.MetadataFollowOption{
  87. ClientName: "mount",
  88. ClientId: selfSignature,
  89. ClientEpoch: 1,
  90. SelfSignature: selfSignature,
  91. PathPrefix: prefix,
  92. AdditionalPathPrefixes: prefixes,
  93. DirectoriesToWatch: nil,
  94. StartTsNs: lastTsNs,
  95. StopTsNs: 0,
  96. EventErrorType: pb.FatalOnError,
  97. }
  98. util.RetryUntil("followMetaUpdates", func() error {
  99. metadataFollowOption.ClientEpoch++
  100. return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, mergeProceesors(processEventFn, followers...))
  101. }, func(err error) bool {
  102. glog.Errorf("follow metadata updates: %v", err)
  103. return true
  104. })
  105. return nil
  106. }