package client import ( "context" flatbuffers "github.com/google/flatbuffers/go" "github.com/seaweedfs/seaweedfs/weed/mq/messages" "github.com/seaweedfs/seaweedfs/weed/mq/segment" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "log" "sync" "sync/atomic" "time" ) const ( batchCountLimit = 3 ) type PublishStreamProcessor struct { // attributes ProducerId int32 ProducerEpoch int32 grpcDialOption grpc.DialOption // input sync.Mutex timeout time.Duration // convert into bytes messagesChan chan *messages.Message builders chan *flatbuffers.Builder batchMessageCountLimit int messagesSequence int64 // done channel doneChan chan struct{} } type UploadProcess struct { bufferBuilder *flatbuffers.Builder batchBuilder *segment.MessageBatchBuilder } func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration) *PublishStreamProcessor { t := &PublishStreamProcessor{ grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), batchMessageCountLimit: batchMessageCountLimit, builders: make(chan *flatbuffers.Builder, batchCountLimit), messagesChan: make(chan *messages.Message, 1024), doneChan: make(chan struct{}), timeout: timeout, } for i := 0; i < batchCountLimit; i++ { t.builders <- flatbuffers.NewBuilder(4 * 1024 * 1024) } go t.doLoopUpload() return t } func (p *PublishStreamProcessor) AddMessage(m *messages.Message) error { p.messagesChan <- m return nil } func (p *PublishStreamProcessor) Shutdown() error { p.doneChan <- struct{}{} return nil } func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*messages.Message) error { if len(messages) == 0 { return nil } builder := <-p.builders bb := segment.NewMessageBatchBuilder(builder) for _, m := range messages { bb.AddMessage(p.messagesSequence, m.Ts.UnixNano(), m.Properties, m.Key, m.Content) p.messagesSequence++ } bb.BuildMessageBatch(p.ProducerId, p.ProducerEpoch, 3, 4) defer func() { p.builders <- builder }() return stream.Send(&mq_pb.PublishRequest{ Data: &mq_pb.PublishRequest_DataMessage{ Message: bb.GetBytes(), }, }) } func (p *PublishStreamProcessor) doLoopUpload() { brokerGrpcAddress := "localhost:17777" // TOOD parallelize the uploading with separate uploader messages := make([]*messages.Message, 0, p.batchMessageCountLimit) util.RetryForever("publish message", func() error { return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.PublishMessage(ctx) if err != nil { log.Printf("grpc PublishMessage: %v", err) return err } var atomicStatus int64 go func() { resp, err := stream.Recv() if err != nil { log.Printf("response error: %v", err) } else { log.Printf("response: %v", resp.AckSequence) } if atomic.LoadInt64(&atomicStatus) < 0 { return } }() var flushErr error // retry previously failed messages if len(messages) >= p.batchMessageCountLimit { flushErr = p.doFlush(stream, messages) if flushErr != nil { return flushErr } messages = messages[:0] } for { select { case m := <-p.messagesChan: messages = append(messages, m) if len(messages) >= p.batchMessageCountLimit { if flushErr = p.doFlush(stream, messages); flushErr != nil { return flushErr } messages = messages[:0] } case <-time.After(p.timeout): if flushErr = p.doFlush(stream, messages); flushErr != nil { return flushErr } messages = messages[:0] case <-p.doneChan: if flushErr = p.doFlush(stream, messages); flushErr != nil { return flushErr } messages = messages[:0] println("$ stopping ...") break } } // stop the response consuming goroutine atomic.StoreInt64(&atomicStatus, -1) return flushErr }) }, func(err error) (shouldContinue bool) { log.Printf("failed with grpc %s: %v", brokerGrpcAddress, err) return true }) }