command_mq_topic_desc.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "io"
  9. )
  10. func init() {
  11. Commands = append(Commands, &commandMqTopicDescribe{})
  12. }
  13. type commandMqTopicDescribe struct {
  14. }
  15. func (c *commandMqTopicDescribe) Name() string {
  16. return "mq.topic.describe"
  17. }
  18. func (c *commandMqTopicDescribe) Help() string {
  19. return `describe a topic`
  20. }
  21. func (c *commandMqTopicDescribe) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
  22. // parse parameters
  23. mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  24. namespace := mqCommand.String("namespace", "", "namespace name")
  25. topicName := mqCommand.String("topic", "", "topic name")
  26. if err := mqCommand.Parse(args); err != nil {
  27. return err
  28. }
  29. // find the broker balancer
  30. brokerBalancer, err := findBrokerBalancer(commandEnv)
  31. if err != nil {
  32. return err
  33. }
  34. fmt.Fprintf(writer, "current balancer: %s\n", brokerBalancer)
  35. return pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  36. resp, err := client.LookupTopicBrokers(context.Background(), &mq_pb.LookupTopicBrokersRequest{
  37. Topic: &mq_pb.Topic{
  38. Namespace: *namespace,
  39. Name: *topicName,
  40. },
  41. })
  42. if err != nil {
  43. return err
  44. }
  45. for _, assignment := range resp.BrokerPartitionAssignments {
  46. fmt.Fprintf(writer, " %+v\n", assignment)
  47. }
  48. return nil
  49. })
  50. }