filehandle.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package mount
  2. import (
  3. "golang.org/x/sync/semaphore"
  4. "math"
  5. "sync"
  6. "golang.org/x/exp/slices"
  7. "github.com/seaweedfs/seaweedfs/weed/filer"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. )
  12. type FileHandleId uint64
  13. type FileHandle struct {
  14. fh FileHandleId
  15. counter int64
  16. entry *filer_pb.Entry
  17. entryLock sync.Mutex
  18. inode uint64
  19. wfs *WFS
  20. // cache file has been written to
  21. dirtyMetadata bool
  22. dirtyPages *PageWriter
  23. entryViewCache []filer.VisibleInterval
  24. reader *filer.ChunkReadAt
  25. contentType string
  26. handle uint64
  27. orderedMutex *semaphore.Weighted
  28. isDeleted bool
  29. }
  30. func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
  31. fh := &FileHandle{
  32. fh: handleId,
  33. counter: 1,
  34. inode: inode,
  35. wfs: wfs,
  36. orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)),
  37. }
  38. // dirtyPages: newContinuousDirtyPages(file, writeOnly),
  39. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  40. if entry != nil {
  41. entry.Attributes.FileSize = filer.FileSize(entry)
  42. }
  43. return fh
  44. }
  45. func (fh *FileHandle) FullPath() util.FullPath {
  46. fp, _ := fh.wfs.inodeToPath.GetPath(fh.inode)
  47. return fp
  48. }
  49. func (fh *FileHandle) GetEntry() *filer_pb.Entry {
  50. fh.entryLock.Lock()
  51. defer fh.entryLock.Unlock()
  52. return fh.entry
  53. }
  54. func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
  55. fh.entryLock.Lock()
  56. defer fh.entryLock.Unlock()
  57. fh.entry = entry
  58. }
  59. func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
  60. fh.entryLock.Lock()
  61. defer fh.entryLock.Unlock()
  62. fn(fh.entry)
  63. return fh.entry
  64. }
  65. func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
  66. fh.entryLock.Lock()
  67. defer fh.entryLock.Unlock()
  68. if fh.entry == nil {
  69. return
  70. }
  71. // find the earliest incoming chunk
  72. newChunks := chunks
  73. earliestChunk := newChunks[0]
  74. for i := 1; i < len(newChunks); i++ {
  75. if lessThan(earliestChunk, newChunks[i]) {
  76. earliestChunk = newChunks[i]
  77. }
  78. }
  79. // pick out-of-order chunks from existing chunks
  80. for _, chunk := range fh.entry.Chunks {
  81. if lessThan(earliestChunk, chunk) {
  82. chunks = append(chunks, chunk)
  83. }
  84. }
  85. // sort incoming chunks
  86. slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
  87. return lessThan(a, b)
  88. })
  89. glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.Chunks), len(chunks))
  90. fh.entry.Chunks = append(fh.entry.Chunks, newChunks...)
  91. fh.entryViewCache = nil
  92. }
  93. func (fh *FileHandle) CloseReader() {
  94. if fh.reader != nil {
  95. _ = fh.reader.Close()
  96. fh.reader = nil
  97. }
  98. }
  99. func (fh *FileHandle) Release() {
  100. fh.dirtyPages.Destroy()
  101. fh.CloseReader()
  102. }
  103. func lessThan(a, b *filer_pb.FileChunk) bool {
  104. if a.Mtime == b.Mtime {
  105. return a.Fid.FileKey < b.Fid.FileKey
  106. }
  107. return a.Mtime < b.Mtime
  108. }