command_mq_topic_configure.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package shell
  2. import (
  3. "context"
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
  10. "io"
  11. )
  12. func init() {
  13. Commands = append(Commands, &commandMqTopicConfigure{})
  14. }
  15. type commandMqTopicConfigure struct {
  16. }
  17. func (c *commandMqTopicConfigure) Name() string {
  18. return "mq.topic.configure"
  19. }
  20. func (c *commandMqTopicConfigure) Help() string {
  21. return `configure a topic with a given name
  22. Example:
  23. mq.topic.configure -namespace <namespace> -topic <topic_name> -partition_count <partition_count>
  24. `
  25. }
  26. func (c *commandMqTopicConfigure) HasTag(CommandTag) bool {
  27. return false
  28. }
  29. func (c *commandMqTopicConfigure) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
  30. // parse parameters
  31. mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  32. namespace := mqCommand.String("namespace", "", "namespace name")
  33. topicName := mqCommand.String("topic", "", "topic name")
  34. partitionCount := mqCommand.Int("partitionCount", 6, "partition count")
  35. if err := mqCommand.Parse(args); err != nil {
  36. return err
  37. }
  38. // find the broker balancer
  39. brokerBalancer, err := findBrokerBalancer(commandEnv)
  40. if err != nil {
  41. return err
  42. }
  43. fmt.Fprintf(writer, "current balancer: %s\n", brokerBalancer)
  44. // create topic
  45. return pb.WithBrokerGrpcClient(false, brokerBalancer, commandEnv.option.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
  46. resp, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
  47. Topic: &schema_pb.Topic{
  48. Namespace: *namespace,
  49. Name: *topicName,
  50. },
  51. PartitionCount: int32(*partitionCount),
  52. })
  53. if err != nil {
  54. return err
  55. }
  56. output, _ := json.MarshalIndent(resp, "", " ")
  57. fmt.Fprintf(writer, "response:\n%+v\n", string(output))
  58. return nil
  59. })
  60. }