partition.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package topic
  2. import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  3. const PartitionCount = 4096
  4. type Partition struct {
  5. RangeStart int32
  6. RangeStop int32 // exclusive
  7. RingSize int32
  8. UnixTimeNs int64 // in nanoseconds
  9. }
  10. func (partition Partition) Equals(other Partition) bool {
  11. if partition.RangeStart != other.RangeStart {
  12. return false
  13. }
  14. if partition.RangeStop != other.RangeStop {
  15. return false
  16. }
  17. if partition.RingSize != other.RingSize {
  18. return false
  19. }
  20. if partition.UnixTimeNs != other.UnixTimeNs {
  21. return false
  22. }
  23. return true
  24. }
  25. func FromPbPartition(partition *mq_pb.Partition) Partition {
  26. return Partition{
  27. RangeStart: partition.RangeStart,
  28. RangeStop: partition.RangeStop,
  29. RingSize: partition.RingSize,
  30. UnixTimeNs: partition.UnixTimeNs,
  31. }
  32. }
  33. func SplitPartitions(targetCount int32, ts int64) []*Partition {
  34. partitions := make([]*Partition, 0, targetCount)
  35. partitionSize := PartitionCount / targetCount
  36. for i := int32(0); i < targetCount; i++ {
  37. partitionStop := (i + 1) * partitionSize
  38. if i == targetCount-1 {
  39. partitionStop = PartitionCount
  40. }
  41. partitions = append(partitions, &Partition{
  42. RangeStart: i * partitionSize,
  43. RangeStop: partitionStop,
  44. RingSize: PartitionCount,
  45. UnixTimeNs: ts,
  46. })
  47. }
  48. return partitions
  49. }
  50. func (partition Partition) ToPbPartition() *mq_pb.Partition {
  51. return &mq_pb.Partition{
  52. RangeStart: partition.RangeStart,
  53. RangeStop: partition.RangeStop,
  54. RingSize: partition.RingSize,
  55. UnixTimeNs: partition.UnixTimeNs,
  56. }
  57. }