weedfs.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package mount
  2. import (
  3. "context"
  4. "errors"
  5. "math/rand"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "sync/atomic"
  10. "time"
  11. "github.com/hanwen/go-fuse/v2/fuse"
  12. "google.golang.org/grpc"
  13. "github.com/seaweedfs/seaweedfs/weed/filer"
  14. "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
  15. "github.com/seaweedfs/seaweedfs/weed/pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb"
  18. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  19. "github.com/seaweedfs/seaweedfs/weed/util"
  20. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  21. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  22. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  23. "github.com/hanwen/go-fuse/v2/fs"
  24. )
  25. type Option struct {
  26. filerIndex int32 // align memory for atomic read/write
  27. FilerAddresses []pb.ServerAddress
  28. MountDirectory string
  29. GrpcDialOption grpc.DialOption
  30. FilerMountRootPath string
  31. Collection string
  32. Replication string
  33. TtlSec int32
  34. DiskType types.DiskType
  35. ChunkSizeLimit int64
  36. ConcurrentWriters int
  37. CacheDirForRead string
  38. CacheSizeMBForRead int64
  39. CacheDirForWrite string
  40. DataCenter string
  41. Umask os.FileMode
  42. Quota int64
  43. DisableXAttr bool
  44. IsMacOs bool
  45. MountUid uint32
  46. MountGid uint32
  47. MountMode os.FileMode
  48. MountCtime time.Time
  49. MountMtime time.Time
  50. MountParentInode uint64
  51. VolumeServerAccess string // how to access volume servers
  52. Cipher bool // whether encrypt data on volume server
  53. UidGidMapper *meta_cache.UidGidMapper
  54. uniqueCacheDirForRead string
  55. uniqueCacheDirForWrite string
  56. }
  57. type WFS struct {
  58. // https://dl.acm.org/doi/fullHtml/10.1145/3310148
  59. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  60. fuse.RawFileSystem
  61. mount_pb.UnimplementedSeaweedMountServer
  62. fs.Inode
  63. option *Option
  64. metaCache *meta_cache.MetaCache
  65. stats statsCache
  66. chunkCache *chunk_cache.TieredChunkCache
  67. signature int32
  68. concurrentWriters *util.LimitedConcurrentExecutor
  69. inodeToPath *InodeToPath
  70. fhMap *FileHandleToInode
  71. dhMap *DirectoryHandleToInode
  72. fuseServer *fuse.Server
  73. IsOverQuota bool
  74. fhLockTable *util.LockTable[FileHandleId]
  75. FilerConf *filer.FilerConf
  76. }
  77. func NewSeaweedFileSystem(option *Option) *WFS {
  78. wfs := &WFS{
  79. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  80. option: option,
  81. signature: util.RandomInt32(),
  82. inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath)),
  83. fhMap: NewFileHandleToInode(),
  84. dhMap: NewDirectoryHandleToInode(),
  85. fhLockTable: util.NewLockTable[FileHandleId](),
  86. }
  87. wfs.option.filerIndex = int32(rand.Intn(len(option.FilerAddresses)))
  88. wfs.option.setupUniqueCacheDirectory()
  89. if option.CacheSizeMBForRead > 0 {
  90. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDirForRead(), option.CacheSizeMBForRead, 1024*1024)
  91. }
  92. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDirForRead(), "meta"), option.UidGidMapper,
  93. util.FullPath(option.FilerMountRootPath),
  94. func(path util.FullPath) {
  95. wfs.inodeToPath.MarkChildrenCached(path)
  96. }, func(path util.FullPath) bool {
  97. return wfs.inodeToPath.IsChildrenCached(path)
  98. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  99. // Find inode if it is not a deleted path
  100. if inode, inodeFound := wfs.inodeToPath.GetInode(filePath); inodeFound {
  101. // Find open file handle
  102. if fh, fhFound := wfs.fhMap.FindFileHandle(inode); fhFound {
  103. fhActiveLock := fh.wfs.fhLockTable.AcquireLock("invalidateFunc", fh.fh, util.ExclusiveLock)
  104. defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
  105. // Recreate dirty pages
  106. fh.dirtyPages.Destroy()
  107. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  108. // Update handle entry
  109. newEntry, status := wfs.maybeLoadEntry(filePath)
  110. if status == fuse.OK {
  111. if fh.GetEntry().GetEntry() != newEntry {
  112. fh.SetEntry(newEntry)
  113. }
  114. }
  115. }
  116. }
  117. })
  118. grace.OnInterrupt(func() {
  119. wfs.metaCache.Shutdown()
  120. os.RemoveAll(option.getUniqueCacheDirForWrite())
  121. os.RemoveAll(option.getUniqueCacheDirForRead())
  122. })
  123. if wfs.option.ConcurrentWriters > 0 {
  124. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  125. }
  126. return wfs
  127. }
  128. func (wfs *WFS) StartBackgroundTasks() error {
  129. follower, err := wfs.subscribeFilerConfEvents()
  130. if err != nil {
  131. return err
  132. }
  133. startTime := time.Now()
  134. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano(), follower)
  135. go wfs.loopCheckQuota()
  136. return nil
  137. }
  138. func (wfs *WFS) String() string {
  139. return "seaweedfs"
  140. }
  141. func (wfs *WFS) Init(server *fuse.Server) {
  142. wfs.fuseServer = server
  143. }
  144. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  145. path, status = wfs.inodeToPath.GetPath(inode)
  146. if status != fuse.OK {
  147. return
  148. }
  149. var found bool
  150. if fh, found = wfs.fhMap.FindFileHandle(inode); found {
  151. entry = fh.UpdateEntry(func(entry *filer_pb.Entry) {
  152. if entry != nil && fh.entry.Attributes == nil {
  153. entry.Attributes = &filer_pb.FuseAttributes{}
  154. }
  155. })
  156. } else {
  157. entry, status = wfs.maybeLoadEntry(path)
  158. }
  159. return
  160. }
  161. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  162. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  163. dir, name := fullpath.DirAndName()
  164. // return a valid entry for the mount root
  165. if string(fullpath) == wfs.option.FilerMountRootPath {
  166. return &filer_pb.Entry{
  167. Name: name,
  168. IsDirectory: true,
  169. Attributes: &filer_pb.FuseAttributes{
  170. Mtime: wfs.option.MountMtime.Unix(),
  171. FileMode: uint32(wfs.option.MountMode),
  172. Uid: wfs.option.MountUid,
  173. Gid: wfs.option.MountGid,
  174. Crtime: wfs.option.MountCtime.Unix(),
  175. },
  176. }, fuse.OK
  177. }
  178. // read from async meta cache
  179. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  180. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  181. if errors.Is(cacheErr, filer_pb.ErrNotFound) {
  182. return nil, fuse.ENOENT
  183. }
  184. return cachedEntry.ToProtoEntry(), fuse.OK
  185. }
  186. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  187. if wfs.option.VolumeServerAccess == "filerProxy" {
  188. return func(fileId string) (targetUrls []string, err error) {
  189. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  190. }
  191. }
  192. return filer.LookupFn(wfs)
  193. }
  194. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  195. i := atomic.LoadInt32(&wfs.option.filerIndex)
  196. return wfs.option.FilerAddresses[i]
  197. }
  198. func (option *Option) setupUniqueCacheDirectory() {
  199. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  200. option.uniqueCacheDirForRead = path.Join(option.CacheDirForRead, cacheUniqueId)
  201. os.MkdirAll(option.uniqueCacheDirForRead, os.FileMode(0777)&^option.Umask)
  202. option.uniqueCacheDirForWrite = filepath.Join(path.Join(option.CacheDirForWrite, cacheUniqueId), "swap")
  203. os.MkdirAll(option.uniqueCacheDirForWrite, os.FileMode(0777)&^option.Umask)
  204. }
  205. func (option *Option) getUniqueCacheDirForWrite() string {
  206. return option.uniqueCacheDirForWrite
  207. }
  208. func (option *Option) getUniqueCacheDirForRead() string {
  209. return option.uniqueCacheDirForRead
  210. }