s3_backend.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package s3_backend
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "github.com/aws/aws-sdk-go/service/s3"
  7. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  8. "github.com/chrislusf/seaweedfs/weed/glog"
  9. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. "github.com/chrislusf/seaweedfs/weed/util"
  12. )
  13. var (
  14. _ backend.DataStorageBackend = &S3Backend{}
  15. )
  16. func init() {
  17. backend.StorageBackends = append(backend.StorageBackends, &S3Backend{})
  18. }
  19. type S3Backend struct {
  20. conn s3iface.S3API
  21. region string
  22. bucket string
  23. dir string
  24. vid needle.VolumeId
  25. key string
  26. }
  27. func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
  28. bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
  29. getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{
  30. Bucket: &s3backend.bucket,
  31. Key: &s3backend.key,
  32. Range: &bytesRange,
  33. })
  34. if getObjectErr != nil {
  35. return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr)
  36. }
  37. defer getObjectOutput.Body.Close()
  38. return getObjectOutput.Body.Read(p)
  39. }
  40. func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) {
  41. panic("implement me")
  42. }
  43. func (s3backend S3Backend) Truncate(off int64) error {
  44. panic("implement me")
  45. }
  46. func (s3backend S3Backend) Close() error {
  47. return nil
  48. }
  49. func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) {
  50. headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{
  51. Bucket: &s3backend.bucket,
  52. Key: &s3backend.key,
  53. })
  54. if headObjectErr != nil {
  55. return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr)
  56. }
  57. datSize = int64(*headObjectOutput.ContentLength)
  58. modTime = *headObjectOutput.LastModified
  59. return
  60. }
  61. func (s3backend S3Backend) String() string {
  62. return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key)
  63. }
  64. func (s3backend *S3Backend) GetName() string {
  65. return "s3"
  66. }
  67. func (s3backend *S3Backend) GetSinkToDirectory() string {
  68. return s3backend.dir
  69. }
  70. func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error {
  71. glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
  72. glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
  73. glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
  74. return s3backend.initialize(
  75. configuration.GetString("aws_access_key_id"),
  76. configuration.GetString("aws_secret_access_key"),
  77. configuration.GetString("region"),
  78. configuration.GetString("bucket"),
  79. configuration.GetString("directory"),
  80. vid,
  81. )
  82. }
  83. func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string,
  84. vid needle.VolumeId) (err error) {
  85. s3backend.region = region
  86. s3backend.bucket = bucket
  87. s3backend.dir = dir
  88. s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
  89. s3backend.vid = vid
  90. s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid)
  91. if strings.HasPrefix(s3backend.key, "/") {
  92. s3backend.key = s3backend.key[1:]
  93. }
  94. return err
  95. }