Browse Source

FUSE mount: lazy loading meta cache

Chris Lu 4 years ago
parent
commit
f7a45d448f

+ 7 - 2
weed/filesys/dir.go

@@ -8,6 +8,7 @@ import (
 	"time"
 
 	"github.com/chrislusf/seaweedfs/weed/filer2"
+	"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
 	"github.com/chrislusf/seaweedfs/weed/glog"
 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
 	"github.com/chrislusf/seaweedfs/weed/util"
@@ -204,8 +205,10 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
 
 	glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String())
 
-	fullFilePath := util.NewFullPath(dir.FullPath(), req.Name)
+	dirPath := dir.FullPath()
+	fullFilePath := util.NewFullPath(dirPath, req.Name)
 
+	meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath))
 	cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath)
 	if cacheErr == filer_pb.ErrNotFound {
 		return nil, fuse.ENOENT
@@ -263,7 +266,9 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
 		return nil
 	}
 
-	listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(dir.wfs.option.DirListCacheLimit))
+	dirPath := util.FullPath(dir.FullPath())
+	meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
+	listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, "", false, int(dir.wfs.option.DirListCacheLimit))
 	if listErr != nil {
 		glog.Errorf("list meta cache: %v", listErr)
 		return nil, fuse.EIO

+ 19 - 6
weed/filesys/meta_cache/meta_cache.go

@@ -9,16 +9,19 @@ import (
 	"github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
 	"github.com/chrislusf/seaweedfs/weed/glog"
 	"github.com/chrislusf/seaweedfs/weed/util"
+	"github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
 )
 
 type MetaCache struct {
 	actualStore filer2.FilerStore
 	sync.RWMutex
+	visitedBoundary *bounded_tree.BoundedTree
 }
 
 func NewMetaCache(dbFolder string) *MetaCache {
 	return &MetaCache{
-		actualStore: openMetaStore(dbFolder),
+		actualStore:     openMetaStore(dbFolder),
+		visitedBoundary: bounded_tree.NewBoundedTree(),
 	}
 }
 
@@ -49,14 +52,24 @@ func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error
 func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer2.Entry) error {
 	mc.Lock()
 	defer mc.Unlock()
-	if oldPath != "" {
-		if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil {
-			return err
+
+	oldDir, _ := oldPath.DirAndName()
+	if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) {
+		if oldPath != "" {
+			if err := mc.actualStore.DeleteEntry(ctx, oldPath); err != nil {
+				return err
+			}
 		}
+	}else{
+		// println("unknown old directory:", oldDir)
 	}
+
 	if newEntry != nil {
-		if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil {
-			return err
+		newDir, _ := newEntry.DirAndName()
+		if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
+			if err := mc.actualStore.InsertEntry(ctx, newEntry); err != nil {
+				return err
+			}
 		}
 	}
 	return nil

+ 26 - 0
weed/filesys/meta_cache/meta_cache_init.go

@@ -2,6 +2,7 @@ package meta_cache
 
 import (
 	"context"
+	"fmt"
 
 	"github.com/chrislusf/seaweedfs/weed/filer2"
 	"github.com/chrislusf/seaweedfs/weed/glog"
@@ -10,6 +11,7 @@ import (
 )
 
 func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) error {
+	return nil
 	glog.V(0).Infof("synchronizing meta data ...")
 	filer_pb.TraverseBfs(client, util.FullPath(path), func(parentPath util.FullPath, pbEntry *filer_pb.Entry) {
 		entry := filer2.FromPbEntry(string(parentPath), pbEntry)
@@ -19,3 +21,27 @@ func InitMetaCache(mc *MetaCache, client filer_pb.FilerClient, path string) erro
 	})
 	return nil
 }
+
+func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) {
+
+	mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) {
+
+		glog.V(2).Infof("ReadDirAllEntries %s ...", path)
+
+		err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
+			entry := filer2.FromPbEntry(string(dirPath), pbEntry)
+			if err := mc.InsertEntry(context.Background(), entry); err != nil {
+				glog.V(0).Infof("read %s: %v", entry.FullPath, err)
+				return err
+			}
+			if entry.IsDirectory() {
+				childDirectories = append(childDirectories, entry.Name())
+			}
+			return nil
+		})
+		if err != nil {
+			err = fmt.Errorf("list %s: %v", dirPath, err)
+		}
+		return
+	})
+}

+ 2 - 0
weed/filesys/xattr.go

@@ -5,6 +5,7 @@ import (
 
 	"github.com/seaweedfs/fuse"
 
+	"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
 	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
 	"github.com/chrislusf/seaweedfs/weed/util"
 )
@@ -113,6 +114,7 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err
 	// glog.V(3).Infof("read entry cache miss %s", fullpath)
 
 	// read from async meta cache
+	meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
 	cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
 	if cacheErr == filer_pb.ErrNotFound {
 		return nil, fuse.ENOENT