publish_stream_processor.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package client
  2. import (
  3. "context"
  4. flatbuffers "github.com/google/flatbuffers/go"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/messages"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/segment"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/grpc"
  11. "google.golang.org/grpc/credentials/insecure"
  12. "log"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. const (
  18. batchCountLimit = 3
  19. )
  20. type PublishStreamProcessor struct {
  21. // attributes
  22. ProducerId int32
  23. ProducerEpoch int32
  24. grpcDialOption grpc.DialOption
  25. // input
  26. sync.Mutex
  27. timeout time.Duration
  28. // convert into bytes
  29. messagesChan chan *messages.Message
  30. builders chan *flatbuffers.Builder
  31. batchMessageCountLimit int
  32. messagesSequence int64
  33. // done channel
  34. doneChan chan struct{}
  35. }
  36. type UploadProcess struct {
  37. bufferBuilder *flatbuffers.Builder
  38. batchBuilder *segment.MessageBatchBuilder
  39. }
  40. func NewPublishStreamProcessor(batchMessageCountLimit int, timeout time.Duration) *PublishStreamProcessor {
  41. t := &PublishStreamProcessor{
  42. grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
  43. batchMessageCountLimit: batchMessageCountLimit,
  44. builders: make(chan *flatbuffers.Builder, batchCountLimit),
  45. messagesChan: make(chan *messages.Message, 1024),
  46. doneChan: make(chan struct{}),
  47. timeout: timeout,
  48. }
  49. for i := 0; i < batchCountLimit; i++ {
  50. t.builders <- flatbuffers.NewBuilder(4 * 1024 * 1024)
  51. }
  52. go t.doLoopUpload()
  53. return t
  54. }
  55. func (p *PublishStreamProcessor) AddMessage(m *messages.Message) error {
  56. p.messagesChan <- m
  57. return nil
  58. }
  59. func (p *PublishStreamProcessor) Shutdown() error {
  60. p.doneChan <- struct{}{}
  61. return nil
  62. }
  63. func (p *PublishStreamProcessor) doFlush(stream mq_pb.SeaweedMessaging_PublishMessageClient, messages []*messages.Message) error {
  64. if len(messages) == 0 {
  65. return nil
  66. }
  67. builder := <-p.builders
  68. bb := segment.NewMessageBatchBuilder(builder)
  69. for _, m := range messages {
  70. bb.AddMessage(p.messagesSequence, m.Ts.UnixNano(), m.Properties, m.Key, m.Content)
  71. p.messagesSequence++
  72. }
  73. bb.BuildMessageBatch(p.ProducerId, p.ProducerEpoch, 3, 4)
  74. defer func() {
  75. p.builders <- builder
  76. }()
  77. return stream.Send(&mq_pb.PublishRequest{
  78. Data: &mq_pb.PublishRequest_DataMessage{
  79. Message: bb.GetBytes(),
  80. },
  81. })
  82. }
  83. func (p *PublishStreamProcessor) doLoopUpload() {
  84. brokerGrpcAddress := "localhost:17777"
  85. // TOOD parallelize the uploading with separate uploader
  86. messages := make([]*messages.Message, 0, p.batchMessageCountLimit)
  87. util.RetryForever("publish message", func() error {
  88. return pb.WithBrokerGrpcClient(false, brokerGrpcAddress, p.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  89. ctx, cancel := context.WithCancel(context.Background())
  90. defer cancel()
  91. stream, err := client.PublishMessage(ctx)
  92. if err != nil {
  93. log.Printf("grpc PublishMessage: %v", err)
  94. return err
  95. }
  96. var atomicStatus int64
  97. go func() {
  98. resp, err := stream.Recv()
  99. if err != nil {
  100. log.Printf("response error: %v", err)
  101. } else {
  102. log.Printf("response: %v", resp.AckSequence)
  103. }
  104. if atomic.LoadInt64(&atomicStatus) < 0 {
  105. return
  106. }
  107. }()
  108. var flushErr error
  109. // retry previously failed messages
  110. if len(messages) >= p.batchMessageCountLimit {
  111. flushErr = p.doFlush(stream, messages)
  112. if flushErr != nil {
  113. return flushErr
  114. }
  115. messages = messages[:0]
  116. }
  117. for {
  118. select {
  119. case m := <-p.messagesChan:
  120. messages = append(messages, m)
  121. if len(messages) >= p.batchMessageCountLimit {
  122. if flushErr = p.doFlush(stream, messages); flushErr != nil {
  123. return flushErr
  124. }
  125. messages = messages[:0]
  126. }
  127. case <-time.After(p.timeout):
  128. if flushErr = p.doFlush(stream, messages); flushErr != nil {
  129. return flushErr
  130. }
  131. messages = messages[:0]
  132. case <-p.doneChan:
  133. if flushErr = p.doFlush(stream, messages); flushErr != nil {
  134. return flushErr
  135. }
  136. messages = messages[:0]
  137. println("$ stopping ...")
  138. break
  139. }
  140. }
  141. // stop the response consuming goroutine
  142. atomic.StoreInt64(&atomicStatus, -1)
  143. return flushErr
  144. })
  145. }, func(err error) (shouldContinue bool) {
  146. log.Printf("failed with grpc %s: %v", brokerGrpcAddress, err)
  147. return true
  148. })
  149. }