balancer.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package balancer
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. )
  7. const (
  8. MaxPartitionCount = 8 * 9 * 5 * 7 //2520
  9. LockBrokerBalancer = "broker_balancer"
  10. )
  11. type Balancer struct {
  12. Brokers cmap.ConcurrentMap[string, *BrokerStats]
  13. }
  14. type BrokerStats struct {
  15. TopicPartitionCount int32
  16. ConsumerCount int32
  17. CpuUsagePercent int32
  18. Stats cmap.ConcurrentMap[string, *TopicPartitionStats]
  19. }
  20. func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
  21. bs.TopicPartitionCount = int32(len(stats.Stats))
  22. bs.CpuUsagePercent = stats.CpuUsagePercent
  23. var consumerCount int32
  24. currentTopicPartitions := bs.Stats.Items()
  25. for _, topicPartitionStats := range stats.Stats {
  26. tps := &TopicPartitionStats{
  27. TopicPartition: TopicPartition{
  28. Namespace: topicPartitionStats.Topic.Namespace,
  29. Topic: topicPartitionStats.Topic.Name,
  30. RangeStart: topicPartitionStats.Partition.RangeStart,
  31. RangeStop: topicPartitionStats.Partition.RangeStop,
  32. },
  33. ConsumerCount: topicPartitionStats.ConsumerCount,
  34. IsLeader: topicPartitionStats.IsLeader,
  35. }
  36. consumerCount += topicPartitionStats.ConsumerCount
  37. key := tps.TopicPartition.String()
  38. bs.Stats.Set(key, tps)
  39. delete(currentTopicPartitions, key)
  40. }
  41. // remove the topic partitions that are not in the stats
  42. for key := range currentTopicPartitions {
  43. bs.Stats.Remove(key)
  44. }
  45. bs.ConsumerCount = consumerCount
  46. }
  47. func (bs *BrokerStats) String() string {
  48. return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
  49. bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.Stats.Items())
  50. }
  51. type TopicPartition struct {
  52. Namespace string
  53. Topic string
  54. RangeStart int32
  55. RangeStop int32
  56. }
  57. type TopicPartitionStats struct {
  58. TopicPartition
  59. ConsumerCount int32
  60. IsLeader bool
  61. }
  62. func NewBalancer() *Balancer {
  63. return &Balancer{
  64. Brokers: cmap.New[*BrokerStats](),
  65. }
  66. }
  67. func NewBrokerStats() *BrokerStats {
  68. return &BrokerStats{
  69. Stats: cmap.New[*TopicPartitionStats](),
  70. }
  71. }
  72. func (tp *TopicPartition) String() string {
  73. return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop)
  74. }