page_chunk_swapfile.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package page_writer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  6. "os"
  7. "sync"
  8. )
  9. var (
  10. _ = PageChunk(&SwapFileChunk{})
  11. )
  12. type ActualChunkIndex int
  13. type SwapFile struct {
  14. dir string
  15. file *os.File
  16. logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
  17. logicToActualChunkIndexLock sync.Mutex
  18. chunkSize int64
  19. freeActualChunkList []ActualChunkIndex
  20. }
  21. type SwapFileChunk struct {
  22. sync.RWMutex
  23. swapfile *SwapFile
  24. usage *ChunkWrittenIntervalList
  25. logicChunkIndex LogicChunkIndex
  26. actualChunkIndex ActualChunkIndex
  27. }
  28. func NewSwapFile(dir string, chunkSize int64) *SwapFile {
  29. return &SwapFile{
  30. dir: dir,
  31. file: nil,
  32. logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
  33. chunkSize: chunkSize,
  34. }
  35. }
  36. func (sf *SwapFile) FreeResource() {
  37. if sf.file != nil {
  38. sf.file.Close()
  39. os.Remove(sf.file.Name())
  40. }
  41. }
  42. func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
  43. if sf.file == nil {
  44. var err error
  45. sf.file, err = os.CreateTemp(sf.dir, "")
  46. if err != nil {
  47. glog.Errorf("create swap file: %v", err)
  48. return nil
  49. }
  50. }
  51. sf.logicToActualChunkIndexLock.Lock()
  52. defer sf.logicToActualChunkIndexLock.Unlock()
  53. actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
  54. if !found {
  55. if len(sf.freeActualChunkList) > 0 {
  56. actualChunkIndex = sf.freeActualChunkList[0]
  57. sf.freeActualChunkList = sf.freeActualChunkList[1:]
  58. } else {
  59. actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
  60. }
  61. sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
  62. }
  63. return &SwapFileChunk{
  64. swapfile: sf,
  65. usage: newChunkWrittenIntervalList(),
  66. logicChunkIndex: logicChunkIndex,
  67. actualChunkIndex: actualChunkIndex,
  68. }
  69. }
  70. func (sc *SwapFileChunk) FreeResource() {
  71. sc.swapfile.logicToActualChunkIndexLock.Lock()
  72. defer sc.swapfile.logicToActualChunkIndexLock.Unlock()
  73. sc.Lock()
  74. defer sc.Unlock()
  75. sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
  76. delete(sc.swapfile.logicToActualChunkIndex, sc.logicChunkIndex)
  77. }
  78. func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
  79. sc.Lock()
  80. defer sc.Unlock()
  81. innerOffset := offset % sc.swapfile.chunkSize
  82. var err error
  83. n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
  84. if err == nil {
  85. sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
  86. } else {
  87. glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
  88. }
  89. return
  90. }
  91. func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
  92. sc.RLock()
  93. defer sc.RUnlock()
  94. chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
  95. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  96. logicStart := max(off, chunkStartOffset+t.StartOffset)
  97. logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
  98. if logicStart < logicStop {
  99. actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
  100. if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
  101. glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
  102. break
  103. }
  104. maxStop = max(maxStop, logicStop)
  105. }
  106. }
  107. return
  108. }
  109. func (sc *SwapFileChunk) IsComplete() bool {
  110. sc.RLock()
  111. defer sc.RUnlock()
  112. return sc.usage.IsComplete(sc.swapfile.chunkSize)
  113. }
  114. func (sc *SwapFileChunk) WrittenSize() int64 {
  115. sc.RLock()
  116. defer sc.RUnlock()
  117. return sc.usage.WrittenSize()
  118. }
  119. func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
  120. if saveFn == nil {
  121. return
  122. }
  123. sc.Lock()
  124. defer sc.Unlock()
  125. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  126. data := mem.Allocate(int(t.Size()))
  127. sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
  128. reader := util.NewBytesReader(data)
  129. saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
  130. })
  131. mem.Free(data)
  132. }
  133. sc.usage = newChunkWrittenIntervalList()
  134. }