local_manager.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package topic
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  5. "github.com/shirou/gopsutil/v3/cpu"
  6. "time"
  7. )
  8. // LocalTopicManager manages topics on local broker
  9. type LocalTopicManager struct {
  10. topics cmap.ConcurrentMap[string, *LocalTopic]
  11. }
  12. // NewLocalTopicManager creates a new LocalTopicManager
  13. func NewLocalTopicManager() *LocalTopicManager {
  14. return &LocalTopicManager{
  15. topics: cmap.New[*LocalTopic](),
  16. }
  17. }
  18. // AddTopic adds a topic to the local topic manager
  19. func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) {
  20. localTopic, ok := manager.topics.Get(topic.String())
  21. if !ok {
  22. localTopic = &LocalTopic{
  23. Topic: topic,
  24. Partitions: make([]*LocalPartition, 0),
  25. }
  26. }
  27. manager.topics.SetIfAbsent(topic.String(), localTopic)
  28. if localTopic.findPartition(localPartition.Partition) != nil {
  29. return
  30. }
  31. localTopic.Partitions = append(localTopic.Partitions, localPartition)
  32. }
  33. // GetTopic gets a topic from the local topic manager
  34. func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition {
  35. localTopic, ok := manager.topics.Get(topic.String())
  36. if !ok {
  37. return nil
  38. }
  39. return localTopic.findPartition(partition)
  40. }
  41. // RemoveTopic removes a topic from the local topic manager
  42. func (manager *LocalTopicManager) RemoveTopic(topic Topic) {
  43. manager.topics.Remove(topic.String())
  44. }
  45. func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) {
  46. localTopic, ok := manager.topics.Get(topic.String())
  47. if !ok {
  48. return false
  49. }
  50. return localTopic.removePartition(partition)
  51. }
  52. func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats {
  53. stats := &mq_pb.BrokerStats{
  54. Stats: make(map[string]*mq_pb.TopicPartitionStats),
  55. }
  56. manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
  57. for _, localPartition := range localTopic.Partitions {
  58. stats.Stats[topic] = &mq_pb.TopicPartitionStats{
  59. Topic: &mq_pb.Topic{
  60. Namespace: string(localTopic.Namespace),
  61. Name: localTopic.Name,
  62. },
  63. Partition: &mq_pb.Partition{
  64. RingSize: localPartition.RingSize,
  65. RangeStart: localPartition.RangeStart,
  66. RangeStop: localPartition.RangeStop,
  67. },
  68. ConsumerCount: localPartition.ConsumerCount,
  69. }
  70. }
  71. })
  72. // collect current broker's cpu usage
  73. usages, err := cpu.Percent(duration, false)
  74. if err == nil && len(usages) > 0 {
  75. stats.CpuUsagePercent = int32(usages[0])
  76. }
  77. return stats
  78. }