wfs.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "math"
  7. "math/rand"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. "github.com/chrislusf/seaweedfs/weed/filer"
  13. "github.com/chrislusf/seaweedfs/weed/storage/types"
  14. "github.com/chrislusf/seaweedfs/weed/wdclient"
  15. "google.golang.org/grpc"
  16. "github.com/chrislusf/seaweedfs/weed/util/grace"
  17. "github.com/seaweedfs/fuse"
  18. "github.com/seaweedfs/fuse/fs"
  19. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  20. "github.com/chrislusf/seaweedfs/weed/glog"
  21. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  22. "github.com/chrislusf/seaweedfs/weed/util"
  23. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  24. )
  25. type Option struct {
  26. MountDirectory string
  27. FilerAddresses []pb.ServerAddress
  28. filerIndex int
  29. GrpcDialOption grpc.DialOption
  30. FilerMountRootPath string
  31. Collection string
  32. Replication string
  33. TtlSec int32
  34. DiskType types.DiskType
  35. ChunkSizeLimit int64
  36. ConcurrentWriters int
  37. CacheDir string
  38. CacheSizeMB int64
  39. DataCenter string
  40. Umask os.FileMode
  41. MountUid uint32
  42. MountGid uint32
  43. MountMode os.FileMode
  44. MountCtime time.Time
  45. MountMtime time.Time
  46. MountParentInode uint64
  47. VolumeServerAccess string // how to access volume servers
  48. Cipher bool // whether encrypt data on volume server
  49. UidGidMapper *meta_cache.UidGidMapper
  50. uniqueCacheDir string
  51. }
  52. var _ = fs.FS(&WFS{})
  53. var _ = fs.FSStatfser(&WFS{})
  54. type WFS struct {
  55. option *Option
  56. // contains all open handles, protected by handlesLock
  57. handlesLock sync.Mutex
  58. handles map[uint64]*FileHandle
  59. bufPool sync.Pool
  60. stats statsCache
  61. root fs.Node
  62. fsNodeCache *FsCache
  63. chunkCache *chunk_cache.TieredChunkCache
  64. metaCache *meta_cache.MetaCache
  65. signature int32
  66. // throttle writers
  67. concurrentWriters *util.LimitedConcurrentExecutor
  68. Server *fs.Server
  69. }
  70. type statsCache struct {
  71. filer_pb.StatisticsResponse
  72. lastChecked int64 // unix time in seconds
  73. }
  74. func NewSeaweedFileSystem(option *Option) *WFS {
  75. wfs := &WFS{
  76. option: option,
  77. handles: make(map[uint64]*FileHandle),
  78. bufPool: sync.Pool{
  79. New: func() interface{} {
  80. return make([]byte, option.ChunkSizeLimit)
  81. },
  82. },
  83. signature: util.RandomInt32(),
  84. }
  85. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  86. wfs.option.setupUniqueCacheDirectory()
  87. if option.CacheSizeMB > 0 {
  88. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  89. }
  90. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) {
  91. fsNode := NodeWithId(filePath.AsInode(entry.FileMode()))
  92. if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
  93. glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
  94. }
  95. dir, name := filePath.DirAndName()
  96. parent := NodeWithId(util.FullPath(dir).AsInode(os.ModeDir))
  97. if dir == option.FilerMountRootPath {
  98. parent = NodeWithId(1)
  99. }
  100. if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
  101. glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
  102. }
  103. })
  104. grace.OnInterrupt(func() {
  105. wfs.metaCache.Shutdown()
  106. os.RemoveAll(option.getUniqueCacheDir())
  107. })
  108. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1}
  109. wfs.fsNodeCache = newFsCache(wfs.root)
  110. if wfs.option.ConcurrentWriters > 0 {
  111. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  112. }
  113. return wfs
  114. }
  115. func (wfs *WFS) StartBackgroundTasks() {
  116. startTime := time.Now()
  117. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  118. }
  119. func (wfs *WFS) Root() (fs.Node, error) {
  120. return wfs.root, nil
  121. }
  122. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  123. fullpath := file.fullpath()
  124. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  125. inodeId := file.Id()
  126. wfs.handlesLock.Lock()
  127. existingHandle, found := wfs.handles[inodeId]
  128. if found && existingHandle != nil && existingHandle.f.isOpen > 0 {
  129. existingHandle.f.isOpen++
  130. wfs.handlesLock.Unlock()
  131. glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen)
  132. return existingHandle
  133. }
  134. wfs.handlesLock.Unlock()
  135. entry, _ := file.maybeLoadEntry(context.Background())
  136. file.entry = entry
  137. fileHandle = newFileHandle(file, uid, gid)
  138. wfs.handlesLock.Lock()
  139. file.isOpen++
  140. wfs.handles[inodeId] = fileHandle
  141. wfs.handlesLock.Unlock()
  142. fileHandle.handle = inodeId
  143. glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
  144. return
  145. }
  146. func (wfs *WFS) Fsync(file *File, header fuse.Header) error {
  147. inodeId := file.Id()
  148. wfs.handlesLock.Lock()
  149. existingHandle, found := wfs.handles[inodeId]
  150. wfs.handlesLock.Unlock()
  151. if found && existingHandle != nil {
  152. existingHandle.Lock()
  153. defer existingHandle.Unlock()
  154. if existingHandle.f.isOpen > 0 {
  155. return existingHandle.doFlush(context.Background(), header)
  156. }
  157. }
  158. return nil
  159. }
  160. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  161. wfs.handlesLock.Lock()
  162. defer wfs.handlesLock.Unlock()
  163. glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  164. delete(wfs.handles, uint64(handleId))
  165. return
  166. }
  167. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  168. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  169. glog.V(4).Infof("reading fs stats: %+v", req)
  170. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  171. err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  172. request := &filer_pb.StatisticsRequest{
  173. Collection: wfs.option.Collection,
  174. Replication: wfs.option.Replication,
  175. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  176. DiskType: string(wfs.option.DiskType),
  177. }
  178. glog.V(4).Infof("reading filer stats: %+v", request)
  179. resp, err := client.Statistics(context.Background(), request)
  180. if err != nil {
  181. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  182. return err
  183. }
  184. glog.V(4).Infof("read filer stats: %+v", resp)
  185. wfs.stats.TotalSize = resp.TotalSize
  186. wfs.stats.UsedSize = resp.UsedSize
  187. wfs.stats.FileCount = resp.FileCount
  188. wfs.stats.lastChecked = time.Now().Unix()
  189. return nil
  190. })
  191. if err != nil {
  192. glog.V(0).Infof("filer Statistics: %v", err)
  193. return err
  194. }
  195. }
  196. totalDiskSize := wfs.stats.TotalSize
  197. usedDiskSize := wfs.stats.UsedSize
  198. actualFileCount := wfs.stats.FileCount
  199. // Compute the total number of available blocks
  200. resp.Blocks = totalDiskSize / blockSize
  201. // Compute the number of used blocks
  202. numBlocks := uint64(usedDiskSize / blockSize)
  203. // Report the number of free and available blocks for the block size
  204. resp.Bfree = resp.Blocks - numBlocks
  205. resp.Bavail = resp.Blocks - numBlocks
  206. resp.Bsize = uint32(blockSize)
  207. // Report the total number of possible files in the file system (and those free)
  208. resp.Files = math.MaxInt64
  209. resp.Ffree = math.MaxInt64 - actualFileCount
  210. // Report the maximum length of a name and the minimum fragment size
  211. resp.Namelen = 1024
  212. resp.Frsize = uint32(blockSize)
  213. return nil
  214. }
  215. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  216. if entry.Attributes == nil {
  217. return
  218. }
  219. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  220. }
  221. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  222. if entry.Attributes == nil {
  223. return
  224. }
  225. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  226. }
  227. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  228. if wfs.option.VolumeServerAccess == "filerProxy" {
  229. return func(fileId string) (targetUrls []string, err error) {
  230. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  231. }
  232. }
  233. return filer.LookupFn(wfs)
  234. }
  235. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  236. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  237. }
  238. func (option *Option) setupUniqueCacheDirectory() {
  239. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  240. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  241. os.MkdirAll(option.uniqueCacheDir, os.FileMode(0777)&^option.Umask)
  242. }
  243. func (option *Option) getUniqueCacheDir() string {
  244. return option.uniqueCacheDir
  245. }
  246. type NodeWithId uint64
  247. func (n NodeWithId) Id() uint64 {
  248. return uint64(n)
  249. }
  250. func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
  251. return nil
  252. }