123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- package s3api
- import (
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
- "modernc.org/strutil"
- "net/http"
- "net/url"
- "strconv"
- "strings"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/util"
- )
- const (
- DirectiveCopy = "COPY"
- DirectiveReplace = "REPLACE"
- )
- func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
- dstBucket, dstObject := s3_constants.GetBucketAndObject(r)
- // Copy source path.
- cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
- if err != nil {
- // Save unescaped string as is.
- cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
- }
- srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
- glog.V(3).Infof("CopyObjectHandler %s %s => %s %s", srcBucket, srcObject, dstBucket, dstObject)
- replaceMeta, replaceTagging := replaceDirective(r.Header)
- if (srcBucket == dstBucket && srcObject == dstObject || cpSrcPath == "") && (replaceMeta || replaceTagging) {
- fullPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject))
- dir, name := fullPath.DirAndName()
- entry, err := s3a.getEntry(dir, name)
- if err != nil || entry.IsDirectory {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
- return
- }
- entry.Extended, err = processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
- if err != nil {
- glog.Errorf("CopyObjectHandler ValidateTags error %s: %v", r.URL, err)
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidTag)
- return
- }
- err = s3a.touch(dir, name, entry)
- if err != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
- return
- }
- writeSuccessResponseXML(w, r, CopyObjectResult{
- ETag: fmt.Sprintf("%x", entry.Attributes.Md5),
- LastModified: time.Now().UTC(),
- })
- return
- }
- // If source object is empty or bucket is empty, reply back invalid copy source.
- if srcObject == "" || srcBucket == "" {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
- return
- }
- srcPath := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, srcBucket, srcObject))
- dir, name := srcPath.DirAndName()
- if entry, err := s3a.getEntry(dir, name); err != nil || entry.IsDirectory {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
- return
- }
- if srcBucket == dstBucket && srcObject == dstObject {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopyDest)
- return
- }
- dstUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, dstBucket, urlPathEscape(dstObject))
- srcUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject))
- _, _, resp, err := util.DownloadFile(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false))
- if err != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
- return
- }
- defer util.CloseResponse(resp)
- tagErr := processMetadata(r.Header, resp.Header, replaceMeta, replaceTagging, s3a.getTags, dir, name)
- if tagErr != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
- return
- }
- glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
- destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)
- etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body, destination)
- if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, r, errCode)
- return
- }
- setEtag(w, etag)
- response := CopyObjectResult{
- ETag: etag,
- LastModified: time.Now().UTC(),
- }
- writeSuccessResponseXML(w, r, response)
- }
- func pathToBucketAndObject(path string) (bucket, object string) {
- path = strings.TrimPrefix(path, "/")
- parts := strings.SplitN(path, "/", 2)
- if len(parts) == 2 {
- return parts[0], "/" + parts[1]
- }
- return parts[0], "/"
- }
- type CopyPartResult struct {
- LastModified time.Time `xml:"LastModified"`
- ETag string `xml:"ETag"`
- }
- func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
- // https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjctsUsingRESTMPUapi.html
- // https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
- dstBucket, dstObject := s3_constants.GetBucketAndObject(r)
- // Copy source path.
- cpSrcPath, err := url.QueryUnescape(r.Header.Get("X-Amz-Copy-Source"))
- if err != nil {
- // Save unescaped string as is.
- cpSrcPath = r.Header.Get("X-Amz-Copy-Source")
- }
- srcBucket, srcObject := pathToBucketAndObject(cpSrcPath)
- // If source object is empty or bucket is empty, reply back invalid copy source.
- if srcObject == "" || srcBucket == "" {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
- return
- }
- uploadID := r.URL.Query().Get("uploadId")
- partIDString := r.URL.Query().Get("partNumber")
- partID, err := strconv.Atoi(partIDString)
- if err != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart)
- return
- }
- glog.V(3).Infof("CopyObjectPartHandler %s %s => %s part %d", srcBucket, srcObject, dstBucket, partID)
- // check partID with maximum part ID for multipart objects
- if partID > globalMaxPartID {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxParts)
- return
- }
- rangeHeader := r.Header.Get("x-amz-copy-source-range")
- dstUrl := fmt.Sprintf("http://%s%s/%s/%04d.part",
- s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(dstBucket), uploadID, partID)
- srcUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, srcBucket, urlPathEscape(srcObject))
- resp, dataReader, err := util.ReadUrlAsReaderCloser(srcUrl, s3a.maybeGetFilerJwtAuthorizationToken(false), rangeHeader)
- if err != nil {
- s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
- return
- }
- defer util.CloseResponse(resp)
- defer dataReader.Close()
- glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
- destination := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, dstBucket, dstObject)
- etag, errCode := s3a.putToFiler(r, dstUrl, dataReader, destination)
- if errCode != s3err.ErrNone {
- s3err.WriteErrorResponse(w, r, errCode)
- return
- }
- setEtag(w, etag)
- response := CopyPartResult{
- ETag: etag,
- LastModified: time.Now().UTC(),
- }
- writeSuccessResponseXML(w, r, response)
- }
- func replaceDirective(reqHeader http.Header) (replaceMeta, replaceTagging bool) {
- return reqHeader.Get(s3_constants.AmzUserMetaDirective) == DirectiveReplace, reqHeader.Get(s3_constants.AmzObjectTaggingDirective) == DirectiveReplace
- }
- func processMetadata(reqHeader, existing http.Header, replaceMeta, replaceTagging bool, getTags func(parentDirectoryPath string, entryName string) (tags map[string]string, err error), dir, name string) (err error) {
- if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) == 0 {
- if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 {
- reqHeader[s3_constants.AmzStorageClass] = sc
- }
- }
- if !replaceMeta {
- for header, _ := range reqHeader {
- if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
- delete(reqHeader, header)
- }
- }
- for k, v := range existing {
- if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
- reqHeader[k] = v
- }
- }
- }
- if !replaceTagging {
- for header, _ := range reqHeader {
- if strings.HasPrefix(header, s3_constants.AmzObjectTagging) {
- delete(reqHeader, header)
- }
- }
- found := false
- for k, _ := range existing {
- if strings.HasPrefix(k, s3_constants.AmzObjectTaggingPrefix) {
- found = true
- break
- }
- }
- if found {
- tags, err := getTags(dir, name)
- if err != nil {
- return err
- }
- var tagArr []string
- for k, v := range tags {
- tagArr = append(tagArr, fmt.Sprintf("%s=%s", k, v))
- }
- tagStr := strutil.JoinFields(tagArr, "&")
- reqHeader.Set(s3_constants.AmzObjectTagging, tagStr)
- }
- }
- return
- }
- func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, replaceMeta, replaceTagging bool) (metadata map[string][]byte, err error) {
- metadata = make(map[string][]byte)
- if sc := existing[s3_constants.AmzStorageClass]; len(sc) > 0 {
- metadata[s3_constants.AmzStorageClass] = sc
- }
- if sc := reqHeader.Get(s3_constants.AmzStorageClass); len(sc) > 0 {
- metadata[s3_constants.AmzStorageClass] = []byte(sc)
- }
- if replaceMeta {
- for header, values := range reqHeader {
- if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
- for _, value := range values {
- metadata[header] = []byte(value)
- }
- }
- }
- } else {
- for k, v := range existing {
- if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
- metadata[k] = v
- }
- }
- }
- if replaceTagging {
- if tags := reqHeader.Get(s3_constants.AmzObjectTagging); tags != "" {
- parsedTags, err := parseTagsHeader(tags)
- if err != nil {
- return nil, err
- }
- err = ValidateTags(parsedTags)
- if err != nil {
- return nil, err
- }
- for k, v := range parsedTags {
- metadata[s3_constants.AmzObjectTagging+"-"+k] = []byte(v)
- }
- }
- } else {
- for k, v := range existing {
- if strings.HasPrefix(k, s3_constants.AmzObjectTagging) {
- metadata[k] = v
- }
- }
- delete(metadata, s3_constants.AmzTagCount)
- }
- return
- }
|