filehandle.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package mount
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/filer"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. "golang.org/x/exp/slices"
  8. "golang.org/x/sync/semaphore"
  9. "math"
  10. "sync"
  11. )
  12. type FileHandleId uint64
  13. type FileHandle struct {
  14. fh FileHandleId
  15. counter int64
  16. entry *LockedEntry
  17. entryLock sync.Mutex
  18. inode uint64
  19. wfs *WFS
  20. // cache file has been written to
  21. dirtyMetadata bool
  22. dirtyPages *PageWriter
  23. entryViewCache []filer.VisibleInterval
  24. reader *filer.ChunkReadAt
  25. contentType string
  26. handle uint64
  27. orderedMutex *semaphore.Weighted
  28. isDeleted bool
  29. }
  30. func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
  31. fh := &FileHandle{
  32. fh: handleId,
  33. counter: 1,
  34. inode: inode,
  35. wfs: wfs,
  36. orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)),
  37. }
  38. // dirtyPages: newContinuousDirtyPages(file, writeOnly),
  39. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  40. if entry != nil {
  41. entry.Attributes.FileSize = filer.FileSize(entry)
  42. }
  43. fh.entry = &LockedEntry{
  44. Entry: entry,
  45. }
  46. return fh
  47. }
  48. func (fh *FileHandle) FullPath() util.FullPath {
  49. fp, _ := fh.wfs.inodeToPath.GetPath(fh.inode)
  50. return fp
  51. }
  52. func (fh *FileHandle) GetEntry() *filer_pb.Entry {
  53. return fh.entry.GetEntry()
  54. }
  55. func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
  56. fh.entry.SetEntry(entry)
  57. }
  58. func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
  59. return fh.entry.UpdateEntry(fn)
  60. }
  61. func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
  62. //fh.entryLock.Lock()
  63. //defer fh.entryLock.Unlock()
  64. if fh.entry == nil {
  65. return
  66. }
  67. // find the earliest incoming chunk
  68. newChunks := chunks
  69. earliestChunk := newChunks[0]
  70. for i := 1; i < len(newChunks); i++ {
  71. if lessThan(earliestChunk, newChunks[i]) {
  72. earliestChunk = newChunks[i]
  73. }
  74. }
  75. // pick out-of-order chunks from existing chunks
  76. for _, chunk := range fh.entry.GetChunks() {
  77. if lessThan(earliestChunk, chunk) {
  78. chunks = append(chunks, chunk)
  79. }
  80. }
  81. // sort incoming chunks
  82. slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
  83. return lessThan(a, b)
  84. })
  85. glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks))
  86. fh.entry.AppendChunks(newChunks)
  87. fh.entryViewCache = nil
  88. }
  89. func (fh *FileHandle) CloseReader() {
  90. if fh.reader != nil {
  91. _ = fh.reader.Close()
  92. fh.reader = nil
  93. }
  94. }
  95. func (fh *FileHandle) Release() {
  96. fh.dirtyPages.Destroy()
  97. fh.CloseReader()
  98. }
  99. func lessThan(a, b *filer_pb.FileChunk) bool {
  100. if a.ModifiedTsNs == b.ModifiedTsNs {
  101. return a.Fid.FileKey < b.Fid.FileKey
  102. }
  103. return a.ModifiedTsNs < b.ModifiedTsNs
  104. }