123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- package sub_client
- import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "io"
- "reflect"
- )
- func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error {
- // connect to the partition broker
- return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
- subscribeClient, err := client.SubscribeMessage(context.Background())
- if err != nil {
- return fmt.Errorf("create subscribe client: %v", err)
- }
- perPartitionConcurrency := sub.SubscriberConfig.PerPartitionConcurrency
- if perPartitionConcurrency <= 0 {
- perPartitionConcurrency = 1
- }
- var stopTsNs int64
- if !sub.ContentConfig.StopTime.IsZero() {
- stopTsNs = sub.ContentConfig.StopTime.UnixNano()
- }
- if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
- Message: &mq_pb.SubscribeMessageRequest_Init{
- Init: &mq_pb.SubscribeMessageRequest_InitMessage{
- ConsumerGroup: sub.SubscriberConfig.ConsumerGroup,
- ConsumerId: sub.SubscriberConfig.ConsumerGroupInstanceId,
- Topic: sub.ContentConfig.Topic.ToPbTopic(),
- PartitionOffset: &mq_pb.PartitionOffset{
- Partition: assigned.Partition,
- StartTsNs: sub.ContentConfig.StartTime.UnixNano(),
- StopTsNs: stopTsNs,
- StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
- },
- Filter: sub.ContentConfig.Filter,
- FollowerBroker: assigned.FollowerBroker,
- Concurrency: perPartitionConcurrency,
- },
- },
- }); err != nil {
- glog.V(0).Infof("subscriber %s connected to partition %+v at %v: %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker, err)
- }
- glog.V(0).Infof("subscriber %s connected to partition %+v at %v", sub.ContentConfig.Topic, assigned.Partition, assigned.LeaderBroker)
- if sub.OnCompletionFunc != nil {
- defer sub.OnCompletionFunc()
- }
- type KeyedOffset struct {
- Key []byte
- Offset int64
- }
- partitionOffsetChan := make(chan KeyedOffset, 1024)
- defer func() {
- close(partitionOffsetChan)
- }()
- executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))
- go func() {
- for {
- select {
- case <-stopCh:
- subscribeClient.CloseSend()
- return
- case ack, ok := <-partitionOffsetChan:
- if !ok {
- subscribeClient.CloseSend()
- return
- }
- subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
- Message: &mq_pb.SubscribeMessageRequest_Ack{
- Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
- Key: ack.Key,
- Sequence: ack.Offset,
- },
- },
- })
- }
- }
- }()
- var lastErr error
- for lastErr == nil {
- // glog.V(0).Infof("subscriber %s/%s/%s waiting for message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
- resp, err := subscribeClient.Recv()
- if err != nil {
- return fmt.Errorf("subscribe recv: %v", err)
- }
- if resp.Message == nil {
- glog.V(0).Infof("subscriber %s/%s received nil message", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
- continue
- }
- switch m := resp.Message.(type) {
- case *mq_pb.SubscribeMessageResponse_Data:
- if m.Data.Ctrl != nil {
- glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
- continue
- }
- if len(m.Data.Key) == 0 {
- fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
- continue
- }
- executors.Execute(func() {
- processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
- if processErr == nil {
- partitionOffsetChan <- KeyedOffset{
- Key: m.Data.Key,
- Offset: m.Data.TsNs,
- }
- } else {
- lastErr = processErr
- }
- })
- case *mq_pb.SubscribeMessageResponse_Ctrl:
- // glog.V(0).Infof("subscriber %s/%s/%s received control %+v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, m.Ctrl)
- if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
- return io.EOF
- }
- }
- }
- return lastErr
- })
- }
|