lookup.go 888 B

12345678910111213141516171819202122232425262728293031323334
  1. package sub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. )
  8. func (sub *TopicSubscriber) doLookup(brokerAddress string) error {
  9. err := pb.WithBrokerGrpcClient(true,
  10. brokerAddress,
  11. sub.SubscriberConfig.GrpcDialOption,
  12. func(client mq_pb.SeaweedMessagingClient) error {
  13. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  14. &mq_pb.LookupTopicBrokersRequest{
  15. Topic: &mq_pb.Topic{
  16. Namespace: sub.ContentConfig.Namespace,
  17. Name: sub.ContentConfig.Topic,
  18. },
  19. IsForPublish: false,
  20. })
  21. if err != nil {
  22. return err
  23. }
  24. sub.brokerPartitionAssignments = lookupResp.BrokerPartitionAssignments
  25. return nil
  26. })
  27. if err != nil {
  28. return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
  29. }
  30. return nil
  31. }