page_chunk_swapfile.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package page_writer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  6. "io"
  7. "os"
  8. "sync"
  9. )
  10. var (
  11. _ = PageChunk(&SwapFileChunk{})
  12. )
  13. type ActualChunkIndex int
  14. type SwapFile struct {
  15. dir string
  16. file *os.File
  17. chunkSize int64
  18. chunkTrackingLock sync.Mutex
  19. activeChunkCount int
  20. freeActualChunkList []ActualChunkIndex
  21. }
  22. type SwapFileChunk struct {
  23. sync.RWMutex
  24. swapfile *SwapFile
  25. usage *ChunkWrittenIntervalList
  26. logicChunkIndex LogicChunkIndex
  27. actualChunkIndex ActualChunkIndex
  28. activityScore *ActivityScore
  29. //memChunk *MemChunk
  30. }
  31. func NewSwapFile(dir string, chunkSize int64) *SwapFile {
  32. return &SwapFile{
  33. dir: dir,
  34. file: nil,
  35. chunkSize: chunkSize,
  36. }
  37. }
  38. func (sf *SwapFile) FreeResource() {
  39. if sf.file != nil {
  40. sf.file.Close()
  41. os.Remove(sf.file.Name())
  42. }
  43. }
  44. func (sf *SwapFile) NewSwapFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
  45. if sf.file == nil {
  46. var err error
  47. sf.file, err = os.CreateTemp(sf.dir, "")
  48. if err != nil {
  49. glog.Errorf("create swap file: %v", err)
  50. return nil
  51. }
  52. }
  53. sf.chunkTrackingLock.Lock()
  54. defer sf.chunkTrackingLock.Unlock()
  55. sf.activeChunkCount++
  56. // assign a new physical chunk
  57. var actualChunkIndex ActualChunkIndex
  58. if len(sf.freeActualChunkList) > 0 {
  59. actualChunkIndex = sf.freeActualChunkList[0]
  60. sf.freeActualChunkList = sf.freeActualChunkList[1:]
  61. } else {
  62. actualChunkIndex = ActualChunkIndex(sf.activeChunkCount)
  63. }
  64. swapFileChunk := &SwapFileChunk{
  65. swapfile: sf,
  66. usage: newChunkWrittenIntervalList(),
  67. logicChunkIndex: logicChunkIndex,
  68. actualChunkIndex: actualChunkIndex,
  69. activityScore: NewActivityScore(),
  70. // memChunk: NewMemChunk(logicChunkIndex, sf.chunkSize),
  71. }
  72. // println(logicChunkIndex, "|", "++++", swapFileChunk.actualChunkIndex, swapFileChunk, sf)
  73. return swapFileChunk
  74. }
  75. func (sc *SwapFileChunk) FreeResource() {
  76. sc.Lock()
  77. defer sc.Unlock()
  78. sc.swapfile.chunkTrackingLock.Lock()
  79. defer sc.swapfile.chunkTrackingLock.Unlock()
  80. sc.swapfile.freeActualChunkList = append(sc.swapfile.freeActualChunkList, sc.actualChunkIndex)
  81. sc.swapfile.activeChunkCount--
  82. // println(sc.logicChunkIndex, "|", "----", sc.actualChunkIndex, sc, sc.swapfile)
  83. }
  84. func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
  85. sc.Lock()
  86. defer sc.Unlock()
  87. // println(sc.logicChunkIndex, "|", tsNs, "write at", offset, len(src), sc.actualChunkIndex)
  88. innerOffset := offset % sc.swapfile.chunkSize
  89. var err error
  90. n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
  91. sc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
  92. if err != nil {
  93. glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
  94. }
  95. //sc.memChunk.WriteDataAt(src, offset, tsNs)
  96. sc.activityScore.MarkWrite()
  97. return
  98. }
  99. func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
  100. sc.RLock()
  101. defer sc.RUnlock()
  102. // println(sc.logicChunkIndex, "|", tsNs, "read at", off, len(p), sc.actualChunkIndex)
  103. //memCopy := make([]byte, len(p))
  104. //copy(memCopy, p)
  105. chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
  106. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  107. logicStart := max(off, chunkStartOffset+t.StartOffset)
  108. logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
  109. if logicStart < logicStop {
  110. actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
  111. if n, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
  112. if err == io.EOF && n == int(logicStop-logicStart) {
  113. err = nil
  114. }
  115. glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
  116. break
  117. }
  118. maxStop = max(maxStop, logicStop)
  119. if t.TsNs >= tsNs {
  120. println("read new data2", t.TsNs-tsNs, "ns")
  121. }
  122. }
  123. }
  124. //sc.memChunk.ReadDataAt(memCopy, off, tsNs)
  125. //if bytes.Compare(memCopy, p) != 0 {
  126. // println("read wrong data from swap file", off, sc.logicChunkIndex)
  127. //}
  128. sc.activityScore.MarkRead()
  129. return
  130. }
  131. func (sc *SwapFileChunk) IsComplete() bool {
  132. sc.RLock()
  133. defer sc.RUnlock()
  134. return sc.usage.IsComplete(sc.swapfile.chunkSize)
  135. }
  136. func (sc *SwapFileChunk) ActivityScore() int64 {
  137. return sc.activityScore.ActivityScore()
  138. }
  139. func (sc *SwapFileChunk) WrittenSize() int64 {
  140. sc.RLock()
  141. defer sc.RUnlock()
  142. return sc.usage.WrittenSize()
  143. }
  144. func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
  145. sc.RLock()
  146. defer sc.RUnlock()
  147. if saveFn == nil {
  148. return
  149. }
  150. // println(sc.logicChunkIndex, "|", "save")
  151. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  152. startOffset := t.StartOffset
  153. stopOffset := t.stopOffset
  154. tsNs := t.TsNs
  155. for t != sc.usage.tail && t.next.StartOffset == stopOffset {
  156. stopOffset = t.next.stopOffset
  157. t = t.next
  158. tsNs = max(tsNs, t.TsNs)
  159. }
  160. data := mem.Allocate(int(stopOffset - startOffset))
  161. n, _ := sc.swapfile.file.ReadAt(data, startOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
  162. if n > 0 {
  163. reader := util.NewBytesReader(data[:n])
  164. saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+startOffset, int64(n), tsNs, func() {
  165. })
  166. }
  167. mem.Free(data)
  168. }
  169. }