broker_grpc_pub_follow.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
  9. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  10. "io"
  11. "time"
  12. )
  13. type memBuffer struct {
  14. buf []byte
  15. startTime time.Time
  16. stopTime time.Time
  17. }
  18. func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) {
  19. var req *mq_pb.PublishFollowMeRequest
  20. req, err = stream.Recv()
  21. if err != nil {
  22. return err
  23. }
  24. initMessage := req.GetInit()
  25. if initMessage == nil {
  26. return fmt.Errorf("missing init message")
  27. }
  28. // create an in-memory queue of buffered messages
  29. inMemoryBuffers := buffered_queue.NewBufferedQueue[memBuffer](4)
  30. logBuffer := b.buildFollowerLogBuffer(inMemoryBuffers)
  31. lastFlushTsNs := time.Now().UnixNano()
  32. // follow each published messages
  33. for {
  34. // receive a message
  35. req, err = stream.Recv()
  36. if err != nil {
  37. if err == io.EOF {
  38. err = nil
  39. break
  40. }
  41. glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
  42. break
  43. }
  44. // Process the received message
  45. if dataMessage := req.GetData(); dataMessage != nil {
  46. // TODO: change this to DataMessage
  47. // log the message
  48. logBuffer.AddToBuffer(dataMessage)
  49. // send back the ack
  50. if err := stream.Send(&mq_pb.PublishFollowMeResponse{
  51. AckTsNs: dataMessage.TsNs,
  52. }); err != nil {
  53. glog.Errorf("Error sending response %v: %v", dataMessage, err)
  54. }
  55. // println("ack", string(dataMessage.Key), dataMessage.TsNs)
  56. } else if closeMessage := req.GetClose(); closeMessage != nil {
  57. glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
  58. break
  59. } else if flushMessage := req.GetFlush(); flushMessage != nil {
  60. glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)
  61. lastFlushTsNs = flushMessage.TsNs
  62. // drop already flushed messages
  63. for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() {
  64. if mem.stopTime.UnixNano() <= flushMessage.TsNs {
  65. inMemoryBuffers.Dequeue()
  66. // println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf))
  67. } else {
  68. break
  69. }
  70. }
  71. } else {
  72. glog.Errorf("unknown message: %v", req)
  73. }
  74. }
  75. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  76. logBuffer.ShutdownLogBuffer()
  77. // wait until all messages are sent to inMemoryBuffers
  78. for !logBuffer.IsAllFlushed() {
  79. time.Sleep(113 * time.Millisecond)
  80. }
  81. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  82. partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  83. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
  84. // flush the remaining messages
  85. inMemoryBuffers.CloseInput()
  86. for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() {
  87. if len(mem.buf) == 0 {
  88. continue
  89. }
  90. startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC()
  91. if stopTime.UnixNano() <= lastFlushTsNs {
  92. glog.V(0).Infof("dropping remaining data at %v %v", t, p)
  93. continue
  94. }
  95. // TODO trim data earlier than lastFlushTsNs
  96. targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
  97. for {
  98. if err := b.appendToFile(targetFile, mem.buf); err != nil {
  99. glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
  100. time.Sleep(737 * time.Millisecond)
  101. } else {
  102. break
  103. }
  104. }
  105. glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf))
  106. }
  107. glog.V(0).Infof("shut down follower for %v %v", t, p)
  108. return err
  109. }
  110. func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_queue.BufferedQueue[memBuffer]) *log_buffer.LogBuffer {
  111. lb := log_buffer.NewLogBuffer("follower",
  112. 2*time.Minute, func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
  113. if len(buf) == 0 {
  114. return
  115. }
  116. inMemoryBuffers.Enqueue(memBuffer{
  117. buf: buf,
  118. startTime: startTime,
  119. stopTime: stopTime,
  120. })
  121. glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf))
  122. }, nil, func() {
  123. })
  124. return lb
  125. }