upload_pipeline.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package page_writer
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type LogicChunkIndex int
  11. type UploadPipeline struct {
  12. filepath util.FullPath
  13. ChunkSize int64
  14. writableChunks map[LogicChunkIndex]PageChunk
  15. writableChunksLock sync.Mutex
  16. sealedChunks map[LogicChunkIndex]*SealedChunk
  17. sealedChunksLock sync.Mutex
  18. uploaders *util.LimitedConcurrentExecutor
  19. uploaderCount int32
  20. uploaderCountCond *sync.Cond
  21. saveToStorageFn SaveToStorageFunc
  22. activeReadChunks map[LogicChunkIndex]int
  23. activeReadChunksLock sync.Mutex
  24. bufferChunkLimit int
  25. }
  26. type SealedChunk struct {
  27. chunk PageChunk
  28. referenceCounter int // track uploading or reading processes
  29. }
  30. func (sc *SealedChunk) FreeReference(messageOnFree string) {
  31. sc.referenceCounter--
  32. if sc.referenceCounter == 0 {
  33. glog.V(4).Infof("Free sealed chunk: %s", messageOnFree)
  34. sc.chunk.FreeResource()
  35. }
  36. }
  37. func NewUploadPipeline(writers *util.LimitedConcurrentExecutor, chunkSize int64, saveToStorageFn SaveToStorageFunc, bufferChunkLimit int) *UploadPipeline {
  38. return &UploadPipeline{
  39. ChunkSize: chunkSize,
  40. writableChunks: make(map[LogicChunkIndex]PageChunk),
  41. sealedChunks: make(map[LogicChunkIndex]*SealedChunk),
  42. uploaders: writers,
  43. uploaderCountCond: sync.NewCond(&sync.Mutex{}),
  44. saveToStorageFn: saveToStorageFn,
  45. activeReadChunks: make(map[LogicChunkIndex]int),
  46. bufferChunkLimit: bufferChunkLimit,
  47. }
  48. }
  49. func (up *UploadPipeline) SaveDataAt(p []byte, off int64) (n int) {
  50. up.writableChunksLock.Lock()
  51. defer up.writableChunksLock.Unlock()
  52. logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
  53. memChunk, found := up.writableChunks[logicChunkIndex]
  54. if !found {
  55. if len(up.writableChunks) < up.bufferChunkLimit {
  56. memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
  57. } else {
  58. fullestChunkIndex, fullness := LogicChunkIndex(-1), int64(0)
  59. for lci, mc := range up.writableChunks {
  60. chunkFullness := mc.WrittenSize()
  61. if fullness < chunkFullness {
  62. fullestChunkIndex = lci
  63. fullness = chunkFullness
  64. }
  65. }
  66. up.moveToSealed(up.writableChunks[fullestChunkIndex], fullestChunkIndex)
  67. delete(up.writableChunks, fullestChunkIndex)
  68. fmt.Printf("flush chunk %d with %d bytes written", logicChunkIndex, fullness)
  69. memChunk = NewMemChunk(logicChunkIndex, up.ChunkSize)
  70. }
  71. up.writableChunks[logicChunkIndex] = memChunk
  72. }
  73. n = memChunk.WriteDataAt(p, off)
  74. up.maybeMoveToSealed(memChunk, logicChunkIndex)
  75. return
  76. }
  77. func (up *UploadPipeline) MaybeReadDataAt(p []byte, off int64) (maxStop int64) {
  78. logicChunkIndex := LogicChunkIndex(off / up.ChunkSize)
  79. // read from sealed chunks first
  80. up.sealedChunksLock.Lock()
  81. sealedChunk, found := up.sealedChunks[logicChunkIndex]
  82. if found {
  83. sealedChunk.referenceCounter++
  84. }
  85. up.sealedChunksLock.Unlock()
  86. if found {
  87. maxStop = sealedChunk.chunk.ReadDataAt(p, off)
  88. glog.V(4).Infof("%s read sealed memchunk [%d,%d)", up.filepath, off, maxStop)
  89. sealedChunk.FreeReference(fmt.Sprintf("%s finish reading chunk %d", up.filepath, logicChunkIndex))
  90. }
  91. // read from writable chunks last
  92. up.writableChunksLock.Lock()
  93. defer up.writableChunksLock.Unlock()
  94. writableChunk, found := up.writableChunks[logicChunkIndex]
  95. if !found {
  96. return
  97. }
  98. writableMaxStop := writableChunk.ReadDataAt(p, off)
  99. glog.V(4).Infof("%s read writable memchunk [%d,%d)", up.filepath, off, writableMaxStop)
  100. maxStop = max(maxStop, writableMaxStop)
  101. return
  102. }
  103. func (up *UploadPipeline) FlushAll() {
  104. up.writableChunksLock.Lock()
  105. defer up.writableChunksLock.Unlock()
  106. for logicChunkIndex, memChunk := range up.writableChunks {
  107. up.moveToSealed(memChunk, logicChunkIndex)
  108. }
  109. up.waitForCurrentWritersToComplete()
  110. }
  111. func (up *UploadPipeline) maybeMoveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
  112. if memChunk.IsComplete() {
  113. up.moveToSealed(memChunk, logicChunkIndex)
  114. }
  115. }
  116. func (up *UploadPipeline) moveToSealed(memChunk PageChunk, logicChunkIndex LogicChunkIndex) {
  117. atomic.AddInt32(&up.uploaderCount, 1)
  118. glog.V(4).Infof("%s uploaderCount %d ++> %d", up.filepath, up.uploaderCount-1, up.uploaderCount)
  119. up.sealedChunksLock.Lock()
  120. if oldMemChunk, found := up.sealedChunks[logicChunkIndex]; found {
  121. oldMemChunk.FreeReference(fmt.Sprintf("%s replace chunk %d", up.filepath, logicChunkIndex))
  122. }
  123. sealedChunk := &SealedChunk{
  124. chunk: memChunk,
  125. referenceCounter: 1, // default 1 is for uploading process
  126. }
  127. up.sealedChunks[logicChunkIndex] = sealedChunk
  128. delete(up.writableChunks, logicChunkIndex)
  129. up.sealedChunksLock.Unlock()
  130. up.uploaders.Execute(func() {
  131. // first add to the file chunks
  132. sealedChunk.chunk.SaveContent(up.saveToStorageFn)
  133. // notify waiting process
  134. atomic.AddInt32(&up.uploaderCount, -1)
  135. glog.V(4).Infof("%s uploaderCount %d --> %d", up.filepath, up.uploaderCount+1, up.uploaderCount)
  136. // Lock and Unlock are not required,
  137. // but it may signal multiple times during one wakeup,
  138. // and the waiting goroutine may miss some of them!
  139. up.uploaderCountCond.L.Lock()
  140. up.uploaderCountCond.Broadcast()
  141. up.uploaderCountCond.L.Unlock()
  142. // wait for readers
  143. for up.IsLocked(logicChunkIndex) {
  144. time.Sleep(59 * time.Millisecond)
  145. }
  146. // then remove from sealed chunks
  147. up.sealedChunksLock.Lock()
  148. defer up.sealedChunksLock.Unlock()
  149. delete(up.sealedChunks, logicChunkIndex)
  150. sealedChunk.FreeReference(fmt.Sprintf("%s finished uploading chunk %d", up.filepath, logicChunkIndex))
  151. })
  152. }
  153. func (up *UploadPipeline) Shutdown() {
  154. }