123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- package broker
- import (
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/util/buffered_queue"
- "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- "io"
- "time"
- )
- type memBuffer struct {
- buf []byte
- startTime time.Time
- stopTime time.Time
- }
- func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error) {
- var req *mq_pb.PublishFollowMeRequest
- req, err = stream.Recv()
- if err != nil {
- return err
- }
- initMessage := req.GetInit()
- if initMessage == nil {
- return fmt.Errorf("missing init message")
- }
- // create an in-memory queue of buffered messages
- inMemoryBuffers := buffered_queue.NewBufferedQueue[memBuffer](4)
- logBuffer := b.buildFollowerLogBuffer(inMemoryBuffers)
- lastFlushTsNs := time.Now().UnixNano()
- // follow each published messages
- for {
- // receive a message
- req, err = stream.Recv()
- if err != nil {
- if err == io.EOF {
- err = nil
- break
- }
- glog.V(0).Infof("topic %v partition %v publish stream error: %v", initMessage.Topic, initMessage.Partition, err)
- break
- }
- // Process the received message
- if dataMessage := req.GetData(); dataMessage != nil {
- // TODO: change this to DataMessage
- // log the message
- logBuffer.AddToBuffer(dataMessage)
- // send back the ack
- if err := stream.Send(&mq_pb.PublishFollowMeResponse{
- AckTsNs: dataMessage.TsNs,
- }); err != nil {
- glog.Errorf("Error sending response %v: %v", dataMessage, err)
- }
- // println("ack", string(dataMessage.Key), dataMessage.TsNs)
- } else if closeMessage := req.GetClose(); closeMessage != nil {
- glog.V(0).Infof("topic %v partition %v publish stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
- break
- } else if flushMessage := req.GetFlush(); flushMessage != nil {
- glog.V(0).Infof("topic %v partition %v publish stream flushed: %v", initMessage.Topic, initMessage.Partition, flushMessage)
- lastFlushTsNs = flushMessage.TsNs
- // drop already flushed messages
- for mem, found := inMemoryBuffers.PeekHead(); found; mem, found = inMemoryBuffers.PeekHead() {
- if mem.stopTime.UnixNano() <= flushMessage.TsNs {
- inMemoryBuffers.Dequeue()
- // println("dropping flushed messages: ", mem.startTime.UnixNano(), mem.stopTime.UnixNano(), len(mem.buf))
- } else {
- break
- }
- }
- } else {
- glog.Errorf("unknown message: %v", req)
- }
- }
- t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- logBuffer.ShutdownLogBuffer()
- // wait until all messages are sent to inMemoryBuffers
- for !logBuffer.IsAllFlushed() {
- time.Sleep(113 * time.Millisecond)
- }
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
- partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
- partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
- // flush the remaining messages
- inMemoryBuffers.CloseInput()
- for mem, found := inMemoryBuffers.Dequeue(); found; mem, found = inMemoryBuffers.Dequeue() {
- if len(mem.buf) == 0 {
- continue
- }
- startTime, stopTime := mem.startTime.UTC(), mem.stopTime.UTC()
- if stopTime.UnixNano() <= lastFlushTsNs {
- glog.V(0).Infof("dropping remaining data at %v %v", t, p)
- continue
- }
- // TODO trim data earlier than lastFlushTsNs
- targetFile := fmt.Sprintf("%s/%s", partitionDir, startTime.Format(topic.TIME_FORMAT))
- for {
- if err := b.appendToFile(targetFile, mem.buf); err != nil {
- glog.V(0).Infof("metadata log write failed %s: %v", targetFile, err)
- time.Sleep(737 * time.Millisecond)
- } else {
- break
- }
- }
- glog.V(0).Infof("flushed remaining data at %v to %s size %d", mem.stopTime.UnixNano(), targetFile, len(mem.buf))
- }
- glog.V(0).Infof("shut down follower for %v %v", t, p)
- return err
- }
- func (b *MessageQueueBroker) buildFollowerLogBuffer(inMemoryBuffers *buffered_queue.BufferedQueue[memBuffer]) *log_buffer.LogBuffer {
- lb := log_buffer.NewLogBuffer("follower",
- 2*time.Minute, func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
- if len(buf) == 0 {
- return
- }
- inMemoryBuffers.Enqueue(memBuffer{
- buf: buf,
- startTime: startTime,
- stopTime: stopTime,
- })
- glog.V(0).Infof("queue up %d~%d size %d", startTime.UnixNano(), stopTime.UnixNano(), len(buf))
- }, nil, func() {
- })
- return lb
- }
|