chunked_file_writer.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package page_writer
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/glog"
  4. "io"
  5. "os"
  6. "sync"
  7. )
  8. type LogicChunkIndex int
  9. type ActualChunkIndex int
  10. // ChunkedFileWriter assumes the write requests will come in within chunks
  11. type ChunkedFileWriter struct {
  12. dir string
  13. file *os.File
  14. logicToActualChunkIndex map[LogicChunkIndex]ActualChunkIndex
  15. chunkUsages []*ChunkWrittenIntervalList
  16. ChunkSize int64
  17. sync.Mutex
  18. }
  19. var _ = io.WriterAt(&ChunkedFileWriter{})
  20. func NewChunkedFileWriter(dir string, chunkSize int64) *ChunkedFileWriter {
  21. return &ChunkedFileWriter{
  22. dir: dir,
  23. file: nil,
  24. logicToActualChunkIndex: make(map[LogicChunkIndex]ActualChunkIndex),
  25. ChunkSize: chunkSize,
  26. }
  27. }
  28. func (cw *ChunkedFileWriter) WriteAt(p []byte, off int64) (n int, err error) {
  29. cw.Lock()
  30. defer cw.Unlock()
  31. if cw.file == nil {
  32. cw.file, err = os.CreateTemp(cw.dir, "")
  33. if err != nil {
  34. glog.Errorf("create temp file: %v", err)
  35. return
  36. }
  37. }
  38. actualOffset, chunkUsage := cw.toActualWriteOffset(off)
  39. n, err = cw.file.WriteAt(p, actualOffset)
  40. if err == nil {
  41. startOffset := off % cw.ChunkSize
  42. chunkUsage.MarkWritten(startOffset, startOffset+int64(n))
  43. }
  44. return
  45. }
  46. func (cw *ChunkedFileWriter) ReadDataAt(p []byte, off int64) (maxStop int64) {
  47. cw.Lock()
  48. defer cw.Unlock()
  49. if cw.file == nil {
  50. return
  51. }
  52. logicChunkIndex := off / cw.ChunkSize
  53. actualChunkIndex, chunkUsage := cw.toActualReadOffset(off)
  54. if chunkUsage != nil {
  55. for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
  56. logicStart := max(off, logicChunkIndex*cw.ChunkSize+t.StartOffset)
  57. logicStop := min(off+int64(len(p)), logicChunkIndex*cw.ChunkSize+t.stopOffset)
  58. if logicStart < logicStop {
  59. actualStart := logicStart - logicChunkIndex*cw.ChunkSize + int64(actualChunkIndex)*cw.ChunkSize
  60. _, err := cw.file.ReadAt(p[logicStart-off:logicStop-off], actualStart)
  61. if err != nil {
  62. glog.Errorf("reading temp file: %v", err)
  63. break
  64. }
  65. CheckByteZero("temp file writer read", p, logicStart-off, logicStop-off)
  66. maxStop = max(maxStop, logicStop)
  67. }
  68. }
  69. }
  70. return
  71. }
  72. func (cw *ChunkedFileWriter) toActualWriteOffset(logicOffset int64) (actualOffset int64, chunkUsage *ChunkWrittenIntervalList) {
  73. logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
  74. offsetRemainder := logicOffset % cw.ChunkSize
  75. existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  76. if found {
  77. return int64(existingActualChunkIndex)*cw.ChunkSize + offsetRemainder, cw.chunkUsages[existingActualChunkIndex]
  78. }
  79. cw.logicToActualChunkIndex[logicChunkIndex] = ActualChunkIndex(len(cw.chunkUsages))
  80. chunkUsage = newChunkWrittenIntervalList()
  81. cw.chunkUsages = append(cw.chunkUsages, chunkUsage)
  82. return int64(len(cw.chunkUsages)-1)*cw.ChunkSize + offsetRemainder, chunkUsage
  83. }
  84. func (cw *ChunkedFileWriter) toActualReadOffset(logicOffset int64) (actualChunkIndex ActualChunkIndex, chunkUsage *ChunkWrittenIntervalList) {
  85. logicChunkIndex := LogicChunkIndex(logicOffset / cw.ChunkSize)
  86. existingActualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  87. if found {
  88. return existingActualChunkIndex, cw.chunkUsages[existingActualChunkIndex]
  89. }
  90. return 0, nil
  91. }
  92. func (cw *ChunkedFileWriter) ProcessEachInterval(process func(file *os.File, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval)) {
  93. for logicChunkIndex, actualChunkIndex := range cw.logicToActualChunkIndex {
  94. chunkUsage := cw.chunkUsages[actualChunkIndex]
  95. for t := chunkUsage.head.next; t != chunkUsage.tail; t = t.next {
  96. if !t.flushed {
  97. process(cw.file, logicChunkIndex, t)
  98. }
  99. }
  100. }
  101. }
  102. // Destroy releases used resources
  103. func (cw *ChunkedFileWriter) Destroy() {
  104. if cw.file != nil {
  105. cw.file.Close()
  106. os.Remove(cw.file.Name())
  107. cw.file = nil
  108. }
  109. cw.logicToActualChunkIndex = make(map[LogicChunkIndex]ActualChunkIndex)
  110. cw.chunkUsages = cw.chunkUsages[:0]
  111. }
  112. type FileIntervalReader struct {
  113. f *os.File
  114. startOffset int64
  115. stopOffset int64
  116. position int64
  117. }
  118. var _ = io.Reader(&FileIntervalReader{})
  119. func NewFileIntervalReader(cw *ChunkedFileWriter, logicChunkIndex LogicChunkIndex, interval *ChunkWrittenInterval) *FileIntervalReader {
  120. actualChunkIndex, found := cw.logicToActualChunkIndex[logicChunkIndex]
  121. if !found {
  122. // this should never happen
  123. return nil
  124. }
  125. return &FileIntervalReader{
  126. f: cw.file,
  127. startOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.StartOffset,
  128. stopOffset: int64(actualChunkIndex)*cw.ChunkSize + interval.stopOffset,
  129. position: 0,
  130. }
  131. }
  132. func (fr *FileIntervalReader) Read(p []byte) (n int, err error) {
  133. readSize := minInt(len(p), int(fr.stopOffset-fr.startOffset-fr.position))
  134. n, err = fr.f.ReadAt(p[:readSize], fr.startOffset+fr.position)
  135. if err == nil || err == io.EOF {
  136. fr.position += int64(n)
  137. if fr.stopOffset-fr.startOffset-fr.position == 0 {
  138. // return a tiny bit faster
  139. err = io.EOF
  140. return
  141. }
  142. }
  143. return
  144. }