s3api_object_copy_handlers.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package s3api
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  6. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  7. "modernc.org/strutil"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. )
  15. const (
  16. DirectiveCopy = "COPY"
  17. DirectiveReplace = "REPLACE"
  18. )
  19. func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
  20. dstBucket, dstObject := s3_constants.GetBucketAndObject(r)
  21. // Copy source path.
  22. cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
  23. if err != nil {
  24. // Save unescaped string as is.
  25. cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
  26. }
  27. srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
  28. glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject)
  29. replaceMeta, replaceTagging := replaceDirective(r.Header)
  30. if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) {
  31. fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
  32. dir, name := fullPath.DirAndName()
  33. entry, err := s3a.getEntry(dir, name)
  34. if err != nil || entry.IsDirectory {
  35. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  36. return
  37. }
  38. entry.Extended, err = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
  39. entry.Attributes.Mtime = time.Now().Unix()
  40. if err != nil {
  41. glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, err)
  42. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
  43. return
  44. }
  45. err = s3a.touch(dir, name, entry)
  46. if err != nil {
  47. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  48. return
  49. }
  50. writeSuccessResponseXML(w, r, CopyObjectResult{
  51. ETag: fmt.Sprintf("%x", entry.Attributes.Md5),
  52. LastModified: time.Now().UTC(),
  53. })
  54. return
  55. }
  56. // If source object is empty or bucket is empty, reply back invalid copy source.
  57. if srcObject == "" || srcBucket == "" {
  58. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  59. return
  60. }
  61. srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
  62. dir, name := srcPath.DirAndName()
  63. if entry, err := s3a.getEntry(dir, name); err != nil || entry.IsDirectory {
  64. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  65. return
  66. }
  67. if srcBucket == dstBucket && srcObject == dstObject {
  68. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopyDest)
  69. return
  70. }
  71. dstUrl := fmt.Sprintf("http://%s%s/%s%s",
  72. s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlEscapeObject(dstObject))
  73. srcUrl := fmt.Sprintf("http://%s%s/%s%s",
  74. s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject))
  75. _, _, resp, err := util.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false))
  76. if err != nil {
  77. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  78. return
  79. }
  80. defer util.CloseResponse(resp)
  81. tagErr := processMetadata(r.Header, resp.Header, replaceMeta, replaceTagging, s3a.getTags, dir, name)
  82. if tagErr != nil {
  83. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  84. return
  85. }
  86. glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
  87. destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)
  88. etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body, destination, dstBucket)
  89. if errCode != s3err.ErrNone {
  90. s3err.WriteErrorResponse(w, r, errCode)
  91. return
  92. }
  93. setEtag(w, etag)
  94. response := CopyObjectResult{
  95. ETag: etag,
  96. LastModified: time.Now().UTC(),
  97. }
  98. writeSuccessResponseXML(w, r, response)
  99. }
  100. func pathToBucketAndObject(path string) (bucket, object string) {
  101. path = strings.TrimPrefix(path, "/")
  102. parts := strings.SplitN(path, "/", 2)
  103. if len(parts) == 2 {
  104. return parts[0], "/" + parts[1]
  105. }
  106. return parts[0], "/"
  107. }
  108. type CopyPartResult struct {
  109. LastModified time.Time `xml:"LastModified"`
  110. ETag string `xml:"ETag"`
  111. }
  112. func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
  113. // https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html
  114. // https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
  115. dstBucket, dstObject := s3_constants.GetBucketAndObject(r)
  116. // Copy source path.
  117. cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
  118. if err != nil {
  119. // Save unescaped string as is.
  120. cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
  121. }
  122. srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
  123. // If source object is empty or bucket is empty, reply back invalid copy source.
  124. if srcObject == "" || srcBucket == "" {
  125. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  126. return
  127. }
  128. uploadID := r.URL.Query().Get("uploadId")
  129. partIDString := r.URL.Query().Get("partNumber")
  130. partID, err := strconv.Atoi(partIDString)
  131. if err != nil {
  132. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
  133. return
  134. }
  135. glog.V(3).Infof("CopyObjectPartHandler %s %s => %s part %d", srcBucket, srcObject, dstBucket, partID)
  136. // check partID with maximum part ID for multipart objects
  137. if partID > globalMaxPartID {
  138. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
  139. return
  140. }
  141. rangeHeader := r.Header.Get("x-amz-copy-source-range")
  142. dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
  143. s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID)
  144. srcUrl := fmt.Sprintf("http://%s%s/%s%s",
  145. s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlEscapeObject(srcObject))
  146. resp, dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader)
  147. if err != nil {
  148. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
  149. return
  150. }
  151. defer util.CloseResponse(resp)
  152. defer dataReader.Close()
  153. glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
  154. destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)
  155. etag, errCode := s3a.putToFiler(r, dstUrl, dataReader, destination, dstBucket)
  156. if errCode != s3err.ErrNone {
  157. s3err.WriteErrorResponse(w, r, errCode)
  158. return
  159. }
  160. setEtag(w, etag)
  161. response := CopyPartResult{
  162. ETag: etag,
  163. LastModified: time.Now().UTC(),
  164. }
  165. writeSuccessResponseXML(w, r, response)
  166. }
  167. func replaceDirective(reqHeader http.Header) (replaceMeta, replaceTagging bool) {
  168. return reqHeader.Get(s3_constants.AmzUserMetaDirective) == DirectiveReplace, reqHeader.Get(s3_constants.AmzObjectTaggingDirective) == DirectiveReplace
  169. }
  170. func processMetadata(reqHeader, existing http.Header, replaceMeta, replaceTagging bool, getTags func(parentDirectoryPath string, entryName string) (tags map[string]string, err error), dir, name string) (err error) {
  171. if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) == 0 {
  172. if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 {
  173. reqHeader[s3_constants.AmzStorageClass] = sc
  174. }
  175. }
  176. if !replaceMeta {
  177. for header, _ := range reqHeader {
  178. if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
  179. delete(reqHeader, header)
  180. }
  181. }
  182. for k, v := range existing {
  183. if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
  184. reqHeader[k] = v
  185. }
  186. }
  187. }
  188. if !replaceTagging {
  189. for header, _ := range reqHeader {
  190. if strings.HasPrefix(header, s3_constants.AmzObjectTagging) {
  191. delete(reqHeader, header)
  192. }
  193. }
  194. found := false
  195. for k, _ := range existing {
  196. if strings.HasPrefix(k, s3_constants.AmzObjectTaggingPrefix) {
  197. found = true
  198. break
  199. }
  200. }
  201. if found {
  202. tags, err := getTags(dir, name)
  203. if err != nil {
  204. return err
  205. }
  206. var tagArr []string
  207. for k, v := range tags {
  208. tagArr = append(tagArr, fmt.Sprintf("%s=%s", k, v))
  209. }
  210. tagStr := strutil.JoinFields(tagArr, "&")
  211. reqHeader.Set(s3_constants.AmzObjectTagging, tagStr)
  212. }
  213. }
  214. return
  215. }
  216. func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte, err error) {
  217. metadata = make(map[string][]byte)
  218. if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 {
  219. metadata[s3_constants.AmzStorageClass] = sc
  220. }
  221. if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) > 0 {
  222. metadata[s3_constants.AmzStorageClass] = []byte(sc)
  223. }
  224. if replaceMeta {
  225. for header, values := range reqHeader {
  226. if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
  227. for _, value := range values {
  228. metadata[header] = []byte(value)
  229. }
  230. }
  231. }
  232. } else {
  233. for k, v := range existing {
  234. if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
  235. metadata[k] = v
  236. }
  237. }
  238. }
  239. if replaceTagging {
  240. if tags := reqHeader.Get(s3_constants.AmzObjectTagging); tags != "" {
  241. parsedTags, err := parseTagsHeader(tags)
  242. if err != nil {
  243. return nil, err
  244. }
  245. err = ValidateTags(parsedTags)
  246. if err != nil {
  247. return nil, err
  248. }
  249. for k, v := range parsedTags {
  250. metadata[s3_constants.AmzObjectTagging+"-"+k] = []byte(v)
  251. }
  252. }
  253. } else {
  254. for k, v := range existing {
  255. if strings.HasPrefix(k, s3_constants.AmzObjectTagging) {
  256. metadata[k] = v
  257. }
  258. }
  259. delete(metadata, s3_constants.AmzTagCount)
  260. }
  261. return
  262. }