broker_stats.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package pub_balancer
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. )
  8. type BrokerStats struct {
  9. TopicPartitionCount int32
  10. ConsumerCount int32
  11. CpuUsagePercent int32
  12. TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition
  13. Topics []topic.Topic
  14. }
  15. type TopicPartitionStats struct {
  16. topic.TopicPartition
  17. ConsumerCount int32
  18. IsLeader bool
  19. }
  20. func NewBrokerStats() *BrokerStats {
  21. return &BrokerStats{
  22. TopicPartitionStats: cmap.New[*TopicPartitionStats](),
  23. }
  24. }
  25. func (bs *BrokerStats) String() string {
  26. return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
  27. bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
  28. }
  29. func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
  30. bs.TopicPartitionCount = int32(len(stats.Stats))
  31. bs.CpuUsagePercent = stats.CpuUsagePercent
  32. var consumerCount int32
  33. currentTopicPartitions := bs.TopicPartitionStats.Items()
  34. for _, topicPartitionStats := range stats.Stats {
  35. tps := &TopicPartitionStats{
  36. TopicPartition: topic.TopicPartition{
  37. Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
  38. Partition: topic.Partition{RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, RingSize: topicPartitionStats.Partition.RingSize},
  39. },
  40. ConsumerCount: topicPartitionStats.ConsumerCount,
  41. IsLeader: topicPartitionStats.IsLeader,
  42. }
  43. consumerCount += topicPartitionStats.ConsumerCount
  44. key := tps.TopicPartition.String()
  45. bs.TopicPartitionStats.Set(key, tps)
  46. delete(currentTopicPartitions, key)
  47. }
  48. // remove the topic partitions that are not in the stats
  49. for key := range currentTopicPartitions {
  50. bs.TopicPartitionStats.Remove(key)
  51. }
  52. bs.ConsumerCount = consumerCount
  53. }
  54. func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) {
  55. tps := &TopicPartitionStats{
  56. TopicPartition: topic.TopicPartition{
  57. Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
  58. Partition: topic.Partition{RangeStart: partition.RangeStart, RangeStop: partition.RangeStop},
  59. },
  60. ConsumerCount: 0,
  61. IsLeader: true,
  62. }
  63. key := tps.TopicPartition.String()
  64. bs.TopicPartitionStats.Set(key, tps)
  65. }