s3_storage_client.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package s3
  2. import (
  3. "fmt"
  4. "github.com/aws/aws-sdk-go/aws"
  5. "github.com/aws/aws-sdk-go/aws/credentials"
  6. "github.com/aws/aws-sdk-go/aws/session"
  7. "github.com/aws/aws-sdk-go/service/s3"
  8. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  9. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  10. "github.com/chrislusf/seaweedfs/weed/remote_storage"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. func init() {
  14. remote_storage.RemoteStorageClientMakers["s3"] = new(s3RemoteStorageMaker)
  15. }
  16. type s3RemoteStorageMaker struct{}
  17. func (s s3RemoteStorageMaker) Make(conf *filer_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
  18. client := &s3RemoteStorageClient{
  19. conf: conf,
  20. }
  21. config := &aws.Config{
  22. Region: aws.String(conf.S3Region),
  23. Endpoint: aws.String(conf.S3Endpoint),
  24. S3ForcePathStyle: aws.Bool(true),
  25. }
  26. if conf.S3AccessKey != "" && conf.S3SecretKey != "" {
  27. config.Credentials = credentials.NewStaticCredentials(conf.S3AccessKey, conf.S3SecretKey, "")
  28. }
  29. sess, err := session.NewSession(config)
  30. if err != nil {
  31. return nil, fmt.Errorf("create aws session: %v", err)
  32. }
  33. client.conn = s3.New(sess)
  34. return client, nil
  35. }
  36. type s3RemoteStorageClient struct {
  37. conf *filer_pb.RemoteConf
  38. conn s3iface.S3API
  39. }
  40. func (s s3RemoteStorageClient) Traverse(remote remote_storage.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
  41. _, bucket, pathKey := remote.NameBucketPath()
  42. pathKey = pathKey[1:]
  43. listInput := &s3.ListObjectsV2Input{
  44. Bucket: aws.String(bucket),
  45. ContinuationToken: nil,
  46. Delimiter: nil, // not aws.String("/"), iterate through all entries
  47. EncodingType: nil,
  48. ExpectedBucketOwner: nil,
  49. FetchOwner: nil,
  50. MaxKeys: nil, // aws.Int64(1000),
  51. Prefix: aws.String(pathKey),
  52. RequestPayer: nil,
  53. StartAfter: nil,
  54. }
  55. isLastPage := false
  56. for !isLastPage && err == nil {
  57. listErr := s.conn.ListObjectsV2Pages(listInput, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
  58. for _, content := range page.Contents {
  59. key := (*content.Key)
  60. if len(pathKey) == 0 {
  61. key = "/" + key
  62. } else {
  63. key = key[len(pathKey):]
  64. }
  65. dir, name := util.FullPath(key).DirAndName()
  66. if err := visitFn(dir, name, false, &filer_pb.RemoteEntry{
  67. LastModifiedAt: (*content.LastModified).Unix(),
  68. Size: *content.Size,
  69. ETag: *content.ETag,
  70. StorageName: s.conf.Name,
  71. }); err != nil {
  72. return false
  73. }
  74. }
  75. listInput.ContinuationToken = page.NextContinuationToken
  76. isLastPage = lastPage
  77. return true
  78. })
  79. if listErr != nil {
  80. err = fmt.Errorf("list %v: %v", remote, listErr)
  81. }
  82. }
  83. return
  84. }