weedfs.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package mount
  2. import (
  3. "context"
  4. "math/rand"
  5. "os"
  6. "path"
  7. "path/filepath"
  8. "sync/atomic"
  9. "time"
  10. "github.com/hanwen/go-fuse/v2/fuse"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/filer"
  13. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  20. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  21. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  22. "github.com/hanwen/go-fuse/v2/fs"
  23. )
  24. type Option struct {
  25. filerIndex int32 // align memory for atomic read/write
  26. FilerAddresses []pb.ServerAddress
  27. MountDirectory string
  28. GrpcDialOption grpc.DialOption
  29. FilerMountRootPath string
  30. Collection string
  31. Replication string
  32. TtlSec int32
  33. DiskType types.DiskType
  34. ChunkSizeLimit int64
  35. ConcurrentWriters int
  36. CacheDir string
  37. CacheSizeMB int64
  38. DataCenter string
  39. Umask os.FileMode
  40. Quota int64
  41. DisableXAttr bool
  42. MountUid uint32
  43. MountGid uint32
  44. MountMode os.FileMode
  45. MountCtime time.Time
  46. MountMtime time.Time
  47. MountParentInode uint64
  48. VolumeServerAccess string // how to access volume servers
  49. Cipher bool // whether encrypt data on volume server
  50. UidGidMapper *meta_cache.UidGidMapper
  51. uniqueCacheDir string
  52. uniqueCacheTempPageDir string
  53. }
  54. type WFS struct {
  55. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  56. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  57. fuse.RawFileSystem
  58. mount_pb.UnimplementedSeaweedMountServer
  59. fs.Inode
  60. option *Option
  61. metaCache *meta_cache.MetaCache
  62. stats statsCache
  63. chunkCache *chunk_cache.TieredChunkCache
  64. signature int32
  65. concurrentWriters *util.LimitedConcurrentExecutor
  66. inodeToPath *InodeToPath
  67. fhmap *FileHandleToInode
  68. dhmap *DirectoryHandleToInode
  69. fuseServer *fuse.Server
  70. IsOverQuota bool
  71. }
  72. func NewSeaweedFileSystem(option *Option) *WFS {
  73. wfs := &WFS{
  74. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  75. option: option,
  76. signature: util.RandomInt32(),
  77. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  78. fhmap: NewFileHandleToInode(),
  79. dhmap: NewDirectoryHandleToInode(),
  80. }
  81. wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
  82. wfs.option.setupUniqueCacheDirectory()
  83. if option.CacheSizeMB > 0 {
  84. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  85. }
  86. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper,
  87. util.FullPath(option.FilerMountRootPath),
  88. func(path util.FullPath) {
  89. wfs.inodeToPath.MarkChildrenCached(path)
  90. }, func(path util.FullPath) bool {
  91. return wfs.inodeToPath.IsChildrenCached(path)
  92. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  93. })
  94. grace.OnInterrupt(func() {
  95. wfs.metaCache.Shutdown()
  96. os.RemoveAll(option.getUniqueCacheDir())
  97. })
  98. if wfs.option.ConcurrentWriters > 0 {
  99. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  100. }
  101. return wfs
  102. }
  103. func (wfs *WFS) StartBackgroundTasks() {
  104. startTime := time.Now()
  105. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  106. go wfs.loopCheckQuota()
  107. }
  108. func (wfs *WFS) String() string {
  109. return "seaweedfs"
  110. }
  111. func (wfs *WFS) Init(server *fuse.Server) {
  112. wfs.fuseServer = server
  113. }
  114. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  115. path, status = wfs.inodeToPath.GetPath(inode)
  116. if status != fuse.OK {
  117. return
  118. }
  119. var found bool
  120. if fh, found = wfs.fhmap.FindFileHandle(inode); found {
  121. entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
  122. if entry != nil && fh.entry.Attributes == nil {
  123. entry.Attributes = &filer_pb.FuseAttributes{}
  124. }
  125. })
  126. } else {
  127. entry, status = wfs.maybeLoadEntry(path)
  128. }
  129. return
  130. }
  131. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  132. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  133. dir, name := fullpath.DirAndName()
  134. // return a valid entry for the mount root
  135. if string(fullpath) == wfs.option.FilerMountRootPath {
  136. return &filer_pb.Entry{
  137. Name: name,
  138. IsDirectory: true,
  139. Attributes: &filer_pb.FuseAttributes{
  140. Mtime: wfs.option.MountMtime.Unix(),
  141. FileMode: uint32(wfs.option.MountMode),
  142. Uid: wfs.option.MountUid,
  143. Gid: wfs.option.MountGid,
  144. Crtime: wfs.option.MountCtime.Unix(),
  145. },
  146. }, fuse.OK
  147. }
  148. // read from async meta cache
  149. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  150. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  151. if cacheErr == filer_pb.ErrNotFound {
  152. return nil, fuse.ENOENT
  153. }
  154. return cachedEntry.ToProtoEntry(), fuse.OK
  155. }
  156. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  157. if wfs.option.VolumeServerAccess == "filerProxy" {
  158. return func(fileId string) (targetUrls []string, err error) {
  159. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  160. }
  161. }
  162. return filer.LookupFn(wfs)
  163. }
  164. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  165. i := atomic.LoadInt32(&wfs.option.filerIndex)
  166. return wfs.option.FilerAddresses[i]
  167. }
  168. func (option *Option) setupUniqueCacheDirectory() {
  169. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  170. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  171. option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "swap")
  172. os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
  173. }
  174. func (option *Option) getTempFilePageDir() string {
  175. return option.uniqueCacheTempPageDir
  176. }
  177. func (option *Option) getUniqueCacheDir() string {
  178. return option.uniqueCacheDir
  179. }