command_mq_topic_desc.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "io"
  9. )
  10. func init() {
  11. Commands = append(Commands, &commandMqTopicDescribe{})
  12. }
  13. type commandMqTopicDescribe struct {
  14. }
  15. func (c *commandMqTopicDescribe) Name() string {
  16. return "mq.topic.describe"
  17. }
  18. func (c *commandMqTopicDescribe) Help() string {
  19. return `describe a topic`
  20. }
  21. func (c *commandMqTopicDescribe) HasTag(CommandTag) bool {
  22. return false
  23. }
  24. func (c *commandMqTopicDescribe) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
  25. // parse parameters
  26. mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  27. namespace := mqCommand.String("namespace", "", "namespace name")
  28. topicName := mqCommand.String("topic", "", "topic name")
  29. if err := mqCommand.Parse(args); err != nil {
  30. return err
  31. }
  32. // find the broker balancer
  33. brokerBalancer, err := findBrokerBalancer(commandEnv)
  34. if err != nil {
  35. return err
  36. }
  37. fmt.Fprintf(writer, "current balancer: %s\n", brokerBalancer)
  38. return pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  39. resp, err := client.LookupTopicBrokers(context.Background(), &mq_pb.LookupTopicBrokersRequest{
  40. Topic: &mq_pb.Topic{
  41. Namespace: *namespace,
  42. Name: *topicName,
  43. },
  44. })
  45. if err != nil {
  46. return err
  47. }
  48. for _, assignment := range resp.BrokerPartitionAssignments {
  49. fmt.Fprintf(writer, " %+v\n", assignment)
  50. }
  51. return nil
  52. })
  53. }