weedfs.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package mount
  2. import (
  3. "context"
  4. "errors"
  5. "math/rand"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "sync/atomic"
  10. "time"
  11. "github.com/hanwen/go-fuse/v2/fuse"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/filer"
  14. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. "github.com/seaweedfs/seaweedfs/weed/util"
  20. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  21. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  22. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  23. "github.com/hanwen/go-fuse/v2/fs"
  24. )
  25. type Option struct {
  26. filerIndex int32 // align memory for atomic read/write
  27. FilerAddresses []pb.ServerAddress
  28. MountDirectory string
  29. GrpcDialOption grpc.DialOption
  30. FilerMountRootPath string
  31. Collection string
  32. Replication string
  33. TtlSec int32
  34. DiskType types.DiskType
  35. ChunkSizeLimit int64
  36. ConcurrentWriters int
  37. CacheDirForRead string
  38. CacheSizeMBForRead int64
  39. CacheDirForWrite string
  40. DataCenter string
  41. Umask os.FileMode
  42. Quota int64
  43. DisableXAttr bool
  44. MountUid uint32
  45. MountGid uint32
  46. MountMode os.FileMode
  47. MountCtime time.Time
  48. MountMtime time.Time
  49. MountParentInode uint64
  50. VolumeServerAccess string // how to access volume servers
  51. Cipher bool // whether encrypt data on volume server
  52. UidGidMapper *meta_cache.UidGidMapper
  53. uniqueCacheDirForRead string
  54. uniqueCacheDirForWrite string
  55. }
  56. type WFS struct {
  57. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  58. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  59. fuse.RawFileSystem
  60. mount_pb.UnimplementedSeaweedMountServer
  61. fs.Inode
  62. option *Option
  63. metaCache *meta_cache.MetaCache
  64. stats statsCache
  65. chunkCache *chunk_cache.TieredChunkCache
  66. signature int32
  67. concurrentWriters *util.LimitedConcurrentExecutor
  68. inodeToPath *InodeToPath
  69. fhMap *FileHandleToInode
  70. dhMap *DirectoryHandleToInode
  71. fuseServer *fuse.Server
  72. IsOverQuota bool
  73. fhLockTable *util.LockTable[FileHandleId]
  74. FilerConf *filer.FilerConf
  75. }
  76. func NewSeaweedFileSystem(option *Option) *WFS {
  77. wfs := &WFS{
  78. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  79. option: option,
  80. signature: util.RandomInt32(),
  81. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  82. fhMap: NewFileHandleToInode(),
  83. dhMap: NewDirectoryHandleToInode(),
  84. fhLockTable: util.NewLockTable[FileHandleId](),
  85. }
  86. wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
  87. wfs.option.setupUniqueCacheDirectory()
  88. if option.CacheSizeMBForRead > 0 {
  89. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
  90. }
  91. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
  92. util.FullPath(option.FilerMountRootPath),
  93. func(path util.FullPath) {
  94. wfs.inodeToPath.MarkChildrenCached(path)
  95. }, func(path util.FullPath) bool {
  96. return wfs.inodeToPath.IsChildrenCached(path)
  97. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  98. // Find inode if it is not a deleted path
  99. if inode, inodeFound := wfs.inodeToPath.GetInode(filePath); inodeFound {
  100. // Find open file handle
  101. if fh, fhFound := wfs.fhMap.FindFileHandle(inode); fhFound {
  102. fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
  103. defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
  104. // Recreate dirty pages
  105. fh.dirtyPages.Destroy()
  106. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  107. // Update handle entry
  108. newEntry, status := wfs.maybeLoadEntry(filePath)
  109. if status == fuse.OK {
  110. if fh.GetEntry().GetEntry() != newEntry {
  111. fh.SetEntry(newEntry)
  112. }
  113. }
  114. }
  115. }
  116. })
  117. grace.OnInterrupt(func() {
  118. wfs.metaCache.Shutdown()
  119. os.RemoveAll(option.getUniqueCacheDirForWrite())
  120. os.RemoveAll(option.getUniqueCacheDirForRead())
  121. })
  122. if wfs.option.ConcurrentWriters > 0 {
  123. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  124. }
  125. return wfs
  126. }
  127. func (wfs *WFS) StartBackgroundTasks() error {
  128. fn, err := wfs.subscribeFilerConfEvents()
  129. if err != nil {
  130. return err
  131. }
  132. go fn()
  133. startTime := time.Now()
  134. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  135. go wfs.loopCheckQuota()
  136. return nil
  137. }
  138. func (wfs *WFS) String() string {
  139. return "seaweedfs"
  140. }
  141. func (wfs *WFS) Init(server *fuse.Server) {
  142. wfs.fuseServer = server
  143. }
  144. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  145. path, status = wfs.inodeToPath.GetPath(inode)
  146. if status != fuse.OK {
  147. return
  148. }
  149. var found bool
  150. if fh, found = wfs.fhMap.FindFileHandle(inode); found {
  151. entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
  152. if entry != nil && fh.entry.Attributes == nil {
  153. entry.Attributes = &filer_pb.FuseAttributes{}
  154. }
  155. })
  156. } else {
  157. entry, status = wfs.maybeLoadEntry(path)
  158. }
  159. return
  160. }
  161. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  162. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  163. dir, name := fullpath.DirAndName()
  164. // return a valid entry for the mount root
  165. if string(fullpath) == wfs.option.FilerMountRootPath {
  166. return &filer_pb.Entry{
  167. Name: name,
  168. IsDirectory: true,
  169. Attributes: &filer_pb.FuseAttributes{
  170. Mtime: wfs.option.MountMtime.Unix(),
  171. FileMode: uint32(wfs.option.MountMode),
  172. Uid: wfs.option.MountUid,
  173. Gid: wfs.option.MountGid,
  174. Crtime: wfs.option.MountCtime.Unix(),
  175. },
  176. }, fuse.OK
  177. }
  178. // read from async meta cache
  179. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  180. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  181. if errors.Is(cacheErr, filer_pb.ErrNotFound) {
  182. return nil, fuse.ENOENT
  183. }
  184. return cachedEntry.ToProtoEntry(), fuse.OK
  185. }
  186. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  187. if wfs.option.VolumeServerAccess == "filerProxy" {
  188. return func(fileId string) (targetUrls []string, err error) {
  189. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  190. }
  191. }
  192. return filer.LookupFn(wfs)
  193. }
  194. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  195. i := atomic.LoadInt32(&wfs.option.filerIndex)
  196. return wfs.option.FilerAddresses[i]
  197. }
  198. func (option *Option) setupUniqueCacheDirectory() {
  199. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  200. option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
  201. os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
  202. option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
  203. os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
  204. }
  205. func (option *Option) getUniqueCacheDirForWrite() string {
  206. return option.uniqueCacheDirForWrite
  207. }
  208. func (option *Option) getUniqueCacheDirForRead() string {
  209. return option.uniqueCacheDirForRead
  210. }