subscribe.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package sub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. "sync"
  8. )
  9. // Subscribe subscribes to a topic's specified partitions.
  10. // If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
  11. func (sub *TopicSubscriber) Subscribe() error {
  12. var wg sync.WaitGroup
  13. for _, brokerPartitionAssignment := range sub.brokerPartitionAssignments {
  14. brokerAddress := brokerPartitionAssignment.LeaderBroker
  15. grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, sub.SubscriberConfig.GrpcDialOption)
  16. if err != nil {
  17. return fmt.Errorf("dial broker %s: %v", brokerAddress, err)
  18. }
  19. brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
  20. subscribeClient, err := brokerClient.Subscribe(context.Background(), &mq_pb.SubscribeRequest{
  21. Consumer: &mq_pb.SubscribeRequest_Consumer{
  22. ConsumerGroup: sub.SubscriberConfig.GroupId,
  23. ConsumerId: sub.SubscriberConfig.GroupInstanceId,
  24. },
  25. Cursor: &mq_pb.SubscribeRequest_Cursor{
  26. Topic: &mq_pb.Topic{
  27. Namespace: sub.ContentConfig.Namespace,
  28. Name: sub.ContentConfig.Topic,
  29. },
  30. Partition: &mq_pb.Partition{
  31. RingSize: brokerPartitionAssignment.Partition.RingSize,
  32. RangeStart: brokerPartitionAssignment.Partition.RangeStart,
  33. RangeStop: brokerPartitionAssignment.Partition.RangeStop,
  34. },
  35. Filter: sub.ContentConfig.Filter,
  36. },
  37. })
  38. if err != nil {
  39. return fmt.Errorf("create subscribe client: %v", err)
  40. }
  41. wg.Add(1)
  42. go func() {
  43. defer wg.Done()
  44. if sub.OnCompletionFunc != nil {
  45. defer sub.OnCompletionFunc()
  46. }
  47. for {
  48. resp, err := subscribeClient.Recv()
  49. if err != nil {
  50. fmt.Printf("subscribe error: %v\n", err)
  51. return
  52. }
  53. if resp.Message == nil {
  54. continue
  55. }
  56. switch m := resp.Message.(type) {
  57. case *mq_pb.SubscribeResponse_Data:
  58. if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) {
  59. return
  60. }
  61. case *mq_pb.SubscribeResponse_Ctrl:
  62. // ignore
  63. }
  64. }
  65. }()
  66. }
  67. wg.Wait()
  68. return nil
  69. }