s3api_object_handlers_multipart.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. package s3api
  2. import (
  3. "crypto/sha1"
  4. "encoding/xml"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "strconv"
  10. "strings"
  11. "github.com/aws/aws-sdk-go/aws"
  12. "github.com/aws/aws-sdk-go/service/s3"
  13. "github.com/google/uuid"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  16. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  17. weed_server "github.com/seaweedfs/seaweedfs/weed/server"
  18. stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
  19. )
  20. const (
  21. maxObjectListSizeLimit = 10000 // Limit number of objects in a listObjectsResponse.
  22. maxUploadsList = 10000 // Limit number of uploads in a listUploadsResponse.
  23. maxPartsList = 10000 // Limit number of parts in a listPartsResponse.
  24. globalMaxPartID = 100000
  25. )
  26. // NewMultipartUploadHandler - New multipart upload.
  27. func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  28. bucket, object := s3_constants.GetBucketAndObject(r)
  29. createMultipartUploadInput := &s3.CreateMultipartUploadInput{
  30. Bucket: aws.String(bucket),
  31. Key: objectKey(aws.String(object)),
  32. Metadata: make(map[string]*string),
  33. }
  34. metadata := weed_server.SaveAmzMetaData(r, nil, false)
  35. for k, v := range metadata {
  36. createMultipartUploadInput.Metadata[k] = aws.String(string(v))
  37. }
  38. contentType := r.Header.Get("Content-Type")
  39. if contentType != "" {
  40. createMultipartUploadInput.ContentType = &contentType
  41. }
  42. response, errCode := s3a.createMultipartUpload(createMultipartUploadInput)
  43. glog.V(2).Info("NewMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
  44. if errCode != s3err.ErrNone {
  45. s3err.WriteErrorResponse(w, r, errCode)
  46. return
  47. }
  48. writeSuccessResponseXML(w, r, response)
  49. }
  50. // CompleteMultipartUploadHandler - Completes multipart upload.
  51. func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  52. // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
  53. bucket, object := s3_constants.GetBucketAndObject(r)
  54. parts := &CompleteMultipartUpload{}
  55. if err := xmlDecoder(r.Body, parts, r.ContentLength); err != nil {
  56. s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
  57. return
  58. }
  59. // Get upload id.
  60. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  61. err := s3a.checkUploadId(object, uploadID)
  62. if err != nil {
  63. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  64. return
  65. }
  66. response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
  67. Bucket: aws.String(bucket),
  68. Key: objectKey(aws.String(object)),
  69. UploadId: aws.String(uploadID),
  70. }, parts)
  71. glog.V(2).Info("CompleteMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)), errCode)
  72. if errCode != s3err.ErrNone {
  73. s3err.WriteErrorResponse(w, r, errCode)
  74. return
  75. }
  76. stats_collect.RecordBucketActiveTime(bucket)
  77. stats_collect.S3UploadedObjectsCounter.WithLabelValues(bucket).Inc()
  78. writeSuccessResponseXML(w, r, response)
  79. }
  80. // AbortMultipartUploadHandler - Aborts multipart upload.
  81. func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
  82. bucket, object := s3_constants.GetBucketAndObject(r)
  83. // Get upload id.
  84. uploadID, _, _, _ := getObjectResources(r.URL.Query())
  85. err := s3a.checkUploadId(object, uploadID)
  86. if err != nil {
  87. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  88. return
  89. }
  90. response, errCode := s3a.abortMultipartUpload(&s3.AbortMultipartUploadInput{
  91. Bucket: aws.String(bucket),
  92. Key: objectKey(aws.String(object)),
  93. UploadId: aws.String(uploadID),
  94. })
  95. if errCode != s3err.ErrNone {
  96. s3err.WriteErrorResponse(w, r, errCode)
  97. return
  98. }
  99. glog.V(2).Info("AbortMultipartUploadHandler", string(s3err.EncodeXMLResponse(response)))
  100. //https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
  101. s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
  102. s3err.PostLog(r, http.StatusNoContent, s3err.ErrNone)
  103. }
  104. // ListMultipartUploadsHandler - Lists multipart uploads.
  105. func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
  106. bucket, _ := s3_constants.GetBucketAndObject(r)
  107. prefix, keyMarker, uploadIDMarker, delimiter, maxUploads, encodingType := getBucketMultipartResources(r.URL.Query())
  108. if maxUploads < 0 {
  109. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxUploads)
  110. return
  111. }
  112. if keyMarker != "" {
  113. // Marker not common with prefix is not implemented.
  114. if !strings.HasPrefix(keyMarker, prefix) {
  115. s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
  116. return
  117. }
  118. }
  119. response, errCode := s3a.listMultipartUploads(&s3.ListMultipartUploadsInput{
  120. Bucket: aws.String(bucket),
  121. Delimiter: aws.String(delimiter),
  122. EncodingType: aws.String(encodingType),
  123. KeyMarker: aws.String(keyMarker),
  124. MaxUploads: aws.Int64(int64(maxUploads)),
  125. Prefix: aws.String(prefix),
  126. UploadIdMarker: aws.String(uploadIDMarker),
  127. })
  128. glog.V(2).Infof("ListMultipartUploadsHandler %s errCode=%d", string(s3err.EncodeXMLResponse(response)), errCode)
  129. if errCode != s3err.ErrNone {
  130. s3err.WriteErrorResponse(w, r, errCode)
  131. return
  132. }
  133. // TODO handle encodingType
  134. writeSuccessResponseXML(w, r, response)
  135. }
  136. // ListObjectPartsHandler - Lists object parts in a multipart upload.
  137. func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
  138. bucket, object := s3_constants.GetBucketAndObject(r)
  139. uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
  140. if partNumberMarker < 0 {
  141. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPartNumberMarker)
  142. return
  143. }
  144. if maxParts < 0 {
  145. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  146. return
  147. }
  148. err := s3a.checkUploadId(object, uploadID)
  149. if err != nil {
  150. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  151. return
  152. }
  153. response, errCode := s3a.listObjectParts(&s3.ListPartsInput{
  154. Bucket: aws.String(bucket),
  155. Key: objectKey(aws.String(object)),
  156. MaxParts: aws.Int64(int64(maxParts)),
  157. PartNumberMarker: aws.Int64(int64(partNumberMarker)),
  158. UploadId: aws.String(uploadID),
  159. })
  160. if errCode != s3err.ErrNone {
  161. s3err.WriteErrorResponse(w, r, errCode)
  162. return
  163. }
  164. glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part))
  165. writeSuccessResponseXML(w, r, response)
  166. }
  167. // PutObjectPartHandler - Put an object part in a multipart upload.
  168. func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
  169. bucket, object := s3_constants.GetBucketAndObject(r)
  170. uploadID := r.URL.Query().Get("uploadId")
  171. err := s3a.checkUploadId(object, uploadID)
  172. if err != nil {
  173. s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchUpload)
  174. return
  175. }
  176. partIDString := r.URL.Query().Get("partNumber")
  177. partID, err := strconv.Atoi(partIDString)
  178. if err != nil {
  179. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
  180. return
  181. }
  182. if partID > globalMaxPartID {
  183. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  184. return
  185. }
  186. dataReader := r.Body
  187. rAuthType := getRequestAuthType(r)
  188. if s3a.iam.isEnabled() {
  189. var s3ErrCode s3err.ErrorCode
  190. switch rAuthType {
  191. case authTypeStreamingSigned:
  192. dataReader, s3ErrCode = s3a.iam.newChunkedReader(r)
  193. case authTypeSignedV2, authTypePresignedV2:
  194. _, s3ErrCode = s3a.iam.isReqAuthenticatedV2(r)
  195. case authTypePresigned, authTypeSigned:
  196. _, s3ErrCode = s3a.iam.reqSignatureV4Verify(r)
  197. }
  198. if s3ErrCode != s3err.ErrNone {
  199. s3err.WriteErrorResponse(w, r, s3ErrCode)
  200. return
  201. }
  202. } else {
  203. if authTypeStreamingSigned == rAuthType {
  204. s3err.WriteErrorResponse(w, r, s3err.ErrAuthNotSetup)
  205. return
  206. }
  207. }
  208. defer dataReader.Close()
  209. glog.V(2).Infof("PutObjectPartHandler %s %s %04d", bucket, uploadID, partID)
  210. uploadUrl := s3a.genPartUploadUrl(bucket, uploadID, partID)
  211. if partID == 1 && r.Header.Get("Content-Type") == "" {
  212. dataReader = mimeDetect(r, dataReader)
  213. }
  214. destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
  215. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader, destination, bucket)
  216. if errCode != s3err.ErrNone {
  217. s3err.WriteErrorResponse(w, r, errCode)
  218. return
  219. }
  220. setEtag(w, etag)
  221. writeSuccessResponseEmpty(w, r)
  222. }
  223. func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
  224. return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder)
  225. }
  226. func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string {
  227. return fmt.Sprintf("http://%s%s/%s/%04d_%s.part",
  228. s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString())
  229. }
  230. // Generate uploadID hash string from object
  231. func (s3a *S3ApiServer) generateUploadID(object string) string {
  232. if strings.HasPrefix(object, "/") {
  233. object = object[1:]
  234. }
  235. h := sha1.New()
  236. h.Write([]byte(object))
  237. return fmt.Sprintf("%x", h.Sum(nil))
  238. }
  239. // Check object name and uploadID when processing multipart uploading
  240. func (s3a *S3ApiServer) checkUploadId(object string, id string) error {
  241. hash := s3a.generateUploadID(object)
  242. if !strings.HasPrefix(id, hash) {
  243. glog.Errorf("object %s and uploadID %s are not matched", object, id)
  244. return fmt.Errorf("object %s and uploadID %s are not matched", object, id)
  245. }
  246. return nil
  247. }
  248. // Parse bucket url queries for ?uploads
  249. func getBucketMultipartResources(values url.Values) (prefix, keyMarker, uploadIDMarker, delimiter string, maxUploads int, encodingType string) {
  250. prefix = values.Get("prefix")
  251. keyMarker = values.Get("key-marker")
  252. uploadIDMarker = values.Get("upload-id-marker")
  253. delimiter = values.Get("delimiter")
  254. if values.Get("max-uploads") != "" {
  255. maxUploads, _ = strconv.Atoi(values.Get("max-uploads"))
  256. } else {
  257. maxUploads = maxUploadsList
  258. }
  259. encodingType = values.Get("encoding-type")
  260. return
  261. }
  262. // Parse object url queries
  263. func getObjectResources(values url.Values) (uploadID string, partNumberMarker, maxParts int, encodingType string) {
  264. uploadID = values.Get("uploadId")
  265. partNumberMarker, _ = strconv.Atoi(values.Get("part-number-marker"))
  266. if values.Get("max-parts") != "" {
  267. maxParts, _ = strconv.Atoi(values.Get("max-parts"))
  268. } else {
  269. maxParts = maxPartsList
  270. }
  271. encodingType = values.Get("encoding-type")
  272. return
  273. }
  274. func xmlDecoder(body io.Reader, v interface{}, size int64) error {
  275. var lbody io.Reader
  276. if size > 0 {
  277. lbody = io.LimitReader(body, size)
  278. } else {
  279. lbody = body
  280. }
  281. d := xml.NewDecoder(lbody)
  282. d.CharsetReader = func(label string, input io.Reader) (io.Reader, error) {
  283. return input, nil
  284. }
  285. return d.Decode(v)
  286. }
  287. type CompleteMultipartUpload struct {
  288. Parts []CompletedPart `xml:"Part"`
  289. }
  290. type CompletedPart struct {
  291. ETag string
  292. PartNumber int
  293. }