broker_grpc_sub.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  10. "time"
  11. )
  12. func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) (err error) {
  13. ctx := stream.Context()
  14. clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
  15. initMessage := req.GetInit()
  16. if initMessage == nil {
  17. glog.Errorf("missing init message")
  18. return fmt.Errorf("missing init message")
  19. }
  20. t := topic.FromPbTopic(req.GetInit().Topic)
  21. partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
  22. glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
  23. localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
  24. if getOrGenErr != nil {
  25. return getOrGenErr
  26. }
  27. localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
  28. glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
  29. isConnected := true
  30. sleepIntervalCount := 0
  31. var counter int64
  32. defer func() {
  33. isConnected = false
  34. localTopicPartition.Subscribers.RemoveSubscriber(clientName)
  35. glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
  36. if localTopicPartition.MaybeShutdownLocalPartition() {
  37. b.localTopicManager.RemoveLocalPartition(t, partition)
  38. }
  39. }()
  40. var startPosition log_buffer.MessagePosition
  41. if req.GetInit() != nil && req.GetInit().GetPartitionOffset() != nil {
  42. startPosition = getRequestPosition(req.GetInit().GetPartitionOffset())
  43. }
  44. return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
  45. if !isConnected {
  46. return false
  47. }
  48. sleepIntervalCount++
  49. if sleepIntervalCount > 32 {
  50. sleepIntervalCount = 32
  51. }
  52. time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
  53. // Check if the client has disconnected by monitoring the context
  54. select {
  55. case <-ctx.Done():
  56. err := ctx.Err()
  57. if err == context.Canceled {
  58. // Client disconnected
  59. return false
  60. }
  61. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  62. return false
  63. default:
  64. // Continue processing the request
  65. }
  66. return true
  67. }, func(logEntry *filer_pb.LogEntry) (bool, error) {
  68. // reset the sleep interval count
  69. sleepIntervalCount = 0
  70. if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
  71. Data: &mq_pb.DataMessage{
  72. Key: logEntry.Key,
  73. Value: logEntry.Data,
  74. TsNs: logEntry.TsNs,
  75. },
  76. }}); err != nil {
  77. glog.Errorf("Error sending data: %v", err)
  78. return false, err
  79. }
  80. counter++
  81. return false, nil
  82. })
  83. }
  84. func getRequestPosition(offset *mq_pb.PartitionOffset) (startPosition log_buffer.MessagePosition) {
  85. if offset.StartTsNs != 0 {
  86. startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
  87. }
  88. if offset.StartType == mq_pb.PartitionOffsetStartType_EARLIEST {
  89. startPosition = log_buffer.NewMessagePosition(1, -3)
  90. } else if offset.StartType == mq_pb.PartitionOffsetStartType_LATEST {
  91. startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
  92. }
  93. return
  94. }