command_s3_clean_uploads.go 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/util"
  6. "io"
  7. "math"
  8. "time"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. )
  11. func init() {
  12. Commands = append(Commands, &commandS3CleanUploads{})
  13. }
  14. type commandS3CleanUploads struct {
  15. }
  16. func (c *commandS3CleanUploads) Name() string {
  17. return "s3.clean.uploads"
  18. }
  19. func (c *commandS3CleanUploads) Help() string {
  20. return `clean up stale multipart uploads
  21. Example:
  22. s3.clean.uploads -replication 001
  23. `
  24. }
  25. func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  26. bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  27. uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"")
  28. if err = bucketCommand.Parse(args); err != nil {
  29. return nil
  30. }
  31. var filerBucketsPath string
  32. filerBucketsPath, err = readFilerBucketsPath(commandEnv)
  33. if err != nil {
  34. return fmt.Errorf("read buckets: %v", err)
  35. }
  36. var buckets []string
  37. err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
  38. buckets = append(buckets, entry.Name)
  39. return nil
  40. }, "", false, math.MaxUint32)
  41. if err != nil {
  42. return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
  43. }
  44. for _, bucket:= range buckets {
  45. c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo)
  46. }
  47. return err
  48. }
  49. func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration) error {
  50. uploadsDir := filerBucketsPath+"/"+bucket+"/.uploads"
  51. var staleUploads []string
  52. now := time.Now()
  53. err := filer_pb.List(commandEnv, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error {
  54. ctime := time.Unix(entry.Attributes.Crtime, 0)
  55. if ctime.Add(timeAgo).Before(now) {
  56. staleUploads = append(staleUploads, entry.Name)
  57. }
  58. return nil
  59. }, "", false, math.MaxUint32)
  60. if err != nil {
  61. return fmt.Errorf("list uploads under %v: %v", uploadsDir, err)
  62. }
  63. for _, staleUpload:= range staleUploads {
  64. deleteUrl := fmt.Sprintf("http://%s:%d%s/%s?recursive=true&ignoreRecursiveError=true",commandEnv.option.FilerHost, commandEnv.option.FilerPort,uploadsDir, staleUpload)
  65. fmt.Fprintf(writer, "purge %s\n", deleteUrl)
  66. err = util.Delete(deleteUrl, "")
  67. if err != nil {
  68. return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
  69. }
  70. }
  71. return nil
  72. }