broker_grpc_pub_follow.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
  8. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  9. "io"
  10. "time"
  11. )
  12. type memBuffer struct {
  13. buf []byte
  14. startTime time.Time
  15. stopTime time.Time
  16. }
  17. func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) {
  18. var req *mq_pb.PublishFollowMeRequest
  19. req, err = stream.Recv()
  20. if err != nil {
  21. return err
  22. }
  23. initMessage := req.GetInit()
  24. if initMessage == nil {
  25. return fmt.Errorf("missing init message")
  26. }
  27. // create an in-memory queue of buffered messages
  28. inMemoryBuffers := buffered_queue.NewBufferedQueue[memBuffer](4)
  29. logBuffer := b.buildFollowerLogBuffer(inMemoryBuffers)
  30. lastFlushTsNs := time.Now().UnixNano()
  31. // follow each published messages
  32. for {
  33. // receive a message
  34. req, err = stream.Recv()
  35. if err != nil {
  36. if err == io.EOF {
  37. err = nil
  38. break
  39. }
  40. glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
  41. break
  42. }
  43. // Process the received message
  44. if dataMessage := req.GetData(); dataMessage != nil {
  45. // TODO: change this to DataMessage
  46. // log the message
  47. logBuffer.AddToBuffer(dataMessage)
  48. // send back the ack
  49. if err := stream.Send(&mq_pb.PublishFollowMeResponse{
  50. AckTsNs: dataMessage.TsNs,
  51. }); err != nil {
  52. glog.Errorf("Error sending response %v: %v", dataMessage, err)
  53. }
  54. // println("ack", string(dataMessage.Key), dataMessage.TsNs)
  55. } else if closeMessage := req.GetClose(); closeMessage != nil {
  56. glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
  57. break
  58. } else if flushMessage := req.GetFlush(); flushMessage != nil {
  59. glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)
  60. lastFlushTsNs = flushMessage.TsNs
  61. // drop already flushed messages
  62. for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() {
  63. if mem.stopTime.UnixNano() <= flushMessage.TsNs {
  64. inMemoryBuffers.Dequeue()
  65. // println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf))
  66. } else {
  67. break
  68. }
  69. }
  70. } else {
  71. glog.Errorf("unknown message: %v", req)
  72. }
  73. }
  74. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  75. logBuffer.ShutdownLogBuffer()
  76. // wait until all messages are sent to inMemoryBuffers
  77. for !logBuffer.IsAllFlushed() {
  78. time.Sleep(113 * time.Millisecond)
  79. }
  80. partitionDir := topic.PartitionDir(t, p)
  81. // flush the remaining messages
  82. inMemoryBuffers.CloseInput()
  83. for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() {
  84. if len(mem.buf) == 0 {
  85. continue
  86. }
  87. startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC()
  88. if stopTime.UnixNano() <= lastFlushTsNs {
  89. glog.V(0).Infof("dropping remaining data at %v %v", t, p)
  90. continue
  91. }
  92. // TODO trim data earlier than lastFlushTsNs
  93. targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
  94. for {
  95. if err := b.appendToFile(targetFile, mem.buf); err != nil {
  96. glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
  97. time.Sleep(737 * time.Millisecond)
  98. } else {
  99. break
  100. }
  101. }
  102. glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf))
  103. }
  104. glog.V(0).Infof("shut down follower for %v %v", t, p)
  105. return err
  106. }
  107. func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_queue.BufferedQueue[memBuffer]) *log_buffer.LogBuffer {
  108. lb := log_buffer.NewLogBuffer("follower",
  109. 2*time.Minute, func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
  110. if len(buf) == 0 {
  111. return
  112. }
  113. inMemoryBuffers.Enqueue(memBuffer{
  114. buf: buf,
  115. startTime: startTime,
  116. stopTime: stopTime,
  117. })
  118. glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf))
  119. }, nil, func() {
  120. })
  121. return lb
  122. }