broker_grpc_sub.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package broker
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
  8. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  13. "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
  14. "io"
  15. "time"
  16. )
  17. func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_SubscribeMessageServer) error {
  18. req, err := stream.Recv()
  19. if err != nil {
  20. return err
  21. }
  22. if req.GetInit() == nil {
  23. glog.Errorf("missing init message")
  24. return fmt.Errorf("missing init message")
  25. }
  26. ctx := stream.Context()
  27. clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
  28. t := topic.FromPbTopic(req.GetInit().Topic)
  29. partition := topic.FromPbPartition(req.GetInit().GetPartitionOffset().GetPartition())
  30. glog.V(0).Infof("Subscriber %s on %v %v connected", req.GetInit().ConsumerId, t, partition)
  31. localTopicPartition, getOrGenErr := b.GetOrGenerateLocalPartition(t, partition)
  32. if getOrGenErr != nil {
  33. return getOrGenErr
  34. }
  35. localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
  36. glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
  37. isConnected := true
  38. sleepIntervalCount := 0
  39. var counter int64
  40. defer func() {
  41. isConnected = false
  42. localTopicPartition.Subscribers.RemoveSubscriber(clientName)
  43. glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
  44. if localTopicPartition.MaybeShutdownLocalPartition() {
  45. b.localTopicManager.RemoveLocalPartition(t, partition)
  46. }
  47. }()
  48. startPosition := b.getRequestPosition(req.GetInit())
  49. imt := sub_coordinator.NewInflightMessageTracker(int(req.GetInit().SlidingWindowSize))
  50. // connect to the follower
  51. var subscribeFollowMeStream mq_pb.SeaweedMessaging_SubscribeFollowMeClient
  52. glog.V(0).Infof("follower broker: %v", req.GetInit().FollowerBroker)
  53. if req.GetInit().FollowerBroker != "" {
  54. follower := req.GetInit().FollowerBroker
  55. if followerGrpcConnection, err := pb.GrpcDial(ctx, follower, true, b.grpcDialOption); err != nil {
  56. return fmt.Errorf("fail to dial %s: %v", follower, err)
  57. } else {
  58. defer func() {
  59. println("closing SubscribeFollowMe connection", follower)
  60. subscribeFollowMeStream.CloseSend()
  61. // followerGrpcConnection.Close()
  62. }()
  63. followerClient := mq_pb.NewSeaweedMessagingClient(followerGrpcConnection)
  64. if subscribeFollowMeStream, err = followerClient.SubscribeFollowMe(ctx); err != nil {
  65. return fmt.Errorf("fail to subscribe to %s: %v", follower, err)
  66. } else {
  67. if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
  68. Message: &mq_pb.SubscribeFollowMeRequest_Init{
  69. Init: &mq_pb.SubscribeFollowMeRequest_InitMessage{
  70. Topic: req.GetInit().Topic,
  71. Partition: req.GetInit().GetPartitionOffset().Partition,
  72. ConsumerGroup: req.GetInit().ConsumerGroup,
  73. },
  74. },
  75. }); err != nil {
  76. return fmt.Errorf("fail to send init to %s: %v", follower, err)
  77. }
  78. }
  79. }
  80. glog.V(0).Infof("follower %s connected", follower)
  81. }
  82. go func() {
  83. var lastOffset int64
  84. for {
  85. ack, err := stream.Recv()
  86. if err != nil {
  87. if err == io.EOF {
  88. // the client has called CloseSend(). This is to ack the close.
  89. stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Ctrl{
  90. Ctrl: &mq_pb.SubscribeMessageResponse_SubscribeCtrlMessage{
  91. IsEndOfStream: true,
  92. },
  93. }})
  94. break
  95. }
  96. glog.V(0).Infof("topic %v partition %v subscriber %s lastOffset %d error: %v", t, partition, clientName, lastOffset, err)
  97. break
  98. }
  99. if ack.GetAck().Key == nil {
  100. // skip ack for control messages
  101. continue
  102. }
  103. imt.AcknowledgeMessage(ack.GetAck().Key, ack.GetAck().Sequence)
  104. currentLastOffset := imt.GetOldestAckedTimestamp()
  105. // fmt.Printf("%+v recv (%s,%d), oldest %d\n", partition, string(ack.GetAck().Key), ack.GetAck().Sequence, currentLastOffset)
  106. if subscribeFollowMeStream != nil && currentLastOffset > lastOffset {
  107. if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
  108. Message: &mq_pb.SubscribeFollowMeRequest_Ack{
  109. Ack: &mq_pb.SubscribeFollowMeRequest_AckMessage{
  110. TsNs: currentLastOffset,
  111. },
  112. },
  113. }); err != nil {
  114. glog.Errorf("Error sending ack to follower: %v", err)
  115. break
  116. }
  117. lastOffset = currentLastOffset
  118. // fmt.Printf("%+v forwarding ack %d\n", partition, lastOffset)
  119. }
  120. }
  121. if lastOffset > 0 {
  122. glog.V(0).Infof("saveConsumerGroupOffset %v %v %v %v", t, partition, req.GetInit().ConsumerGroup, lastOffset)
  123. if err := b.saveConsumerGroupOffset(t, partition, req.GetInit().ConsumerGroup, lastOffset); err != nil {
  124. glog.Errorf("saveConsumerGroupOffset partition %v lastOffset %d: %v", partition, lastOffset, err)
  125. }
  126. }
  127. if subscribeFollowMeStream != nil {
  128. if err := subscribeFollowMeStream.Send(&mq_pb.SubscribeFollowMeRequest{
  129. Message: &mq_pb.SubscribeFollowMeRequest_Close{
  130. Close: &mq_pb.SubscribeFollowMeRequest_CloseMessage{},
  131. },
  132. }); err != nil {
  133. glog.Errorf("Error sending close to follower: %v", err)
  134. }
  135. }
  136. }()
  137. return localTopicPartition.Subscribe(clientName, startPosition, func() bool {
  138. if !isConnected {
  139. return false
  140. }
  141. sleepIntervalCount++
  142. if sleepIntervalCount > 32 {
  143. sleepIntervalCount = 32
  144. }
  145. time.Sleep(time.Duration(sleepIntervalCount) * 137 * time.Millisecond)
  146. // Check if the client has disconnected by monitoring the context
  147. select {
  148. case <-ctx.Done():
  149. err := ctx.Err()
  150. if errors.Is(err, context.Canceled) {
  151. // Client disconnected
  152. return false
  153. }
  154. glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
  155. return false
  156. default:
  157. // Continue processing the request
  158. }
  159. return true
  160. }, func(logEntry *filer_pb.LogEntry) (bool, error) {
  161. // reset the sleep interval count
  162. sleepIntervalCount = 0
  163. for imt.IsInflight(logEntry.Key) {
  164. time.Sleep(137 * time.Millisecond)
  165. }
  166. if logEntry.Key != nil {
  167. imt.EnflightMessage(logEntry.Key, logEntry.TsNs)
  168. }
  169. if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
  170. Data: &mq_pb.DataMessage{
  171. Key: logEntry.Key,
  172. Value: logEntry.Data,
  173. TsNs: logEntry.TsNs,
  174. },
  175. }}); err != nil {
  176. glog.Errorf("Error sending data: %v", err)
  177. return false, err
  178. }
  179. counter++
  180. return false, nil
  181. })
  182. }
  183. func (b *MessageQueueBroker) getRequestPosition(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (startPosition log_buffer.MessagePosition) {
  184. if initMessage == nil {
  185. return
  186. }
  187. offset := initMessage.GetPartitionOffset()
  188. if offset.StartTsNs != 0 {
  189. startPosition = log_buffer.NewMessagePosition(offset.StartTsNs, -2)
  190. return
  191. }
  192. if storedOffset, err := b.readConsumerGroupOffset(initMessage); err == nil {
  193. glog.V(0).Infof("resume from saved offset %v %v %v: %v", initMessage.Topic, initMessage.PartitionOffset.Partition, initMessage.ConsumerGroup, storedOffset)
  194. startPosition = log_buffer.NewMessagePosition(storedOffset, -2)
  195. return
  196. }
  197. if offset.StartType == schema_pb.PartitionOffsetStartType_EARLIEST {
  198. startPosition = log_buffer.NewMessagePosition(1, -3)
  199. } else if offset.StartType == schema_pb.PartitionOffsetStartType_LATEST {
  200. startPosition = log_buffer.NewMessagePosition(time.Now().UnixNano(), -4)
  201. }
  202. return
  203. }