sub_coordinator.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package sub_coordinator
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/filer_client"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  8. )
  9. type TopicConsumerGroups struct {
  10. // map a consumer group name to a consumer group
  11. ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
  12. }
  13. // SubCoordinator coordinates the instances in the consumer group for one topic.
  14. // It is responsible for:
  15. // 1. (Maybe) assigning partitions when a consumer instance is up/down.
  16. type SubCoordinator struct {
  17. // map topic name to consumer groups
  18. TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
  19. FilerClientAccessor *filer_client.FilerClientAccessor
  20. }
  21. func NewSubCoordinator() *SubCoordinator {
  22. return &SubCoordinator{
  23. TopicSubscribers: cmap.New[*TopicConsumerGroups](),
  24. }
  25. }
  26. func (c *SubCoordinator) GetTopicConsumerGroups(topic *schema_pb.Topic, createIfMissing bool) *TopicConsumerGroups {
  27. topicName := toTopicName(topic)
  28. tcg, _ := c.TopicSubscribers.Get(topicName)
  29. if tcg == nil && createIfMissing {
  30. tcg = &TopicConsumerGroups{
  31. ConsumerGroups: cmap.New[*ConsumerGroup](),
  32. }
  33. if !c.TopicSubscribers.SetIfAbsent(topicName, tcg) {
  34. tcg, _ = c.TopicSubscribers.Get(topicName)
  35. }
  36. }
  37. return tcg
  38. }
  39. func (c *SubCoordinator) RemoveTopic(topic *schema_pb.Topic) {
  40. topicName := toTopicName(topic)
  41. c.TopicSubscribers.Remove(topicName)
  42. }
  43. func toTopicName(topic *schema_pb.Topic) string {
  44. topicName := topic.Namespace + "." + topic.Name
  45. return topicName
  46. }
  47. func (c *SubCoordinator) AddSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) (*ConsumerGroup, *ConsumerGroupInstance, error) {
  48. tcg := c.GetTopicConsumerGroups(initMessage.Topic, true)
  49. cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
  50. if cg == nil {
  51. cg = NewConsumerGroup(initMessage.Topic, initMessage.RebalanceSeconds, c.FilerClientAccessor)
  52. if cg != nil {
  53. tcg.ConsumerGroups.SetIfAbsent(initMessage.ConsumerGroup, cg)
  54. }
  55. cg, _ = tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
  56. }
  57. if cg == nil {
  58. return nil, nil, fmt.Errorf("fail to create consumer group %s: topic %s not found", initMessage.ConsumerGroup, initMessage.Topic)
  59. }
  60. cgi, _ := cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
  61. if cgi == nil {
  62. cgi = NewConsumerGroupInstance(initMessage.ConsumerGroupInstanceId, initMessage.MaxPartitionCount)
  63. if !cg.ConsumerGroupInstances.SetIfAbsent(initMessage.ConsumerGroupInstanceId, cgi) {
  64. cgi, _ = cg.ConsumerGroupInstances.Get(initMessage.ConsumerGroupInstanceId)
  65. }
  66. }
  67. cgi.MaxPartitionCount = initMessage.MaxPartitionCount
  68. cg.Market.AddConsumerInstance(cgi)
  69. return cg, cgi, nil
  70. }
  71. func (c *SubCoordinator) RemoveSubscriber(initMessage *mq_pb.SubscriberToSubCoordinatorRequest_InitMessage) {
  72. tcg := c.GetTopicConsumerGroups(initMessage.Topic, false)
  73. if tcg == nil {
  74. return
  75. }
  76. cg, _ := tcg.ConsumerGroups.Get(initMessage.ConsumerGroup)
  77. if cg == nil {
  78. return
  79. }
  80. cg.ConsumerGroupInstances.Remove(initMessage.ConsumerGroupInstanceId)
  81. cg.Market.RemoveConsumerInstance(ConsumerGroupInstanceId(initMessage.ConsumerGroupInstanceId))
  82. if cg.ConsumerGroupInstances.Count() == 0 {
  83. tcg.ConsumerGroups.Remove(initMessage.ConsumerGroup)
  84. cg.Shutdown()
  85. }
  86. if tcg.ConsumerGroups.Count() == 0 {
  87. c.RemoveTopic(initMessage.Topic)
  88. }
  89. }
  90. func (c *SubCoordinator) OnPartitionChange(topic *schema_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) {
  91. tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
  92. if tcg == nil {
  93. return
  94. }
  95. for _, cg := range tcg.ConsumerGroups.Items() {
  96. cg.OnPartitionListChange(assignments)
  97. }
  98. }