consumer_group.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package sub_coordinator
  2. import (
  3. cmap "github.com/orcaman/concurrent-map/v2"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "time"
  9. )
  10. type ConsumerGroupInstance struct {
  11. InstanceId string
  12. // the consumer group instance may not have an active partition
  13. Partitions []*topic.Partition
  14. ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
  15. }
  16. type ConsumerGroup struct {
  17. topic topic.Topic
  18. // map a consumer group instance id to a consumer group instance
  19. ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
  20. mapping *PartitionConsumerMapping
  21. reBalanceTimer *time.Timer
  22. pubBalancer *pub_balancer.Balancer
  23. }
  24. func NewConsumerGroup(t *mq_pb.Topic, pubBalancer *pub_balancer.Balancer) *ConsumerGroup {
  25. return &ConsumerGroup{
  26. topic: topic.FromPbTopic(t),
  27. ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
  28. mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount),
  29. pubBalancer: pubBalancer,
  30. }
  31. }
  32. func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
  33. return &ConsumerGroupInstance{
  34. InstanceId: instanceId,
  35. ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
  36. }
  37. }
  38. func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
  39. cg.onConsumerGroupInstanceChange("add consumer instance " + consumerGroupInstance)
  40. }
  41. func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
  42. cg.onConsumerGroupInstanceChange("remove consumer instance " + consumerGroupInstance)
  43. }
  44. func (cg *ConsumerGroup) onConsumerGroupInstanceChange(reason string) {
  45. if cg.reBalanceTimer != nil {
  46. cg.reBalanceTimer.Stop()
  47. cg.reBalanceTimer = nil
  48. }
  49. cg.reBalanceTimer = time.AfterFunc(5*time.Second, func() {
  50. cg.RebalanceConsumberGroupInstances(nil, reason)
  51. cg.reBalanceTimer = nil
  52. })
  53. }
  54. func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) {
  55. if cg.reBalanceTimer != nil {
  56. cg.reBalanceTimer.Stop()
  57. cg.reBalanceTimer = nil
  58. }
  59. partitionSlotToBrokerList := pub_balancer.NewPartitionSlotToBrokerList(pub_balancer.MaxPartitionCount)
  60. for _, assignment := range assignments {
  61. partitionSlotToBrokerList.AddBroker(assignment.Partition, assignment.LeaderBroker)
  62. }
  63. cg.RebalanceConsumberGroupInstances(partitionSlotToBrokerList, "partition list change")
  64. }
  65. func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) {
  66. glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason)
  67. // collect current topic partitions
  68. partitionSlotToBrokerList := knownPartitionSlotToBrokerList
  69. if partitionSlotToBrokerList == nil {
  70. var found bool
  71. partitionSlotToBrokerList, found = cg.pubBalancer.TopicToBrokers.Get(cg.topic.String())
  72. if !found {
  73. glog.V(0).Infof("topic %s not found in balancer", cg.topic.String())
  74. return
  75. }
  76. }
  77. // collect current consumer group instance ids
  78. var consumerInstanceIds []string
  79. for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() {
  80. consumerInstanceIds = append(consumerInstanceIds, consumerGroupInstance.InstanceId)
  81. }
  82. cg.mapping.BalanceToConsumerInstanceIds(partitionSlotToBrokerList, consumerInstanceIds)
  83. // convert cg.mapping currentMapping to map of consumer group instance id to partition slots
  84. consumerInstanceToPartitionSlots := make(map[string][]*PartitionSlotToConsumerInstance)
  85. for _, partitionSlot := range cg.mapping.currentMapping.PartitionSlots {
  86. consumerInstanceToPartitionSlots[partitionSlot.AssignedInstanceId] = append(consumerInstanceToPartitionSlots[partitionSlot.AssignedInstanceId], partitionSlot)
  87. }
  88. // notify consumer group instances
  89. for _, consumerGroupInstance := range cg.ConsumerGroupInstances.Items() {
  90. partitionSlots, found := consumerInstanceToPartitionSlots[consumerGroupInstance.InstanceId]
  91. if !found {
  92. partitionSlots = make([]*PartitionSlotToConsumerInstance, 0)
  93. }
  94. consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots)
  95. assignedPartitions := make([]*mq_pb.BrokerPartitionAssignment, len(partitionSlots))
  96. for i, partitionSlot := range partitionSlots {
  97. assignedPartitions[i] = &mq_pb.BrokerPartitionAssignment{
  98. Partition: &mq_pb.Partition{
  99. RangeStop: partitionSlot.RangeStop,
  100. RangeStart: partitionSlot.RangeStart,
  101. RingSize: partitionSlotToBrokerList.RingSize,
  102. UnixTimeNs: partitionSlot.UnixTimeNs,
  103. },
  104. LeaderBroker: partitionSlot.Broker,
  105. }
  106. }
  107. response := &mq_pb.SubscriberToSubCoordinatorResponse{
  108. Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
  109. Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
  110. PartitionAssignments: assignedPartitions,
  111. },
  112. },
  113. }
  114. println("sending response to", consumerGroupInstance.InstanceId, "...")
  115. consumerGroupInstance.ResponseChan <- response
  116. }
  117. }