partition.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package topic
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. "time"
  6. )
  7. const PartitionCount = 4096
  8. type Partition struct {
  9. RangeStart int32
  10. RangeStop int32 // exclusive
  11. RingSize int32
  12. UnixTimeNs int64 // in nanoseconds
  13. }
  14. func NewPartition(rangeStart, rangeStop, ringSize int32, unixTimeNs int64) *Partition {
  15. return &Partition{
  16. RangeStart: rangeStart,
  17. RangeStop: rangeStop,
  18. RingSize: ringSize,
  19. UnixTimeNs: unixTimeNs,
  20. }
  21. }
  22. func (partition Partition) Equals(other Partition) bool {
  23. if partition.RangeStart != other.RangeStart {
  24. return false
  25. }
  26. if partition.RangeStop != other.RangeStop {
  27. return false
  28. }
  29. if partition.RingSize != other.RingSize {
  30. return false
  31. }
  32. if partition.UnixTimeNs != other.UnixTimeNs {
  33. return false
  34. }
  35. return true
  36. }
  37. func FromPbPartition(partition *mq_pb.Partition) Partition {
  38. return Partition{
  39. RangeStart: partition.RangeStart,
  40. RangeStop: partition.RangeStop,
  41. RingSize: partition.RingSize,
  42. UnixTimeNs: partition.UnixTimeNs,
  43. }
  44. }
  45. func SplitPartitions(targetCount int32, ts int64) []*Partition {
  46. partitions := make([]*Partition, 0, targetCount)
  47. partitionSize := PartitionCount / targetCount
  48. for i := int32(0); i < targetCount; i++ {
  49. partitionStop := (i + 1) * partitionSize
  50. if i == targetCount-1 {
  51. partitionStop = PartitionCount
  52. }
  53. partitions = append(partitions, &Partition{
  54. RangeStart: i * partitionSize,
  55. RangeStop: partitionStop,
  56. RingSize: PartitionCount,
  57. UnixTimeNs: ts,
  58. })
  59. }
  60. return partitions
  61. }
  62. func (partition Partition) ToPbPartition() *mq_pb.Partition {
  63. return &mq_pb.Partition{
  64. RangeStart: partition.RangeStart,
  65. RangeStop: partition.RangeStop,
  66. RingSize: partition.RingSize,
  67. UnixTimeNs: partition.UnixTimeNs,
  68. }
  69. }
  70. func (partition Partition) Overlaps(partition2 Partition) bool {
  71. if partition.RangeStart >= partition2.RangeStop {
  72. return false
  73. }
  74. if partition.RangeStop <= partition2.RangeStart {
  75. return false
  76. }
  77. return true
  78. }
  79. func (partition Partition) String() string {
  80. return fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop)
  81. }
  82. func ParseTopicVersion(name string) (t time.Time, err error) {
  83. return time.Parse(PartitionGenerationFormat, name)
  84. }
  85. func ParsePartitionBoundary(name string) (start, stop int32) {
  86. _, err := fmt.Sscanf(name, "%04d-%04d", &start, &stop)
  87. if err != nil {
  88. return 0, 0
  89. }
  90. return start, stop
  91. }
  92. func PartitionDir(t Topic, p Partition) string {
  93. partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(PartitionGenerationFormat)
  94. return fmt.Sprintf("%s/%s/%04d-%04d", t.Dir(), partitionGeneration, p.RangeStart, p.RangeStop)
  95. }