ec_locate.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package erasure_coding
  2. type Interval struct {
  3. BlockIndex int
  4. InnerBlockOffset int64
  5. Size uint32
  6. IsLargeBlock bool
  7. LargeBlockRowsCount int
  8. }
  9. func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) {
  10. blockIndex, isLargeBlock, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset)
  11. // adding DataShardsCount*smallBlockLength to ensure we can derive the number of large block size from a shard size
  12. nLargeBlockRows := int((datSize + DataShardsCount*smallBlockLength) / (largeBlockLength * DataShardsCount))
  13. for size > 0 {
  14. interval := Interval{
  15. BlockIndex: blockIndex,
  16. InnerBlockOffset: innerBlockOffset,
  17. IsLargeBlock: isLargeBlock,
  18. LargeBlockRowsCount: nLargeBlockRows,
  19. }
  20. blockRemaining := largeBlockLength - innerBlockOffset
  21. if !isLargeBlock {
  22. blockRemaining = smallBlockLength - innerBlockOffset
  23. }
  24. if int64(size) <= blockRemaining {
  25. interval.Size = size
  26. intervals = append(intervals, interval)
  27. return
  28. }
  29. interval.Size = uint32(blockRemaining)
  30. intervals = append(intervals, interval)
  31. size -= interval.Size
  32. blockIndex += 1
  33. if isLargeBlock && blockIndex == nLargeBlockRows*DataShardsCount {
  34. isLargeBlock = false
  35. blockIndex = 0
  36. }
  37. innerBlockOffset = 0
  38. }
  39. return
  40. }
  41. func locateOffset(largeBlockLength, smallBlockLength int64, datSize int64, offset int64) (blockIndex int, isLargeBlock bool, innerBlockOffset int64) {
  42. largeRowSize := largeBlockLength * DataShardsCount
  43. nLargeBlockRows := datSize / (largeBlockLength * DataShardsCount)
  44. // if offset is within the large block area
  45. if offset < nLargeBlockRows*largeRowSize {
  46. isLargeBlock = true
  47. blockIndex, innerBlockOffset = locateOffsetWithinBlocks(largeBlockLength, offset)
  48. return
  49. }
  50. isLargeBlock = false
  51. offset -= nLargeBlockRows * largeRowSize
  52. blockIndex, innerBlockOffset = locateOffsetWithinBlocks(smallBlockLength, offset)
  53. return
  54. }
  55. func locateOffsetWithinBlocks(blockLength int64, offset int64) (blockIndex int, innerBlockOffset int64) {
  56. blockIndex = int(offset / blockLength)
  57. innerBlockOffset = offset % blockLength
  58. return
  59. }
  60. func (interval Interval) ToShardIdAndOffset(largeBlockSize, smallBlockSize int64) (ShardId, int64) {
  61. ecFileOffset := interval.InnerBlockOffset
  62. rowIndex := interval.BlockIndex / DataShardsCount
  63. if interval.IsLargeBlock {
  64. ecFileOffset += int64(rowIndex) * largeBlockSize
  65. } else {
  66. ecFileOffset += int64(interval.LargeBlockRowsCount)*largeBlockSize + int64(rowIndex)*smallBlockSize
  67. }
  68. ecFileIndex := interval.BlockIndex % DataShardsCount
  69. return ShardId(ecFileIndex), ecFileOffset
  70. }