dirty_pages_temp_file.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package filesys
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  6. "io"
  7. "os"
  8. "sync"
  9. "time"
  10. )
  11. type TempFileDirtyPages struct {
  12. f *File
  13. tf *os.File
  14. writtenIntervals *WrittenContinuousIntervals
  15. writeOnly bool
  16. writeWaitGroup sync.WaitGroup
  17. pageAddLock sync.Mutex
  18. chunkAddLock sync.Mutex
  19. lastErr error
  20. collection string
  21. replication string
  22. }
  23. func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages {
  24. tempFile := &TempFileDirtyPages{
  25. f: file,
  26. writeOnly: writeOnly,
  27. writtenIntervals: &WrittenContinuousIntervals{},
  28. }
  29. return tempFile
  30. }
  31. func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
  32. pages.pageAddLock.Lock()
  33. defer pages.pageAddLock.Unlock()
  34. if pages.tf == nil {
  35. tf, err := os.CreateTemp(pages.f.wfs.option.getTempFilePageDir(), "")
  36. if err != nil {
  37. glog.Errorf("create temp file: %v", err)
  38. pages.lastErr = err
  39. return
  40. }
  41. pages.tf = tf
  42. pages.writtenIntervals.tempFile = tf
  43. pages.writtenIntervals.lastOffset = 0
  44. }
  45. writtenOffset := pages.writtenIntervals.lastOffset
  46. dataSize := int64(len(data))
  47. // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
  48. if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil {
  49. pages.lastErr = err
  50. } else {
  51. pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
  52. pages.writtenIntervals.lastOffset += dataSize
  53. }
  54. // pages.writtenIntervals.debug()
  55. return
  56. }
  57. func (pages *TempFileDirtyPages) FlushData() error {
  58. pages.saveExistingPagesToStorage()
  59. pages.writeWaitGroup.Wait()
  60. if pages.lastErr != nil {
  61. return fmt.Errorf("flush data: %v", pages.lastErr)
  62. }
  63. pages.pageAddLock.Lock()
  64. defer pages.pageAddLock.Unlock()
  65. if pages.tf != nil {
  66. pages.writtenIntervals.tempFile = nil
  67. pages.writtenIntervals.lists = nil
  68. pages.tf.Close()
  69. os.Remove(pages.tf.Name())
  70. pages.tf = nil
  71. }
  72. return nil
  73. }
  74. func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
  75. pageSize := pages.f.wfs.option.ChunkSizeLimit
  76. // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
  77. for _, list := range pages.writtenIntervals.lists {
  78. listStopOffset := list.Offset() + list.Size()
  79. for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
  80. start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
  81. if start >= stop {
  82. continue
  83. }
  84. // glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists))
  85. pages.saveToStorage(list.ToReader(start, stop), start, stop-start)
  86. }
  87. }
  88. }
  89. func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
  90. mtime := time.Now().UnixNano()
  91. pages.writeWaitGroup.Add(1)
  92. writer := func() {
  93. defer pages.writeWaitGroup.Done()
  94. reader = io.LimitReader(reader, size)
  95. chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset)
  96. if err != nil {
  97. glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
  98. pages.lastErr = err
  99. return
  100. }
  101. chunk.Mtime = mtime
  102. pages.collection, pages.replication = collection, replication
  103. pages.chunkAddLock.Lock()
  104. defer pages.chunkAddLock.Unlock()
  105. pages.f.addChunks([]*filer_pb.FileChunk{chunk})
  106. glog.V(3).Infof("%s saveToStorage %s [%d,%d)", pages.f.fullpath(), chunk.FileId, offset, offset+size)
  107. }
  108. if pages.f.wfs.concurrentWriters != nil {
  109. pages.f.wfs.concurrentWriters.Execute(writer)
  110. } else {
  111. go writer()
  112. }
  113. }
  114. func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
  115. return pages.writtenIntervals.ReadDataAt(data, startOffset)
  116. }
  117. func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
  118. return pages.collection, pages.replication
  119. }
  120. func (pages *TempFileDirtyPages) SetWriteOnly(writeOnly bool) {
  121. if pages.writeOnly {
  122. pages.writeOnly = writeOnly
  123. }
  124. }
  125. func (pages *TempFileDirtyPages) GetWriteOnly() (writeOnly bool) {
  126. return pages.writeOnly
  127. }