weedfs.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package mount
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage/types"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/wdclient"
  13. "github.com/hanwen/go-fuse/v2/fuse"
  14. "google.golang.org/grpc"
  15. "math/rand"
  16. "os"
  17. "path"
  18. "time"
  19. "github.com/hanwen/go-fuse/v2/fs"
  20. )
  21. type Option struct {
  22. MountDirectory string
  23. FilerAddresses []pb.ServerAddress
  24. filerIndex int
  25. GrpcDialOption grpc.DialOption
  26. FilerMountRootPath string
  27. Collection string
  28. Replication string
  29. TtlSec int32
  30. DiskType types.DiskType
  31. ChunkSizeLimit int64
  32. ConcurrentWriters int
  33. CacheDir string
  34. CacheSizeMB int64
  35. DataCenter string
  36. Umask os.FileMode
  37. MountUid uint32
  38. MountGid uint32
  39. MountMode os.FileMode
  40. MountCtime time.Time
  41. MountMtime time.Time
  42. MountParentInode uint64
  43. VolumeServerAccess string // how to access volume servers
  44. Cipher bool // whether encrypt data on volume server
  45. UidGidMapper *meta_cache.UidGidMapper
  46. uniqueCacheDir string
  47. }
  48. type WFS struct {
  49. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  50. fuse.RawFileSystem
  51. fs.Inode
  52. option *Option
  53. metaCache *meta_cache.MetaCache
  54. stats statsCache
  55. chunkCache *chunk_cache.TieredChunkCache
  56. signature int32
  57. concurrentWriters *util.LimitedConcurrentExecutor
  58. inodeToPath *InodeToPath
  59. fhmap *FileHandleToInode
  60. dhmap *DirectoryHandleToInode
  61. fuseServer *fuse.Server
  62. }
  63. func NewSeaweedFileSystem(option *Option) *WFS {
  64. wfs := &WFS{
  65. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  66. option: option,
  67. signature: util.RandomInt32(),
  68. inodeToPath: NewInodeToPath(),
  69. fhmap: NewFileHandleToInode(),
  70. dhmap: NewDirectoryHandleToInode(),
  71. }
  72. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  73. wfs.option.setupUniqueCacheDirectory()
  74. if option.CacheSizeMB > 0 {
  75. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  76. }
  77. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper, func(path util.FullPath) {
  78. wfs.inodeToPath.MarkChildrenCached(path)
  79. }, func(path util.FullPath) bool {
  80. return wfs.inodeToPath.IsChildrenCached(path)
  81. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  82. })
  83. grace.OnInterrupt(func() {
  84. wfs.metaCache.Shutdown()
  85. os.RemoveAll(option.getUniqueCacheDir())
  86. })
  87. if wfs.option.ConcurrentWriters > 0 {
  88. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  89. }
  90. return wfs
  91. }
  92. func (wfs *WFS) StartBackgroundTasks() {
  93. startTime := time.Now()
  94. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  95. }
  96. func (wfs *WFS) String() string {
  97. return "seaweedfs"
  98. }
  99. func (wfs *WFS) Init(server *fuse.Server) {
  100. wfs.fuseServer = server
  101. }
  102. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  103. path, status = wfs.inodeToPath.GetPath(inode)
  104. if status != fuse.OK {
  105. return
  106. }
  107. var found bool
  108. if fh, found = wfs.fhmap.FindFileHandle(inode); found {
  109. return path, fh, fh.entry, fuse.OK
  110. }
  111. entry, status = wfs.maybeLoadEntry(path)
  112. return
  113. }
  114. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  115. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  116. dir, name := fullpath.DirAndName()
  117. // return a valid entry for the mount root
  118. if string(fullpath) == wfs.option.FilerMountRootPath {
  119. return &filer_pb.Entry{
  120. Name: name,
  121. IsDirectory: true,
  122. Attributes: &filer_pb.FuseAttributes{
  123. Mtime: wfs.option.MountMtime.Unix(),
  124. FileMode: uint32(wfs.option.MountMode),
  125. Uid: wfs.option.MountUid,
  126. Gid: wfs.option.MountGid,
  127. Crtime: wfs.option.MountCtime.Unix(),
  128. },
  129. }, fuse.OK
  130. }
  131. // read from async meta cache
  132. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir), nil)
  133. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  134. if cacheErr == filer_pb.ErrNotFound {
  135. return nil, fuse.ENOENT
  136. }
  137. return cachedEntry.ToProtoEntry(), fuse.OK
  138. }
  139. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  140. if wfs.option.VolumeServerAccess == "filerProxy" {
  141. return func(fileId string) (targetUrls []string, err error) {
  142. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  143. }
  144. }
  145. return filer.LookupFn(wfs)
  146. }
  147. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  148. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  149. }
  150. func (option *Option) setupUniqueCacheDirectory() {
  151. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  152. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  153. os.MkdirAll(option.uniqueCacheDir, os.FileMode(0777)&^option.Umask)
  154. }
  155. func (option *Option) getUniqueCacheDir() string {
  156. return option.uniqueCacheDir
  157. }