weedfs.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package mount
  2. import (
  3. "context"
  4. "errors"
  5. "math/rand"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "sync/atomic"
  10. "time"
  11. "github.com/hanwen/go-fuse/v2/fuse"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/filer"
  14. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. "github.com/seaweedfs/seaweedfs/weed/util"
  20. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  21. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  22. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  23. "github.com/hanwen/go-fuse/v2/fs"
  24. )
  25. type Option struct {
  26. filerIndex int32 // align memory for atomic read/write
  27. FilerAddresses []pb.ServerAddress
  28. MountDirectory string
  29. GrpcDialOption grpc.DialOption
  30. FilerMountRootPath string
  31. Collection string
  32. Replication string
  33. TtlSec int32
  34. DiskType types.DiskType
  35. ChunkSizeLimit int64
  36. ConcurrentWriters int
  37. CacheDirForRead string
  38. CacheSizeMBForRead int64
  39. CacheDirForWrite string
  40. CacheMetaTTlSec int
  41. DataCenter string
  42. Umask os.FileMode
  43. Quota int64
  44. DisableXAttr bool
  45. IsMacOs bool
  46. MountUid uint32
  47. MountGid uint32
  48. MountMode os.FileMode
  49. MountCtime time.Time
  50. MountMtime time.Time
  51. MountParentInode uint64
  52. VolumeServerAccess string // how to access volume servers
  53. Cipher bool // whether encrypt data on volume server
  54. UidGidMapper *meta_cache.UidGidMapper
  55. uniqueCacheDirForRead string
  56. uniqueCacheDirForWrite string
  57. }
  58. type WFS struct {
  59. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  60. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  61. fuse.RawFileSystem
  62. mount_pb.UnimplementedSeaweedMountServer
  63. fs.Inode
  64. option *Option
  65. metaCache *meta_cache.MetaCache
  66. stats statsCache
  67. chunkCache *chunk_cache.TieredChunkCache
  68. signature int32
  69. concurrentWriters *util.LimitedConcurrentExecutor
  70. inodeToPath *InodeToPath
  71. fhMap *FileHandleToInode
  72. dhMap *DirectoryHandleToInode
  73. fuseServer *fuse.Server
  74. IsOverQuota bool
  75. fhLockTable *util.LockTable[FileHandleId]
  76. FilerConf *filer.FilerConf
  77. }
  78. func NewSeaweedFileSystem(option *Option) *WFS {
  79. wfs := &WFS{
  80. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  81. option: option,
  82. signature: util.RandomInt32(),
  83. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec),
  84. fhMap: NewFileHandleToInode(),
  85. dhMap: NewDirectoryHandleToInode(),
  86. fhLockTable: util.NewLockTable[FileHandleId](),
  87. }
  88. wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
  89. wfs.option.setupUniqueCacheDirectory()
  90. if option.CacheSizeMBForRead > 0 {
  91. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
  92. }
  93. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
  94. util.FullPath(option.FilerMountRootPath),
  95. func(path util.FullPath) {
  96. wfs.inodeToPath.MarkChildrenCached(path)
  97. }, func(path util.FullPath) bool {
  98. return wfs.inodeToPath.IsChildrenCached(path)
  99. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  100. // Find inode if it is not a deleted path
  101. if inode, inodeFound := wfs.inodeToPath.GetInode(filePath); inodeFound {
  102. // Find open file handle
  103. if fh, fhFound := wfs.fhMap.FindFileHandle(inode); fhFound {
  104. fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
  105. defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
  106. // Recreate dirty pages
  107. fh.dirtyPages.Destroy()
  108. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  109. // Update handle entry
  110. newEntry, status := wfs.maybeLoadEntry(filePath)
  111. if status == fuse.OK {
  112. if fh.GetEntry().GetEntry() != newEntry {
  113. fh.SetEntry(newEntry)
  114. }
  115. }
  116. }
  117. }
  118. })
  119. grace.OnInterrupt(func() {
  120. wfs.metaCache.Shutdown()
  121. os.RemoveAll(option.getUniqueCacheDirForWrite())
  122. os.RemoveAll(option.getUniqueCacheDirForRead())
  123. })
  124. if wfs.option.ConcurrentWriters > 0 {
  125. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  126. }
  127. return wfs
  128. }
  129. func (wfs *WFS) StartBackgroundTasks() error {
  130. follower, err := wfs.subscribeFilerConfEvents()
  131. if err != nil {
  132. return err
  133. }
  134. startTime := time.Now()
  135. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), follower)
  136. go wfs.loopCheckQuota()
  137. return nil
  138. }
  139. func (wfs *WFS) String() string {
  140. return "seaweedfs"
  141. }
  142. func (wfs *WFS) Init(server *fuse.Server) {
  143. wfs.fuseServer = server
  144. }
  145. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  146. path, status = wfs.inodeToPath.GetPath(inode)
  147. if status != fuse.OK {
  148. return
  149. }
  150. var found bool
  151. if fh, found = wfs.fhMap.FindFileHandle(inode); found {
  152. entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
  153. if entry != nil && fh.entry.Attributes == nil {
  154. entry.Attributes = &filer_pb.FuseAttributes{}
  155. }
  156. })
  157. } else {
  158. entry, status = wfs.maybeLoadEntry(path)
  159. }
  160. return
  161. }
  162. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  163. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  164. dir, name := fullpath.DirAndName()
  165. // return a valid entry for the mount root
  166. if string(fullpath) == wfs.option.FilerMountRootPath {
  167. return &filer_pb.Entry{
  168. Name: name,
  169. IsDirectory: true,
  170. Attributes: &filer_pb.FuseAttributes{
  171. Mtime: wfs.option.MountMtime.Unix(),
  172. FileMode: uint32(wfs.option.MountMode),
  173. Uid: wfs.option.MountUid,
  174. Gid: wfs.option.MountGid,
  175. Crtime: wfs.option.MountCtime.Unix(),
  176. },
  177. }, fuse.OK
  178. }
  179. // read from async meta cache
  180. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  181. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  182. if errors.Is(cacheErr, filer_pb.ErrNotFound) {
  183. return nil, fuse.ENOENT
  184. }
  185. return cachedEntry.ToProtoEntry(), fuse.OK
  186. }
  187. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  188. if wfs.option.VolumeServerAccess == "filerProxy" {
  189. return func(fileId string) (targetUrls []string, err error) {
  190. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  191. }
  192. }
  193. return filer.LookupFn(wfs)
  194. }
  195. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  196. i := atomic.LoadInt32(&wfs.option.filerIndex)
  197. return wfs.option.FilerAddresses[i]
  198. }
  199. func (option *Option) setupUniqueCacheDirectory() {
  200. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  201. option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
  202. os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
  203. option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
  204. os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
  205. }
  206. func (option *Option) getUniqueCacheDirForWrite() string {
  207. return option.uniqueCacheDirForWrite
  208. }
  209. func (option *Option) getUniqueCacheDirForRead() string {
  210. return option.uniqueCacheDirForRead
  211. }