wfs.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/wdclient"
  8. "math"
  9. "os"
  10. "path"
  11. "sync"
  12. "time"
  13. "google.golang.org/grpc"
  14. "github.com/chrislusf/seaweedfs/weed/util/grace"
  15. "github.com/seaweedfs/fuse"
  16. "github.com/seaweedfs/fuse/fs"
  17. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  18. "github.com/chrislusf/seaweedfs/weed/glog"
  19. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  20. "github.com/chrislusf/seaweedfs/weed/util"
  21. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  22. )
  23. type Option struct {
  24. MountDirectory string
  25. FilerAddress string
  26. FilerGrpcAddress string
  27. GrpcDialOption grpc.DialOption
  28. FilerMountRootPath string
  29. Collection string
  30. Replication string
  31. TtlSec int32
  32. DiskType types.DiskType
  33. ChunkSizeLimit int64
  34. ConcurrentWriters int
  35. CacheDir string
  36. CacheSizeMB int64
  37. DataCenter string
  38. EntryCacheTtl time.Duration
  39. Umask os.FileMode
  40. MountUid uint32
  41. MountGid uint32
  42. MountMode os.FileMode
  43. MountCtime time.Time
  44. MountMtime time.Time
  45. VolumeServerAccess string // how to access volume servers
  46. Cipher bool // whether encrypt data on volume server
  47. UidGidMapper *meta_cache.UidGidMapper
  48. }
  49. var _ = fs.FS(&WFS{})
  50. var _ = fs.FSStatfser(&WFS{})
  51. type WFS struct {
  52. option *Option
  53. // contains all open handles, protected by handlesLock
  54. handlesLock sync.Mutex
  55. handles map[uint64]*FileHandle
  56. bufPool sync.Pool
  57. stats statsCache
  58. root fs.Node
  59. fsNodeCache *FsCache
  60. chunkCache *chunk_cache.TieredChunkCache
  61. metaCache *meta_cache.MetaCache
  62. signature int32
  63. // throttle writers
  64. concurrentWriters *util.LimitedConcurrentExecutor
  65. Server *fs.Server
  66. }
  67. type statsCache struct {
  68. filer_pb.StatisticsResponse
  69. lastChecked int64 // unix time in seconds
  70. }
  71. func NewSeaweedFileSystem(option *Option) *WFS {
  72. wfs := &WFS{
  73. option: option,
  74. handles: make(map[uint64]*FileHandle),
  75. bufPool: sync.Pool{
  76. New: func() interface{} {
  77. return make([]byte, option.ChunkSizeLimit)
  78. },
  79. },
  80. signature: util.RandomInt32(),
  81. }
  82. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:8]
  83. cacheDir := path.Join(option.CacheDir, cacheUniqueId)
  84. if option.CacheSizeMB > 0 {
  85. os.MkdirAll(cacheDir, os.FileMode(0777)&^option.Umask)
  86. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
  87. }
  88. wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
  89. fsNode := wfs.fsNodeCache.GetFsNode(filePath)
  90. if fsNode != nil {
  91. if file, ok := fsNode.(*File); ok {
  92. if err := wfs.Server.InvalidateNodeData(file); err != nil {
  93. glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
  94. }
  95. file.clearEntry()
  96. }
  97. }
  98. dir, name := filePath.DirAndName()
  99. parent := wfs.root
  100. if dir != "/" {
  101. parent = wfs.fsNodeCache.GetFsNode(util.FullPath(dir))
  102. }
  103. if parent != nil {
  104. if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
  105. glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
  106. }
  107. }
  108. })
  109. startTime := time.Now()
  110. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  111. grace.OnInterrupt(func() {
  112. wfs.metaCache.Shutdown()
  113. })
  114. entry, _ := filer_pb.GetEntry(wfs, util.FullPath(wfs.option.FilerMountRootPath))
  115. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: filer.FromPbEntry(wfs.option.FilerMountRootPath,entry)}
  116. wfs.fsNodeCache = newFsCache(wfs.root)
  117. if wfs.option.ConcurrentWriters > 0 {
  118. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  119. }
  120. return wfs
  121. }
  122. func (wfs *WFS) Root() (fs.Node, error) {
  123. return wfs.root, nil
  124. }
  125. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  126. fullpath := file.fullpath()
  127. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  128. wfs.handlesLock.Lock()
  129. defer wfs.handlesLock.Unlock()
  130. inodeId := file.fullpath().AsInode()
  131. if file.isOpen > 0 {
  132. existingHandle, found := wfs.handles[inodeId]
  133. if found && existingHandle != nil {
  134. file.isOpen++
  135. return existingHandle
  136. }
  137. }
  138. fileHandle = newFileHandle(file, uid, gid)
  139. file.maybeLoadEntry(context.Background())
  140. file.isOpen++
  141. wfs.handles[inodeId] = fileHandle
  142. fileHandle.handle = inodeId
  143. return
  144. }
  145. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  146. wfs.handlesLock.Lock()
  147. defer wfs.handlesLock.Unlock()
  148. glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  149. delete(wfs.handles, fullpath.AsInode())
  150. return
  151. }
  152. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  153. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  154. glog.V(4).Infof("reading fs stats: %+v", req)
  155. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  156. err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
  157. request := &filer_pb.StatisticsRequest{
  158. Collection: wfs.option.Collection,
  159. Replication: wfs.option.Replication,
  160. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  161. DiskType: string(wfs.option.DiskType),
  162. }
  163. glog.V(4).Infof("reading filer stats: %+v", request)
  164. resp, err := client.Statistics(context.Background(), request)
  165. if err != nil {
  166. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  167. return err
  168. }
  169. glog.V(4).Infof("read filer stats: %+v", resp)
  170. wfs.stats.TotalSize = resp.TotalSize
  171. wfs.stats.UsedSize = resp.UsedSize
  172. wfs.stats.FileCount = resp.FileCount
  173. wfs.stats.lastChecked = time.Now().Unix()
  174. return nil
  175. })
  176. if err != nil {
  177. glog.V(0).Infof("filer Statistics: %v", err)
  178. return err
  179. }
  180. }
  181. totalDiskSize := wfs.stats.TotalSize
  182. usedDiskSize := wfs.stats.UsedSize
  183. actualFileCount := wfs.stats.FileCount
  184. // Compute the total number of available blocks
  185. resp.Blocks = totalDiskSize / blockSize
  186. // Compute the number of used blocks
  187. numBlocks := uint64(usedDiskSize / blockSize)
  188. // Report the number of free and available blocks for the block size
  189. resp.Bfree = resp.Blocks - numBlocks
  190. resp.Bavail = resp.Blocks - numBlocks
  191. resp.Bsize = uint32(blockSize)
  192. // Report the total number of possible files in the file system (and those free)
  193. resp.Files = math.MaxInt64
  194. resp.Ffree = math.MaxInt64 - actualFileCount
  195. // Report the maximum length of a name and the minimum fragment size
  196. resp.Namelen = 1024
  197. resp.Frsize = uint32(blockSize)
  198. return nil
  199. }
  200. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  201. if entry.Attributes == nil {
  202. return
  203. }
  204. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  205. }
  206. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  207. if entry.Attributes == nil {
  208. return
  209. }
  210. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  211. }
  212. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  213. if wfs.option.VolumeServerAccess == "filerProxy" {
  214. return func(fileId string) (targetUrls []string, err error) {
  215. return []string{"http://" + wfs.option.FilerAddress + "/?proxyChunkId=" + fileId}, nil
  216. }
  217. }
  218. return filer.LookupFn(wfs)
  219. }