package s3api import ( "bytes" "fmt" "io" "net/http" "net/url" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" ) func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { mimeBuffer := make([]byte, 512) size, _ := dataReader.Read(mimeBuffer) if size > 0 { r.Header.Set("Content-Type", http.DetectContentType(mimeBuffer[:size])) return io.NopCloser(io.MultiReader(bytes.NewReader(mimeBuffer[:size]), dataReader)) } return io.NopCloser(dataReader) } func urlEscapeObject(object string) string { t := urlPathEscape(removeDuplicateSlashes(object)) if strings.HasPrefix(t, "/") { return t } return "/" + t } func urlPathEscape(object string) string { var escapedParts []string for _, part := range strings.Split(object, "/") { escapedParts = append(escapedParts, url.PathEscape(part)) } return strings.Join(escapedParts, "/") } func removeDuplicateSlashes(object string) string { result := strings.Builder{} result.Grow(len(object)) isLastSlash := false for _, r := range object { switch r { case '/': if !isLastSlash { result.WriteRune(r) } isLastSlash = true default: result.WriteRune(r) isLastSlash = false } } return result.String() } func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string { object = urlPathEscape(removeDuplicateSlashes(object)) destUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, object) return destUrl } func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) { bucket, object := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("GetObjectHandler %s %s", bucket, object) if strings.HasSuffix(r.URL.Path, "/") { s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) return } destUrl := s3a.toFilerUrl(bucket, object) s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse) } func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { bucket, object := s3_constants.GetBucketAndObject(r) glog.V(3).Infof("HeadObjectHandler %s %s", bucket, object) destUrl := s3a.toFilerUrl(bucket, object) s3a.proxyToFiler(w, r, destUrl, false, passThroughResponse) } func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, isWrite bool, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int)) { glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl) start := time.Now() proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body) if err != nil { glog.Errorf("NewRequest %s: %v", destUrl, err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) for k, v := range r.URL.Query() { if _, ok := s3_constants.PassThroughHeaders[strings.ToLower(k)]; ok { proxyReq.Header[k] = v } if k == "partNumber" { proxyReq.Header[s3_constants.SeaweedFSPartNumber] = v } } for header, values := range r.Header { proxyReq.Header[header] = values } // ensure that the Authorization header is overriding any previous // Authorization header which might be already present in proxyReq s3a.maybeAddFilerJwtAuthorization(proxyReq, isWrite) resp, postErr := s3a.client.Do(proxyReq) if postErr != nil { glog.Errorf("post to filer: %v", postErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } defer util.CloseResponse(resp) if resp.StatusCode == http.StatusPreconditionFailed { s3err.WriteErrorResponse(w, r, s3err.ErrPreconditionFailed) return } if resp.StatusCode == http.StatusRequestedRangeNotSatisfiable { s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) return } if r.Method == "DELETE" { if resp.StatusCode == http.StatusNotFound { // this is normal responseStatusCode := responseFn(resp, w) s3err.PostLog(r, responseStatusCode, s3err.ErrNone) return } } if resp.StatusCode == http.StatusNotFound { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } TimeToFirstByte(r.Method, start, r) if resp.Header.Get(s3_constants.SeaweedFSIsDirectoryKey) == "true" { responseStatusCode := responseFn(resp, w) s3err.PostLog(r, responseStatusCode, s3err.ErrNone) return } if resp.StatusCode == http.StatusInternalServerError { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } // when HEAD a directory, it should be reported as no such key // https://github.com/seaweedfs/seaweedfs/issues/3457 if resp.ContentLength == -1 && resp.StatusCode != http.StatusNotModified { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } if resp.StatusCode == http.StatusBadRequest { resp_body, _ := io.ReadAll(resp.Body) switch string(resp_body) { case "InvalidPart": s3err.WriteErrorResponse(w, r, s3err.ErrInvalidPart) default: s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRequest) } resp.Body.Close() return } setUserMetadataKeyToLowercase(resp) responseStatusCode := responseFn(resp, w) s3err.PostLog(r, responseStatusCode, s3err.ErrNone) } func setUserMetadataKeyToLowercase(resp *http.Response) { for key, value := range resp.Header { if strings.HasPrefix(key, s3_constants.AmzUserMetaPrefix) { resp.Header[strings.ToLower(key)] = value delete(resp.Header, key) } } } func passThroughResponse(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int) { for k, v := range proxyResponse.Header { w.Header()[k] = v } if proxyResponse.Header.Get("Content-Range") != "" && proxyResponse.StatusCode == 200 { w.WriteHeader(http.StatusPartialContent) statusCode = http.StatusPartialContent } else { statusCode = proxyResponse.StatusCode } w.WriteHeader(statusCode) buf := mem.Allocate(128 * 1024) defer mem.Free(buf) if n, err := io.CopyBuffer(w, proxyResponse.Body, buf); err != nil { glog.V(1).Infof("passthrough response read %d bytes: %v", n, err) } return statusCode }