partition.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package topic
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. )
  6. const PartitionCount = 4096
  7. type Partition struct {
  8. RangeStart int32
  9. RangeStop int32 // exclusive
  10. RingSize int32
  11. UnixTimeNs int64 // in nanoseconds
  12. }
  13. func NewPartition(rangeStart, rangeStop, ringSize int32, unixTimeNs int64) *Partition {
  14. return &Partition{
  15. RangeStart: rangeStart,
  16. RangeStop: rangeStop,
  17. RingSize: ringSize,
  18. UnixTimeNs: unixTimeNs,
  19. }
  20. }
  21. func (partition Partition) Equals(other Partition) bool {
  22. if partition.RangeStart != other.RangeStart {
  23. return false
  24. }
  25. if partition.RangeStop != other.RangeStop {
  26. return false
  27. }
  28. if partition.RingSize != other.RingSize {
  29. return false
  30. }
  31. if partition.UnixTimeNs != other.UnixTimeNs {
  32. return false
  33. }
  34. return true
  35. }
  36. func FromPbPartition(partition *mq_pb.Partition) Partition {
  37. return Partition{
  38. RangeStart: partition.RangeStart,
  39. RangeStop: partition.RangeStop,
  40. RingSize: partition.RingSize,
  41. UnixTimeNs: partition.UnixTimeNs,
  42. }
  43. }
  44. func SplitPartitions(targetCount int32, ts int64) []*Partition {
  45. partitions := make([]*Partition, 0, targetCount)
  46. partitionSize := PartitionCount / targetCount
  47. for i := int32(0); i < targetCount; i++ {
  48. partitionStop := (i + 1) * partitionSize
  49. if i == targetCount-1 {
  50. partitionStop = PartitionCount
  51. }
  52. partitions = append(partitions, &Partition{
  53. RangeStart: i * partitionSize,
  54. RangeStop: partitionStop,
  55. RingSize: PartitionCount,
  56. UnixTimeNs: ts,
  57. })
  58. }
  59. return partitions
  60. }
  61. func (partition Partition) ToPbPartition() *mq_pb.Partition {
  62. return &mq_pb.Partition{
  63. RangeStart: partition.RangeStart,
  64. RangeStop: partition.RangeStop,
  65. RingSize: partition.RingSize,
  66. UnixTimeNs: partition.UnixTimeNs,
  67. }
  68. }
  69. func (partition Partition) Overlaps(partition2 Partition) bool {
  70. if partition.RangeStart >= partition2.RangeStop {
  71. return false
  72. }
  73. if partition.RangeStop <= partition2.RangeStart {
  74. return false
  75. }
  76. return true
  77. }
  78. func (partition Partition) String() string {
  79. return fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop)
  80. }
  81. func ToString(partition *mq_pb.Partition) string {
  82. return fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop)
  83. }