on_each_partition.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package sub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "io"
  10. )
  11. func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error {
  12. // connect to the partition broker
  13. return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  14. subscribeClient, err := client.SubscribeMessage(context.Background())
  15. if err != nil {
  16. return fmt.Errorf("create subscribe client: %v", err)
  17. }
  18. perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency
  19. if perPartitionConcurrency <= 0 {
  20. perPartitionConcurrency = 1
  21. }
  22. if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
  23. Message: &mq_pb.SubscribeMessageRequest_Init{
  24. Init: &mq_pb.SubscribeMessageRequest_InitMessage{
  25. ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
  26. ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
  27. Topic: sub.ContentConfig.Topic.ToPbTopic(),
  28. PartitionOffset: &mq_pb.PartitionOffset{
  29. Partition: assigned.Partition,
  30. StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
  31. },
  32. Filter: sub.ContentConfig.Filter,
  33. FollowerBroker: assigned.FollowerBroker,
  34. Concurrency: perPartitionConcurrency,
  35. },
  36. },
  37. }); err != nil {
  38. glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
  39. }
  40. glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
  41. if sub.OnCompletionFunc != nil {
  42. defer sub.OnCompletionFunc()
  43. }
  44. type KeyedOffset struct {
  45. Key []byte
  46. Offset int64
  47. }
  48. partitionOffsetChan := make(chan KeyedOffset, 1024)
  49. defer func() {
  50. close(partitionOffsetChan)
  51. }()
  52. executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))
  53. go func() {
  54. for {
  55. select {
  56. case <-stopCh:
  57. subscribeClient.CloseSend()
  58. return
  59. case ack, ok := <-partitionOffsetChan:
  60. if !ok {
  61. subscribeClient.CloseSend()
  62. return
  63. }
  64. subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
  65. Message: &mq_pb.SubscribeMessageRequest_Ack{
  66. Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
  67. Key: ack.Key,
  68. Sequence: ack.Offset,
  69. },
  70. },
  71. })
  72. }
  73. }
  74. }()
  75. var lastErr error
  76. for lastErr == nil {
  77. // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
  78. resp, err := subscribeClient.Recv()
  79. if err != nil {
  80. return fmt.Errorf("subscribe recv: %v", err)
  81. }
  82. if resp.Message == nil {
  83. glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
  84. continue
  85. }
  86. switch m := resp.Message.(type) {
  87. case *mq_pb.SubscribeMessageResponse_Data:
  88. if m.Data.Ctrl != nil {
  89. glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
  90. continue
  91. }
  92. executors.Execute(func() {
  93. processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
  94. if processErr == nil {
  95. partitionOffsetChan <- KeyedOffset{
  96. Key: m.Data.Key,
  97. Offset: m.Data.TsNs,
  98. }
  99. } else {
  100. lastErr = processErr
  101. }
  102. })
  103. case *mq_pb.SubscribeMessageResponse_Ctrl:
  104. // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
  105. if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
  106. return io.EOF
  107. }
  108. }
  109. }
  110. return lastErr
  111. })
  112. }