123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- package pub_client
- import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "log"
- "sort"
- "sync"
- "sync/atomic"
- "time"
- )
- type EachPartitionError struct {
- *mq_pb.BrokerPartitionAssignment
- Err error
- generation int
- }
- type EachPartitionPublishJob struct {
- *mq_pb.BrokerPartitionAssignment
- stopChan chan bool
- wg sync.WaitGroup
- generation int
- inputQueue *buffered_queue.BufferedQueue[*mq_pb.DataMessage]
- }
- func (p *TopicPublisher) startSchedulerThread(wg *sync.WaitGroup) error {
- if err := p.doConfigureTopic(); err != nil {
- return fmt.Errorf("configure topic %s: %v", p.config.Topic, err)
- }
- log.Printf("start scheduler thread for topic %s", p.config.Topic)
- generation := 0
- var errChan chan EachPartitionError
- for {
- glog.V(0).Infof("lookup partitions gen %d topic %s", generation+1, p.config.Topic)
- if assignments, err := p.doLookupTopicPartitions(); err == nil {
- generation++
- glog.V(0).Infof("start generation %d with %d assignments", generation, len(assignments))
- if errChan == nil {
- errChan = make(chan EachPartitionError, len(assignments))
- }
- p.onEachAssignments(generation, assignments, errChan)
- } else {
- glog.Errorf("lookup topic %s: %v", p.config.Topic, err)
- time.Sleep(5 * time.Second)
- continue
- }
- if generation == 1 {
- wg.Done()
- }
- // wait for any error to happen. If so, consume all remaining errors, and retry
- for {
- select {
- case eachErr := <-errChan:
- glog.Errorf("gen %d publish to topic %s partition %v: %v", eachErr.generation, p.config.Topic, eachErr.Partition, eachErr.Err)
- if eachErr.generation < generation {
- continue
- }
- break
- }
- }
- }
- }
- func (p *TopicPublisher) onEachAssignments(generation int, assignments []*mq_pb.BrokerPartitionAssignment, errChan chan EachPartitionError) {
- // TODO assuming this is not re-configured so the partitions are fixed.
- sort.Slice(assignments, func(i, j int) bool {
- return assignments[i].Partition.RangeStart < assignments[j].Partition.RangeStart
- })
- var jobs []*EachPartitionPublishJob
- hasExistingJob := len(p.jobs) == len(assignments)
- for i, assignment := range assignments {
- if assignment.LeaderBroker == "" {
- continue
- }
- if hasExistingJob {
- var existingJob *EachPartitionPublishJob
- existingJob = p.jobs[i]
- if existingJob.BrokerPartitionAssignment.LeaderBroker == assignment.LeaderBroker {
- existingJob.generation = generation
- jobs = append(jobs, existingJob)
- continue
- } else {
- if existingJob.LeaderBroker != "" {
- close(existingJob.stopChan)
- existingJob.LeaderBroker = ""
- existingJob.wg.Wait()
- }
- }
- }
- // start a go routine to publish to this partition
- job := &EachPartitionPublishJob{
- BrokerPartitionAssignment: assignment,
- stopChan: make(chan bool, 1),
- generation: generation,
- inputQueue: buffered_queue.NewBufferedQueue[*mq_pb.DataMessage](1024),
- }
- job.wg.Add(1)
- go func(job *EachPartitionPublishJob) {
- defer job.wg.Done()
- if err := p.doPublishToPartition(job); err != nil {
- errChan <- EachPartitionError{assignment, err, generation}
- }
- }(job)
- jobs = append(jobs, job)
- // TODO assuming this is not re-configured so the partitions are fixed.
- // better just re-use the existing job
- p.partition2Buffer.Insert(assignment.Partition.RangeStart, assignment.Partition.RangeStop, job.inputQueue)
- }
- p.jobs = jobs
- }
- func (p *TopicPublisher) doPublishToPartition(job *EachPartitionPublishJob) error {
- log.Printf("connecting to %v for topic partition %+v", job.LeaderBroker, job.Partition)
- grpcConnection, err := pb.GrpcDial(context.Background(), job.LeaderBroker, true, p.grpcDialOption)
- if err != nil {
- return fmt.Errorf("dial broker %s: %v", job.LeaderBroker, err)
- }
- brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- stream, err := brokerClient.PublishMessage(context.Background())
- if err != nil {
- return fmt.Errorf("create publish client: %v", err)
- }
- publishClient := &PublishClient{
- SeaweedMessaging_PublishMessageClient: stream,
- Broker: job.LeaderBroker,
- }
- if err = publishClient.Send(&mq_pb.PublishMessageRequest{
- Message: &mq_pb.PublishMessageRequest_Init{
- Init: &mq_pb.PublishMessageRequest_InitMessage{
- Topic: p.config.Topic.ToPbTopic(),
- Partition: job.Partition,
- AckInterval: 128,
- FollowerBroker: job.FollowerBroker,
- PublisherName: p.config.PublisherName,
- },
- },
- }); err != nil {
- return fmt.Errorf("send init message: %v", err)
- }
- // process the hello message
- resp, err := stream.Recv()
- if err != nil {
- return fmt.Errorf("recv init response: %v", err)
- }
- if resp.Error != "" {
- return fmt.Errorf("init response error: %v", resp.Error)
- }
- var publishedTsNs int64
- hasMoreData := int32(1)
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- defer wg.Done()
- for {
- ackResp, err := publishClient.Recv()
- if err != nil {
- e, _ := status.FromError(err)
- if e.Code() == codes.Unknown && e.Message() == "EOF" {
- log.Printf("publish to %s EOF", publishClient.Broker)
- return
- }
- publishClient.Err = err
- log.Printf("publish1 to %s error: %v\n", publishClient.Broker, err)
- return
- }
- if ackResp.Error != "" {
- publishClient.Err = fmt.Errorf("ack error: %v", ackResp.Error)
- log.Printf("publish2 to %s error: %v\n", publishClient.Broker, ackResp.Error)
- return
- }
- if ackResp.AckSequence > 0 {
- log.Printf("ack %d published %d hasMoreData:%d", ackResp.AckSequence, atomic.LoadInt64(&publishedTsNs), atomic.LoadInt32(&hasMoreData))
- }
- if atomic.LoadInt64(&publishedTsNs) <= ackResp.AckSequence && atomic.LoadInt32(&hasMoreData) == 0 {
- return
- }
- }
- }()
- publishCounter := 0
- for data, hasData := job.inputQueue.Dequeue(); hasData; data, hasData = job.inputQueue.Dequeue() {
- if data.Ctrl != nil && data.Ctrl.IsClose {
- // need to set this before sending to brokers, to avoid timing issue
- atomic.StoreInt32(&hasMoreData, 0)
- }
- if err := publishClient.Send(&mq_pb.PublishMessageRequest{
- Message: &mq_pb.PublishMessageRequest_Data{
- Data: data,
- },
- }); err != nil {
- return fmt.Errorf("send publish data: %v", err)
- }
- publishCounter++
- atomic.StoreInt64(&publishedTsNs, data.TsNs)
- }
- if publishCounter > 0 {
- wg.Wait()
- } else {
- // CloseSend would cancel the context on the server side
- if err := publishClient.CloseSend(); err != nil {
- return fmt.Errorf("close send: %v", err)
- }
- }
- log.Printf("published %d messages to %v for topic partition %+v", publishCounter, job.LeaderBroker, job.Partition)
- return nil
- }
- func (p *TopicPublisher) doConfigureTopic() (err error) {
- if len(p.config.Brokers) == 0 {
- return fmt.Errorf("no bootstrap brokers")
- }
- var lastErr error
- for _, brokerAddress := range p.config.Brokers {
- err = pb.WithBrokerGrpcClient(false,
- brokerAddress,
- p.grpcDialOption,
- func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
- Topic: p.config.Topic.ToPbTopic(),
- PartitionCount: p.config.PartitionCount,
- RecordType: p.config.RecordType, // TODO schema upgrade
- })
- return err
- })
- if err == nil {
- lastErr = nil
- return nil
- } else {
- lastErr = fmt.Errorf("%s: %v", brokerAddress, err)
- }
- }
- if lastErr != nil {
- return fmt.Errorf("doConfigureTopic %s: %v", p.config.Topic, err)
- }
- return nil
- }
- func (p *TopicPublisher) doLookupTopicPartitions() (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
- if len(p.config.Brokers) == 0 {
- return nil, fmt.Errorf("no bootstrap brokers")
- }
- var lastErr error
- for _, brokerAddress := range p.config.Brokers {
- err := pb.WithBrokerGrpcClient(false,
- brokerAddress,
- p.grpcDialOption,
- func(client mq_pb.SeaweedMessagingClient) error {
- lookupResp, err := client.LookupTopicBrokers(context.Background(),
- &mq_pb.LookupTopicBrokersRequest{
- Topic: p.config.Topic.ToPbTopic(),
- })
- glog.V(0).Infof("lookup topic %s: %v", p.config.Topic, lookupResp)
- if err != nil {
- return err
- }
- if len(lookupResp.BrokerPartitionAssignments) == 0 {
- return fmt.Errorf("no broker partition assignments")
- }
- assignments = lookupResp.BrokerPartitionAssignments
- return nil
- })
- if err == nil {
- return assignments, nil
- } else {
- lastErr = err
- }
- }
- return nil, fmt.Errorf("lookup topic %s: %v", p.config.Topic, lastErr)
- }
|