message_pipeline.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package messages
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/util"
  4. "google.golang.org/grpc"
  5. "log"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. type OnMessageFunc func(message *Message)
  11. type MessagePipeline struct {
  12. // atomic status
  13. atomicPipelineStatus int64 // -1: stop
  14. // attributes
  15. ProducerId int32
  16. ProducerEpoch int32
  17. grpcDialOption grpc.DialOption
  18. emptyBuffersChan chan *MessageBuffer
  19. sealedBuffersChan chan *MessageBuffer
  20. movedBuffersChan chan MessageBufferReference
  21. onMessageFn OnMessageFunc
  22. mover MessageBufferMover
  23. moverPool *util.LimitedAsyncExecutor
  24. // control pipeline
  25. doneChan chan struct{}
  26. batchSize int
  27. timeout time.Duration
  28. incomingMessageLock sync.Mutex
  29. incomingMessageBuffer *MessageBuffer
  30. messageSequence int64
  31. }
  32. func NewMessagePipeline(producerId int32, workerCount int, batchSize int, timeout time.Duration, mover MessageBufferMover) *MessagePipeline {
  33. t := &MessagePipeline{
  34. ProducerId: producerId,
  35. emptyBuffersChan: make(chan *MessageBuffer, workerCount),
  36. sealedBuffersChan: make(chan *MessageBuffer, workerCount),
  37. movedBuffersChan: make(chan MessageBufferReference, workerCount),
  38. doneChan: make(chan struct{}),
  39. batchSize: batchSize,
  40. timeout: timeout,
  41. moverPool: util.NewLimitedAsyncExecutor(workerCount),
  42. mover: mover,
  43. }
  44. go t.doLoopUpload()
  45. return t
  46. }
  47. func (mp *MessagePipeline) NextMessageBufferReference() MessageBufferReference {
  48. return mp.moverPool.NextFuture().Await().(MessageBufferReference)
  49. }
  50. func (mp *MessagePipeline) AddMessage(message *Message) {
  51. mp.incomingMessageLock.Lock()
  52. defer mp.incomingMessageLock.Unlock()
  53. // get existing message buffer or create a new one
  54. if mp.incomingMessageBuffer == nil {
  55. select {
  56. case mp.incomingMessageBuffer = <-mp.emptyBuffersChan:
  57. default:
  58. mp.incomingMessageBuffer = NewMessageBuffer()
  59. }
  60. mp.incomingMessageBuffer.Reset(mp.messageSequence)
  61. }
  62. // add one message
  63. mp.incomingMessageBuffer.AddMessage(message)
  64. mp.messageSequence++
  65. // seal the message buffer if full
  66. if mp.incomingMessageBuffer.Len() >= mp.batchSize {
  67. mp.incomingMessageBuffer.Seal(mp.ProducerId, mp.ProducerEpoch, 0, 0)
  68. mp.sealedBuffersChan <- mp.incomingMessageBuffer
  69. mp.incomingMessageBuffer = nil
  70. }
  71. }
  72. func (mp *MessagePipeline) doLoopUpload() {
  73. mp.mover.Setup()
  74. defer mp.mover.TearDown()
  75. ticker := time.NewTicker(mp.timeout)
  76. for {
  77. status := atomic.LoadInt64(&mp.atomicPipelineStatus)
  78. if status == -100 {
  79. return
  80. } else if status == -1 {
  81. // entering shutting down mode
  82. atomic.StoreInt64(&mp.atomicPipelineStatus, -2)
  83. mp.incomingMessageLock.Lock()
  84. mp.doFlushIncomingMessages()
  85. mp.incomingMessageLock.Unlock()
  86. }
  87. select {
  88. case messageBuffer := <-mp.sealedBuffersChan:
  89. ticker.Reset(mp.timeout)
  90. mp.moverPool.Execute(func() any {
  91. var output MessageBufferReference
  92. util.RetryForever("message mover", func() error {
  93. if messageReference, flushErr := mp.mover.MoveBuffer(messageBuffer); flushErr != nil {
  94. return flushErr
  95. } else {
  96. output = messageReference
  97. }
  98. return nil
  99. }, func(err error) (shouldContinue bool) {
  100. log.Printf("failed: %v", err)
  101. return true
  102. })
  103. return output
  104. })
  105. case <-ticker.C:
  106. if atomic.LoadInt64(&mp.atomicPipelineStatus) == -2 {
  107. atomic.StoreInt64(&mp.atomicPipelineStatus, -100)
  108. return
  109. }
  110. mp.incomingMessageLock.Lock()
  111. mp.doFlushIncomingMessages()
  112. mp.incomingMessageLock.Unlock()
  113. }
  114. }
  115. atomic.StoreInt64(&mp.atomicPipelineStatus, -100)
  116. close(mp.movedBuffersChan)
  117. }
  118. func (mp *MessagePipeline) doFlushIncomingMessages() {
  119. if mp.incomingMessageBuffer == nil || mp.incomingMessageBuffer.Len() == 0 {
  120. return
  121. }
  122. mp.incomingMessageBuffer.Seal(mp.ProducerId, mp.ProducerEpoch, 0, 0)
  123. mp.sealedBuffersChan <- mp.incomingMessageBuffer
  124. mp.incomingMessageBuffer = nil
  125. }
  126. func (mp *MessagePipeline) ShutdownStart() {
  127. if atomic.LoadInt64(&mp.atomicPipelineStatus) == 0 {
  128. atomic.StoreInt64(&mp.atomicPipelineStatus, -1)
  129. }
  130. }
  131. func (mp *MessagePipeline) ShutdownWait() {
  132. for atomic.LoadInt64(&mp.atomicPipelineStatus) != -100 {
  133. time.Sleep(331 * time.Millisecond)
  134. }
  135. }
  136. func (mp *MessagePipeline) ShutdownImmediate() {
  137. if atomic.LoadInt64(&mp.atomicPipelineStatus) == 0 {
  138. atomic.StoreInt64(&mp.atomicPipelineStatus, -100)
  139. }
  140. }