filer_conf.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package mount
  2. import (
  3. "fmt"
  4. "path/filepath"
  5. "sync/atomic"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. )
  13. func (wfs *WFS) subscribeFilerConfEvents() (func(), error) {
  14. now := time.Now()
  15. confDir := filer.DirectoryEtcSeaweedFS
  16. confName := filer.FilerConfName
  17. confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
  18. // read current conf
  19. err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  20. content, err := filer.ReadInsideFiler(client, confDir, confName)
  21. if err != nil {
  22. return err
  23. }
  24. fc := filer.NewFilerConf()
  25. if len(content) > 0 {
  26. if err := fc.LoadFromBytes(content); err != nil {
  27. return fmt.Errorf("parse %s: %v", confFullName, err)
  28. }
  29. }
  30. wfs.FilerConf = fc
  31. return nil
  32. })
  33. if err != nil {
  34. return nil, err
  35. }
  36. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  37. message := resp.EventNotification
  38. if message.NewEntry == nil {
  39. return nil
  40. }
  41. dir := resp.Directory
  42. name := resp.EventNotification.NewEntry.Name
  43. if dir != confDir || name != confName {
  44. return nil
  45. }
  46. content := message.NewEntry.Content
  47. fc := filer.NewFilerConf()
  48. if len(content) > 0 {
  49. if err = fc.LoadFromBytes(content); err != nil {
  50. return fmt.Errorf("parse %s: %v", confFullName, err)
  51. }
  52. }
  53. wfs.FilerConf = fc
  54. return nil
  55. }
  56. metadataFollowOption := &pb.MetadataFollowOption{
  57. ClientName: "fuse",
  58. ClientId: wfs.signature,
  59. ClientEpoch: 1,
  60. SelfSignature: 0,
  61. PathPrefix: confFullName,
  62. AdditionalPathPrefixes: nil,
  63. StartTsNs: now.UnixNano(),
  64. StopTsNs: 0,
  65. EventErrorType: pb.FatalOnError,
  66. }
  67. return func() {
  68. // sync new conf changes
  69. util.RetryUntil("followFilerConfChanges", func() error {
  70. metadataFollowOption.ClientEpoch++
  71. i := atomic.LoadInt32(&wfs.option.filerIndex)
  72. n := len(wfs.option.FilerAddresses)
  73. err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn)
  74. if err == nil {
  75. atomic.StoreInt32(&wfs.option.filerIndex, i)
  76. return nil
  77. }
  78. i++
  79. if i >= int32(n) {
  80. i = 0
  81. }
  82. return err
  83. }, func(err error) bool {
  84. glog.V(0).Infof("fuse follow filer conf changes: %v", err)
  85. return true
  86. })
  87. }, nil
  88. }
  89. func (wfs *WFS) wormEnabledForEntry(path util.FullPath, entry *filer_pb.Entry) bool {
  90. if entry == nil || entry.Attributes == nil {
  91. return false
  92. }
  93. if wfs.FilerConf == nil {
  94. return false
  95. }
  96. rule := wfs.FilerConf.MatchStorageRule(string(path))
  97. if !rule.Worm {
  98. return false
  99. }
  100. return entry.Attributes.FileSize > 0 || entry.Attributes.Crtime != entry.Attributes.Mtime
  101. }