sub_coordinator.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package sub_coordinator
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/filer_client"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. )
  8. type TopicConsumerGroups struct {
  9. // map a consumer group name to a consumer group
  10. ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
  11. }
  12. // SubCoordinator coordinates the instances in the consumer group for one topic.
  13. // It is responsible for:
  14. // 1. (Maybe) assigning partitions when a consumer instance is up/down.
  15. type SubCoordinator struct {
  16. // map topic name to consumer groups
  17. TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
  18. FilerClientAccessor *filer_client.FilerClientAccessor
  19. }
  20. func NewSubCoordinator() *SubCoordinator {
  21. return &SubCoordinator{
  22. TopicSubscribers: cmap.New[*TopicConsumerGroups](),
  23. }
  24. }
  25. func (c *SubCoordinator) GetTopicConsumerGroups(topic *mq_pb.Topic, createIfMissing bool) *TopicConsumerGroups {
  26. topicName := toTopicName(topic)
  27. tcg, _ := c.TopicSubscribers.Get(topicName)
  28. if tcg == nil && createIfMissing {
  29. tcg = &TopicConsumerGroups{
  30. ConsumerGroups: cmap.New[*ConsumerGroup](),
  31. }
  32. if !c.TopicSubscribers.SetIfAbsent(topicName, tcg) {
  33. tcg, _ = c.TopicSubscribers.Get(topicName)
  34. }
  35. }
  36. return tcg
  37. }
  38. func (c *SubCoordinator) RemoveTopic(topic *mq_pb.Topic) {
  39. topicName := toTopicName(topic)
  40. c.TopicSubscribers.Remove(topicName)
  41. }
  42. func toTopicName(topic *mq_pb.Topic) string {
  43. topicName := topic.Namespace + "." + topic.Name
  44. return topicName
  45. }
  46. func (c *SubCoordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) (*ConsumerGroup, *ConsumerGroupInstance, error) {
  47. tcg := c.GetTopicConsumerGroups(initMessage.Topic, true)
  48. cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
  49. if cg == nil {
  50. cg = NewConsumerGroup(initMessage.Topic, initMessage.RebalanceSeconds, c.FilerClientAccessor)
  51. if cg != nil {
  52. tcg.ConsumerGroups.SetIfAbsent(initMessage.ConsumerGroup, cg)
  53. }
  54. cg, _ = tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
  55. }
  56. if cg == nil {
  57. return nil, nil, fmt.Errorf("fail to create consumer group %s: topic %s not found", initMessage.ConsumerGroup, initMessage.Topic)
  58. }
  59. cgi, _ := cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
  60. if cgi == nil {
  61. cgi = NewConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.MaxPartitionCount)
  62. if !cg.ConsumerGroupInstances.SetIfAbsent(initMessage.ConsumerGroupInstanceId, cgi) {
  63. cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
  64. }
  65. }
  66. cgi.MaxPartitionCount = initMessage.MaxPartitionCount
  67. cg.Market.AddConsumerInstance(cgi)
  68. return cg, cgi, nil
  69. }
  70. func (c *SubCoordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) {
  71. tcg := c.GetTopicConsumerGroups(initMessage.Topic, false)
  72. if tcg == nil {
  73. return
  74. }
  75. cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
  76. if cg == nil {
  77. return
  78. }
  79. cg.ConsumerGroupInstances.Remove(initMessage.ConsumerGroupInstanceId)
  80. cg.Market.RemoveConsumerInstance(ConsumerGroupInstanceId(initMessage.ConsumerGroupInstanceId))
  81. if cg.ConsumerGroupInstances.Count() == 0 {
  82. tcg.ConsumerGroups.Remove(initMessage.ConsumerGroup)
  83. cg.Shutdown()
  84. }
  85. if tcg.ConsumerGroups.Count() == 0 {
  86. c.RemoveTopic(initMessage.Topic)
  87. }
  88. }
  89. func (c *SubCoordinator) OnPartitionChange(topic *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) {
  90. tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
  91. if tcg == nil {
  92. return
  93. }
  94. for _, cg := range tcg.ConsumerGroups.Items() {
  95. cg.OnPartitionListChange(assignments)
  96. }
  97. }