broker_stats.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. package pub_balancer
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. )
  8. type BrokerStats struct {
  9. TopicPartitionCount int32
  10. PublisherCount int32
  11. SubscriberCount int32
  12. CpuUsagePercent int32
  13. TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition
  14. Topics []topic.Topic
  15. }
  16. type TopicPartitionStats struct {
  17. topic.TopicPartition
  18. PublisherCount int32
  19. SubscriberCount int32
  20. }
  21. func NewBrokerStats() *BrokerStats {
  22. return &BrokerStats{
  23. TopicPartitionStats: cmap.New[*TopicPartitionStats](),
  24. }
  25. }
  26. func (bs *BrokerStats) String() string {
  27. return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, Publishers:%d, Subscribers:%d CpuUsagePercent:%d, Stats:%+v}",
  28. bs.TopicPartitionCount, bs.PublisherCount, bs.SubscriberCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
  29. }
  30. func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
  31. bs.TopicPartitionCount = int32(len(stats.Stats))
  32. bs.CpuUsagePercent = stats.CpuUsagePercent
  33. var publisherCount, subscriberCount int32
  34. currentTopicPartitions := bs.TopicPartitionStats.Items()
  35. for _, topicPartitionStats := range stats.Stats {
  36. tps := &TopicPartitionStats{
  37. TopicPartition: topic.TopicPartition{
  38. Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
  39. Partition: topic.Partition{
  40. RangeStart: topicPartitionStats.Partition.RangeStart,
  41. RangeStop: topicPartitionStats.Partition.RangeStop,
  42. RingSize: topicPartitionStats.Partition.RingSize,
  43. UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
  44. },
  45. },
  46. PublisherCount: topicPartitionStats.PublisherCount,
  47. SubscriberCount: topicPartitionStats.SubscriberCount,
  48. }
  49. publisherCount += topicPartitionStats.PublisherCount
  50. subscriberCount += topicPartitionStats.SubscriberCount
  51. key := tps.TopicPartition.String()
  52. bs.TopicPartitionStats.Set(key, tps)
  53. delete(currentTopicPartitions, key)
  54. }
  55. // remove the topic partitions that are not in the stats
  56. for key := range currentTopicPartitions {
  57. bs.TopicPartitionStats.Remove(key)
  58. }
  59. bs.PublisherCount = publisherCount
  60. bs.SubscriberCount = subscriberCount
  61. }
  62. func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
  63. tps := &TopicPartitionStats{
  64. TopicPartition: topic.TopicPartition{
  65. Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
  66. Partition: topic.Partition{
  67. RangeStart: partition.RangeStart,
  68. RangeStop: partition.RangeStop,
  69. RingSize: partition.RingSize,
  70. UnixTimeNs: partition.UnixTimeNs,
  71. },
  72. },
  73. PublisherCount: 0,
  74. SubscriberCount: 0,
  75. }
  76. key := tps.TopicPartition.String()
  77. if isAdd {
  78. bs.TopicPartitionStats.SetIfAbsent(key, tps)
  79. } else {
  80. bs.TopicPartitionStats.Remove(key)
  81. }
  82. }