s3_download.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package s3_backend
  2. import (
  3. "fmt"
  4. "os"
  5. "sync/atomic"
  6. "github.com/aws/aws-sdk-go/aws"
  7. "github.com/aws/aws-sdk-go/service/s3"
  8. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  9. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. )
  12. func downloadFromS3(sess s3iface.S3API, destFileName string, sourceBucket string, sourceKey string,
  13. fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
  14. fileSize, err = getFileSize(sess, sourceBucket, sourceKey)
  15. if err != nil {
  16. return
  17. }
  18. //open the file
  19. f, err := os.OpenFile(destFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
  20. if err != nil {
  21. return 0, fmt.Errorf("failed to open file %q, %v", destFileName, err)
  22. }
  23. defer f.Close()
  24. // Create a downloader with the session and custom options
  25. downloader := s3manager.NewDownloaderWithClient(sess, func(u *s3manager.Downloader) {
  26. u.PartSize = int64(64 * 1024 * 1024)
  27. u.Concurrency = 5
  28. })
  29. fileWriter := &s3DownloadProgressedWriter{
  30. fp: f,
  31. size: fileSize,
  32. written: 0,
  33. fn: fn,
  34. }
  35. // Download the file from S3.
  36. fileSize, err = downloader.Download(fileWriter, &s3.GetObjectInput{
  37. Bucket: aws.String(sourceBucket),
  38. Key: aws.String(sourceKey),
  39. })
  40. if err != nil {
  41. return fileSize, fmt.Errorf("failed to download /buckets/%s%s to %s: %v", sourceBucket, sourceKey, destFileName, err)
  42. }
  43. glog.V(1).Infof("downloaded file %s\n", destFileName)
  44. return
  45. }
  46. // adapted from https://github.com/aws/aws-sdk-go/pull/1868
  47. // and https://petersouter.xyz/s3-download-progress-bar-in-golang/
  48. type s3DownloadProgressedWriter struct {
  49. size int64
  50. written int64
  51. fn func(progressed int64, percentage float32) error
  52. fp *os.File
  53. }
  54. func (w *s3DownloadProgressedWriter) WriteAt(p []byte, off int64) (int, error) {
  55. n, err := w.fp.WriteAt(p, off)
  56. if err != nil {
  57. return n, err
  58. }
  59. // Got the length have read( or means has uploaded), and you can construct your message
  60. atomic.AddInt64(&w.written, int64(n))
  61. if w.fn != nil {
  62. written := w.written
  63. if err := w.fn(written, float32(written*100)/float32(w.size)); err != nil {
  64. return n, err
  65. }
  66. }
  67. return n, err
  68. }
  69. func getFileSize(svc s3iface.S3API, bucket string, key string) (filesize int64, error error) {
  70. params := &s3.HeadObjectInput{
  71. Bucket: aws.String(bucket),
  72. Key: aws.String(key),
  73. }
  74. resp, err := svc.HeadObject(params)
  75. if err != nil {
  76. return 0, err
  77. }
  78. return *resp.ContentLength, nil
  79. }