broker_grpc_lookup.go 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package broker
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  10. )
  11. // LookupTopicBrokers returns the brokers that are serving the topic
  12. func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
  13. if !b.isLockOwner() {
  14. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  15. resp, err = client.LookupTopicBrokers(ctx, request)
  16. return nil
  17. })
  18. if proxyErr != nil {
  19. return nil, proxyErr
  20. }
  21. return resp, err
  22. }
  23. t := topic.FromPbTopic(request.Topic)
  24. ret := &mq_pb.LookupTopicBrokersResponse{}
  25. conf := &mq_pb.ConfigureTopicResponse{}
  26. ret.Topic = request.Topic
  27. if conf, err = b.fca.ReadTopicConfFromFiler(t); err != nil {
  28. glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
  29. } else {
  30. err = b.ensureTopicActiveAssignments(t, conf)
  31. ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments
  32. }
  33. return ret, err
  34. }
  35. func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
  36. if !b.isLockOwner() {
  37. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  38. resp, err = client.ListTopics(ctx, request)
  39. return nil
  40. })
  41. if proxyErr != nil {
  42. return nil, proxyErr
  43. }
  44. return resp, err
  45. }
  46. ret := &mq_pb.ListTopicsResponse{}
  47. knownTopics := make(map[string]struct{})
  48. for brokerStatsItem := range b.PubBalancer.Brokers.IterBuffered() {
  49. _, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
  50. for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
  51. topicPartitionStat := topicPartitionStatsItem.Val
  52. topic := &schema_pb.Topic{
  53. Namespace: topicPartitionStat.TopicPartition.Namespace,
  54. Name: topicPartitionStat.TopicPartition.Name,
  55. }
  56. topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name)
  57. if _, found := knownTopics[topicKey]; found {
  58. continue
  59. }
  60. knownTopics[topicKey] = struct{}{}
  61. ret.Topics = append(ret.Topics, topic)
  62. }
  63. }
  64. return ret, nil
  65. }
  66. func (b *MessageQueueBroker) isLockOwner() bool {
  67. return b.lockAsBalancer.LockOwner() == b.option.BrokerAddress().String()
  68. }