meta_cache_subscribe.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package meta_cache
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "strings"
  10. )
  11. func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error {
  12. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  13. message := resp.EventNotification
  14. for _, sig := range message.Signatures {
  15. if sig == selfSignature && selfSignature != 0 {
  16. return nil
  17. }
  18. }
  19. dir := resp.Directory
  20. var oldPath util.FullPath
  21. var newEntry *filer.Entry
  22. if message.OldEntry != nil {
  23. oldPath = util.NewFullPath(dir, message.OldEntry.Name)
  24. glog.V(4).Infof("deleting %v", oldPath)
  25. }
  26. if message.NewEntry != nil {
  27. if message.NewParentPath != "" {
  28. dir = message.NewParentPath
  29. }
  30. key := util.NewFullPath(dir, message.NewEntry.Name)
  31. glog.V(4).Infof("creating %v", key)
  32. newEntry = filer.FromPbEntry(dir, message.NewEntry)
  33. }
  34. err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
  35. if err == nil {
  36. if message.OldEntry != nil && message.NewEntry != nil {
  37. oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
  38. mc.invalidateFunc(oldKey, message.OldEntry)
  39. if message.OldEntry.Name != message.NewEntry.Name {
  40. newKey := util.NewFullPath(dir, message.NewEntry.Name)
  41. mc.invalidateFunc(newKey, message.NewEntry)
  42. }
  43. } else if filer_pb.IsCreate(resp) {
  44. // no need to invalidate
  45. } else if filer_pb.IsDelete(resp) {
  46. oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
  47. mc.invalidateFunc(oldKey, message.OldEntry)
  48. }
  49. }
  50. return err
  51. }
  52. prefix := dir
  53. if !strings.HasSuffix(prefix, "/") {
  54. prefix = prefix + "/"
  55. }
  56. metadataFollowOption := &pb.MetadataFollowOption{
  57. ClientName: "mount",
  58. ClientId: selfSignature,
  59. ClientEpoch: 1,
  60. SelfSignature: selfSignature,
  61. PathPrefix: prefix,
  62. AdditionalPathPrefixes: nil,
  63. DirectoriesToWatch: nil,
  64. StartTsNs: lastTsNs,
  65. StopTsNs: 0,
  66. EventErrorType: pb.FatalOnError,
  67. }
  68. util.RetryUntil("followMetaUpdates", func() error {
  69. metadataFollowOption.ClientEpoch++
  70. return pb.WithFilerClientFollowMetadata(client, metadataFollowOption, processEventFn)
  71. }, func(err error) bool {
  72. glog.Errorf("follow metadata updates: %v", err)
  73. return true
  74. })
  75. return nil
  76. }