s3api_object_multipart_handlers.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package s3api
  2. import (
  3. "fmt"
  4. "net/http"
  5. "net/url"
  6. "strconv"
  7. "strings"
  8. "github.com/aws/aws-sdk-go/aws"
  9. "github.com/aws/aws-sdk-go/service/s3"
  10. )
  11. const (
  12. maxObjectListSizeLimit = 10000 // Limit number of objects in a listObjectsResponse.
  13. maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse.
  14. maxPartsList = 10000 // Limit number of parts in a listPartsResponse.
  15. globalMaxPartID = 100000
  16. )
  17. // NewMultipartUploadHandler - New multipart upload.
  18. func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  19. bucket, object := getBucketAndObject(r)
  20. response, errCode := s3a.createMultipartUpload(&s3.CreateMultipartUploadInput{
  21. Bucket: aws.String(bucket),
  22. Key: objectKey(aws.String(object)),
  23. })
  24. if errCode != ErrNone {
  25. writeErrorResponse(w, errCode, r.URL)
  26. return
  27. }
  28. // println("NewMultipartUploadHandler", string(encodeResponse(response)))
  29. writeSuccessResponseXML(w, encodeResponse(response))
  30. }
  31. // CompleteMultipartUploadHandler - Completes multipart upload.
  32. func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  33. bucket, object := getBucketAndObject(r)
  34. // Get upload id.
  35. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  36. response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
  37. Bucket: aws.String(bucket),
  38. Key: objectKey(aws.String(object)),
  39. UploadId: aws.String(uploadID),
  40. })
  41. // println("CompleteMultipartUploadHandler", string(encodeResponse(response)), errCode)
  42. if errCode != ErrNone {
  43. writeErrorResponse(w, errCode, r.URL)
  44. return
  45. }
  46. writeSuccessResponseXML(w, encodeResponse(response))
  47. }
  48. // AbortMultipartUploadHandler - Aborts multipart upload.
  49. func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  50. bucket, object := getBucketAndObject(r)
  51. // Get upload id.
  52. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  53. response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
  54. Bucket: aws.String(bucket),
  55. Key: objectKey(aws.String(object)),
  56. UploadId: aws.String(uploadID),
  57. })
  58. if errCode != ErrNone {
  59. writeErrorResponse(w, errCode, r.URL)
  60. return
  61. }
  62. // println("AbortMultipartUploadHandler", string(encodeResponse(response)))
  63. writeSuccessResponseXML(w, encodeResponse(response))
  64. }
  65. // ListMultipartUploadsHandler - Lists multipart uploads.
  66. func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
  67. bucket, _ := getBucketAndObject(r)
  68. prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
  69. if maxUploads < 0 {
  70. writeErrorResponse(w, ErrInvalidMaxUploads, r.URL)
  71. return
  72. }
  73. if keyMarker != "" {
  74. // Marker not common with prefix is not implemented.
  75. if !strings.HasPrefix(keyMarker, prefix) {
  76. writeErrorResponse(w, ErrNotImplemented, r.URL)
  77. return
  78. }
  79. }
  80. response, errCode := s3a.listMultipartUploads(&s3.ListMultipartUploadsInput{
  81. Bucket: aws.String(bucket),
  82. Delimiter: aws.String(delimiter),
  83. EncodingType: aws.String(encodingType),
  84. KeyMarker: aws.String(keyMarker),
  85. MaxUploads: aws.Int64(int64(maxUploads)),
  86. Prefix: aws.String(prefix),
  87. UploadIdMarker: aws.String(uploadIDMarker),
  88. })
  89. if errCode != ErrNone {
  90. writeErrorResponse(w, errCode, r.URL)
  91. return
  92. }
  93. // TODO handle encodingType
  94. // println("ListMultipartUploadsHandler", string(encodeResponse(response)))
  95. writeSuccessResponseXML(w, encodeResponse(response))
  96. }
  97. // ListObjectPartsHandler - Lists object parts in a multipart upload.
  98. func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
  99. bucket, object := getBucketAndObject(r)
  100. uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
  101. if partNumberMarker < 0 {
  102. writeErrorResponse(w, ErrInvalidPartNumberMarker, r.URL)
  103. return
  104. }
  105. if maxParts < 0 {
  106. writeErrorResponse(w, ErrInvalidMaxParts, r.URL)
  107. return
  108. }
  109. response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
  110. Bucket: aws.String(bucket),
  111. Key: objectKey(aws.String(object)),
  112. MaxParts: aws.Int64(int64(maxParts)),
  113. PartNumberMarker: aws.Int64(int64(partNumberMarker)),
  114. UploadId: aws.String(uploadID),
  115. })
  116. if errCode != ErrNone {
  117. writeErrorResponse(w, errCode, r.URL)
  118. return
  119. }
  120. // println("ListObjectPartsHandler", string(encodeResponse(response)))
  121. writeSuccessResponseXML(w, encodeResponse(response))
  122. }
  123. // PutObjectPartHandler - Put an object part in a multipart upload.
  124. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
  125. bucket, _ := getBucketAndObject(r)
  126. uploadID := r.URL.Query().Get("uploadId")
  127. exists, err := s3a.exists(s3a.genUploadsFolder(bucket), uploadID, true)
  128. if !exists {
  129. writeErrorResponse(w, ErrNoSuchUpload, r.URL)
  130. return
  131. }
  132. partIDString := r.URL.Query().Get("partNumber")
  133. partID, err := strconv.Atoi(partIDString)
  134. if err != nil {
  135. writeErrorResponse(w, ErrInvalidPart, r.URL)
  136. return
  137. }
  138. if partID > globalMaxPartID {
  139. writeErrorResponse(w, ErrInvalidMaxParts, r.URL)
  140. return
  141. }
  142. dataReader := r.Body
  143. if s3a.iam.isEnabled() {
  144. rAuthType := getRequestAuthType(r)
  145. var s3ErrCode ErrorCode
  146. switch rAuthType {
  147. case authTypeStreamingSigned:
  148. dataReader, s3ErrCode = s3a.iam.newSignV4ChunkedReader(r)
  149. case authTypeSignedV2, authTypePresignedV2:
  150. _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
  151. case authTypePresigned, authTypeSigned:
  152. _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
  153. }
  154. if s3ErrCode != ErrNone {
  155. writeErrorResponse(w, s3ErrCode, r.URL)
  156. return
  157. }
  158. }
  159. defer dataReader.Close()
  160. uploadUrl := fmt.Sprintf("http://%s%s/%s/%04d.part?collection=%s",
  161. s3a.option.Filer, s3a.genUploadsFolder(bucket), uploadID, partID-1, bucket)
  162. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
  163. if errCode != ErrNone {
  164. writeErrorResponse(w, errCode, r.URL)
  165. return
  166. }
  167. setEtag(w, etag)
  168. writeSuccessResponseEmpty(w)
  169. }
  170. func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
  171. return fmt.Sprintf("%s/%s/.uploads", s3a.option.BucketsPath, bucket)
  172. }
  173. // Parse bucket url queries for ?uploads
  174. func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
  175. prefix = values.Get("prefix")
  176. keyMarker = values.Get("key-marker")
  177. uploadIDMarker = values.Get("upload-id-marker")
  178. delimiter = values.Get("delimiter")
  179. if values.Get("max-uploads") != "" {
  180. maxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
  181. } else {
  182. maxUploads = maxUploadsList
  183. }
  184. encodingType = values.Get("encoding-type")
  185. return
  186. }
  187. // Parse object url queries
  188. func getObjectResources(values url.Values) (uploadID string, partNumberMarker, maxParts int, encodingType string) {
  189. uploadID = values.Get("uploadId")
  190. partNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
  191. if values.Get("max-parts") != "" {
  192. maxParts, _ = strconv.Atoi(values.Get("max-parts"))
  193. } else {
  194. maxParts = maxPartsList
  195. }
  196. encodingType = values.Get("encoding-type")
  197. return
  198. }
  199. type byCompletedPartNumber []*s3.CompletedPart
  200. func (a byCompletedPartNumber) Len() int { return len(a) }
  201. func (a byCompletedPartNumber) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
  202. func (a byCompletedPartNumber) Less(i, j int) bool { return *a[i].PartNumber < *a[j].PartNumber }