123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- package filesys
- import (
- "context"
- "fmt"
- "math"
- "os"
- "path"
- "sync"
- "time"
- "google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/util/grace"
- "github.com/seaweedfs/fuse"
- "github.com/seaweedfs/fuse/fs"
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
- "github.com/chrislusf/seaweedfs/weed/util/log"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
- )
- type Option struct {
- FilerGrpcAddress string
- GrpcDialOption grpc.DialOption
- FilerMountRootPath string
- Collection string
- Replication string
- TtlSec int32
- ChunkSizeLimit int64
- ConcurrentWriters int
- CacheDir string
- CacheSizeMB int64
- DataCenter string
- EntryCacheTtl time.Duration
- Umask os.FileMode
- MountUid uint32
- MountGid uint32
- MountMode os.FileMode
- MountCtime time.Time
- MountMtime time.Time
- OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
- Cipher bool // whether encrypt data on volume server
- UidGidMapper *meta_cache.UidGidMapper
- }
- var _ = fs.FS(&WFS{})
- var _ = fs.FSStatfser(&WFS{})
- type WFS struct {
- option *Option
- // contains all open handles, protected by handlesLock
- handlesLock sync.Mutex
- handles map[uint64]*FileHandle
- bufPool sync.Pool
- stats statsCache
- root fs.Node
- fsNodeCache *FsCache
- chunkCache *chunk_cache.TieredChunkCache
- metaCache *meta_cache.MetaCache
- signature int32
- // throttle writers
- concurrentWriters *util.LimitedConcurrentExecutor
- }
- type statsCache struct {
- filer_pb.StatisticsResponse
- lastChecked int64 // unix time in seconds
- }
- func NewSeaweedFileSystem(option *Option) *WFS {
- wfs := &WFS{
- option: option,
- handles: make(map[uint64]*FileHandle),
- bufPool: sync.Pool{
- New: func() interface{} {
- return make([]byte, option.ChunkSizeLimit)
- },
- },
- signature: util.RandomInt32(),
- }
- cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
- cacheDir := path.Join(option.CacheDir, cacheUniqueId)
- if option.CacheSizeMB > 0 {
- os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
- wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
- }
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
- fsNode := wfs.fsNodeCache.GetFsNode(filePath)
- if fsNode != nil {
- if file, ok := fsNode.(*File); ok {
- file.clearEntry()
- }
- }
- })
- startTime := time.Now()
- go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
- grace.OnInterrupt(func() {
- wfs.metaCache.Shutdown()
- })
- entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
- wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
- wfs.fsNodeCache = newFsCache(wfs.root)
- if wfs.option.ConcurrentWriters > 0 {
- wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
- }
- return wfs
- }
- func (wfs *WFS) Root() (fs.Node, error) {
- return wfs.root, nil
- }
- func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
- fullpath := file.fullpath()
- log.Tracef("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
- wfs.handlesLock.Lock()
- defer wfs.handlesLock.Unlock()
- inodeId := file.fullpath().AsInode()
- if file.isOpen > 0 {
- existingHandle, found := wfs.handles[inodeId]
- if found && existingHandle != nil {
- file.isOpen++
- return existingHandle
- }
- }
- fileHandle = newFileHandle(file, uid, gid)
- file.maybeLoadEntry(context.Background())
- file.isOpen++
- wfs.handles[inodeId] = fileHandle
- fileHandle.handle = inodeId
- return
- }
- func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
- wfs.handlesLock.Lock()
- defer wfs.handlesLock.Unlock()
- log.Tracef("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
- delete(wfs.handles, fullpath.AsInode())
- return
- }
- // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
- func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
- log.Tracef("reading fs stats: %+v", req)
- if wfs.stats.lastChecked < time.Now().Unix()-20 {
- err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.StatisticsRequest{
- Collection: wfs.option.Collection,
- Replication: wfs.option.Replication,
- Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
- }
- log.Tracef("reading filer stats: %+v", request)
- resp, err := client.Statistics(context.Background(), request)
- if err != nil {
- log.Infof("reading filer stats %v: %v", request, err)
- return err
- }
- log.Tracef("read filer stats: %+v", resp)
- wfs.stats.TotalSize = resp.TotalSize
- wfs.stats.UsedSize = resp.UsedSize
- wfs.stats.FileCount = resp.FileCount
- wfs.stats.lastChecked = time.Now().Unix()
- return nil
- })
- if err != nil {
- log.Infof("filer Statistics: %v", err)
- return err
- }
- }
- totalDiskSize := wfs.stats.TotalSize
- usedDiskSize := wfs.stats.UsedSize
- actualFileCount := wfs.stats.FileCount
- // Compute the total number of available blocks
- resp.Blocks = totalDiskSize / blockSize
- // Compute the number of used blocks
- numBlocks := uint64(usedDiskSize / blockSize)
- // Report the number of free and available blocks for the block size
- resp.Bfree = resp.Blocks - numBlocks
- resp.Bavail = resp.Blocks - numBlocks
- resp.Bsize = uint32(blockSize)
- // Report the total number of possible files in the file system (and those free)
- resp.Files = math.MaxInt64
- resp.Ffree = math.MaxInt64 - actualFileCount
- // Report the maximum length of a name and the minimum fragment size
- resp.Namelen = 1024
- resp.Frsize = uint32(blockSize)
- return nil
- }
- func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
- if entry.Attributes == nil {
- return
- }
- entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
- }
- func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
- if entry.Attributes == nil {
- return
- }
- entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
- }
|