broker_grpc_lookup.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. "google.golang.org/grpc/codes"
  7. "google.golang.org/grpc/status"
  8. )
  9. // FindTopicBrokers returns the brokers that are serving the topic
  10. //
  11. // 1. lock the topic
  12. //
  13. // 2. find the topic partitions on the filer
  14. // 2.1 if the topic is not found, return error
  15. // 2.1.1 if the request is_for_subscribe, return error not found
  16. // 2.1.2 if the request is_for_publish, create the topic
  17. // 2.2 if the topic is found, return the brokers
  18. //
  19. // 3. unlock the topic
  20. func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
  21. if b.currentBalancer == "" {
  22. return nil, status.Errorf(codes.Unavailable, "no balancer")
  23. }
  24. if !b.lockAsBalancer.IsLocked() {
  25. proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
  26. resp, err = client.LookupTopicBrokers(ctx, request)
  27. return nil
  28. })
  29. if proxyErr != nil {
  30. return nil, proxyErr
  31. }
  32. return resp, err
  33. }
  34. ret := &mq_pb.LookupTopicBrokersResponse{}
  35. ret.Topic = request.Topic
  36. ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, 6)
  37. return ret, err
  38. }
  39. func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
  40. if b.currentBalancer == "" {
  41. return nil, status.Errorf(codes.Unavailable, "no balancer")
  42. }
  43. if !b.lockAsBalancer.IsLocked() {
  44. proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
  45. resp, err = client.ListTopics(ctx, request)
  46. return nil
  47. })
  48. if proxyErr != nil {
  49. return nil, proxyErr
  50. }
  51. return resp, err
  52. }
  53. ret := &mq_pb.ListTopicsResponse{}
  54. knownTopics := make(map[string]struct{})
  55. for brokerStatsItem := range b.Balancer.Brokers.IterBuffered() {
  56. _, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  57. for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
  58. topicPartitionStat := topicPartitionStatsItem.Val
  59. topic := &mq_pb.Topic{
  60. Namespace: topicPartitionStat.TopicPartition.Namespace,
  61. Name: topicPartitionStat.TopicPartition.Name,
  62. }
  63. topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name)
  64. if _, found := knownTopics[topicKey]; found {
  65. continue
  66. }
  67. knownTopics[topicKey] = struct{}{}
  68. ret.Topics = append(ret.Topics, topic)
  69. }
  70. }
  71. return ret, nil
  72. }