page_chunk_mem.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package page_writer
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/util"
  4. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. var (
  9. _ = PageChunk(&MemChunk{})
  10. memChunkCounter int64
  11. )
  12. type MemChunk struct {
  13. sync.RWMutex
  14. buf []byte
  15. usage *ChunkWrittenIntervalList
  16. chunkSize int64
  17. logicChunkIndex LogicChunkIndex
  18. }
  19. func NewMemChunk(logicChunkIndex LogicChunkIndex, chunkSize int64) *MemChunk {
  20. atomic.AddInt64(&memChunkCounter, 1)
  21. return &MemChunk{
  22. logicChunkIndex: logicChunkIndex,
  23. chunkSize: chunkSize,
  24. buf: mem.Allocate(int(chunkSize)),
  25. usage: newChunkWrittenIntervalList(),
  26. }
  27. }
  28. func (mc *MemChunk) FreeResource() {
  29. mc.Lock()
  30. defer mc.Unlock()
  31. atomic.AddInt64(&memChunkCounter, -1)
  32. mem.Free(mc.buf)
  33. }
  34. func (mc *MemChunk) WriteDataAt(src []byte, offset int64) (n int) {
  35. mc.Lock()
  36. defer mc.Unlock()
  37. innerOffset := offset % mc.chunkSize
  38. n = copy(mc.buf[innerOffset:], src)
  39. mc.usage.MarkWritten(innerOffset, innerOffset+int64(n))
  40. return
  41. }
  42. func (mc *MemChunk) ReadDataAt(p []byte, off int64) (maxStop int64) {
  43. mc.RLock()
  44. defer mc.RUnlock()
  45. memChunkBaseOffset := int64(mc.logicChunkIndex) * mc.chunkSize
  46. for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
  47. logicStart := max(off, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset)
  48. logicStop := min(off+int64(len(p)), memChunkBaseOffset+t.stopOffset)
  49. if logicStart < logicStop {
  50. copy(p[logicStart-off:logicStop-off], mc.buf[logicStart-memChunkBaseOffset:logicStop-memChunkBaseOffset])
  51. maxStop = max(maxStop, logicStop)
  52. }
  53. }
  54. return
  55. }
  56. func (mc *MemChunk) IsComplete() bool {
  57. mc.RLock()
  58. defer mc.RUnlock()
  59. return mc.usage.IsComplete(mc.chunkSize)
  60. }
  61. func (mc *MemChunk) WrittenSize() int64 {
  62. mc.RLock()
  63. defer mc.RUnlock()
  64. return mc.usage.WrittenSize()
  65. }
  66. func (mc *MemChunk) SaveContent(saveFn SaveToStorageFunc) {
  67. mc.RLock()
  68. defer mc.RUnlock()
  69. if saveFn == nil {
  70. return
  71. }
  72. for t := mc.usage.head.next; t != mc.usage.tail; t = t.next {
  73. reader := util.NewBytesReader(mc.buf[t.StartOffset:t.stopOffset])
  74. saveFn(reader, int64(mc.logicChunkIndex)*mc.chunkSize+t.StartOffset, t.Size(), func() {
  75. })
  76. }
  77. }