s3_write.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package S3Sink
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "io"
  7. "github.com/aws/aws-sdk-go/aws"
  8. "github.com/aws/aws-sdk-go/aws/awserr"
  9. "github.com/aws/aws-sdk-go/service/s3"
  10. "github.com/chrislusf/seaweedfs/weed/filer2"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. func (s3sink *S3Sink) deleteObject(key string) error {
  16. input := &s3.DeleteObjectInput{
  17. Bucket: aws.String(s3sink.bucket),
  18. Key: aws.String(key),
  19. }
  20. result, err := s3sink.conn.DeleteObject(input)
  21. if err == nil {
  22. glog.V(0).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
  23. } else {
  24. glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
  25. }
  26. return err
  27. }
  28. func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) (uploadId string, err error) {
  29. input := &s3.CreateMultipartUploadInput{
  30. Bucket: aws.String(s3sink.bucket),
  31. Key: aws.String(key),
  32. ContentType: aws.String(entry.Attributes.Mime),
  33. }
  34. result, err := s3sink.conn.CreateMultipartUpload(input)
  35. if err == nil {
  36. glog.V(0).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
  37. } else {
  38. glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err)
  39. return "", err
  40. }
  41. return *result.UploadId, err
  42. }
  43. func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error {
  44. input := &s3.AbortMultipartUploadInput{
  45. Bucket: aws.String(s3sink.bucket),
  46. Key: aws.String(key),
  47. UploadId: aws.String(uploadId),
  48. }
  49. result, err := s3sink.conn.AbortMultipartUpload(input)
  50. if err != nil {
  51. if aerr, ok := err.(awserr.Error); ok {
  52. switch aerr.Code() {
  53. case s3.ErrCodeNoSuchUpload:
  54. glog.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error())
  55. default:
  56. glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
  57. }
  58. } else {
  59. // Print the error, cast err to awserr.Error to get the Code and
  60. // Message from an error.
  61. glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
  62. }
  63. return err
  64. }
  65. glog.V(0).Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result)
  66. return nil
  67. }
  68. // To complete multipart upload
  69. func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId string, parts []*s3.CompletedPart) error {
  70. input := &s3.CompleteMultipartUploadInput{
  71. Bucket: aws.String(s3sink.bucket),
  72. Key: aws.String(key),
  73. UploadId: aws.String(uploadId),
  74. MultipartUpload: &s3.CompletedMultipartUpload{
  75. Parts: parts,
  76. },
  77. }
  78. result, err := s3sink.conn.CompleteMultipartUpload(input)
  79. if err == nil {
  80. glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
  81. } else {
  82. glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
  83. }
  84. return err
  85. }
  86. // To upload a part
  87. func (s3sink *S3Sink) uploadPart(ctx context.Context, key, uploadId string, partId int, chunk *filer2.ChunkView) (*s3.CompletedPart, error) {
  88. var readSeeker io.ReadSeeker
  89. readSeeker, err := s3sink.buildReadSeeker(ctx, chunk)
  90. if err != nil {
  91. glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
  92. return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
  93. }
  94. input := &s3.UploadPartInput{
  95. Body: readSeeker,
  96. Bucket: aws.String(s3sink.bucket),
  97. Key: aws.String(key),
  98. PartNumber: aws.Int64(int64(partId)),
  99. UploadId: aws.String(uploadId),
  100. }
  101. result, err := s3sink.conn.UploadPart(input)
  102. if err == nil {
  103. glog.V(0).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
  104. } else {
  105. glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err)
  106. }
  107. part := &s3.CompletedPart{
  108. ETag: result.ETag,
  109. PartNumber: aws.Int64(int64(partId)),
  110. }
  111. return part, err
  112. }
  113. // To upload a part by copying byte range from an existing object as data source
  114. func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySource string, sourceStart, sourceStop int) error {
  115. input := &s3.UploadPartCopyInput{
  116. Bucket: aws.String(s3sink.bucket),
  117. CopySource: aws.String(fmt.Sprintf("/%s/%s", s3sink.bucket, copySource)),
  118. CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", sourceStart, sourceStop)),
  119. Key: aws.String(key),
  120. PartNumber: aws.Int64(partId),
  121. UploadId: aws.String(uploadId),
  122. }
  123. result, err := s3sink.conn.UploadPartCopy(input)
  124. if err == nil {
  125. glog.V(0).Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result)
  126. } else {
  127. glog.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err)
  128. }
  129. return err
  130. }
  131. func (s3sink *S3Sink) buildReadSeeker(ctx context.Context, chunk *filer2.ChunkView) (io.ReadSeeker, error) {
  132. fileUrl, err := s3sink.filerSource.LookupFileId(ctx, chunk.FileId)
  133. if err != nil {
  134. return nil, err
  135. }
  136. buf := make([]byte, chunk.Size)
  137. util.ReadUrl(fileUrl, chunk.Offset, int(chunk.Size), buf, true)
  138. return bytes.NewReader(buf), nil
  139. }