partition.go 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package topic
  2. import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  3. const PartitionCount = 4096
  4. type Partition struct {
  5. RangeStart int32
  6. RangeStop int32 // exclusive
  7. RingSize int32
  8. UnixTimeNs int64 // in nanoseconds
  9. }
  10. func NewPartition(rangeStart, rangeStop, ringSize int32, unixTimeNs int64) *Partition {
  11. return &Partition{
  12. RangeStart: rangeStart,
  13. RangeStop: rangeStop,
  14. RingSize: ringSize,
  15. UnixTimeNs: unixTimeNs,
  16. }
  17. }
  18. func (partition Partition) Equals(other Partition) bool {
  19. if partition.RangeStart != other.RangeStart {
  20. return false
  21. }
  22. if partition.RangeStop != other.RangeStop {
  23. return false
  24. }
  25. if partition.RingSize != other.RingSize {
  26. return false
  27. }
  28. if partition.UnixTimeNs != other.UnixTimeNs {
  29. return false
  30. }
  31. return true
  32. }
  33. func FromPbPartition(partition *mq_pb.Partition) Partition {
  34. return Partition{
  35. RangeStart: partition.RangeStart,
  36. RangeStop: partition.RangeStop,
  37. RingSize: partition.RingSize,
  38. UnixTimeNs: partition.UnixTimeNs,
  39. }
  40. }
  41. func SplitPartitions(targetCount int32, ts int64) []*Partition {
  42. partitions := make([]*Partition, 0, targetCount)
  43. partitionSize := PartitionCount / targetCount
  44. for i := int32(0); i < targetCount; i++ {
  45. partitionStop := (i + 1) * partitionSize
  46. if i == targetCount-1 {
  47. partitionStop = PartitionCount
  48. }
  49. partitions = append(partitions, &Partition{
  50. RangeStart: i * partitionSize,
  51. RangeStop: partitionStop,
  52. RingSize: PartitionCount,
  53. UnixTimeNs: ts,
  54. })
  55. }
  56. return partitions
  57. }
  58. func (partition Partition) ToPbPartition() *mq_pb.Partition {
  59. return &mq_pb.Partition{
  60. RangeStart: partition.RangeStart,
  61. RangeStop: partition.RangeStop,
  62. RingSize: partition.RingSize,
  63. UnixTimeNs: partition.UnixTimeNs,
  64. }
  65. }