consumer_group.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package sub_coordinator
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/filer_client"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  10. "time"
  11. )
  12. type ConsumerGroup struct {
  13. topic topic.Topic
  14. // map a consumer group instance id to a consumer group instance
  15. ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
  16. Market *Market
  17. reBalanceTimer *time.Timer
  18. filerClientAccessor *filer_client.FilerClientAccessor
  19. stopCh chan struct{}
  20. }
  21. func NewConsumerGroup(t *schema_pb.Topic, reblanceSeconds int32, filerClientAccessor *filer_client.FilerClientAccessor) *ConsumerGroup {
  22. cg := &ConsumerGroup{
  23. topic: topic.FromPbTopic(t),
  24. ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
  25. filerClientAccessor: filerClientAccessor,
  26. stopCh: make(chan struct{}),
  27. }
  28. if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
  29. var partitions []topic.Partition
  30. for _, assignment := range conf.BrokerPartitionAssignments {
  31. partitions = append(partitions, topic.FromPbPartition(assignment.Partition))
  32. }
  33. cg.Market = NewMarket(partitions, time.Duration(reblanceSeconds)*time.Second)
  34. } else {
  35. glog.V(0).Infof("fail to read topic conf from filer: %v", err)
  36. return nil
  37. }
  38. go func() {
  39. for {
  40. select {
  41. case adjustment := <-cg.Market.AdjustmentChan:
  42. cgi, found := cg.ConsumerGroupInstances.Get(string(adjustment.consumer))
  43. if !found {
  44. glog.V(0).Infof("consumer group instance %s not found", adjustment.consumer)
  45. continue
  46. }
  47. if adjustment.isAssign {
  48. if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
  49. for _, assignment := range conf.BrokerPartitionAssignments {
  50. if adjustment.partition.Equals(topic.FromPbPartition(assignment.Partition)) {
  51. cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
  52. Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
  53. Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
  54. PartitionAssignment: &mq_pb.BrokerPartitionAssignment{
  55. Partition: adjustment.partition.ToPbPartition(),
  56. LeaderBroker: assignment.LeaderBroker,
  57. FollowerBroker: assignment.FollowerBroker,
  58. },
  59. },
  60. },
  61. }
  62. glog.V(0).Infof("send assignment %v to %s", adjustment.partition, adjustment.consumer)
  63. break
  64. }
  65. }
  66. }
  67. } else {
  68. cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
  69. Message: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment_{
  70. UnAssignment: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment{
  71. Partition: adjustment.partition.ToPbPartition(),
  72. },
  73. },
  74. }
  75. glog.V(0).Infof("send unassignment %v to %s", adjustment.partition, adjustment.consumer)
  76. }
  77. case <-cg.stopCh:
  78. return
  79. }
  80. }
  81. }()
  82. return cg
  83. }
  84. func (cg *ConsumerGroup) AckAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage) {
  85. fmt.Printf("ack assignment %v\n", assignment)
  86. cg.Market.ConfirmAdjustment(&Adjustment{
  87. consumer: cgi.InstanceId,
  88. partition: topic.FromPbPartition(assignment.Partition),
  89. isAssign: true,
  90. })
  91. }
  92. func (cg *ConsumerGroup) AckUnAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) {
  93. fmt.Printf("ack unassignment %v\n", assignment)
  94. cg.Market.ConfirmAdjustment(&Adjustment{
  95. consumer: cgi.InstanceId,
  96. partition: topic.FromPbPartition(assignment.Partition),
  97. isAssign: false,
  98. })
  99. }
  100. func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) {
  101. }
  102. func (cg *ConsumerGroup) Shutdown() {
  103. close(cg.stopCh)
  104. }