lookup.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package pub_client
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. )
  8. func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error {
  9. if p.config.CreateTopic {
  10. err := pb.WithBrokerGrpcClient(true,
  11. brokerAddress,
  12. p.grpcDialOption,
  13. func(client mq_pb.SeaweedMessagingClient) error {
  14. _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  15. Topic: &mq_pb.Topic{
  16. Namespace: p.namespace,
  17. Name: p.topic,
  18. },
  19. PartitionCount: p.config.CreateTopicPartitionCount,
  20. })
  21. return err
  22. })
  23. if err != nil {
  24. return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
  25. }
  26. }
  27. err := pb.WithBrokerGrpcClient(true,
  28. brokerAddress,
  29. p.grpcDialOption,
  30. func(client mq_pb.SeaweedMessagingClient) error {
  31. lookupResp, err := client.LookupTopicBrokers(context.Background(),
  32. &mq_pb.LookupTopicBrokersRequest{
  33. Topic: &mq_pb.Topic{
  34. Namespace: p.namespace,
  35. Name: p.topic,
  36. },
  37. IsForPublish: true,
  38. })
  39. if p.config.CreateTopic && err != nil {
  40. _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  41. Topic: &mq_pb.Topic{
  42. Namespace: p.namespace,
  43. Name: p.topic,
  44. },
  45. PartitionCount: p.config.CreateTopicPartitionCount,
  46. })
  47. if err != nil {
  48. return err
  49. }
  50. lookupResp, err = client.LookupTopicBrokers(context.Background(),
  51. &mq_pb.LookupTopicBrokersRequest{
  52. Topic: &mq_pb.Topic{
  53. Namespace: p.namespace,
  54. Name: p.topic,
  55. },
  56. IsForPublish: true,
  57. })
  58. }
  59. if err != nil {
  60. return err
  61. }
  62. for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
  63. // partition => publishClient
  64. publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
  65. if err != nil {
  66. return err
  67. }
  68. p.partition2Broker.Insert(
  69. brokerPartitionAssignment.Partition.RangeStart,
  70. brokerPartitionAssignment.Partition.RangeStop,
  71. publishClient)
  72. }
  73. return nil
  74. })
  75. if err != nil {
  76. return fmt.Errorf("lookup topic %s/%s: %v", p.namespace, p.topic, err)
  77. }
  78. return nil
  79. }