123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- package shell
- import (
- "flag"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
- "github.com/seaweedfs/seaweedfs/weed/security"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "io"
- "math"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- )
- func init() {
- Commands = append(Commands, &commandS3CleanUploads{})
- }
- type commandS3CleanUploads struct {
- }
- func (c *commandS3CleanUploads) Name() string {
- return "s3.clean.uploads"
- }
- func (c *commandS3CleanUploads) Help() string {
- return `clean up stale multipart uploads
- Example:
- s3.clean.uploads -timeAgo 1.5h
- `
- }
- func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
- bucketCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
- uploadedTimeAgo := bucketCommand.Duration("timeAgo", 24*time.Hour, "created time before now. \"1.5h\" or \"2h45m\". Valid time units are \"m\", \"h\"")
- if err = bucketCommand.Parse(args); err != nil {
- return nil
- }
- signingKey := util.GetViper().GetString("jwt.signing.key")
- var filerBucketsPath string
- filerBucketsPath, err = readFilerBucketsPath(commandEnv)
- if err != nil {
- return fmt.Errorf("read buckets: %v", err)
- }
- var buckets []string
- err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
- buckets = append(buckets, entry.Name)
- return nil
- }, "", false, math.MaxUint32)
- if err != nil {
- return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
- }
- for _, bucket := range buckets {
- if err := c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo, signingKey); err != nil {
- fmt.Fprintf(writer, fmt.Sprintf("failed cleanup uploads for bucket %s: %v", bucket, err))
- }
- }
- return err
- }
- func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io.Writer, filerBucketsPath string, bucket string, timeAgo time.Duration, signingKey string) error {
- uploadsDir := filerBucketsPath + "/" + bucket + "/" + s3_constants.MultipartUploadsFolder
- var staleUploads []string
- now := time.Now()
- err := filer_pb.List(commandEnv, uploadsDir, "", func(entry *filer_pb.Entry, isLast bool) error {
- ctime := time.Unix(entry.Attributes.Crtime, 0)
- if ctime.Add(timeAgo).Before(now) {
- staleUploads = append(staleUploads, entry.Name)
- }
- return nil
- }, "", false, math.MaxUint32)
- if err != nil {
- return fmt.Errorf("list uploads under %v: %v", uploadsDir, err)
- }
- var encodedJwt security.EncodedJwt
- if signingKey != "" {
- encodedJwt = security.GenJwtForFilerServer(security.SigningKey(signingKey), 15*60)
- }
- for _, staleUpload := range staleUploads {
- deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload)
- fmt.Fprintf(writer, "purge %s\n", deleteUrl)
- err = util.Delete(deleteUrl, string(encodedJwt))
- if err != nil && err.Error() != "" {
- return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
- }
- }
- return nil
- }
|