command_mq_topic_compact.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package shell
  2. import (
  3. "flag"
  4. "github.com/seaweedfs/seaweedfs/weed/filer_client"
  5. "github.com/seaweedfs/seaweedfs/weed/mq/logstore"
  6. "github.com/seaweedfs/seaweedfs/weed/mq/schema"
  7. "github.com/seaweedfs/seaweedfs/weed/mq/topic"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "google.golang.org/grpc"
  11. "io"
  12. "time"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandMqTopicCompact{})
  16. }
  17. type commandMqTopicCompact struct {
  18. }
  19. func (c *commandMqTopicCompact) Name() string {
  20. return "mq.topic.compact"
  21. }
  22. func (c *commandMqTopicCompact) Help() string {
  23. return `compact the topic storage into parquet format
  24. Example:
  25. mq.topic.compact -namespace <namespace> -topic <topic_name> -timeAgo <time_ago>
  26. `
  27. }
  28. func (c *commandMqTopicCompact) HasTag(tag CommandTag) bool {
  29. return ResourceHeavy == tag
  30. }
  31. func (c *commandMqTopicCompact) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
  32. // parse parameters
  33. mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  34. namespace := mqCommand.String("namespace", "", "namespace name")
  35. topicName := mqCommand.String("topic", "", "topic name")
  36. timeAgo := mqCommand.Duration("timeAgo", 2*time.Minute, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  37. replication := mqCommand.String("replication", "", "replication type")
  38. collection := mqCommand.String("collection", "", "optional collection name")
  39. dataCenter := mqCommand.String("dataCenter", "", "optional data center name")
  40. diskType := mqCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
  41. maxMB := mqCommand.Int("maxMB", 4, "split files larger than the limit")
  42. if err := mqCommand.Parse(args); err != nil {
  43. return err
  44. }
  45. storagePreference := &operation.StoragePreference{
  46. Replication: *replication,
  47. Collection: *collection,
  48. DataCenter: *dataCenter,
  49. DiskType: *diskType,
  50. MaxMB: *maxMB,
  51. }
  52. // read topic configuration
  53. fca := &filer_client.FilerClientAccessor{
  54. GetFiler: func() pb.ServerAddress {
  55. return commandEnv.option.FilerAddress
  56. },
  57. GetGrpcDialOption: func() grpc.DialOption {
  58. return commandEnv.option.GrpcDialOption
  59. },
  60. }
  61. t := topic.NewTopic(*namespace, *topicName)
  62. topicConf, err := fca.ReadTopicConfFromFiler(t)
  63. if err != nil {
  64. return err
  65. }
  66. // get record type
  67. recordType := topicConf.GetRecordType()
  68. recordType = schema.NewRecordTypeBuilder(recordType).
  69. WithField(logstore.SW_COLUMN_NAME_TS, schema.TypeInt64).
  70. WithField(logstore.SW_COLUMN_NAME_KEY, schema.TypeBytes).
  71. RecordTypeEnd()
  72. // compact the topic partition versions
  73. if err = logstore.CompactTopicPartitions(commandEnv, t, *timeAgo, recordType, storagePreference); err != nil {
  74. return err
  75. }
  76. return nil
  77. }