wfs.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "os"
  7. "path"
  8. "sync"
  9. "time"
  10. "google.golang.org/grpc"
  11. "github.com/chrislusf/seaweedfs/weed/util/grace"
  12. "github.com/seaweedfs/fuse"
  13. "github.com/seaweedfs/fuse/fs"
  14. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  15. "github.com/chrislusf/seaweedfs/weed/util/log"
  16. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  17. "github.com/chrislusf/seaweedfs/weed/util"
  18. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  19. )
  20. type Option struct {
  21. FilerGrpcAddress string
  22. GrpcDialOption grpc.DialOption
  23. FilerMountRootPath string
  24. Collection string
  25. Replication string
  26. TtlSec int32
  27. ChunkSizeLimit int64
  28. ConcurrentWriters int
  29. CacheDir string
  30. CacheSizeMB int64
  31. DataCenter string
  32. EntryCacheTtl time.Duration
  33. Umask os.FileMode
  34. MountUid uint32
  35. MountGid uint32
  36. MountMode os.FileMode
  37. MountCtime time.Time
  38. MountMtime time.Time
  39. OutsideContainerClusterMode bool // whether the mount runs outside SeaweedFS containers
  40. Cipher bool // whether encrypt data on volume server
  41. UidGidMapper *meta_cache.UidGidMapper
  42. }
  43. var _ = fs.FS(&WFS{})
  44. var _ = fs.FSStatfser(&WFS{})
  45. type WFS struct {
  46. option *Option
  47. // contains all open handles, protected by handlesLock
  48. handlesLock sync.Mutex
  49. handles map[uint64]*FileHandle
  50. bufPool sync.Pool
  51. stats statsCache
  52. root fs.Node
  53. fsNodeCache *FsCache
  54. chunkCache *chunk_cache.TieredChunkCache
  55. metaCache *meta_cache.MetaCache
  56. signature int32
  57. // throttle writers
  58. concurrentWriters *util.LimitedConcurrentExecutor
  59. }
  60. type statsCache struct {
  61. filer_pb.StatisticsResponse
  62. lastChecked int64 // unix time in seconds
  63. }
  64. func NewSeaweedFileSystem(option *Option) *WFS {
  65. wfs := &WFS{
  66. option: option,
  67. handles: make(map[uint64]*FileHandle),
  68. bufPool: sync.Pool{
  69. New: func() interface{} {
  70. return make([]byte, option.ChunkSizeLimit)
  71. },
  72. },
  73. signature: util.RandomInt32(),
  74. }
  75. cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4]
  76. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  77. if option.CacheSizeMB > 0 {
  78. os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
  79. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  80. }
  81. wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
  82. fsNode := wfs.fsNodeCache.GetFsNode(filePath)
  83. if fsNode != nil {
  84. if file, ok := fsNode.(*File); ok {
  85. file.clearEntry()
  86. }
  87. }
  88. })
  89. startTime := time.Now()
  90. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  91. grace.OnInterrupt(func() {
  92. wfs.metaCache.Shutdown()
  93. })
  94. entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
  95. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
  96. wfs.fsNodeCache = newFsCache(wfs.root)
  97. if wfs.option.ConcurrentWriters > 0 {
  98. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  99. }
  100. return wfs
  101. }
  102. func (wfs *WFS) Root() (fs.Node, error) {
  103. return wfs.root, nil
  104. }
  105. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  106. fullpath := file.fullpath()
  107. log.Tracef("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  108. wfs.handlesLock.Lock()
  109. defer wfs.handlesLock.Unlock()
  110. inodeId := file.fullpath().AsInode()
  111. if file.isOpen > 0 {
  112. existingHandle, found := wfs.handles[inodeId]
  113. if found && existingHandle != nil {
  114. file.isOpen++
  115. return existingHandle
  116. }
  117. }
  118. fileHandle = newFileHandle(file, uid, gid)
  119. file.maybeLoadEntry(context.Background())
  120. file.isOpen++
  121. wfs.handles[inodeId] = fileHandle
  122. fileHandle.handle = inodeId
  123. return
  124. }
  125. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  126. wfs.handlesLock.Lock()
  127. defer wfs.handlesLock.Unlock()
  128. log.Tracef("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  129. delete(wfs.handles, fullpath.AsInode())
  130. return
  131. }
  132. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  133. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  134. log.Tracef("reading fs stats: %+v", req)
  135. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  136. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  137. request := &filer_pb.StatisticsRequest{
  138. Collection: wfs.option.Collection,
  139. Replication: wfs.option.Replication,
  140. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  141. }
  142. log.Tracef("reading filer stats: %+v", request)
  143. resp, err := client.Statistics(context.Background(), request)
  144. if err != nil {
  145. log.Infof("reading filer stats %v: %v", request, err)
  146. return err
  147. }
  148. log.Tracef("read filer stats: %+v", resp)
  149. wfs.stats.TotalSize = resp.TotalSize
  150. wfs.stats.UsedSize = resp.UsedSize
  151. wfs.stats.FileCount = resp.FileCount
  152. wfs.stats.lastChecked = time.Now().Unix()
  153. return nil
  154. })
  155. if err != nil {
  156. log.Infof("filer Statistics: %v", err)
  157. return err
  158. }
  159. }
  160. totalDiskSize := wfs.stats.TotalSize
  161. usedDiskSize := wfs.stats.UsedSize
  162. actualFileCount := wfs.stats.FileCount
  163. // Compute the total number of available blocks
  164. resp.Blocks = totalDiskSize / blockSize
  165. // Compute the number of used blocks
  166. numBlocks := uint64(usedDiskSize / blockSize)
  167. // Report the number of free and available blocks for the block size
  168. resp.Bfree = resp.Blocks - numBlocks
  169. resp.Bavail = resp.Blocks - numBlocks
  170. resp.Bsize = uint32(blockSize)
  171. // Report the total number of possible files in the file system (and those free)
  172. resp.Files = math.MaxInt64
  173. resp.Ffree = math.MaxInt64 - actualFileCount
  174. // Report the maximum length of a name and the minimum fragment size
  175. resp.Namelen = 1024
  176. resp.Frsize = uint32(blockSize)
  177. return nil
  178. }
  179. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  180. if entry.Attributes == nil {
  181. return
  182. }
  183. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  184. }
  185. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  186. if entry.Attributes == nil {
  187. return
  188. }
  189. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  190. }