coordinator.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package sub_coordinator
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. )
  7. type TopicConsumerGroups struct {
  8. // map a consumer group name to a consumer group
  9. ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
  10. }
  11. // Coordinator coordinates the instances in the consumer group for one topic.
  12. // It is responsible for:
  13. // 1. (Maybe) assigning partitions when a consumer instance is up/down.
  14. type Coordinator struct {
  15. // map topic name to consumer groups
  16. TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
  17. balancer *pub_balancer.Balancer
  18. }
  19. func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
  20. return &Coordinator{
  21. TopicSubscribers: cmap.New[*TopicConsumerGroups](),
  22. balancer: balancer,
  23. }
  24. }
  25. func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups {
  26. topicName := toTopicName(topic)
  27. tcg, _ := c.TopicSubscribers.Get(topicName)
  28. if tcg == nil && createIfMissing {
  29. tcg = &TopicConsumerGroups{
  30. ConsumerGroups: cmap.New[*ConsumerGroup](),
  31. }
  32. if !c.TopicSubscribers.SetIfAbsent(topicName, tcg) {
  33. tcg, _ = c.TopicSubscribers.Get(topicName)
  34. }
  35. }
  36. return tcg
  37. }
  38. func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic) {
  39. topicName := toTopicName(topic)
  40. c.TopicSubscribers.Remove(topicName)
  41. }
  42. func toTopicName(topic *mq_pb.Topic) string {
  43. topicName := topic.Namespace + "." + topic.Name
  44. return topicName
  45. }
  46. func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance {
  47. tcg := c.GetTopicConsumerGroups(topic, true)
  48. cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
  49. if cg == nil {
  50. cg = NewConsumerGroup(topic, c.balancer)
  51. if !tcg.ConsumerGroups.SetIfAbsent(consumerGroup, cg) {
  52. cg, _ = tcg.ConsumerGroups.Get(consumerGroup)
  53. }
  54. }
  55. cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance)
  56. if cgi == nil {
  57. cgi = NewConsumerGroupInstance(consumerGroupInstance)
  58. if !cg.ConsumerGroupInstances.SetIfAbsent(consumerGroupInstance, cgi) {
  59. cgi, _ = cg.ConsumerGroupInstances.Get(consumerGroupInstance)
  60. }
  61. }
  62. cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic)
  63. return cgi
  64. }
  65. func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) {
  66. tcg := c.GetTopicConsumerGroups(topic, false)
  67. if tcg == nil {
  68. return
  69. }
  70. cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
  71. if cg == nil {
  72. return
  73. }
  74. cg.ConsumerGroupInstances.Remove(consumerGroupInstance)
  75. cg.OnRemoveConsumerGroupInstance(consumerGroupInstance, topic)
  76. if cg.ConsumerGroupInstances.Count() == 0 {
  77. tcg.ConsumerGroups.Remove(consumerGroup)
  78. }
  79. if tcg.ConsumerGroups.Count() == 0 {
  80. c.RemoveTopic(topic)
  81. }
  82. }
  83. func (c *Coordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) {
  84. tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
  85. if tcg == nil {
  86. return
  87. }
  88. for _, cg := range tcg.ConsumerGroups.Items() {
  89. cg.OnPartitionListChange(assignments)
  90. }
  91. }
  92. // OnSubAddBroker is called when a broker is added to the balancer
  93. func (c *Coordinator) OnSubAddBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
  94. }
  95. // OnSubRemoveBroker is called when a broker is removed from the balancer
  96. func (c *Coordinator) OnSubRemoveBroker(broker string, brokerStats *pub_balancer.BrokerStats) {
  97. }