on_each_partition.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package sub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "io"
  10. "reflect"
  11. )
  12. func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error {
  13. // connect to the partition broker
  14. return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  15. subscribeClient, err := client.SubscribeMessage(context.Background())
  16. if err != nil {
  17. return fmt.Errorf("create subscribe client: %v", err)
  18. }
  19. perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency
  20. if perPartitionConcurrency <= 0 {
  21. perPartitionConcurrency = 1
  22. }
  23. var stopTsNs int64
  24. if !sub.ContentConfig.StopTime.IsZero() {
  25. stopTsNs = sub.ContentConfig.StopTime.UnixNano()
  26. }
  27. if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
  28. Message: &mq_pb.SubscribeMessageRequest_Init{
  29. Init: &mq_pb.SubscribeMessageRequest_InitMessage{
  30. ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
  31. ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
  32. Topic: sub.ContentConfig.Topic.ToPbTopic(),
  33. PartitionOffset: &mq_pb.PartitionOffset{
  34. Partition: assigned.Partition,
  35. StartTsNs: sub.ContentConfig.StartTime.UnixNano(),
  36. StopTsNs: stopTsNs,
  37. StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
  38. },
  39. Filter: sub.ContentConfig.Filter,
  40. FollowerBroker: assigned.FollowerBroker,
  41. Concurrency: perPartitionConcurrency,
  42. },
  43. },
  44. }); err != nil {
  45. glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
  46. }
  47. glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
  48. if sub.OnCompletionFunc != nil {
  49. defer sub.OnCompletionFunc()
  50. }
  51. type KeyedOffset struct {
  52. Key []byte
  53. Offset int64
  54. }
  55. partitionOffsetChan := make(chan KeyedOffset, 1024)
  56. defer func() {
  57. close(partitionOffsetChan)
  58. }()
  59. executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))
  60. go func() {
  61. for {
  62. select {
  63. case <-stopCh:
  64. subscribeClient.CloseSend()
  65. return
  66. case ack, ok := <-partitionOffsetChan:
  67. if !ok {
  68. subscribeClient.CloseSend()
  69. return
  70. }
  71. subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
  72. Message: &mq_pb.SubscribeMessageRequest_Ack{
  73. Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
  74. Key: ack.Key,
  75. Sequence: ack.Offset,
  76. },
  77. },
  78. })
  79. }
  80. }
  81. }()
  82. var lastErr error
  83. for lastErr == nil {
  84. // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
  85. resp, err := subscribeClient.Recv()
  86. if err != nil {
  87. return fmt.Errorf("subscribe recv: %v", err)
  88. }
  89. if resp.Message == nil {
  90. glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
  91. continue
  92. }
  93. switch m := resp.Message.(type) {
  94. case *mq_pb.SubscribeMessageResponse_Data:
  95. if m.Data.Ctrl != nil {
  96. glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
  97. continue
  98. }
  99. if len(m.Data.Key) == 0 {
  100. fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
  101. continue
  102. }
  103. executors.Execute(func() {
  104. processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
  105. if processErr == nil {
  106. partitionOffsetChan <- KeyedOffset{
  107. Key: m.Data.Key,
  108. Offset: m.Data.TsNs,
  109. }
  110. } else {
  111. lastErr = processErr
  112. }
  113. })
  114. case *mq_pb.SubscribeMessageResponse_Ctrl:
  115. // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
  116. if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
  117. return io.EOF
  118. }
  119. }
  120. }
  121. return lastErr
  122. })
  123. }