filer_conf.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package mount
  2. import (
  3. "errors"
  4. "fmt"
  5. "path/filepath"
  6. "time"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. )
  13. func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error) {
  14. confDir := filer.DirectoryEtcSeaweedFS
  15. confName := filer.FilerConfName
  16. confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
  17. // read current conf
  18. err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  19. content, err := filer.ReadInsideFiler(client, confDir, confName)
  20. if err != nil {
  21. return err
  22. }
  23. fc := filer.NewFilerConf()
  24. if len(content) > 0 {
  25. if err := fc.LoadFromBytes(content); err != nil {
  26. return fmt.Errorf("parse %s: %v", confFullName, err)
  27. }
  28. }
  29. wfs.FilerConf = fc
  30. return nil
  31. })
  32. if err != nil {
  33. if errors.Is(err, filer_pb.ErrNotFound) {
  34. glog.V(0).Infof("fuse filer conf %s not found", confFullName)
  35. } else {
  36. return nil, err
  37. }
  38. }
  39. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  40. message := resp.EventNotification
  41. if message.NewEntry == nil {
  42. return nil
  43. }
  44. dir := resp.Directory
  45. name := resp.EventNotification.NewEntry.Name
  46. if dir != confDir || name != confName {
  47. return nil
  48. }
  49. content := message.NewEntry.Content
  50. fc := filer.NewFilerConf()
  51. if len(content) > 0 {
  52. if err = fc.LoadFromBytes(content); err != nil {
  53. return fmt.Errorf("parse %s: %v", confFullName, err)
  54. }
  55. }
  56. wfs.FilerConf = fc
  57. return nil
  58. }
  59. return &meta_cache.MetadataFollower{
  60. PathPrefixToWatch: confFullName,
  61. ProcessEventFn: processEventFn,
  62. }, nil
  63. }
  64. func (wfs *WFS) wormEnforcedForEntry(path util.FullPath, entry *filer_pb.Entry) (wormEnforced, wormEnabled bool) {
  65. if entry == nil || wfs.FilerConf == nil {
  66. return false, false
  67. }
  68. rule := wfs.FilerConf.MatchStorageRule(string(path))
  69. if !rule.Worm {
  70. return false, false
  71. }
  72. // worm is not enforced
  73. if entry.WormEnforcedAtTsNs == 0 {
  74. return false, true
  75. }
  76. // worm will never expire
  77. if rule.WormRetentionTimeSeconds == 0 {
  78. return true, true
  79. }
  80. enforcedAt := time.Unix(0, entry.WormEnforcedAtTsNs)
  81. // worm is expired
  82. if time.Now().Sub(enforcedAt).Seconds() >= float64(rule.WormRetentionTimeSeconds) {
  83. return false, true
  84. }
  85. return true, true
  86. }