123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- package s3_backend
- import (
- "fmt"
- "strings"
- "time"
- "github.com/aws/aws-sdk-go/service/s3"
- "github.com/aws/aws-sdk-go/service/s3/s3iface"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/util"
- )
- var (
- _ backend.DataStorageBackend = &S3Backend{}
- )
- func init() {
- backend.StorageBackends = append(backend.StorageBackends, &S3Backend{})
- }
- type S3Backend struct {
- conn s3iface.S3API
- region string
- bucket string
- dir string
- vid needle.VolumeId
- key string
- }
- func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
- bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
- getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
- Range: &bytesRange,
- })
- if getObjectErr != nil {
- return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr)
- }
- defer getObjectOutput.Body.Close()
- return getObjectOutput.Body.Read(p)
- }
- func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) {
- panic("implement me")
- }
- func (s3backend S3Backend) Truncate(off int64) error {
- panic("implement me")
- }
- func (s3backend S3Backend) Close() error {
- return nil
- }
- func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) {
- headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
- })
- if headObjectErr != nil {
- return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr)
- }
- datSize = int64(*headObjectOutput.ContentLength)
- modTime = *headObjectOutput.LastModified
- return
- }
- func (s3backend S3Backend) String() string {
- return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key)
- }
- func (s3backend *S3Backend) GetName() string {
- return "s3"
- }
- func (s3backend *S3Backend) GetSinkToDirectory() string {
- return s3backend.dir
- }
- func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error {
- glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
- glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
- return s3backend.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
- vid,
- )
- }
- func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string,
- vid needle.VolumeId) (err error) {
- s3backend.region = region
- s3backend.bucket = bucket
- s3backend.dir = dir
- s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
- s3backend.vid = vid
- s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid)
- if strings.HasPrefix(s3backend.key, "/") {
- s3backend.key = s3backend.key[1:]
- }
- return err
- }
|