wfs.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package filesys
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "math"
  7. "math/rand"
  8. "os"
  9. "path"
  10. "path/filepath"
  11. "sync"
  12. "time"
  13. "github.com/chrislusf/seaweedfs/weed/filer"
  14. "github.com/chrislusf/seaweedfs/weed/storage/types"
  15. "github.com/chrislusf/seaweedfs/weed/wdclient"
  16. "google.golang.org/grpc"
  17. "github.com/chrislusf/seaweedfs/weed/util/grace"
  18. "github.com/seaweedfs/fuse"
  19. "github.com/seaweedfs/fuse/fs"
  20. "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
  21. "github.com/chrislusf/seaweedfs/weed/glog"
  22. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  23. "github.com/chrislusf/seaweedfs/weed/util"
  24. "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
  25. )
  26. type Option struct {
  27. MountDirectory string
  28. FilerAddresses []pb.ServerAddress
  29. filerIndex int
  30. GrpcDialOption grpc.DialOption
  31. FilerMountRootPath string
  32. Collection string
  33. Replication string
  34. TtlSec int32
  35. DiskType types.DiskType
  36. ChunkSizeLimit int64
  37. ConcurrentWriters int
  38. CacheDir string
  39. CacheSizeMB int64
  40. DataCenter string
  41. Umask os.FileMode
  42. MountUid uint32
  43. MountGid uint32
  44. MountMode os.FileMode
  45. MountCtime time.Time
  46. MountMtime time.Time
  47. MountParentInode uint64
  48. VolumeServerAccess string // how to access volume servers
  49. Cipher bool // whether encrypt data on volume server
  50. UidGidMapper *meta_cache.UidGidMapper
  51. uniqueCacheDir string
  52. uniqueCacheTempPageDir string
  53. }
  54. var _ = fs.FS(&WFS{})
  55. var _ = fs.FSStatfser(&WFS{})
  56. type WFS struct {
  57. option *Option
  58. // contains all open handles, protected by handlesLock
  59. handlesLock sync.Mutex
  60. handles map[uint64]*FileHandle
  61. bufPool sync.Pool
  62. stats statsCache
  63. root fs.Node
  64. fsNodeCache *FsCache
  65. chunkCache *chunk_cache.TieredChunkCache
  66. metaCache *meta_cache.MetaCache
  67. signature int32
  68. // throttle writers
  69. concurrentWriters *util.LimitedConcurrentExecutor
  70. Server *fs.Server
  71. }
  72. type statsCache struct {
  73. filer_pb.StatisticsResponse
  74. lastChecked int64 // unix time in seconds
  75. }
  76. func NewSeaweedFileSystem(option *Option) *WFS {
  77. wfs := &WFS{
  78. option: option,
  79. handles: make(map[uint64]*FileHandle),
  80. bufPool: sync.Pool{
  81. New: func() interface{} {
  82. return make([]byte, option.ChunkSizeLimit)
  83. },
  84. },
  85. signature: util.RandomInt32(),
  86. }
  87. wfs.option.filerIndex = rand.Intn(len(option.FilerAddresses))
  88. wfs.option.setupUniqueCacheDirectory()
  89. if option.CacheSizeMB > 0 {
  90. wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, option.getUniqueCacheDir(), option.CacheSizeMB, 1024*1024)
  91. }
  92. wfs.metaCache = meta_cache.NewMetaCache(path.Join(option.getUniqueCacheDir(), "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath, entry *filer_pb.Entry) {
  93. fsNode := NodeWithId(filePath.AsInode(entry.FileMode()))
  94. if err := wfs.Server.InvalidateNodeData(fsNode); err != nil {
  95. glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
  96. }
  97. dir, name := filePath.DirAndName()
  98. parent := NodeWithId(util.FullPath(dir).AsInode(os.ModeDir))
  99. if dir == option.FilerMountRootPath {
  100. parent = NodeWithId(1)
  101. }
  102. if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
  103. glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
  104. }
  105. })
  106. grace.OnInterrupt(func() {
  107. wfs.metaCache.Shutdown()
  108. })
  109. wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, id: 1}
  110. wfs.fsNodeCache = newFsCache(wfs.root)
  111. if wfs.option.ConcurrentWriters > 0 {
  112. wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
  113. }
  114. return wfs
  115. }
  116. func (wfs *WFS) StartBackgroundTasks() {
  117. startTime := time.Now()
  118. go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
  119. }
  120. func (wfs *WFS) Root() (fs.Node, error) {
  121. return wfs.root, nil
  122. }
  123. func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
  124. fullpath := file.fullpath()
  125. glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
  126. inodeId := file.Id()
  127. wfs.handlesLock.Lock()
  128. existingHandle, found := wfs.handles[inodeId]
  129. if found && existingHandle != nil && existingHandle.f.isOpen > 0 {
  130. existingHandle.f.isOpen++
  131. wfs.handlesLock.Unlock()
  132. glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen)
  133. return existingHandle
  134. }
  135. wfs.handlesLock.Unlock()
  136. entry, _ := file.maybeLoadEntry(context.Background())
  137. file.entry = entry
  138. fileHandle = newFileHandle(file, uid, gid)
  139. wfs.handlesLock.Lock()
  140. file.isOpen++
  141. wfs.handles[inodeId] = fileHandle
  142. wfs.handlesLock.Unlock()
  143. fileHandle.handle = inodeId
  144. glog.V(4).Infof("Acquired new Handle %s open %d", fullpath, file.isOpen)
  145. return
  146. }
  147. func (wfs *WFS) Fsync(file *File, header fuse.Header) error {
  148. inodeId := file.Id()
  149. wfs.handlesLock.Lock()
  150. existingHandle, found := wfs.handles[inodeId]
  151. wfs.handlesLock.Unlock()
  152. if found && existingHandle != nil {
  153. existingHandle.Lock()
  154. defer existingHandle.Unlock()
  155. if existingHandle.f.isOpen > 0 {
  156. return existingHandle.doFlush(context.Background(), header)
  157. }
  158. }
  159. return nil
  160. }
  161. func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) {
  162. wfs.handlesLock.Lock()
  163. defer wfs.handlesLock.Unlock()
  164. glog.V(4).Infof("ReleaseHandle %s id %d current handles length %d", fullpath, handleId, len(wfs.handles))
  165. delete(wfs.handles, uint64(handleId))
  166. return
  167. }
  168. // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser
  169. func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error {
  170. glog.V(4).Infof("reading fs stats: %+v", req)
  171. if wfs.stats.lastChecked < time.Now().Unix()-20 {
  172. err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  173. request := &filer_pb.StatisticsRequest{
  174. Collection: wfs.option.Collection,
  175. Replication: wfs.option.Replication,
  176. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec),
  177. DiskType: string(wfs.option.DiskType),
  178. }
  179. glog.V(4).Infof("reading filer stats: %+v", request)
  180. resp, err := client.Statistics(context.Background(), request)
  181. if err != nil {
  182. glog.V(0).Infof("reading filer stats %v: %v", request, err)
  183. return err
  184. }
  185. glog.V(4).Infof("read filer stats: %+v", resp)
  186. wfs.stats.TotalSize = resp.TotalSize
  187. wfs.stats.UsedSize = resp.UsedSize
  188. wfs.stats.FileCount = resp.FileCount
  189. wfs.stats.lastChecked = time.Now().Unix()
  190. return nil
  191. })
  192. if err != nil {
  193. glog.V(0).Infof("filer Statistics: %v", err)
  194. return err
  195. }
  196. }
  197. totalDiskSize := wfs.stats.TotalSize
  198. usedDiskSize := wfs.stats.UsedSize
  199. actualFileCount := wfs.stats.FileCount
  200. // Compute the total number of available blocks
  201. resp.Blocks = totalDiskSize / blockSize
  202. // Compute the number of used blocks
  203. numBlocks := uint64(usedDiskSize / blockSize)
  204. // Report the number of free and available blocks for the block size
  205. resp.Bfree = resp.Blocks - numBlocks
  206. resp.Bavail = resp.Blocks - numBlocks
  207. resp.Bsize = uint32(blockSize)
  208. // Report the total number of possible files in the file system (and those free)
  209. resp.Files = math.MaxInt64
  210. resp.Ffree = math.MaxInt64 - actualFileCount
  211. // Report the maximum length of a name and the minimum fragment size
  212. resp.Namelen = 1024
  213. resp.Frsize = uint32(blockSize)
  214. return nil
  215. }
  216. func (wfs *WFS) mapPbIdFromFilerToLocal(entry *filer_pb.Entry) {
  217. if entry.Attributes == nil {
  218. return
  219. }
  220. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.FilerToLocal(entry.Attributes.Uid, entry.Attributes.Gid)
  221. }
  222. func (wfs *WFS) mapPbIdFromLocalToFiler(entry *filer_pb.Entry) {
  223. if entry.Attributes == nil {
  224. return
  225. }
  226. entry.Attributes.Uid, entry.Attributes.Gid = wfs.option.UidGidMapper.LocalToFiler(entry.Attributes.Uid, entry.Attributes.Gid)
  227. }
  228. func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
  229. if wfs.option.VolumeServerAccess == "filerProxy" {
  230. return func(fileId string) (targetUrls []string, err error) {
  231. return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
  232. }
  233. }
  234. return filer.LookupFn(wfs)
  235. }
  236. func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
  237. return wfs.option.FilerAddresses[wfs.option.filerIndex]
  238. }
  239. func (option *Option) setupUniqueCacheDirectory() {
  240. cacheUniqueId := util.Md5String([]byte(option.MountDirectory + string(option.FilerAddresses[0]) + option.FilerMountRootPath + util.Version()))[0:8]
  241. option.uniqueCacheDir = path.Join(option.CacheDir, cacheUniqueId)
  242. option.uniqueCacheTempPageDir = filepath.Join(option.uniqueCacheDir, "sw")
  243. os.MkdirAll(option.uniqueCacheTempPageDir, os.FileMode(0777)&^option.Umask)
  244. }
  245. func (option *Option) getTempFilePageDir() string {
  246. return option.uniqueCacheTempPageDir
  247. }
  248. func (option *Option) getUniqueCacheDir() string {
  249. return option.uniqueCacheDir
  250. }
  251. type NodeWithId uint64
  252. func (n NodeWithId) Id() uint64 {
  253. return uint64(n)
  254. }
  255. func (n NodeWithId) Attr(ctx context.Context, attr *fuse.Attr) error {
  256. return nil
  257. }