broker_grpc_admin.go 824 B

123456789101112131415161718192021222324252627
  1. package broker
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/cluster"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  7. )
  8. func (b *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
  9. ret := &mq_pb.FindBrokerLeaderResponse{}
  10. err := b.withMasterClient(false, b.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
  11. resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
  12. ClientType: cluster.BrokerType,
  13. FilerGroup: request.FilerGroup,
  14. })
  15. if err != nil {
  16. return err
  17. }
  18. if len(resp.ClusterNodes) == 0 {
  19. return nil
  20. }
  21. ret.Broker = resp.ClusterNodes[0].Address
  22. return nil
  23. })
  24. return ret, err
  25. }