weedfs.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package mount
  2. import (
  3. "context"
  4. "github.com/chrislusf/seaweedfs/weed/filer"
  5. "github.com/chrislusf/seaweedfs/weed/mount/meta_cache"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage/types"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/chrislusf/seaweedfs/weed/wdclient"
  13. "github.com/hanwen/go-fuse/v2/fuse"
  14. "google.golang.org/grpc"
  15. "math/rand"
  16. "os"
  17. "path"
  18. "path/filepath"
  19. "time"
  20. "github.com/hanwen/go-fuse/v2/fs"
  21. )
  22. type Option struct {
  23. MountDirectory string
  24. FilerAddresses []pb.ServerAddress
  25. filerIndex int
  26. GrpcDialOption grpc.DialOption
  27. FilerMountRootPath string
  28. Collection string
  29. Replication string
  30. TtlSec int32
  31. DiskType types.DiskType
  32. ChunkSizeLimit int64
  33. ConcurrentWriters int
  34. CacheDir string
  35. CacheSizeMB int64
  36. DataCenter string
  37. Umask os.FileMode
  38. MountUid uint32
  39. MountGid uint32
  40. MountMode os.FileMode
  41. MountCtime time.Time
  42. MountMtime time.Time
  43. MountParentInode uint64
  44. VolumeServerAccess string // how to access volume servers
  45. Cipher bool // whether encrypt data on volume server
  46. UidGidMapper *meta_cache.UidGidMapper
  47. uniqueCacheDir string
  48. uniqueCacheTempPageDir string
  49. }
  50. type WFS struct {
  51. // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go
  52. fuse.RawFileSystem
  53. fs.Inode
  54. option *Option
  55. metaCache *meta_cache.MetaCache
  56. stats statsCache
  57. root Directory
  58. chunkCache *chunk_cache.TieredChunkCache
  59. signature int32
  60. concurrentWriters *util.LimitedConcurrentExecutor
  61. inodeToPath *InodeToPath
  62. fhmap *FileHandleToInode
  63. }
  64. func NewSeaweedFileSystem(option *Option) *WFS {
  65. wfs := &WFS{
  66. RawFileSystem: fuse.NewDefaultRawFileSystem(),
  67. option: option,
  68. signature: util.RandomInt32(),
  69. inodeToPath: NewInodeToPath(),
  70. fhmap: NewFileHandleToInode(),
  71. }
  72. wfs.root = Directory{
  73. name: "/",
  74. wfs: wfs,
  75. entry: nil,
  76. parent: nil,
  77. }
  78. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  79. wfs.option.setupUniqueCacheDirectory()
  80. if option.CacheSizeMB > 0 {
  81. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  82. }
  83. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), option.UidGidMapper, func(path util.FullPath) {
  84. wfs.inodeToPath.MarkChildrenCached(path)
  85. }, func(path util.FullPath) bool {
  86. return wfs.inodeToPath.IsChildrenCached(path)
  87. }, func(filePath util.FullPath, entry *filer_pb.Entry) {
  88. })
  89. grace.OnInterrupt(func() {
  90. wfs.metaCache.Shutdown()
  91. })
  92. if wfs.option.ConcurrentWriters > 0 {
  93. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  94. }
  95. return wfs
  96. }
  97. func (wfs *WFS) StartBackgroundTasks() {
  98. startTime := time.Now()
  99. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  100. }
  101. func (wfs *WFS) Root() *Directory {
  102. return &wfs.root
  103. }
  104. func (wfs *WFS) String() string {
  105. return "seaweedfs"
  106. }
  107. func (wfs *WFS) maybeReadEntry(inode uint64) (path util.FullPath, fh *FileHandle, entry *filer_pb.Entry, status fuse.Status) {
  108. path = wfs.inodeToPath.GetPath(inode)
  109. var found bool
  110. if fh, found = wfs.fhmap.FindFileHandle(inode); found {
  111. return path, fh, fh.entry, fuse.OK
  112. }
  113. entry, status = wfs.maybeLoadEntry(path)
  114. return
  115. }
  116. func (wfs *WFS) maybeLoadEntry(fullpath util.FullPath) (*filer_pb.Entry, fuse.Status) {
  117. // glog.V(3).Infof("read entry cache miss %s", fullpath)
  118. dir, name := fullpath.DirAndName()
  119. // return a valid entry for the mount root
  120. if string(fullpath) == wfs.option.FilerMountRootPath {
  121. return &filer_pb.Entry{
  122. Name: name,
  123. IsDirectory: true,
  124. Attributes: &filer_pb.FuseAttributes{
  125. Mtime: wfs.option.MountMtime.Unix(),
  126. FileMode: uint32(wfs.option.MountMode),
  127. Uid: wfs.option.MountUid,
  128. Gid: wfs.option.MountGid,
  129. Crtime: wfs.option.MountCtime.Unix(),
  130. },
  131. }, fuse.OK
  132. }
  133. // read from async meta cache
  134. meta_cache.EnsureVisited(wfs.metaCache, wfs, util.FullPath(dir))
  135. cachedEntry, cacheErr := wfs.metaCache.FindEntry(context.Background(), fullpath)
  136. if cacheErr == filer_pb.ErrNotFound {
  137. return nil, fuse.ENOENT
  138. }
  139. return cachedEntry.ToProtoEntry(), fuse.OK
  140. }
  141. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  142. if wfs.option.VolumeServerAccess == "filerProxy" {
  143. return func(fileId string) (targetUrls []string, err error) {
  144. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  145. }
  146. }
  147. return filer.LookupFn(wfs)
  148. }
  149. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  150. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  151. }
  152. func (option *Option) setupUniqueCacheDirectory() {
  153. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  154. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  155. option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw")
  156. os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
  157. }
  158. func (option *Option) getTempFilePageDir() string {
  159. return option.uniqueCacheTempPageDir
  160. }
  161. func (option *Option) getUniqueCacheDir() string {
  162. return option.uniqueCacheDir
  163. }