12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- package pub_balancer
- import (
- "fmt"
- cmap "github.com/orcaman/concurrent-map/v2"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- )
- type BrokerStats struct {
- TopicPartitionCount int32
- PublisherCount int32
- SubscriberCount int32
- CpuUsagePercent int32
- TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition
- Topics []topic.Topic
- }
- type TopicPartitionStats struct {
- topic.TopicPartition
- PublisherCount int32
- SubscriberCount int32
- }
- func NewBrokerStats() *BrokerStats {
- return &BrokerStats{
- TopicPartitionStats: cmap.New[*TopicPartitionStats](),
- }
- }
- func (bs *BrokerStats) String() string {
- return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, Publishers:%d, Subscribers:%d CpuUsagePercent:%d, Stats:%+v}",
- bs.TopicPartitionCount, bs.PublisherCount, bs.SubscriberCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
- }
- func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
- bs.TopicPartitionCount = int32(len(stats.Stats))
- bs.CpuUsagePercent = stats.CpuUsagePercent
- var publisherCount, subscriberCount int32
- currentTopicPartitions := bs.TopicPartitionStats.Items()
- for _, topicPartitionStats := range stats.Stats {
- tps := &TopicPartitionStats{
- TopicPartition: topic.TopicPartition{
- Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
- Partition: topic.Partition{
- RangeStart: topicPartitionStats.Partition.RangeStart,
- RangeStop: topicPartitionStats.Partition.RangeStop,
- RingSize: topicPartitionStats.Partition.RingSize,
- UnixTimeNs: topicPartitionStats.Partition.UnixTimeNs,
- },
- },
- PublisherCount: topicPartitionStats.PublisherCount,
- SubscriberCount: topicPartitionStats.SubscriberCount,
- }
- publisherCount += topicPartitionStats.PublisherCount
- subscriberCount += topicPartitionStats.SubscriberCount
- key := tps.TopicPartition.String()
- bs.TopicPartitionStats.Set(key, tps)
- delete(currentTopicPartitions, key)
- }
- // remove the topic partitions that are not in the stats
- for key := range currentTopicPartitions {
- bs.TopicPartitionStats.Remove(key)
- }
- bs.PublisherCount = publisherCount
- bs.SubscriberCount = subscriberCount
- }
- func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
- tps := &TopicPartitionStats{
- TopicPartition: topic.TopicPartition{
- Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
- Partition: topic.Partition{
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- RingSize: partition.RingSize,
- UnixTimeNs: partition.UnixTimeNs,
- },
- },
- PublisherCount: 0,
- SubscriberCount: 0,
- }
- key := tps.TopicPartition.String()
- if isAdd {
- bs.TopicPartitionStats.SetIfAbsent(key, tps)
- } else {
- bs.TopicPartitionStats.Remove(key)
- }
- }
|