123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- package mount
- import (
- "errors"
- "fmt"
- "path/filepath"
- "sync/atomic"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- )
- func (wfs *WFS) subscribeFilerConfEvents() (func(), error) {
- now := time.Now()
- confDir := filer.DirectoryEtcSeaweedFS
- confName := filer.FilerConfName
- confFullName := filepath.Join(filer.DirectoryEtcSeaweedFS, filer.FilerConfName)
- // read current conf
- err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- content, err := filer.ReadInsideFiler(client, confDir, confName)
- if err != nil {
- return err
- }
- fc := filer.NewFilerConf()
- if len(content) > 0 {
- if err := fc.LoadFromBytes(content); err != nil {
- return fmt.Errorf("parse %s: %v", confFullName, err)
- }
- }
- wfs.FilerConf = fc
- return nil
- })
- if err != nil {
- if errors.Is(err, filer_pb.ErrNotFound) {
- glog.V(0).Infof("fuse filer conf %s not found", confFullName)
- } else {
- return nil, err
- }
- }
- processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
- message := resp.EventNotification
- if message.NewEntry == nil {
- return nil
- }
- dir := resp.Directory
- name := resp.EventNotification.NewEntry.Name
- if dir != confDir || name != confName {
- return nil
- }
- content := message.NewEntry.Content
- fc := filer.NewFilerConf()
- if len(content) > 0 {
- if err = fc.LoadFromBytes(content); err != nil {
- return fmt.Errorf("parse %s: %v", confFullName, err)
- }
- }
- wfs.FilerConf = fc
- return nil
- }
- metadataFollowOption := &pb.MetadataFollowOption{
- ClientName: "fuse",
- ClientId: wfs.signature,
- ClientEpoch: 1,
- SelfSignature: 0,
- PathPrefix: confFullName,
- AdditionalPathPrefixes: nil,
- StartTsNs: now.UnixNano(),
- StopTsNs: 0,
- EventErrorType: pb.FatalOnError,
- }
- return func() {
- // sync new conf changes
- util.RetryUntil("followFilerConfChanges", func() error {
- metadataFollowOption.ClientEpoch++
- i := atomic.LoadInt32(&wfs.option.filerIndex)
- n := len(wfs.option.FilerAddresses)
- err = pb.FollowMetadata(wfs.option.FilerAddresses[i], wfs.option.GrpcDialOption, metadataFollowOption, processEventFn)
- if err == nil {
- atomic.StoreInt32(&wfs.option.filerIndex, i)
- return nil
- }
- i++
- if i >= int32(n) {
- i = 0
- }
- return err
- }, func(err error) bool {
- glog.V(0).Infof("fuse follow filer conf changes: %v", err)
- return true
- })
- }, nil
- }
- func (wfs *WFS) wormEnabledForEntry(path util.FullPath, entry *filer_pb.Entry) bool {
- if entry == nil || entry.Attributes == nil {
- return false
- }
- if wfs.FilerConf == nil {
- return false
- }
- rule := wfs.FilerConf.MatchStorageRule(string(path))
- if !rule.Worm {
- return false
- }
- return entry.Attributes.FileSize > 0 || entry.Attributes.Crtime != entry.Attributes.Mtime
- }
|