package pub_client import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "log" "sort" "sync" "sync/atomic" "time" ) type EachPartitionError struct { *mq_pb.BrokerPartitionAssignment Err error generation int } type EachPartitionPublishJob struct { *mq_pb.BrokerPartitionAssignment stopChan chan bool wg sync.WaitGroup generation int inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage] } func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error { if err := p.doConfigureTopic(); err != nil { return fmt.Errorf("configure topic %s: %v", p.config.Topic, err) } log.Printf("start scheduler thread for topic %s", p.config.Topic) generation := 0 var errChan chan EachPartitionError for { glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic) if assignments, err := p.doLookupTopicPartitions(); err == nil { generation++ glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments)) if errChan == nil { errChan = make(chan EachPartitionError, len(assignments)) } p.onEachAssignments(generation, assignments, errChan) } else { glog.Errorf("lookup topic %s: %v", p.config.Topic, err) time.Sleep(5 * time.Second) continue } if generation == 1 { wg.Done() } // wait for any error to happen. If so, consume all remaining errors, and retry for { select { case eachErr := <-errChan: glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err) if eachErr.generation < generation { continue } break } } } } func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) { // TODO assuming this is not re-configured so the partitions are fixed. sort.Slice(assignments, func(i, j int) bool { return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart }) var jobs []*EachPartitionPublishJob hasExistingJob := len(p.jobs) == len(assignments) for i, assignment := range assignments { if assignment.LeaderBroker == "" { continue } if hasExistingJob { var existingJob *EachPartitionPublishJob existingJob = p.jobs[i] if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker { existingJob.generation = generation jobs = append(jobs, existingJob) continue } else { if existingJob.LeaderBroker != "" { close(existingJob.stopChan) existingJob.LeaderBroker = "" existingJob.wg.Wait() } } } // start a go routine to publish to this partition job := &EachPartitionPublishJob{ BrokerPartitionAssignment: assignment, stopChan: make(chan bool, 1), generation: generation, inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024), } job.wg.Add(1) go func(job *EachPartitionPublishJob) { defer job.wg.Done() if err := p.doPublishToPartition(job); err != nil { errChan <- EachPartitionError{assignment, err, generation} } }(job) jobs = append(jobs, job) // TODO assuming this is not re-configured so the partitions are fixed. // better just re-use the existing job p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue) } p.jobs = jobs } func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error { log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition) grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption) if err != nil { return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err) } brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection) stream, err := brokerClient.PublishMessage(context.Background()) if err != nil { return fmt.Errorf("create publish client: %v", err) } publishClient := &PublishClient{ SeaweedMessaging_PublishMessageClient: stream, Broker: job.LeaderBroker, } if err = publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Init{ Init: &mq_pb.PublishMessageRequest_InitMessage{ Topic: p.config.Topic.ToPbTopic(), Partition: job.Partition, AckInterval: 128, FollowerBroker: job.FollowerBroker, PublisherName: p.config.PublisherName, }, }, }); err != nil { return fmt.Errorf("send init message: %v", err) } // process the hello message resp, err := stream.Recv() if err != nil { return fmt.Errorf("recv init response: %v", err) } if resp.Error != "" { return fmt.Errorf("init response error: %v", resp.Error) } var publishedTsNs int64 hasMoreData := int32(1) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() for { ackResp, err := publishClient.Recv() if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Unknown && e.Message() == "EOF" { log.Printf("publish to %s EOF", publishClient.Broker) return } publishClient.Err = err log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err) return } if ackResp.Error != "" { publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error) log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error) return } if ackResp.AckSequence > 0 { log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData)) } if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 { return } } }() publishCounter := 0 for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() { if data.Ctrl != nil && data.Ctrl.IsClose { // need to set this before sending to brokers, to avoid timing issue atomic.StoreInt32(&hasMoreData, 0) } if err := publishClient.Send(&mq_pb.PublishMessageRequest{ Message: &mq_pb.PublishMessageRequest_Data{ Data: data, }, }); err != nil { return fmt.Errorf("send publish data: %v", err) } publishCounter++ atomic.StoreInt64(&publishedTsNs, data.TsNs) } if publishCounter > 0 { wg.Wait() } else { // CloseSend would cancel the context on the server side if err := publishClient.CloseSend(); err != nil { return fmt.Errorf("close send: %v", err) } } log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition) return nil } func (p *TopicPublisher) doConfigureTopic() (err error) { if len(p.config.Brokers) == 0 { return fmt.Errorf("no bootstrap brokers") } var lastErr error for _, brokerAddress := range p.config.Brokers { err = pb.WithBrokerGrpcClient(false, brokerAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{ Topic: p.config.Topic.ToPbTopic(), PartitionCount: p.config.PartitionCount, RecordType: p.config.RecordType, // TODO schema upgrade }) return err }) if err == nil { lastErr = nil return nil } else { lastErr = fmt.Errorf("%s: %v", brokerAddress, err) } } if lastErr != nil { return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err) } return nil } func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) { if len(p.config.Brokers) == 0 { return nil, fmt.Errorf("no bootstrap brokers") } var lastErr error for _, brokerAddress := range p.config.Brokers { err := pb.WithBrokerGrpcClient(false, brokerAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { lookupResp, err := client.LookupTopicBrokers(context.Background(), &mq_pb.LookupTopicBrokersRequest{ Topic: p.config.Topic.ToPbTopic(), }) glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp) if err != nil { return err } if len(lookupResp.BrokerPartitionAssignments) == 0 { return fmt.Errorf("no broker partition assignments") } assignments = lookupResp.BrokerPartitionAssignments return nil }) if err == nil { return assignments, nil } else { lastErr = err } } return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr) }