filer_conf.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package mount
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "path/filepath"
  11. )
  12. func (wfs *WFS) subscribeFilerConfEvents() (*meta_cache.MetadataFollower, error) {
  13. confDir := filer.DirectoryEtcSeaweedFS
  14. confName := filer.FilerConfName
  15. confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
  16. // read current conf
  17. err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  18. content, err := filer.ReadInsideFiler(client, confDir, confName)
  19. if err != nil {
  20. return err
  21. }
  22. fc := filer.NewFilerConf()
  23. if len(content) > 0 {
  24. if err := fc.LoadFromBytes(content); err != nil {
  25. return fmt.Errorf("parse %s: %v", confFullName, err)
  26. }
  27. }
  28. wfs.FilerConf = fc
  29. return nil
  30. })
  31. if err != nil {
  32. if errors.Is(err, filer_pb.ErrNotFound) {
  33. glog.V(0).Infof("fuse filer conf %s not found", confFullName)
  34. } else {
  35. return nil, err
  36. }
  37. }
  38. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  39. message := resp.EventNotification
  40. if message.NewEntry == nil {
  41. return nil
  42. }
  43. dir := resp.Directory
  44. name := resp.EventNotification.NewEntry.Name
  45. if dir != confDir || name != confName {
  46. return nil
  47. }
  48. content := message.NewEntry.Content
  49. fc := filer.NewFilerConf()
  50. if len(content) > 0 {
  51. if err = fc.LoadFromBytes(content); err != nil {
  52. return fmt.Errorf("parse %s: %v", confFullName, err)
  53. }
  54. }
  55. wfs.FilerConf = fc
  56. return nil
  57. }
  58. return &meta_cache.MetadataFollower{
  59. PathPrefixToWatch: confFullName,
  60. ProcessEventFn: processEventFn,
  61. }, nil
  62. }
  63. func (wfs *WFS) wormEnabledForEntry(path util.FullPath, entry *filer_pb.Entry) bool {
  64. if entry == nil || entry.Attributes == nil {
  65. return false
  66. }
  67. if wfs.FilerConf == nil {
  68. return false
  69. }
  70. rule := wfs.FilerConf.MatchStorageRule(string(path))
  71. if !rule.Worm {
  72. return false
  73. }
  74. return entry.Attributes.FileSize > 0 || entry.Attributes.Crtime != entry.Attributes.Mtime
  75. }