page_chunk_mem.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package page_writer
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "github.com/seaweedfs/seaweedfs/weed/util"
  6. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  7. )
  8. var (
  9. _ = PageChunk(&MemChunk{})
  10. memChunkCounter int64
  11. )
  12. type MemChunk struct {
  13. sync.RWMutex
  14. buf []byte
  15. usage *ChunkWrittenIntervalList
  16. chunkSize int64
  17. logicChunkIndex LogicChunkIndex
  18. activityScore *ActivityScore
  19. }
  20. func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
  21. atomic.AddInt64(&memChunkCounter, 1)
  22. return &MemChunk{
  23. logicChunkIndex: logicChunkIndex,
  24. chunkSize: chunkSize,
  25. buf: mem.Allocate(int(chunkSize)),
  26. usage: newChunkWrittenIntervalList(),
  27. activityScore: NewActivityScore(),
  28. }
  29. }
  30. func (mc *MemChunk) FreeResource() {
  31. mc.Lock()
  32. defer mc.Unlock()
  33. atomic.AddInt64(&memChunkCounter, -1)
  34. mem.Free(mc.buf)
  35. }
  36. func (mc *MemChunk) WriteDataAt(src []byte, offset int64, tsNs int64) (n int) {
  37. mc.Lock()
  38. defer mc.Unlock()
  39. innerOffset := offset % mc.chunkSize
  40. n = copy(mc.buf[innerOffset:], src)
  41. mc.usage.MarkWritten(innerOffset, innerOffset+int64(n), tsNs)
  42. mc.activityScore.MarkWrite()
  43. return
  44. }
  45. func (mc *MemChunk) ReadDataAt(p []byte, off int64, tsNs int64) (maxStop int64) {
  46. mc.RLock()
  47. defer mc.RUnlock()
  48. memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
  49. for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
  50. logicStart := max(off, memChunkBaseOffset+t.StartOffset)
  51. logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
  52. if logicStart < logicStop {
  53. if t.TsNs >= tsNs {
  54. copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
  55. maxStop = max(maxStop, logicStop)
  56. } else {
  57. println("read old data1", tsNs-t.TsNs, "ns")
  58. }
  59. }
  60. }
  61. mc.activityScore.MarkRead()
  62. return
  63. }
  64. func (mc *MemChunk) IsComplete() bool {
  65. mc.RLock()
  66. defer mc.RUnlock()
  67. return mc.usage.IsComplete(mc.chunkSize)
  68. }
  69. func (mc *MemChunk) ActivityScore() int64 {
  70. return mc.activityScore.ActivityScore()
  71. }
  72. func (mc *MemChunk) WrittenSize() int64 {
  73. mc.RLock()
  74. defer mc.RUnlock()
  75. return mc.usage.WrittenSize()
  76. }
  77. func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
  78. mc.RLock()
  79. defer mc.RUnlock()
  80. if saveFn == nil {
  81. return
  82. }
  83. for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
  84. startOffset := t.StartOffset
  85. stopOffset := t.stopOffset
  86. tsNs := t.TsNs
  87. for t != mc.usage.tail && t.next.StartOffset == stopOffset {
  88. stopOffset = t.next.stopOffset
  89. t = t.next
  90. tsNs = max(tsNs, t.TsNs)
  91. }
  92. reader := util.NewBytesReader(mc.buf[startOffset:stopOffset])
  93. saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+startOffset, stopOffset-startOffset, tsNs, func() {
  94. })
  95. }
  96. }