balancer.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package balancer
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. )
  8. const (
  9. MaxPartitionCount = 8 * 9 * 5 * 7 //2520
  10. LockBrokerBalancer = "broker_balancer"
  11. )
  12. type Balancer struct {
  13. Brokers cmap.ConcurrentMap[string, *BrokerStats]
  14. }
  15. type BrokerStats struct {
  16. TopicPartitionCount int32
  17. ConsumerCount int32
  18. CpuUsagePercent int32
  19. Stats cmap.ConcurrentMap[string, *TopicPartitionStats]
  20. }
  21. func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
  22. bs.TopicPartitionCount = int32(len(stats.Stats))
  23. bs.CpuUsagePercent = stats.CpuUsagePercent
  24. var consumerCount int32
  25. currentTopicPartitions := bs.Stats.Items()
  26. for _, topicPartitionStats := range stats.Stats {
  27. tps := &TopicPartitionStats{
  28. TopicPartition: topic.TopicPartition{
  29. Namespace: topicPartitionStats.Topic.Namespace,
  30. Topic: topicPartitionStats.Topic.Name,
  31. RangeStart: topicPartitionStats.Partition.RangeStart,
  32. RangeStop: topicPartitionStats.Partition.RangeStop,
  33. },
  34. ConsumerCount: topicPartitionStats.ConsumerCount,
  35. IsLeader: topicPartitionStats.IsLeader,
  36. }
  37. consumerCount += topicPartitionStats.ConsumerCount
  38. key := tps.TopicPartition.String()
  39. bs.Stats.Set(key, tps)
  40. delete(currentTopicPartitions, key)
  41. }
  42. // remove the topic partitions that are not in the stats
  43. for key := range currentTopicPartitions {
  44. bs.Stats.Remove(key)
  45. }
  46. bs.ConsumerCount = consumerCount
  47. }
  48. func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) {
  49. tps := &TopicPartitionStats{
  50. TopicPartition: topic.TopicPartition{
  51. Namespace: t.Namespace,
  52. Topic: t.Name,
  53. RangeStart: partition.RangeStart,
  54. RangeStop: partition.RangeStop,
  55. },
  56. ConsumerCount: 0,
  57. IsLeader: true,
  58. }
  59. key := tps.TopicPartition.String()
  60. bs.Stats.Set(key, tps)
  61. }
  62. func (bs *BrokerStats) String() string {
  63. return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
  64. bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.Stats.Items())
  65. }
  66. type TopicPartitionStats struct {
  67. topic.TopicPartition
  68. ConsumerCount int32
  69. IsLeader bool
  70. }
  71. func NewBalancer() *Balancer {
  72. return &Balancer{
  73. Brokers: cmap.New[*BrokerStats](),
  74. }
  75. }
  76. func NewBrokerStats() *BrokerStats {
  77. return &BrokerStats{
  78. Stats: cmap.New[*TopicPartitionStats](),
  79. }
  80. }