coordinator.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package sub_coordinator
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. )
  7. type TopicConsumerGroups struct {
  8. // map a consumer group name to a consumer group
  9. ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
  10. }
  11. // Coordinator coordinates the instances in the consumer group for one topic.
  12. // It is responsible for:
  13. // 1. (Maybe) assigning partitions when a consumer instance is up/down.
  14. type Coordinator struct {
  15. // map topic name to consumer groups
  16. TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
  17. balancer *pub_balancer.Balancer
  18. }
  19. func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
  20. return &Coordinator{
  21. TopicSubscribers: cmap.New[*TopicConsumerGroups](),
  22. balancer: balancer,
  23. }
  24. }
  25. func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic) *TopicConsumerGroups {
  26. topicName := toTopicName(topic)
  27. tcg, _ := c.TopicSubscribers.Get(topicName)
  28. if tcg == nil {
  29. tcg = &TopicConsumerGroups{
  30. ConsumerGroups: cmap.New[*ConsumerGroup](),
  31. }
  32. c.TopicSubscribers.Set(topicName, tcg)
  33. }
  34. return tcg
  35. }
  36. func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic) {
  37. topicName := toTopicName(topic)
  38. c.TopicSubscribers.Remove(topicName)
  39. }
  40. func toTopicName(topic *mq_pb.Topic) string {
  41. topicName := topic.Namespace + "." + topic.Name
  42. return topicName
  43. }
  44. func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance {
  45. tcg := c.GetTopicConsumerGroups(topic)
  46. cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
  47. if cg == nil {
  48. cg = NewConsumerGroup()
  49. tcg.ConsumerGroups.Set(consumerGroup, cg)
  50. }
  51. cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance)
  52. if cgi == nil {
  53. cgi = NewConsumerGroupInstance(consumerGroupInstance)
  54. cg.ConsumerGroupInstances.Set(consumerGroupInstance, cgi)
  55. }
  56. cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic)
  57. return cgi
  58. }
  59. func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) {
  60. tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
  61. if tcg == nil {
  62. return
  63. }
  64. cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
  65. if cg == nil {
  66. return
  67. }
  68. cg.ConsumerGroupInstances.Remove(consumerGroupInstance)
  69. cg.OnRemoveConsumerGroupInstance(consumerGroupInstance, topic)
  70. if cg.ConsumerGroupInstances.Count() == 0 {
  71. tcg.ConsumerGroups.Remove(consumerGroup)
  72. }
  73. if tcg.ConsumerGroups.Count() == 0 {
  74. c.RemoveTopic(topic)
  75. }
  76. }