consumer_group.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package sub_coordinator
  2. import (
  3. "fmt"
  4. cmap "github.com/orcaman/concurrent-map/v2"
  5. "github.com/seaweedfs/seaweedfs/weed/filer_client"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "time"
  10. )
  11. type ConsumerGroup struct {
  12. topic topic.Topic
  13. // map a consumer group instance id to a consumer group instance
  14. ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
  15. Market *Market
  16. reBalanceTimer *time.Timer
  17. filerClientAccessor *filer_client.FilerClientAccessor
  18. stopCh chan struct{}
  19. }
  20. func NewConsumerGroup(t *mq_pb.Topic, reblanceSeconds int32, filerClientAccessor *filer_client.FilerClientAccessor) *ConsumerGroup {
  21. cg := &ConsumerGroup{
  22. topic: topic.FromPbTopic(t),
  23. ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
  24. filerClientAccessor: filerClientAccessor,
  25. stopCh: make(chan struct{}),
  26. }
  27. if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
  28. var partitions []topic.Partition
  29. for _, assignment := range conf.BrokerPartitionAssignments {
  30. partitions = append(partitions, topic.FromPbPartition(assignment.Partition))
  31. }
  32. cg.Market = NewMarket(partitions, time.Duration(reblanceSeconds)*time.Second)
  33. } else {
  34. glog.V(0).Infof("fail to read topic conf from filer: %v", err)
  35. return nil
  36. }
  37. go func() {
  38. for {
  39. select {
  40. case adjustment := <-cg.Market.AdjustmentChan:
  41. cgi, found := cg.ConsumerGroupInstances.Get(string(adjustment.consumer))
  42. if !found {
  43. glog.V(0).Infof("consumer group instance %s not found", adjustment.consumer)
  44. continue
  45. }
  46. if adjustment.isAssign {
  47. if conf, err := cg.filerClientAccessor.ReadTopicConfFromFiler(cg.topic); err == nil {
  48. for _, assignment := range conf.BrokerPartitionAssignments {
  49. if adjustment.partition.Equals(topic.FromPbPartition(assignment.Partition)) {
  50. cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
  51. Message: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment_{
  52. Assignment: &mq_pb.SubscriberToSubCoordinatorResponse_Assignment{
  53. PartitionAssignment: &mq_pb.BrokerPartitionAssignment{
  54. Partition: adjustment.partition.ToPbPartition(),
  55. LeaderBroker: assignment.LeaderBroker,
  56. FollowerBroker: assignment.FollowerBroker,
  57. },
  58. },
  59. },
  60. }
  61. glog.V(0).Infof("send assignment %v to %s", adjustment.partition, adjustment.consumer)
  62. break
  63. }
  64. }
  65. }
  66. } else {
  67. cgi.ResponseChan <- &mq_pb.SubscriberToSubCoordinatorResponse{
  68. Message: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment_{
  69. UnAssignment: &mq_pb.SubscriberToSubCoordinatorResponse_UnAssignment{
  70. Partition: adjustment.partition.ToPbPartition(),
  71. },
  72. },
  73. }
  74. glog.V(0).Infof("send unassignment %v to %s", adjustment.partition, adjustment.consumer)
  75. }
  76. case <-cg.stopCh:
  77. return
  78. }
  79. }
  80. }()
  81. return cg
  82. }
  83. func (cg *ConsumerGroup) AckAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage) {
  84. fmt.Printf("ack assignment %v\n", assignment)
  85. cg.Market.ConfirmAdjustment(&Adjustment{
  86. consumer: cgi.InstanceId,
  87. partition: topic.FromPbPartition(assignment.Partition),
  88. isAssign: true,
  89. })
  90. }
  91. func (cg *ConsumerGroup) AckUnAssignment(cgi *ConsumerGroupInstance, assignment *mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage) {
  92. fmt.Printf("ack unassignment %v\n", assignment)
  93. cg.Market.ConfirmAdjustment(&Adjustment{
  94. consumer: cgi.InstanceId,
  95. partition: topic.FromPbPartition(assignment.Partition),
  96. isAssign: false,
  97. })
  98. }
  99. func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartitionAssignment) {
  100. }
  101. func (cg *ConsumerGroup) Shutdown() {
  102. close(cg.stopCh)
  103. }