Browse Source

mount: map uid/gid between local and filer

Chris Lu 4 years ago
parent
commit
7e1aad0b54

+ 4 - 0
weed/command/mount.go

@@ -20,6 +20,8 @@ type MountOptions struct {
 	umaskString                 *string
 	nonempty                    *bool
 	outsideContainerClusterMode *bool
+	uidMap                      *string
+	gidMap                      *string
 }
 
 var (
@@ -47,6 +49,8 @@ func init() {
 	mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file")
 	mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file")
 	mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system")
+	mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated <local_uid>:<filer_uid>")
+	mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated <local_gid>:<filer_gid>")
 }
 
 var cmdMount = &Command{

+ 9 - 0
weed/command/mount_std.go

@@ -5,6 +5,7 @@ package command
 import (
 	"context"
 	"fmt"
+	"github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
 	"os"
 	"os/user"
 	"path"
@@ -115,6 +116,13 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
 		}
 	}
 
+	// mapping uid, gid
+	uidGidMapper, err := meta_cache.NewUidGidMapper(*option.uidMap, *option.gidMap)
+	if err != nil {
+		fmt.Printf("failed to parse %s %s: %v\n", *option.uidMap, *option.gidMap, err)
+		return false
+	}
+
 	// Ensure target mount point availability
 	if isValid := checkMountPointAvailable(dir); !isValid {
 		glog.Fatalf("Expected mount to still be active, target mount point: %s, please check!", dir)
@@ -174,6 +182,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
 		Umask:                       umask,
 		OutsideContainerClusterMode: *mountOptions.outsideContainerClusterMode,
 		Cipher:                      cipher,
+		UidGidMapper:                uidGidMapper,
 	})
 
 	// mount

+ 10 - 0
weed/filesys/dir.go

@@ -148,6 +148,10 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
 	glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags)
 
 	if err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+		dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
