chunk_interval_list.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package page_writer
  2. import (
  3. "math"
  4. )
  5. // ChunkWrittenInterval mark one written interval within one page chunk
  6. type ChunkWrittenInterval struct {
  7. StartOffset int64
  8. stopOffset int64
  9. TsNs int64
  10. prev *ChunkWrittenInterval
  11. next *ChunkWrittenInterval
  12. }
  13. func (interval *ChunkWrittenInterval) Size() int64 {
  14. return interval.stopOffset - interval.StartOffset
  15. }
  16. func (interval *ChunkWrittenInterval) isComplete(chunkSize int64) bool {
  17. return interval.stopOffset-interval.StartOffset == chunkSize
  18. }
  19. // ChunkWrittenIntervalList mark written intervals within one page chunk
  20. type ChunkWrittenIntervalList struct {
  21. head *ChunkWrittenInterval
  22. tail *ChunkWrittenInterval
  23. }
  24. func newChunkWrittenIntervalList() *ChunkWrittenIntervalList {
  25. list := &ChunkWrittenIntervalList{
  26. head: &ChunkWrittenInterval{
  27. StartOffset: -1,
  28. stopOffset: -1,
  29. },
  30. tail: &ChunkWrittenInterval{
  31. StartOffset: math.MaxInt64,
  32. stopOffset: math.MaxInt64,
  33. },
  34. }
  35. list.head.next = list.tail
  36. list.tail.prev = list.head
  37. return list
  38. }
  39. func (list *ChunkWrittenIntervalList) MarkWritten(startOffset, stopOffset, tsNs int64) {
  40. if startOffset >= stopOffset {
  41. return
  42. }
  43. interval := &ChunkWrittenInterval{
  44. StartOffset: startOffset,
  45. stopOffset: stopOffset,
  46. TsNs: tsNs,
  47. }
  48. list.addInterval(interval)
  49. }
  50. func (list *ChunkWrittenIntervalList) IsComplete(chunkSize int64) bool {
  51. return list.size() == 1 && list.head.next.isComplete(chunkSize)
  52. }
  53. func (list *ChunkWrittenIntervalList) WrittenSize() (writtenByteCount int64) {
  54. for t := list.head; t != nil; t = t.next {
  55. writtenByteCount += t.Size()
  56. }
  57. return
  58. }
  59. func (list *ChunkWrittenIntervalList) addInterval(interval *ChunkWrittenInterval) {
  60. //t := list.head
  61. //for ; t.next != nil; t = t.next {
  62. // if t.TsNs > interval.TsNs {
  63. // println("writes is out of order", t.TsNs-interval.TsNs, "ns")
  64. // }
  65. //}
  66. p := list.head
  67. for ; p.next != nil && p.next.stopOffset <= interval.StartOffset; p = p.next {
  68. }
  69. q := list.tail
  70. for ; q.prev != nil && q.prev.StartOffset >= interval.stopOffset; q = q.prev {
  71. }
  72. // left side
  73. // interval after p.next start
  74. if p.next.StartOffset < interval.StartOffset {
  75. t := &ChunkWrittenInterval{
  76. StartOffset: p.next.StartOffset,
  77. stopOffset: interval.StartOffset,
  78. TsNs: p.next.TsNs,
  79. }
  80. p.next = t
  81. t.prev = p
  82. t.next = interval
  83. interval.prev = t
  84. } else {
  85. p.next = interval
  86. interval.prev = p
  87. }
  88. // right side
  89. // interval ends before p.prev
  90. if interval.stopOffset < q.prev.stopOffset {
  91. t := &ChunkWrittenInterval{
  92. StartOffset: interval.stopOffset,
  93. stopOffset: q.prev.stopOffset,
  94. TsNs: q.prev.TsNs,
  95. }
  96. q.prev = t
  97. t.next = q
  98. interval.next = t
  99. t.prev = interval
  100. } else {
  101. q.prev = interval
  102. interval.next = q
  103. }
  104. }
  105. func (list *ChunkWrittenIntervalList) size() int {
  106. var count int
  107. for t := list.head; t != nil; t = t.next {
  108. count++
  109. }
  110. return count - 2
  111. }