s3_upload.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package s3_backend
  2. import (
  3. "fmt"
  4. "os"
  5. "sync/atomic"
  6. "github.com/aws/aws-sdk-go/aws"
  7. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  8. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. )
  11. func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  12. //open the file
  13. f, err := os.Open(filename)
  14. if err != nil {
  15. return 0, fmt.Errorf("failed to open file %q, %v", filename, err)
  16. }
  17. defer f.Close()
  18. info, err := f.Stat()
  19. if err != nil {
  20. return 0, fmt.Errorf("failed to stat file %q, %v", filename, err)
  21. }
  22. fileSize = info.Size()
  23. partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB
  24. for partSize*1000 < fileSize {
  25. partSize *= 4
  26. }
  27. // Create an uploader with the session and custom options
  28. uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) {
  29. u.PartSize = partSize
  30. u.Concurrency = 5
  31. })
  32. fileReader := &s3UploadProgressedReader{
  33. fp: f,
  34. size: fileSize,
  35. read: -fileSize,
  36. fn: fn,
  37. }
  38. // Upload the file to S3.
  39. var result *s3manager.UploadOutput
  40. result, err = uploader.Upload(&s3manager.UploadInput{
  41. Bucket: aws.String(destBucket),
  42. Key: aws.String(destKey),
  43. Body: fileReader,
  44. StorageClass: aws.String("STANDARD_IA"),
  45. })
  46. //in case it fails to upload
  47. if err != nil {
  48. return 0, fmt.Errorf("failed to upload file %s: %v", filename, err)
  49. }
  50. glog.V(1).Infof("file %s uploaded to %s\n", filename, result.Location)
  51. return
  52. }
  53. // adapted from https://github.com/aws/aws-sdk-go/pull/1868
  54. type s3UploadProgressedReader struct {
  55. fp *os.File
  56. size int64
  57. read int64
  58. fn func(progressed int64, percentage float32) error
  59. }
  60. func (r *s3UploadProgressedReader) Read(p []byte) (int, error) {
  61. return r.fp.Read(p)
  62. }
  63. func (r *s3UploadProgressedReader) ReadAt(p []byte, off int64) (int, error) {
  64. n, err := r.fp.ReadAt(p, off)
  65. if err != nil {
  66. return n, err
  67. }
  68. // Got the length have read( or means has uploaded), and you can construct your message
  69. atomic.AddInt64(&r.read, int64(n))
  70. if r.fn != nil {
  71. read := r.read
  72. if err := r.fn(read, float32(read*100)/float32(r.size)); err != nil {
  73. return n, err
  74. }
  75. }
  76. return n, err
  77. }
  78. func (r *s3UploadProgressedReader) Seek(offset int64, whence int) (int64, error) {
  79. return r.fp.Seek(offset, whence)
  80. }