s3_storage_client.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package s3
  2. import (
  3. "fmt"
  4. "github.com/aws/aws-sdk-go/aws"
  5. "github.com/aws/aws-sdk-go/aws/credentials"
  6. "github.com/aws/aws-sdk-go/aws/session"
  7. "github.com/aws/aws-sdk-go/service/s3"
  8. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  9. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  10. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  11. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  12. "github.com/chrislusf/seaweedfs/weed/util"
  13. )
  14. func init() {
  15. remote_storage.RemoteStorageClientMakers["s3"] = new(s3RemoteStorageMaker)
  16. }
  17. type s3RemoteStorageMaker struct{}
  18. func (s s3RemoteStorageMaker) Make(conf *filer_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
  19. client := &s3RemoteStorageClient{
  20. conf: conf,
  21. }
  22. config := &aws.Config{
  23. Region: aws.String(conf.S3Region),
  24. Endpoint: aws.String(conf.S3Endpoint),
  25. S3ForcePathStyle: aws.Bool(true),
  26. }
  27. if conf.S3AccessKey != "" && conf.S3SecretKey != "" {
  28. config.Credentials = credentials.NewStaticCredentials(conf.S3AccessKey, conf.S3SecretKey, "")
  29. }
  30. sess, err := session.NewSession(config)
  31. if err != nil {
  32. return nil, fmt.Errorf("create aws session: %v", err)
  33. }
  34. client.conn = s3.New(sess)
  35. return client, nil
  36. }
  37. type s3RemoteStorageClient struct {
  38. conf *filer_pb.RemoteConf
  39. conn s3iface.S3API
  40. }
  41. func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
  42. pathKey := remote.Path[1:]
  43. listInput := &s3.ListObjectsV2Input{
  44. Bucket: aws.String(remote.Bucket),
  45. ContinuationToken: nil,
  46. Delimiter: nil, // not aws.String("/"), iterate through all entries
  47. EncodingType: nil,
  48. ExpectedBucketOwner: nil,
  49. FetchOwner: nil,
  50. MaxKeys: nil, // aws.Int64(1000),
  51. Prefix: aws.String(pathKey),
  52. RequestPayer: nil,
  53. StartAfter: nil,
  54. }
  55. isLastPage := false
  56. for !isLastPage && err == nil {
  57. listErr := s.conn.ListObjectsV2Pages(listInput, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  58. for _, content := range page.Contents {
  59. key := *content.Key
  60. if len(pathKey) == 0 {
  61. key = "/" + key
  62. } else {
  63. key = key[len(pathKey):]
  64. }
  65. dir, name := util.FullPath(key).DirAndName()
  66. if err := visitFn(dir, name, false, &filer_pb.RemoteEntry{
  67. LastModifiedAt: (*content.LastModified).Unix(),
  68. Size: *content.Size,
  69. ETag: *content.ETag,
  70. StorageName: s.conf.Name,
  71. }); err != nil {
  72. return false
  73. }
  74. }
  75. listInput.ContinuationToken = page.NextContinuationToken
  76. isLastPage = lastPage
  77. return true
  78. })
  79. if listErr != nil {
  80. err = fmt.Errorf("list %v: %v", remote, listErr)
  81. }
  82. }
  83. return
  84. }
  85. func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error) {
  86. downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) {
  87. u.PartSize = int64(4 * 1024 * 1024)
  88. u.Concurrency = 1
  89. })
  90. dataSlice := make([]byte, int(size))
  91. writerAt := aws.NewWriteAtBuffer(dataSlice)
  92. _, err = downloader.Download(writerAt, &s3.GetObjectInput{
  93. Bucket: aws.String(loc.Bucket),
  94. Key: aws.String(loc.Path[1:]),
  95. Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)),
  96. })
  97. if err != nil {
  98. return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
  99. }
  100. return writerAt.Bytes(), nil
  101. }