s3_upload.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package s3_backend
  2. import (
  3. "fmt"
  4. "os"
  5. "sync/atomic"
  6. "github.com/aws/aws-sdk-go/aws"
  7. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  8. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. )
  11. func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string,
  12. attributes map[string]string,
  13. fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  14. //open the file
  15. f, err := os.Open(filename)
  16. if err != nil {
  17. return 0, fmt.Errorf("failed to open file %q, %v", filename, err)
  18. }
  19. defer f.Close()
  20. info, err := f.Stat()
  21. if err != nil {
  22. return 0, fmt.Errorf("failed to stat file %q, %v", filename, err)
  23. }
  24. fileSize = info.Size()
  25. partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB
  26. for partSize*1000 < fileSize {
  27. partSize *= 4
  28. }
  29. // Create an uploader with the session and custom options
  30. uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) {
  31. u.PartSize = partSize
  32. u.Concurrency = 5
  33. })
  34. fileReader := &s3UploadProgressedReader{
  35. fp: f,
  36. size: fileSize,
  37. read: -fileSize,
  38. fn: fn,
  39. }
  40. // process tagging
  41. tags := ""
  42. for k, v := range attributes {
  43. if len(tags) > 0 {
  44. tags = tags + "&"
  45. }
  46. tags = tags + k + "=" + v
  47. }
  48. // Upload the file to S3.
  49. var result *s3manager.UploadOutput
  50. result, err = uploader.Upload(&s3manager.UploadInput{
  51. Bucket: aws.String(destBucket),
  52. Key: aws.String(destKey),
  53. Body: fileReader,
  54. ACL: aws.String("private"),
  55. ServerSideEncryption: aws.String("AES256"),
  56. StorageClass: aws.String("STANDARD_IA"),
  57. Tagging: aws.String(tags),
  58. })
  59. //in case it fails to upload
  60. if err != nil {
  61. return 0, fmt.Errorf("failed to upload file %s: %v", filename, err)
  62. }
  63. glog.V(1).Infof("file %s uploaded to %s\n", filename, result.Location)
  64. return
  65. }
  66. // adapted from https://github.com/aws/aws-sdk-go/pull/1868
  67. type s3UploadProgressedReader struct {
  68. fp *os.File
  69. size int64
  70. read int64
  71. fn func(progressed int64, percentage float32) error
  72. }
  73. func (r *s3UploadProgressedReader) Read(p []byte) (int, error) {
  74. return r.fp.Read(p)
  75. }
  76. func (r *s3UploadProgressedReader) ReadAt(p []byte, off int64) (int, error) {
  77. n, err := r.fp.ReadAt(p, off)
  78. if err != nil {
  79. return n, err
  80. }
  81. // Got the length have read( or means has uploaded), and you can construct your message
  82. atomic.AddInt64(&r.read, int64(n))
  83. if r.fn != nil {
  84. read := r.read
  85. if err := r.fn(read, float32(read*100)/float32(r.size)); err != nil {
  86. return n, err
  87. }
  88. }
  89. return n, err
  90. }
  91. func (r *s3UploadProgressedReader) Seek(offset int64, whence int) (int64, error) {
  92. return r.fp.Seek(offset, whence)
  93. }