balancer.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package balancer
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. )
  7. const (
  8. MaxPartitionCount = 8 * 9 * 5 * 7 //2520
  9. )
  10. type Balancer struct {
  11. Brokers cmap.ConcurrentMap[string, *BrokerStats]
  12. }
  13. type BrokerStats struct {
  14. TopicPartitionCount int32
  15. ConsumerCount int32
  16. CpuUsagePercent int32
  17. Stats cmap.ConcurrentMap[string, *TopicPartitionStats]
  18. }
  19. func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
  20. bs.TopicPartitionCount = int32(len(stats.Stats))
  21. bs.CpuUsagePercent = stats.CpuUsagePercent
  22. var consumerCount int32
  23. currentTopicPartitions := bs.Stats.Items()
  24. for _, topicPartitionStats := range stats.Stats {
  25. tps := &TopicPartitionStats{
  26. TopicPartition: TopicPartition{
  27. Namespace: topicPartitionStats.Topic.Namespace,
  28. Topic: topicPartitionStats.Topic.Name,
  29. RangeStart: topicPartitionStats.Partition.RangeStart,
  30. RangeStop: topicPartitionStats.Partition.RangeStop,
  31. },
  32. ConsumerCount: topicPartitionStats.ConsumerCount,
  33. IsLeader: topicPartitionStats.IsLeader,
  34. }
  35. consumerCount += topicPartitionStats.ConsumerCount
  36. key := tps.TopicPartition.String()
  37. bs.Stats.Set(key, tps)
  38. delete(currentTopicPartitions, key)
  39. }
  40. // remove the topic partitions that are not in the stats
  41. for key := range currentTopicPartitions {
  42. bs.Stats.Remove(key)
  43. }
  44. bs.ConsumerCount = consumerCount
  45. }
  46. type TopicPartition struct {
  47. Namespace string
  48. Topic string
  49. RangeStart int32
  50. RangeStop int32
  51. }
  52. type TopicPartitionStats struct {
  53. TopicPartition
  54. ConsumerCount int32
  55. IsLeader bool
  56. }
  57. func NewBalancer() *Balancer {
  58. return &Balancer{
  59. Brokers: cmap.New[*BrokerStats](),
  60. }
  61. }
  62. func NewBrokerStats() *BrokerStats {
  63. return &BrokerStats{
  64. Stats: cmap.New[*TopicPartitionStats](),
  65. }
  66. }
  67. func (tp *TopicPartition) String() string {
  68. return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop)
  69. }