s3api_object_multipart_handlers.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package s3api
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
  6. "net/http"
  7. "net/url"
  8. "strconv"
  9. "strings"
  10. "github.com/aws/aws-sdk-go/aws"
  11. "github.com/aws/aws-sdk-go/service/s3"
  12. )
  13. const (
  14. maxObjectListSizeLimit = 10000 // Limit number of objects in a listObjectsResponse.
  15. maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse.
  16. maxPartsList = 10000 // Limit number of parts in a listPartsResponse.
  17. globalMaxPartID = 100000
  18. )
  19. // NewMultipartUploadHandler - New multipart upload.
  20. func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  21. bucket, object := getBucketAndObject(r)
  22. response, errCode := s3a.createMultipartUpload(&s3.CreateMultipartUploadInput{
  23. Bucket: aws.String(bucket),
  24. Key: objectKey(aws.String(object)),
  25. })
  26. glog.V(2).Info("NewMultipartUploadHandler", string(encodeResponse(response)), errCode)
  27. if errCode != s3err.ErrNone {
  28. writeErrorResponse(w, errCode, r.URL)
  29. return
  30. }
  31. writeSuccessResponseXML(w, encodeResponse(response))
  32. }
  33. // CompleteMultipartUploadHandler - Completes multipart upload.
  34. func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  35. bucket, object := getBucketAndObject(r)
  36. // Get upload id.
  37. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  38. response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
  39. Bucket: aws.String(bucket),
  40. Key: objectKey(aws.String(object)),
  41. UploadId: aws.String(uploadID),
  42. })
  43. glog.V(2).Info("CompleteMultipartUploadHandler", string(encodeResponse(response)), errCode)
  44. if errCode != s3err.ErrNone {
  45. writeErrorResponse(w, errCode, r.URL)
  46. return
  47. }
  48. writeSuccessResponseXML(w, encodeResponse(response))
  49. }
  50. // AbortMultipartUploadHandler - Aborts multipart upload.
  51. func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  52. bucket, object := getBucketAndObject(r)
  53. // Get upload id.
  54. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  55. response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
  56. Bucket: aws.String(bucket),
  57. Key: objectKey(aws.String(object)),
  58. UploadId: aws.String(uploadID),
  59. })
  60. if errCode != s3err.ErrNone {
  61. writeErrorResponse(w, errCode, r.URL)
  62. return
  63. }
  64. glog.V(2).Info("AbortMultipartUploadHandler", string(encodeResponse(response)))
  65. writeSuccessResponseXML(w, encodeResponse(response))
  66. }
  67. // ListMultipartUploadsHandler - Lists multipart uploads.
  68. func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
  69. bucket, _ := getBucketAndObject(r)
  70. prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
  71. if maxUploads < 0 {
  72. writeErrorResponse(w, s3err.ErrInvalidMaxUploads, r.URL)
  73. return
  74. }
  75. if keyMarker != "" {
  76. // Marker not common with prefix is not implemented.
  77. if !strings.HasPrefix(keyMarker, prefix) {
  78. writeErrorResponse(w, s3err.ErrNotImplemented, r.URL)
  79. return
  80. }
  81. }
  82. response, errCode := s3a.listMultipartUploads(&s3.ListMultipartUploadsInput{
  83. Bucket: aws.String(bucket),
  84. Delimiter: aws.String(delimiter),
  85. EncodingType: aws.String(encodingType),
  86. KeyMarker: aws.String(keyMarker),
  87. MaxUploads: aws.Int64(int64(maxUploads)),
  88. Prefix: aws.String(prefix),
  89. UploadIdMarker: aws.String(uploadIDMarker),
  90. })
  91. glog.V(2).Info("ListMultipartUploadsHandler", string(encodeResponse(response)), errCode)
  92. if errCode != s3err.ErrNone {
  93. writeErrorResponse(w, errCode, r.URL)
  94. return
  95. }
  96. // TODO handle encodingType
  97. writeSuccessResponseXML(w, encodeResponse(response))
  98. }
  99. // ListObjectPartsHandler - Lists object parts in a multipart upload.
  100. func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
  101. bucket, object := getBucketAndObject(r)
  102. uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
  103. if partNumberMarker < 0 {
  104. writeErrorResponse(w, s3err.ErrInvalidPartNumberMarker, r.URL)
  105. return
  106. }
  107. if maxParts < 0 {
  108. writeErrorResponse(w, s3err.ErrInvalidMaxParts, r.URL)
  109. return
  110. }
  111. response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
  112. Bucket: aws.String(bucket),
  113. Key: objectKey(aws.String(object)),
  114. MaxParts: aws.Int64(int64(maxParts)),
  115. PartNumberMarker: aws.Int64(int64(partNumberMarker)),
  116. UploadId: aws.String(uploadID),
  117. })
  118. glog.V(2).Info("ListObjectPartsHandler", string(encodeResponse(response)), errCode)
  119. if errCode != s3err.ErrNone {
  120. writeErrorResponse(w, errCode, r.URL)
  121. return
  122. }
  123. writeSuccessResponseXML(w, encodeResponse(response))
  124. }
  125. // PutObjectPartHandler - Put an object part in a multipart upload.
  126. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
  127. bucket, _ := getBucketAndObject(r)
  128. uploadID := r.URL.Query().Get("uploadId")
  129. exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
  130. if !exists {
  131. writeErrorResponse(w, s3err.ErrNoSuchUpload, r.URL)
  132. return
  133. }
  134. partIDString := r.URL.Query().Get("partNumber")
  135. partID, err := strconv.Atoi(partIDString)
  136. if err != nil {
  137. writeErrorResponse(w, s3err.ErrInvalidPart, r.URL)
  138. return
  139. }
  140. if partID > globalMaxPartID {
  141. writeErrorResponse(w, s3err.ErrInvalidMaxParts, r.URL)
  142. return
  143. }
  144. dataReader := r.Body
  145. if s3a.iam.isEnabled() {
  146. rAuthType := getRequestAuthType(r)
  147. var s3ErrCode s3err.ErrorCode
  148. switch rAuthType {
  149. case authTypeStreamingSigned:
  150. dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
  151. case authTypeSignedV2, authTypePresignedV2:
  152. _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
  153. case authTypePresigned, authTypeSigned:
  154. _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
  155. }
  156. if s3ErrCode != s3err.ErrNone {
  157. writeErrorResponse(w, s3ErrCode, r.URL)
  158. return
  159. }
  160. }
  161. defer dataReader.Close()
  162. uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
  163. s3a.option.Filer, s3a.genUploadsFolder(bucket), uploadID, partID, bucket)
  164. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
  165. if errCode != s3err.ErrNone {
  166. writeErrorResponse(w, errCode, r.URL)
  167. return
  168. }
  169. setEtag(w, etag)
  170. writeSuccessResponseEmpty(w)
  171. }
  172. func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
  173. return fmt.Sprintf("%s/%s/.uploads", s3a.option.BucketsPath, bucket)
  174. }
  175. // Parse bucket url queries for ?uploads
  176. func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
  177. prefix = values.Get("prefix")
  178. keyMarker = values.Get("key-marker")
  179. uploadIDMarker = values.Get("upload-id-marker")
  180. delimiter = values.Get("delimiter")
  181. if values.Get("max-uploads") != "" {
  182. maxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
  183. } else {
  184. maxUploads = maxUploadsList
  185. }
  186. encodingType = values.Get("encoding-type")
  187. return
  188. }
  189. // Parse object url queries
  190. func getObjectResources(values url.Values) (uploadID string, partNumberMarker, maxParts int, encodingType string) {
  191. uploadID = values.Get("uploadId")
  192. partNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
  193. if values.Get("max-parts") != "" {
  194. maxParts, _ = strconv.Atoi(values.Get("max-parts"))
  195. } else {
  196. maxParts = maxPartsList
  197. }
  198. encodingType = values.Get("encoding-type")
  199. return
  200. }
  201. type byCompletedPartNumber []*s3.CompletedPart
  202. func (a byCompletedPartNumber) Len() int { return len(a) }
  203. func (a byCompletedPartNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  204. func (a byCompletedPartNumber) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }