page_chunk_swapfile.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package page_writer
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/glog"
  4. "github.com/chrislusf/seaweedfs/weed/util"
  5. "github.com/chrislusf/seaweedfs/weed/util/mem"
  6. "os"
  7. )
  8. var (
  9. _ = PageChunk(&SwapFileChunk{})
  10. )
  11. type ActualChunkIndex int
  12. type SwapFile struct {
  13. dir string
  14. file *os.File
  15. logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
  16. chunkSize int64
  17. }
  18. type SwapFileChunk struct {
  19. swapfile *SwapFile
  20. usage *ChunkWrittenIntervalList
  21. logicChunkIndex LogicChunkIndex
  22. actualChunkIndex ActualChunkIndex
  23. }
  24. func NewSwapFile(dir string, chunkSize int64) *SwapFile {
  25. return &SwapFile{
  26. dir: dir,
  27. file: nil,
  28. logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
  29. chunkSize: chunkSize,
  30. }
  31. }
  32. func (sf *SwapFile) FreeResource() {
  33. if sf.file != nil {
  34. sf.file.Close()
  35. os.Remove(sf.file.Name())
  36. }
  37. }
  38. func (sf *SwapFile) NewTempFileChunk(logicChunkIndex LogicChunkIndex) (tc *SwapFileChunk) {
  39. if sf.file == nil {
  40. var err error
  41. sf.file, err = os.CreateTemp(sf.dir, "")
  42. if err != nil {
  43. glog.Errorf("create swap file: %v", err)
  44. return nil
  45. }
  46. }
  47. actualChunkIndex, found := sf.logicToActualChunkIndex[logicChunkIndex]
  48. if !found {
  49. actualChunkIndex = ActualChunkIndex(len(sf.logicToActualChunkIndex))
  50. sf.logicToActualChunkIndex[logicChunkIndex] = actualChunkIndex
  51. }
  52. return &SwapFileChunk{
  53. swapfile: sf,
  54. usage: newChunkWrittenIntervalList(),
  55. logicChunkIndex: logicChunkIndex,
  56. actualChunkIndex: actualChunkIndex,
  57. }
  58. }
  59. func (sc *SwapFileChunk) FreeResource() {
  60. }
  61. func (sc *SwapFileChunk) WriteDataAt(src []byte, offset int64) (n int) {
  62. innerOffset := offset % sc.swapfile.chunkSize
  63. var err error
  64. n, err = sc.swapfile.file.WriteAt(src, int64(sc.actualChunkIndex)*sc.swapfile.chunkSize+innerOffset)
  65. if err == nil {
  66. sc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
  67. } else {
  68. glog.Errorf("failed to write swap file %s: %v", sc.swapfile.file.Name(), err)
  69. }
  70. return
  71. }
  72. func (sc *SwapFileChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
  73. chunkStartOffset := int64(sc.logicChunkIndex) * sc.swapfile.chunkSize
  74. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  75. logicStart := max(off, chunkStartOffset+t.StartOffset)
  76. logicStop := min(off+int64(len(p)), chunkStartOffset+t.stopOffset)
  77. if logicStart < logicStop {
  78. actualStart := logicStart - chunkStartOffset + int64(sc.actualChunkIndex)*sc.swapfile.chunkSize
  79. if _, err := sc.swapfile.file.ReadAt(p[logicStart-off:logicStop-off], actualStart); err != nil {
  80. glog.Errorf("failed to reading swap file %s: %v", sc.swapfile.file.Name(), err)
  81. break
  82. }
  83. maxStop = max(maxStop, logicStop)
  84. }
  85. }
  86. return
  87. }
  88. func (sc *SwapFileChunk) IsComplete() bool {
  89. return sc.usage.IsComplete(sc.swapfile.chunkSize)
  90. }
  91. func (sc *SwapFileChunk) WrittenSize() int64 {
  92. return sc.usage.WrittenSize()
  93. }
  94. func (sc *SwapFileChunk) SaveContent(saveFn SaveToStorageFunc) {
  95. if saveFn == nil {
  96. return
  97. }
  98. for t := sc.usage.head.next; t != sc.usage.tail; t = t.next {
  99. data := mem.Allocate(int(t.Size()))
  100. sc.swapfile.file.ReadAt(data, t.StartOffset+int64(sc.actualChunkIndex)*sc.swapfile.chunkSize)
  101. reader := util.NewBytesReader(data)
  102. saveFn(reader, int64(sc.logicChunkIndex)*sc.swapfile.chunkSize+t.StartOffset, t.Size(), func() {
  103. })
  104. mem.Free(data)
  105. }
  106. sc.usage = newChunkWrittenIntervalList()
  107. }