filehandle.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package mount
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. "os"
  8. "sync"
  9. )
  10. type FileHandleId uint64
  11. var IsDebugFileReadWrite = false
  12. type FileHandle struct {
  13. fh FileHandleId
  14. counter int64
  15. entry *LockedEntry
  16. entryLock sync.RWMutex
  17. entryChunkGroup *filer.ChunkGroup
  18. inode uint64
  19. wfs *WFS
  20. // cache file has been written to
  21. dirtyMetadata bool
  22. dirtyPages *PageWriter
  23. reader *filer.ChunkReadAt
  24. contentType string
  25. isDeleted bool
  26. // for debugging
  27. mirrorFile *os.File
  28. }
  29. func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
  30. fh := &FileHandle{
  31. fh: handleId,
  32. counter: 1,
  33. inode: inode,
  34. wfs: wfs,
  35. }
  36. // dirtyPages: newContinuousDirtyPages(file, writeOnly),
  37. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  38. fh.entry = &LockedEntry{
  39. Entry: entry,
  40. }
  41. if entry != nil {
  42. fh.SetEntry(entry)
  43. }
  44. if IsDebugFileReadWrite {
  45. var err error
  46. fh.mirrorFile, err = os.OpenFile("/tmp/sw/"+entry.Name, os.O_RDWR|os.O_CREATE, 0600)
  47. if err != nil {
  48. println("failed to create mirror:", err.Error())
  49. }
  50. }
  51. return fh
  52. }
  53. func (fh *FileHandle) FullPath() util.FullPath {
  54. fp, _ := fh.wfs.inodeToPath.GetPath(fh.inode)
  55. return fp
  56. }
  57. func (fh *FileHandle) GetEntry() *filer_pb.Entry {
  58. return fh.entry.GetEntry()
  59. }
  60. func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
  61. if entry != nil {
  62. fileSize := filer.FileSize(entry)
  63. entry.Attributes.FileSize = fileSize
  64. var resolveManifestErr error
  65. fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
  66. if resolveManifestErr != nil {
  67. glog.Warningf("failed to resolve manifest chunks in %+v", entry)
  68. }
  69. } else {
  70. glog.Fatalf("setting file handle entry to nil")
  71. }
  72. fh.entry.SetEntry(entry)
  73. }
  74. func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
  75. return fh.entry.UpdateEntry(fn)
  76. }
  77. func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
  78. fh.entryLock.Lock()
  79. defer fh.entryLock.Unlock()
  80. if fh.entry == nil {
  81. return
  82. }
  83. fh.entry.AppendChunks(chunks)
  84. }
  85. func (fh *FileHandle) ReleaseHandle() {
  86. fhActiveLock := fh.wfs.fhLockTable.AcquireLock("ReleaseHandle", fh.fh, util.ExclusiveLock)
  87. defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
  88. fh.entryLock.Lock()
  89. defer fh.entryLock.Unlock()
  90. fh.dirtyPages.Destroy()
  91. if IsDebugFileReadWrite {
  92. fh.mirrorFile.Close()
  93. }
  94. }
  95. func lessThan(a, b *filer_pb.FileChunk) bool {
  96. if a.ModifiedTsNs == b.ModifiedTsNs {
  97. return a.Fid.FileKey < b.Fid.FileKey
  98. }
  99. return a.ModifiedTsNs < b.ModifiedTsNs
  100. }