broker_grpc_balance.go 752 B

123456789101112131415161718192021222324252627
  1. package broker
  2. import (
  3. "context"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  6. )
  7. func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) {
  8. if !b.isLockOwner() {
  9. proxyErr := b.withBrokerClient(false, pb.ServerAddress(b.lockAsBalancer.LockOwner()), func(client mq_pb.SeaweedMessagingClient) error {
  10. resp, err = client.BalanceTopics(ctx, request)
  11. return nil
  12. })
  13. if proxyErr != nil {
  14. return nil, proxyErr
  15. }
  16. return resp, err
  17. }
  18. ret := &mq_pb.BalanceTopicsResponse{}
  19. actions := b.Balancer.BalancePublishers()
  20. err = b.Balancer.ExecuteBalanceAction(actions, b.grpcDialOption)
  21. return ret, err
  22. }