weedfs.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package mount
  2. import (
  3. "context"
  4. "math/rand"
  5. "os"
  6. "path"
  7. "path/filepath"
  8. "sync/atomic"
  9. "time"
  10. "github.com/hanwen/go-fuse/v2/fuse"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/filer"
  13. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  14. "github.com/seaweedfs/seaweedfs/weed/pb"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  18. "github.com/seaweedfs/seaweedfs/weed/util"
  19. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  20. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  21. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  22. "github.com/hanwen/go-fuse/v2/fs"
  23. )
  24. type Option struct {
  25. filerIndex int32 // align memory for atomic read/write
  26. FilerAddresses []pb.ServerAddress
  27. MountDirectory string
  28. GrpcDialOption grpc.DialOption
  29. FilerMountRootPath string
  30. Collection string
  31. Replication string
  32. TtlSec int32
  33. DiskType types.DiskType
  34. ChunkSizeLimit int64
  35. ConcurrentWriters int
  36. CacheDirForRead string
  37. CacheSizeMBForRead int64
  38. CacheDirForWrite string
  39. DataCenter string
  40. Umask os.FileMode
  41. Quota int64
  42. DisableXAttr bool
  43. MountUid uint32
  44. MountGid uint32
  45. MountMode os.FileMode
  46. MountCtime time.Time
  47. MountMtime time.Time
  48. MountParentInode uint64
  49. VolumeServerAccess string // how to access volume servers
  50. Cipher bool // whether encrypt data on volume server
  51. UidGidMapper *meta_cache.UidGidMapper
  52. uniqueCacheDirForRead string
  53. uniqueCacheDirForWrite string
  54. }
  55. type WFS struct {
  56. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  57. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  58. fuse.RawFileSystem
  59. mount_pb.UnimplementedSeaweedMountServer
  60. fs.Inode
  61. option *Option
  62. metaCache *meta_cache.MetaCache
  63. stats statsCache
  64. chunkCache *chunk_cache.TieredChunkCache
  65. signature int32
  66. concurrentWriters *util.LimitedConcurrentExecutor
  67. inodeToPath *InodeToPath
  68. fhmap *FileHandleToInode
  69. dhmap *DirectoryHandleToInode
  70. fuseServer *fuse.Server
  71. IsOverQuota bool
  72. fhLockTable *util.LockTable[FileHandleId]
  73. }
  74. func NewSeaweedFileSystem(option *Option) *WFS {
  75. wfs := &WFS{
  76. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  77. option: option,
  78. signature: util.RandomInt32(),
  79. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  80. fhmap: NewFileHandleToInode(),
  81. dhmap: NewDirectoryHandleToInode(),
  82. fhLockTable: util.NewLockTable[FileHandleId](),
  83. }
  84. wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
  85. wfs.option.setupUniqueCacheDirectory()
  86. if option.CacheSizeMBForRead > 0 {
  87. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
  88. }
  89. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
  90. util.FullPath(option.FilerMountRootPath),
  91. func(path util.FullPath) {
  92. wfs.inodeToPath.MarkChildrenCached(path)
  93. }, func(path util.FullPath) bool {
  94. return wfs.inodeToPath.IsChildrenCached(path)
  95. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  96. // Find inode if it is not a deleted path
  97. if inode, inode_found := wfs.inodeToPath.GetInode(filePath); inode_found {
  98. // Find open file handle
  99. if fh, fh_found := wfs.fhmap.FindFileHandle(inode); fh_found {
  100. fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
  101. defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
  102. // Recreate dirty pages
  103. fh.dirtyPages.Destroy()
  104. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  105. // Update handle entry
  106. newentry, status := wfs.maybeLoadEntry(filePath)
  107. if status == fuse.OK {
  108. if fh.GetEntry().GetEntry() != newentry {
  109. fh.SetEntry(newentry)
  110. }
  111. }
  112. }
  113. }
  114. })
  115. grace.OnInterrupt(func() {
  116. wfs.metaCache.Shutdown()
  117. os.RemoveAll(option.getUniqueCacheDirForWrite())
  118. os.RemoveAll(option.getUniqueCacheDirForRead())
  119. })
  120. if wfs.option.ConcurrentWriters > 0 {
  121. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  122. }
  123. return wfs
  124. }
  125. func (wfs *WFS) StartBackgroundTasks() {
  126. startTime := time.Now()
  127. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  128. go wfs.loopCheckQuota()
  129. }
  130. func (wfs *WFS) String() string {
  131. return "seaweedfs"
  132. }
  133. func (wfs *WFS) Init(server *fuse.Server) {
  134. wfs.fuseServer = server
  135. }
  136. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  137. path, status = wfs.inodeToPath.GetPath(inode)
  138. if status != fuse.OK {
  139. return
  140. }
  141. var found bool
  142. if fh, found = wfs.fhmap.FindFileHandle(inode); found {
  143. entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
  144. if entry != nil && fh.entry.Attributes == nil {
  145. entry.Attributes = &filer_pb.FuseAttributes{}
  146. }
  147. })
  148. } else {
  149. entry, status = wfs.maybeLoadEntry(path)
  150. }
  151. return
  152. }
  153. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  154. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  155. dir, name := fullpath.DirAndName()
  156. // return a valid entry for the mount root
  157. if string(fullpath) == wfs.option.FilerMountRootPath {
  158. return &filer_pb.Entry{
  159. Name: name,
  160. IsDirectory: true,
  161. Attributes: &filer_pb.FuseAttributes{
  162. Mtime: wfs.option.MountMtime.Unix(),
  163. FileMode: uint32(wfs.option.MountMode),
  164. Uid: wfs.option.MountUid,
  165. Gid: wfs.option.MountGid,
  166. Crtime: wfs.option.MountCtime.Unix(),
  167. },
  168. }, fuse.OK
  169. }
  170. // read from async meta cache
  171. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  172. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  173. if cacheErr == filer_pb.ErrNotFound {
  174. return nil, fuse.ENOENT
  175. }
  176. return cachedEntry.ToProtoEntry(), fuse.OK
  177. }
  178. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  179. if wfs.option.VolumeServerAccess == "filerProxy" {
  180. return func(fileId string) (targetUrls []string, err error) {
  181. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  182. }
  183. }
  184. return filer.LookupFn(wfs)
  185. }
  186. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  187. i := atomic.LoadInt32(&wfs.option.filerIndex)
  188. return wfs.option.FilerAddresses[i]
  189. }
  190. func (option *Option) setupUniqueCacheDirectory() {
  191. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  192. option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
  193. os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
  194. option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
  195. os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
  196. }
  197. func (option *Option) getUniqueCacheDirForWrite() string {
  198. return option.uniqueCacheDirForWrite
  199. }
  200. func (option *Option) getUniqueCacheDirForRead() string {
  201. return option.uniqueCacheDirForRead
  202. }