broker_grpc_sub_follow.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package broker
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/filer"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "io"
  11. )
  12. func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) {
  13. var req *mq_pb.SubscribeFollowMeRequest
  14. req, err = stream.Recv()
  15. if err != nil {
  16. return err
  17. }
  18. initMessage := req.GetInit()
  19. if initMessage == nil {
  20. return fmt.Errorf("missing init message")
  21. }
  22. // create an in-memory offset
  23. var lastOffset int64
  24. // follow each published messages
  25. for {
  26. // receive a message
  27. req, err = stream.Recv()
  28. if err != nil {
  29. if err == io.EOF {
  30. err = nil
  31. break
  32. }
  33. glog.V(0).Infof("topic %v partition %v subscribe stream error: %v", initMessage.Topic, initMessage.Partition, err)
  34. break
  35. }
  36. // Process the received message
  37. if ackMessage := req.GetAck(); ackMessage != nil {
  38. lastOffset = ackMessage.TsNs
  39. // println("sub follower got offset", lastOffset)
  40. } else if closeMessage := req.GetClose(); closeMessage != nil {
  41. glog.V(0).Infof("topic %v partition %v subscribe stream closed: %v", initMessage.Topic, initMessage.Partition, closeMessage)
  42. return nil
  43. } else {
  44. glog.Errorf("unknown message: %v", req)
  45. }
  46. }
  47. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
  48. if lastOffset > 0 {
  49. err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset)
  50. }
  51. glog.V(0).Infof("shut down follower for %v offset %d", initMessage, lastOffset)
  52. return err
  53. }
  54. func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (offset int64, err error) {
  55. t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.PartitionOffset.Partition)
  56. partitionDir := topic.PartitionDir(t, p)
  57. offsetFileName := fmt.Sprintf("%s.offset", initMessage.ConsumerGroup)
  58. err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  59. data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName)
  60. if err != nil {
  61. return err
  62. }
  63. if len(data) != 8 {
  64. return fmt.Errorf("no offset found")
  65. }
  66. offset = int64(util.BytesToUint64(data))
  67. return nil
  68. })
  69. return offset, err
  70. }
  71. func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error {
  72. partitionDir := topic.PartitionDir(t, p)
  73. offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
  74. offsetBytes := make([]byte, 8)
  75. util.Uint64toBytes(offsetBytes, uint64(offset))
  76. return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  77. glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset)
  78. return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes)
  79. })
  80. }