filechunks_read.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package filer
  2. import (
  3. "container/list"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  5. "golang.org/x/exp/slices"
  6. )
  7. func readResolvedChunks(chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles *IntervalList[*VisibleInterval]) {
  8. var points []*Point
  9. for _, chunk := range chunks {
  10. if chunk.IsChunkManifest {
  11. println("This should not happen! A manifest chunk found:", chunk.GetFileIdString())
  12. }
  13. start, stop := max(chunk.Offset, startOffset), min(chunk.Offset+int64(chunk.Size), stopOffset)
  14. if start >= stop {
  15. continue
  16. }
  17. points = append(points, &Point{
  18. x: chunk.Offset,
  19. ts: chunk.ModifiedTsNs,
  20. chunk: chunk,
  21. isStart: true,
  22. })
  23. points = append(points, &Point{
  24. x: chunk.Offset + int64(chunk.Size),
  25. ts: chunk.ModifiedTsNs,
  26. chunk: chunk,
  27. isStart: false,
  28. })
  29. }
  30. slices.SortFunc(points, func(a, b *Point) bool {
  31. if a.x != b.x {
  32. return a.x < b.x
  33. }
  34. if a.ts != b.ts {
  35. return a.ts < b.ts
  36. }
  37. return !a.isStart
  38. })
  39. var prevX int64
  40. queue := list.New() // points with higher ts are at the tail
  41. visibles = NewIntervalList[*VisibleInterval]()
  42. var prevPoint *Point
  43. for _, point := range points {
  44. if queue.Len() > 0 {
  45. prevPoint = queue.Back().Value.(*Point)
  46. } else {
  47. prevPoint = nil
  48. }
  49. if point.isStart {
  50. if prevPoint != nil {
  51. if point.x != prevX && prevPoint.ts < point.ts {
  52. addToVisibles(visibles, prevX, prevPoint, point)
  53. prevX = point.x
  54. }
  55. }
  56. // insert into queue
  57. if prevPoint == nil || prevPoint.ts < point.ts {
  58. queue.PushBack(point)
  59. prevX = point.x
  60. } else {
  61. for e := queue.Front(); e != nil; e = e.Next() {
  62. if e.Value.(*Point).ts > point.ts {
  63. queue.InsertBefore(point, e)
  64. break
  65. }
  66. }
  67. }
  68. } else {
  69. isLast := true
  70. for e := queue.Back(); e != nil; e = e.Prev() {
  71. if e.Value.(*Point).ts == point.ts {
  72. queue.Remove(e)
  73. break
  74. }
  75. isLast = false
  76. }
  77. if isLast && prevPoint != nil {
  78. addToVisibles(visibles, prevX, prevPoint, point)
  79. prevX = point.x
  80. }
  81. }
  82. }
  83. return
  84. }
  85. func addToVisibles(visibles *IntervalList[*VisibleInterval], prevX int64, startPoint *Point, point *Point) {
  86. if prevX < point.x {
  87. chunk := startPoint.chunk
  88. visible := &VisibleInterval{
  89. start: prevX,
  90. stop: point.x,
  91. fileId: chunk.GetFileIdString(),
  92. modifiedTsNs: chunk.ModifiedTsNs,
  93. offsetInChunk: prevX - chunk.Offset,
  94. chunkSize: chunk.Size,
  95. cipherKey: chunk.CipherKey,
  96. isGzipped: chunk.IsCompressed,
  97. }
  98. appendVisibleInterfal(visibles, visible)
  99. }
  100. }
  101. func appendVisibleInterfal(visibles *IntervalList[*VisibleInterval], visible *VisibleInterval) {
  102. visibles.AppendInterval(&Interval[*VisibleInterval]{
  103. StartOffset: visible.start,
  104. StopOffset: visible.stop,
  105. TsNs: visible.modifiedTsNs,
  106. Value: visible,
  107. })
  108. }
  109. type Point struct {
  110. x int64
  111. ts int64
  112. chunk *filer_pb.FileChunk
  113. isStart bool
  114. }