command_mq_topic_desc.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package shell
  2. import (
  3. "context"
  4. "flag"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  9. "io"
  10. )
  11. func init() {
  12. Commands = append(Commands, &commandMqTopicDescribe{})
  13. }
  14. type commandMqTopicDescribe struct {
  15. }
  16. func (c *commandMqTopicDescribe) Name() string {
  17. return "mq.topic.describe"
  18. }
  19. func (c *commandMqTopicDescribe) Help() string {
  20. return `describe a topic`
  21. }
  22. func (c *commandMqTopicDescribe) HasTag(CommandTag) bool {
  23. return false
  24. }
  25. func (c *commandMqTopicDescribe) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
  26. // parse parameters
  27. mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  28. namespace := mqCommand.String("namespace", "", "namespace name")
  29. topicName := mqCommand.String("topic", "", "topic name")
  30. if err := mqCommand.Parse(args); err != nil {
  31. return err
  32. }
  33. // find the broker balancer
  34. brokerBalancer, err := findBrokerBalancer(commandEnv)
  35. if err != nil {
  36. return err
  37. }
  38. fmt.Fprintf(writer, "current balancer: %s\n", brokerBalancer)
  39. return pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  40. resp, err := client.LookupTopicBrokers(context.Background(), &mq_pb.LookupTopicBrokersRequest{
  41. Topic: &schema_pb.Topic{
  42. Namespace: *namespace,
  43. Name: *topicName,
  44. },
  45. })
  46. if err != nil {
  47. return err
  48. }
  49. for _, assignment := range resp.BrokerPartitionAssignments {
  50. fmt.Fprintf(writer, " %+v\n", assignment)
  51. }
  52. return nil
  53. })
  54. }