s3_sink.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package S3Sink
  2. import (
  3. "encoding/base64"
  4. "fmt"
  5. "github.com/aws/aws-sdk-go/aws"
  6. "github.com/aws/aws-sdk-go/aws/credentials"
  7. "github.com/aws/aws-sdk-go/aws/session"
  8. "github.com/aws/aws-sdk-go/service/s3"
  9. "github.com/aws/aws-sdk-go/service/s3/s3iface"
  10. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  11. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  12. "strconv"
  13. "strings"
  14. "github.com/seaweedfs/seaweedfs/weed/filer"
  15. "github.com/seaweedfs/seaweedfs/weed/glog"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  18. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  19. "github.com/seaweedfs/seaweedfs/weed/util"
  20. )
  21. type S3Sink struct {
  22. conn s3iface.S3API
  23. filerSource *source.FilerSource
  24. isIncremental bool
  25. keepPartSize bool
  26. s3DisableContentMD5Validation bool
  27. s3ForcePathStyle bool
  28. uploaderConcurrency int
  29. uploaderMaxUploadParts int
  30. uploaderPartSizeMb int
  31. region string
  32. bucket string
  33. dir string
  34. endpoint string
  35. acl string
  36. }
  37. func init() {
  38. sink.Sinks = append(sink.Sinks, &S3Sink{})
  39. }
  40. func (s3sink *S3Sink) GetName() string {
  41. return "s3"
  42. }
  43. func (s3sink *S3Sink) GetSinkToDirectory() string {
  44. return s3sink.dir
  45. }
  46. func (s3sink *S3Sink) IsIncremental() bool {
  47. return s3sink.isIncremental
  48. }
  49. func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
  50. configuration.SetDefault(prefix+"region", "us-east-2")
  51. configuration.SetDefault(prefix+"directory", "/")
  52. configuration.SetDefault(prefix+"keep_part_size", true)
  53. configuration.SetDefault(prefix+"uploader_max_upload_parts", 1000)
  54. configuration.SetDefault(prefix+"uploader_part_size_mb", 8)
  55. configuration.SetDefault(prefix+"uploader_concurrency", 8)
  56. configuration.SetDefault(prefix+"s3_disable_content_md5_validation", true)
  57. configuration.SetDefault(prefix+"s3_force_path_style", true)
  58. s3sink.region = configuration.GetString(prefix + "region")
  59. s3sink.bucket = configuration.GetString(prefix + "bucket")
  60. s3sink.dir = configuration.GetString(prefix + "directory")
  61. s3sink.endpoint = configuration.GetString(prefix + "endpoint")
  62. s3sink.acl = configuration.GetString(prefix + "acl")
  63. s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental")
  64. s3sink.keepPartSize = configuration.GetBool(prefix + "keep_part_size")
  65. s3sink.s3DisableContentMD5Validation = configuration.GetBool(prefix + "s3_disable_content_md5_validation")
  66. s3sink.s3ForcePathStyle = configuration.GetBool(prefix + "s3_force_path_style")
  67. s3sink.uploaderMaxUploadParts = configuration.GetInt(prefix + "uploader_max_upload_parts")
  68. s3sink.uploaderPartSizeMb = configuration.GetInt(prefix + "uploader_part_size")
  69. s3sink.uploaderConcurrency = configuration.GetInt(prefix + "uploader_concurrency")
  70. glog.V(0).Infof("sink.s3.region: %v", s3sink.region)
  71. glog.V(0).Infof("sink.s3.bucket: %v", s3sink.bucket)
  72. glog.V(0).Infof("sink.s3.directory: %v", s3sink.dir)
  73. glog.V(0).Infof("sink.s3.endpoint: %v", s3sink.endpoint)
  74. glog.V(0).Infof("sink.s3.acl: %v", s3sink.acl)
  75. glog.V(0).Infof("sink.s3.is_incremental: %v", s3sink.isIncremental)
  76. glog.V(0).Infof("sink.s3.s3_disable_content_md5_validation: %v", s3sink.s3DisableContentMD5Validation)
  77. glog.V(0).Infof("sink.s3.s3_force_path_style: %v", s3sink.s3ForcePathStyle)
  78. glog.V(0).Infof("sink.s3.keep_part_size: %v", s3sink.keepPartSize)
  79. if s3sink.uploaderMaxUploadParts > s3manager.MaxUploadParts {
  80. s3sink.uploaderMaxUploadParts = s3manager.MaxUploadParts
  81. glog.Warningf("uploader_max_upload_parts is greater than the maximum number of parts allowed when uploading multiple parts to Amazon S3")
  82. glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v => %v", s3sink.uploaderMaxUploadParts, s3manager.MaxUploadParts)
  83. } else {
  84. glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v", s3sink.uploaderMaxUploadParts)
  85. }
  86. glog.V(0).Infof("sink.s3.uploader_part_size_mb: %v", s3sink.uploaderPartSizeMb)
  87. glog.V(0).Infof("sink.s3.uploader_concurrency: %v", s3sink.uploaderConcurrency)
  88. return s3sink.initialize(
  89. configuration.GetString(prefix+"aws_access_key_id"),
  90. configuration.GetString(prefix+"aws_secret_access_key"),
  91. )
  92. }
  93. func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) {
  94. s3sink.filerSource = s
  95. }
  96. func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey string) error {
  97. config := &aws.Config{
  98. Region: aws.String(s3sink.region),
  99. Endpoint: aws.String(s3sink.endpoint),
  100. S3DisableContentMD5Validation: aws.Bool(s3sink.s3DisableContentMD5Validation),
  101. S3ForcePathStyle: aws.Bool(s3sink.s3ForcePathStyle),
  102. }
  103. if awsAccessKeyId != "" && awsSecretAccessKey != "" {
  104. config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
  105. }
  106. sess, err := session.NewSession(config)
  107. if err != nil {
  108. return fmt.Errorf("create aws session: %v", err)
  109. }
  110. s3sink.conn = s3.New(sess)
  111. return nil
  112. }
  113. func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
  114. key = cleanKey(key)
  115. if isDirectory {
  116. return nil
  117. }
  118. input := &s3.DeleteObjectInput{
  119. Bucket: aws.String(s3sink.bucket),
  120. Key: aws.String(key),
  121. }
  122. result, err := s3sink.conn.DeleteObject(input)
  123. if err == nil {
  124. glog.V(2).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
  125. } else {
  126. glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
  127. }
  128. return err
  129. }
  130. func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) (err error) {
  131. key = cleanKey(key)
  132. if entry.IsDirectory {
  133. return nil
  134. }
  135. reader := filer.NewFileReader(s3sink.filerSource, entry)
  136. // Create an uploader with the session and custom options
  137. uploader := s3manager.NewUploaderWithClient(s3sink.conn, func(u *s3manager.Uploader) {
  138. u.PartSize = int64(s3sink.uploaderPartSizeMb * 1024 * 1024)
  139. u.Concurrency = s3sink.uploaderConcurrency
  140. u.MaxUploadParts = s3sink.uploaderMaxUploadParts
  141. })
  142. if s3sink.keepPartSize {
  143. switch chunkCount := len(entry.Chunks); {
  144. case chunkCount > 1:
  145. if firstChunkSize := int64(entry.Chunks[0].Size); firstChunkSize > s3manager.MinUploadPartSize {
  146. uploader.PartSize = firstChunkSize
  147. }
  148. default:
  149. uploader.PartSize = 0
  150. }
  151. }
  152. doSaveMtime := true
  153. if entry.Extended == nil {
  154. entry.Extended = make(map[string][]byte)
  155. } else if _, ok := entry.Extended[s3_constants.AmzUserMetaMtime]; ok {
  156. doSaveMtime = false
  157. }
  158. if doSaveMtime {
  159. entry.Extended[s3_constants.AmzUserMetaMtime] = []byte(strconv.FormatInt(entry.Attributes.Mtime, 10))
  160. }
  161. // process tagging
  162. tags := ""
  163. for k, v := range entry.Extended {
  164. if len(tags) > 0 {
  165. tags = tags + "&"
  166. }
  167. tags = tags + k + "=" + string(v)
  168. }
  169. // Upload the file to S3.
  170. uploadInput := s3manager.UploadInput{
  171. Bucket: aws.String(s3sink.bucket),
  172. Key: aws.String(key),
  173. Body: reader,
  174. Tagging: aws.String(tags),
  175. }
  176. if len(entry.Attributes.Md5) > 0 {
  177. uploadInput.ContentMD5 = aws.String(base64.StdEncoding.EncodeToString([]byte(entry.Attributes.Md5)))
  178. }
  179. _, err = uploader.Upload(&uploadInput)
  180. return err
  181. }
  182. func (s3sink *S3Sink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
  183. key = cleanKey(key)
  184. return true, s3sink.CreateEntry(key, newEntry, signatures)
  185. }
  186. func cleanKey(key string) string {
  187. if strings.HasPrefix(key, "/") {
  188. key = key[1:]
  189. }
  190. return key
  191. }