+		defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
+
 		if err := filer_pb.CreateEntry(client, request); err != nil {
 			if strings.Contains(err.Error(), "EEXIST") {
 				return fuse.EEXIST
@@ -193,6 +197,9 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
 
 	err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
 
+		dir.wfs.mapPbIdFromLocalToFiler(newEntry)
+		defer dir.wfs.mapPbIdFromFilerToLocal(newEntry)
+
 		request := &filer_pb.CreateEntryRequest{
 			Directory:  dir.FullPath(),
 			Entry:      newEntry,
@@ -458,6 +465,9 @@ func (dir *Dir) saveEntry() error {
 
 	return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
 
+		dir.wfs.mapPbIdFromLocalToFiler(dir.entry)
+		defer dir.wfs.mapPbIdFromFilerToLocal(dir.entry)
+
 		request := &filer_pb.UpdateEntryRequest{
 			Directory:  parentDir,
 			Entry:      dir.entry,

+ 4 - 0
weed/filesys/dir_link.go

@@ -38,6 +38,10 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
 	}
 
 	err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+		dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
+		defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
+
 		if err := filer_pb.CreateEntry(client, request); err != nil {
 			glog.V(0).Infof("symlink %s/%s: %v", dir.FullPath(), req.NewName, err)
 			return fuse.EIO

+ 3 - 0
weed/filesys/file.go

@@ -292,6 +292,9 @@ func (file *File) setEntry(entry *filer_pb.Entry) {
 func (file *File) saveEntry() error {
 	return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
 
+		file.wfs.mapPbIdFromLocalToFiler(file.entry)
+		defer file.wfs.mapPbIdFromFilerToLocal(file.entry)
+
 		request := &filer_pb.UpdateEntryRequest{
 			Directory:  file.dir.FullPath(),
 			Entry:      file.entry,

+ 3 - 0
weed/filesys/filehandle.go

@@ -265,6 +265,9 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
 		fh.f.entry.Chunks = append(chunks, manifestChunks...)
 		fh.f.entryViewCache = nil
 
+		fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
+		defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)
+
 		if err := filer_pb.CreateEntry(client, request); err != nil {
 			glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)
 			return fmt.Errorf("fh flush create %s: %v", fh.f.fullpath(), err)

+ 101 - 0
weed/filesys/meta_cache/id_mapper.go

@@ -0,0 +1,101 @@
+package meta_cache
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+)
+
+type UidGidMapper struct {
+	uidMapper *IdMapper
+	gidMapper *IdMapper
+}
+
+type IdMapper struct {
+	localToFiler map[uint32]uint32
+	filerToLocal map[uint32]uint32
+}
+
+// UidGidMapper translates local uid/gid to filer uid/gid
+// The local storage always persists the same as the filer.
+// The local->filer translation happens when updating the filer first and later saving to meta_cache.
+// And filer->local happens when reading from the meta_cache.
+func NewUidGidMapper(uidPairsStr, gidPairStr string) (*UidGidMapper, error) {
+	uidMapper, err := newIdMapper(uidPairsStr)
+	if err != nil {
+		return nil, err
+	}
+	gidMapper, err := newIdMapper(gidPairStr)
+	if err != nil {
+		return nil, err
+	}
+
+	return &UidGidMapper{
+		uidMapper: uidMapper,
+		gidMapper: gidMapper,
+	}, nil
+}
+
+func (m *UidGidMapper) LocalToFiler(uid, gid uint32) (uint32,uint32) {
+	return m.uidMapper.LocalToFiler(uid), m.gidMapper.LocalToFiler(gid)
+}
+func (m *UidGidMapper) FilerToLocal(uid, gid uint32) (uint32,uint32) {
+	return m.uidMapper.FilerToLocal(uid), m.gidMapper.FilerToLocal(gid)
+}
+
+func (m *IdMapper) LocalToFiler(id uint32) (uint32) {
+	value, found := m.localToFiler[id]
+	if found {
+		return value
+	}
+	return id
+}
+func (m *IdMapper) FilerToLocal(id uint32) (uint32) {
+	value, found := m.filerToLocal[id]
+	if found {
+		return value
+	}
+	return id
+}
+
+func newIdMapper(pairsStr string) (*IdMapper, error) {
+
+	localToFiler, filerToLocal, err := parseUint32Pairs(pairsStr)
+	if err != nil {
+		return nil, err
+	}
+
+	return &IdMapper{
+		localToFiler: localToFiler,
+		filerToLocal: filerToLocal,
+	}, nil
+
+}
+
+func parseUint32Pairs(pairsStr string) (localToFiler, filerToLocal map[uint32]uint32, err error) {
+
+	if pairsStr == "" {
+		return
+	}
+
+	localToFiler = make(map[uint32]uint32)
+	filerToLocal = make(map[uint32]uint32)
+	for _, pairStr := range strings.Split(pairsStr, ",") {
+		pair := strings.Split(pairStr, ":")
+		localUidStr, filerUidStr := pair[0], pair[1]
+		localUid, localUidErr := strconv.Atoi(localUidStr)
+		if localUidErr != nil {
+			err = fmt.Errorf("failed to parse local %d: %v", localUidStr, localUidErr)
+			return
+		}
+		filerUid, filerUidErr := strconv.Atoi(filerUidStr)
+		if filerUidErr != nil {
+			err = fmt.Errorf("failed to parse remote %s: %v", filerUidStr, filerUidErr)
+			return
+		}
+		localToFiler[uint32(localUid)] = uint32(filerUid)
+		filerToLocal[uint32(filerUid)] = uint32(localUid)
+	}
+
+	return
+}

+ 10 - 2
weed/filesys/meta_cache/meta_cache.go

@@ -20,12 +20,14 @@ type MetaCache struct {
 	actualStore filer.FilerStore
 	sync.RWMutex
 	visitedBoundary *bounded_tree.BoundedTree
+	uidGidMapper    *UidGidMapper
 }
 
-func NewMetaCache(dbFolder string) *MetaCache {
+func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper) *MetaCache {
 	return &MetaCache{
 		actualStore:     openMetaStore(dbFolder),
 		visitedBoundary: bounded_tree.NewBoundedTree(),
+		uidGidMapper:    uidGidMapper,
 	}
 }
 
@@ -58,7 +60,7 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro
 	return mc.actualStore.InsertEntry(ctx, entry)
 }
 
-func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
+func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
 	mc.Lock()
 	defer mc.Unlock()
 
@@ -103,6 +105,7 @@ func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *fi
 	if err != nil {
 		return nil, err
 	}
+	mc.mapIdFromFilerToLocal(entry)
 	filer_pb.AfterEntryDeserialization(entry.Chunks)
 	return
 }
@@ -122,6 +125,7 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
 		return nil, err
 	}
 	for _, entry := range entries {
+		mc.mapIdFromFilerToLocal(entry)
 		filer_pb.AfterEntryDeserialization(entry.Chunks)
 	}
 	return entries, err
@@ -132,3 +136,7 @@ func (mc *MetaCache) Shutdown() {
 	defer mc.Unlock()
 	mc.actualStore.Shutdown()
 }
+
+func (mc *MetaCache) mapIdFromFilerToLocal(entry *filer.Entry) {
+	entry.Attr.Uid, entry.Attr.Gid = mc.uidGidMapper.FilerToLocal(entry.Attr.Uid, entry.Attr.Gid)
+}

+ 1 - 1
weed/filesys/meta_cache/meta_cache_subscribe.go

@@ -39,7 +39,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
 			glog.V(4).Infof("creating %v", key)
 			newEntry = filer.FromPbEntry(dir, message.NewEntry)
 		}
-		return mc.AtomicUpdateEntry(context.Background(), oldPath, newEntry)
+		return mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
 	}
 
 	for {

+ 9 - 2
weed/filesys/wfs.go

@@ -45,7 +45,7 @@ type Option struct {
 
 	OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
 	Cipher                      bool // whether encrypt data on volume server
-
+	UidGidMapper                *meta_cache.UidGidMapper
 }
 
 var _ = fs.FS(&WFS{})
@@ -92,7 +92,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
 		wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB)
 	}
 
-	wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"))
+	wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), option.UidGidMapper)
 	startTime := time.Now()
 	go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
 	grace.OnInterrupt(func() {
@@ -206,3 +206,10 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.
 
 	return nil
 }
+
+func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
+	entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
+}
+func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
+	entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
+}