subscriber.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package msgclient
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/messaging/broker"
  8. "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
  9. "google.golang.org/grpc"
  10. )
  11. type Subscriber struct {
  12. subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
  13. subscriberCancels []context.CancelFunc
  14. subscriberId string
  15. }
  16. func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
  17. // read topic configuration
  18. topicConfiguration := &messaging_pb.TopicConfiguration{
  19. PartitionCount: 4,
  20. }
  21. subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
  22. subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
  23. for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
  24. if partitionId >= 0 && i != partitionId {
  25. continue
  26. }
  27. tp := broker.TopicPartition{
  28. Namespace: namespace,
  29. Topic: topic,
  30. Partition: int32(i),
  31. }
  32. grpcClientConn, err := mc.findBroker(tp)
  33. if err != nil {
  34. return nil, err
  35. }
  36. ctx, cancel := context.WithCancel(context.Background())
  37. client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime)
  38. if err != nil {
  39. return nil, err
  40. }
  41. subscriberClients[i] = client
  42. subscriberCancels[i] = cancel
  43. }
  44. return &Subscriber{
  45. subscriberClients: subscriberClients,
  46. subscriberCancels: subscriberCancels,
  47. subscriberId: subscriberId,
  48. }, nil
  49. }
  50. func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
  51. stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
  52. if err != nil {
  53. return
  54. }
  55. // send init message
  56. err = stream.Send(&messaging_pb.SubscriberMessage{
  57. Init: &messaging_pb.SubscriberMessage_InitMessage{
  58. Namespace: tp.Namespace,
  59. Topic: tp.Topic,
  60. Partition: tp.Partition,
  61. StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
  62. TimestampNs: startTime.UnixNano(),
  63. SubscriberId: subscriberId,
  64. },
  65. })
  66. if err != nil {
  67. return
  68. }
  69. return stream, nil
  70. }
  71. func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
  72. for {
  73. resp, listenErr := subscriberClient.Recv()
  74. if listenErr == io.EOF {
  75. return nil
  76. }
  77. if listenErr != nil {
  78. println(listenErr.Error())
  79. return listenErr
  80. }
  81. if resp.Data == nil {
  82. // this could be heartbeat from broker
  83. continue
  84. }
  85. processFn(resp.Data)
  86. }
  87. }
  88. // Subscribe starts goroutines to process the messages
  89. func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
  90. var wg sync.WaitGroup
  91. for i := 0; i < len(s.subscriberClients); i++ {
  92. if s.subscriberClients[i] != nil {
  93. wg.Add(1)
  94. go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) {
  95. defer wg.Done()
  96. doSubscribe(subscriberClient, processFn)
  97. }(s.subscriberClients[i])
  98. }
  99. }
  100. wg.Wait()
  101. }
  102. func (s *Subscriber) Shutdown() {
  103. for i := 0; i < len(s.subscriberClients); i++ {
  104. if s.subscriberCancels[i] != nil {
  105. s.subscriberCancels[i]()
  106. }
  107. }
  108. }