s3_upload.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package s3_backend
  2. import (
  3. "fmt"
  4. "github.com/aws/aws-sdk-go/aws"
  5. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  6. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  7. "os"
  8. "sync"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. )
  11. func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string, storageClass string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  12. //open the file
  13. f, err := os.Open(filename)
  14. if err != nil {
  15. return 0, fmt.Errorf("failed to open file %q, %v", filename, err)
  16. }
  17. defer f.Close()
  18. info, err := f.Stat()
  19. if err != nil {
  20. return 0, fmt.Errorf("failed to stat file %q, %v", filename, err)
  21. }
  22. fileSize = info.Size()
  23. partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB
  24. for partSize*1000 < fileSize {
  25. partSize *= 4
  26. }
  27. // Create an uploader with the session and custom options
  28. uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) {
  29. u.PartSize = partSize
  30. u.Concurrency = 5
  31. })
  32. fileReader := &s3UploadProgressedReader{
  33. fp: f,
  34. size: fileSize,
  35. signMap: map[int64]struct{}{},
  36. fn: fn,
  37. }
  38. // Upload the file to S3.
  39. var result *s3manager.UploadOutput
  40. result, err = uploader.Upload(&s3manager.UploadInput{
  41. Bucket: aws.String(destBucket),
  42. Key: aws.String(destKey),
  43. Body: fileReader,
  44. StorageClass: aws.String(storageClass),
  45. })
  46. //in case it fails to upload
  47. if err != nil {
  48. return 0, fmt.Errorf("failed to upload file %s: %v", filename, err)
  49. }
  50. glog.V(1).Infof("file %s uploaded to %s\n", filename, result.Location)
  51. return
  52. }
  53. // adapted from https://github.com/aws/aws-sdk-go/pull/1868
  54. // https://github.com/aws/aws-sdk-go/blob/main/example/service/s3/putObjectWithProcess/putObjWithProcess.go
  55. type s3UploadProgressedReader struct {
  56. fp *os.File
  57. size int64
  58. read int64
  59. signMap map[int64]struct{}
  60. mux sync.Mutex
  61. fn func(progressed int64, percentage float32) error
  62. }
  63. func (r *s3UploadProgressedReader) Read(p []byte) (int, error) {
  64. return r.fp.Read(p)
  65. }
  66. func (r *s3UploadProgressedReader) ReadAt(p []byte, off int64) (int, error) {
  67. n, err := r.fp.ReadAt(p, off)
  68. if err != nil {
  69. return n, err
  70. }
  71. r.mux.Lock()
  72. // Ignore the first signature call
  73. if _, ok := r.signMap[off]; ok {
  74. r.read += int64(n)
  75. } else {
  76. r.signMap[off] = struct{}{}
  77. }
  78. r.mux.Unlock()
  79. if r.fn != nil {
  80. read := r.read
  81. if err := r.fn(read, float32(read*100)/float32(r.size)); err != nil {
  82. return n, err
  83. }
  84. }
  85. return n, err
  86. }
  87. func (r *s3UploadProgressedReader) Seek(offset int64, whence int) (int64, error) {
  88. return r.fp.Seek(offset, whence)
  89. }