filer_conf.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package mount
  2. import (
  3. "errors"
  4. "fmt"
  5. "path/filepath"
  6. "sync/atomic"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/filer"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. )
  14. func (wfs *WFS) subscribeFilerConfEvents() (func(), error) {
  15. now := time.Now()
  16. confDir := filer.DirectoryEtcSeaweedFS
  17. confName := filer.FilerConfName
  18. confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
  19. // read current conf
  20. err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  21. content, err := filer.ReadInsideFiler(client, confDir, confName)
  22. if err != nil {
  23. return err
  24. }
  25. fc := filer.NewFilerConf()
  26. if len(content) > 0 {
  27. if err := fc.LoadFromBytes(content); err != nil {
  28. return fmt.Errorf("parse %s: %v", confFullName, err)
  29. }
  30. }
  31. wfs.FilerConf = fc
  32. return nil
  33. })
  34. if err != nil {
  35. if errors.Is(err, filer_pb.ErrNotFound) {
  36. glog.V(0).Infof("fuse filer conf %s not found", confFullName)
  37. } else {
  38. return nil, err
  39. }
  40. }
  41. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  42. message := resp.EventNotification
  43. if message.NewEntry == nil {
  44. return nil
  45. }
  46. dir := resp.Directory
  47. name := resp.EventNotification.NewEntry.Name
  48. if dir != confDir || name != confName {
  49. return nil
  50. }
  51. content := message.NewEntry.Content
  52. fc := filer.NewFilerConf()
  53. if len(content) > 0 {
  54. if err = fc.LoadFromBytes(content); err != nil {
  55. return fmt.Errorf("parse %s: %v", confFullName, err)
  56. }
  57. }
  58. wfs.FilerConf = fc
  59. return nil
  60. }
  61. metadataFollowOption := &pb.MetadataFollowOption{
  62. ClientName: "fuse",
  63. ClientId: wfs.signature,
  64. ClientEpoch: 1,
  65. SelfSignature: 0,
  66. PathPrefix: confFullName,
  67. AdditionalPathPrefixes: nil,
  68. StartTsNs: now.UnixNano(),
  69. StopTsNs: 0,
  70. EventErrorType: pb.FatalOnError,
  71. }
  72. return func() {
  73. // sync new conf changes
  74. util.RetryUntil("followFilerConfChanges", func() error {
  75. metadataFollowOption.ClientEpoch++
  76. i := atomic.LoadInt32(&wfs.option.filerIndex)
  77. n := len(wfs.option.FilerAddresses)
  78. err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn)
  79. if err == nil {
  80. atomic.StoreInt32(&wfs.option.filerIndex, i)
  81. return nil
  82. }
  83. i++
  84. if i >= int32(n) {
  85. i = 0
  86. }
  87. return err
  88. }, func(err error) bool {
  89. glog.V(0).Infof("fuse follow filer conf changes: %v", err)
  90. return true
  91. })
  92. }, nil
  93. }
  94. func (wfs *WFS) wormEnabledForEntry(path util.FullPath, entry *filer_pb.Entry) bool {
  95. if entry == nil || entry.Attributes == nil {
  96. return false
  97. }
  98. if wfs.FilerConf == nil {
  99. return false
  100. }
  101. rule := wfs.FilerConf.MatchStorageRule(string(path))
  102. if !rule.Worm {
  103. return false
  104. }
  105. return entry.Attributes.FileSize > 0 || entry.Attributes.Crtime != entry.Attributes.Mtime
  106. }