broker_grpc_lookup.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. )
  10. // LookupTopicBrokers returns the brokers that are serving the topic
  11. func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
  12. if !b.isLockOwner() {
  13. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  14. resp, err = client.LookupTopicBrokers(ctx, request)
  15. return nil
  16. })
  17. if proxyErr != nil {
  18. return nil, proxyErr
  19. }
  20. return resp, err
  21. }
  22. t := topic.FromPbTopic(request.Topic)
  23. ret := &mq_pb.LookupTopicBrokersResponse{}
  24. conf := &mq_pb.ConfigureTopicResponse{}
  25. ret.Topic = request.Topic
  26. if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil {
  27. glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
  28. } else {
  29. err = b.ensureTopicActiveAssignments(t, conf)
  30. ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments
  31. }
  32. return ret, err
  33. }
  34. func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
  35. if !b.isLockOwner() {
  36. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  37. resp, err = client.ListTopics(ctx, request)
  38. return nil
  39. })
  40. if proxyErr != nil {
  41. return nil, proxyErr
  42. }
  43. return resp, err
  44. }
  45. ret := &mq_pb.ListTopicsResponse{}
  46. knownTopics := make(map[string]struct{})
  47. for brokerStatsItem := range b.PubBalancer.Brokers.IterBuffered() {
  48. _, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  49. for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
  50. topicPartitionStat := topicPartitionStatsItem.Val
  51. topic := &mq_pb.Topic{
  52. Namespace: topicPartitionStat.TopicPartition.Namespace,
  53. Name: topicPartitionStat.TopicPartition.Name,
  54. }
  55. topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name)
  56. if _, found := knownTopics[topicKey]; found {
  57. continue
  58. }
  59. knownTopics[topicKey] = struct{}{}
  60. ret.Topics = append(ret.Topics, topic)
  61. }
  62. }
  63. return ret, nil
  64. }
  65. func (b *MessageQueueBroker) isLockOwner() bool {
  66. return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String()
  67. }