broker_grpc_sub_follow.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "io"
  11. "time"
  12. )
  13. func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) {
  14. var req *mq_pb.SubscribeFollowMeRequest
  15. req, err = stream.Recv()
  16. if err != nil {
  17. return err
  18. }
  19. initMessage := req.GetInit()
  20. if initMessage == nil {
  21. return fmt.Errorf("missing init message")
  22. }
  23. // create an in-memory offset
  24. var lastOffset int64
  25. // follow each published messages
  26. for {
  27. // receive a message
  28. req, err = stream.Recv()
  29. if err != nil {
  30. if err == io.EOF {
  31. err = nil
  32. break
  33. }
  34. glog.V(0).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err)
  35. break
  36. }
  37. // Process the received message
  38. if ackMessage := req.GetAck(); ackMessage != nil {
  39. lastOffset = ackMessage.TsNs
  40. // println("sub follower got offset", lastOffset)
  41. } else if closeMessage := req.GetClose(); closeMessage != nil {
  42. glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
  43. return nil
  44. } else {
  45. glog.Errorf("unknown message: %v", req)
  46. }
  47. }
  48. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  49. if lastOffset > 0 {
  50. err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset)
  51. }
  52. glog.V(0).Infof("shut down follower for %v offset %d", initMessage, lastOffset)
  53. return err
  54. }
  55. func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (offset int64, err error) {
  56. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.PartitionOffset.Partition)
  57. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  58. partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  59. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
  60. offsetFileName := fmt.Sprintf("%s.offset", initMessage.ConsumerGroup)
  61. err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  62. data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName)
  63. if err != nil {
  64. return err
  65. }
  66. if len(data) != 8 {
  67. return fmt.Errorf("no offset found")
  68. }
  69. offset = int64(util.BytesToUint64(data))
  70. return nil
  71. })
  72. return offset, err
  73. }
  74. func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error {
  75. topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
  76. partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
  77. partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
  78. offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
  79. offsetBytes := make([]byte, 8)
  80. util.Uint64toBytes(offsetBytes, uint64(offset))
  81. return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  82. glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset)
  83. return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes)
  84. })
  85. }