filechunks_read.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package filer
  2. import (
  3. "container/list"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  5. "golang.org/x/exp/slices"
  6. )
  7. func readResolvedChunks(chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval]) {
  8. var points []*Point
  9. for _, chunk := range chunks {
  10. if chunk.IsChunkManifest {
  11. println("This should not happen! A manifest chunk found:", chunk.GetFileIdString())
  12. }
  13. start, stop := max(chunk.Offset, startOffset), min(chunk.Offset+int64(chunk.Size), stopOffset)
  14. if start >= stop {
  15. continue
  16. }
  17. points = append(points, &Point{
  18. x: chunk.Offset,
  19. ts: chunk.ModifiedTsNs,
  20. chunk: chunk,
  21. isStart: true,
  22. })
  23. points = append(points, &Point{
  24. x: chunk.Offset + int64(chunk.Size),
  25. ts: chunk.ModifiedTsNs,
  26. chunk: chunk,
  27. isStart: false,
  28. })
  29. }
  30. slices.SortFunc(points, func(a, b *Point) int {
  31. if a.x != b.x {
  32. return int(a.x - b.x)
  33. }
  34. if a.ts != b.ts {
  35. return int(a.ts - b.ts)
  36. }
  37. if a.isStart {
  38. return 1
  39. }
  40. if b.isStart {
  41. return -1
  42. }
  43. return 0
  44. })
  45. var prevX int64
  46. queue := list.New() // points with higher ts are at the tail
  47. visibles = NewIntervalList[*VisibleInterval]()
  48. var prevPoint *Point
  49. for _, point := range points {
  50. if queue.Len() > 0 {
  51. prevPoint = queue.Back().Value.(*Point)
  52. } else {
  53. prevPoint = nil
  54. }
  55. if point.isStart {
  56. if prevPoint != nil {
  57. if point.x != prevX && prevPoint.ts < point.ts {
  58. addToVisibles(visibles, prevX, prevPoint, point)
  59. prevX = point.x
  60. }
  61. }
  62. // insert into queue
  63. if prevPoint == nil || prevPoint.ts < point.ts {
  64. queue.PushBack(point)
  65. prevX = point.x
  66. } else {
  67. for e := queue.Front(); e != nil; e = e.Next() {
  68. if e.Value.(*Point).ts > point.ts {
  69. queue.InsertBefore(point, e)
  70. break
  71. }
  72. }
  73. }
  74. } else {
  75. isLast := true
  76. for e := queue.Back(); e != nil; e = e.Prev() {
  77. if e.Value.(*Point).ts == point.ts {
  78. queue.Remove(e)
  79. break
  80. }
  81. isLast = false
  82. }
  83. if isLast && prevPoint != nil {
  84. addToVisibles(visibles, prevX, prevPoint, point)
  85. prevX = point.x
  86. }
  87. }
  88. }
  89. return
  90. }
  91. func addToVisibles(visibles *IntervalList[*VisibleInterval], prevX int64, startPoint *Point, point *Point) {
  92. if prevX < point.x {
  93. chunk := startPoint.chunk
  94. visible := &VisibleInterval{
  95. start: prevX,
  96. stop: point.x,
  97. fileId: chunk.GetFileIdString(),
  98. modifiedTsNs: chunk.ModifiedTsNs,
  99. offsetInChunk: prevX - chunk.Offset,
  100. chunkSize: chunk.Size,
  101. cipherKey: chunk.CipherKey,
  102. isGzipped: chunk.IsCompressed,
  103. }
  104. appendVisibleInterfal(visibles, visible)
  105. }
  106. }
  107. func appendVisibleInterfal(visibles *IntervalList[*VisibleInterval], visible *VisibleInterval) {
  108. visibles.AppendInterval(&Interval[*VisibleInterval]{
  109. StartOffset: visible.start,
  110. StopOffset: visible.stop,
  111. TsNs: visible.modifiedTsNs,
  112. Value: visible,
  113. })
  114. }
  115. type Point struct {
  116. x int64
  117. ts int64
  118. chunk *filer_pb.FileChunk
  119. isStart bool
  120. }