s3api_object_handlers.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package s3api
  2. import (
  3. "crypto/md5"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "net/http"
  9. "strings"
  10. "github.com/gorilla/mux"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/server"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. )
  15. var (
  16. client *http.Client
  17. )
  18. func init() {
  19. client = &http.Client{Transport: &http.Transport{
  20. MaxIdleConnsPerHost: 1024,
  21. }}
  22. }
  23. func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
  24. // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
  25. vars := mux.Vars(r)
  26. bucket := vars["bucket"]
  27. object := getObject(vars)
  28. _, err := validateContentMd5(r.Header)
  29. if err != nil {
  30. writeErrorResponse(w, ErrInvalidDigest, r.URL)
  31. return
  32. }
  33. rAuthType := getRequestAuthType(r)
  34. dataReader := r.Body
  35. if rAuthType == authTypeStreamingSigned {
  36. dataReader = newSignV4ChunkedReader(r)
  37. }
  38. defer dataReader.Close()
  39. uploadUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
  40. s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
  41. etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
  42. if errCode != ErrNone {
  43. writeErrorResponse(w, errCode, r.URL)
  44. return
  45. }
  46. setEtag(w, etag)
  47. writeSuccessResponseEmpty(w)
  48. }
  49. func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
  50. vars := mux.Vars(r)
  51. bucket := vars["bucket"]
  52. object := getObject(vars)
  53. if strings.HasSuffix(r.URL.Path, "/") {
  54. writeErrorResponse(w, ErrNotImplemented, r.URL)
  55. return
  56. }
  57. destUrl := fmt.Sprintf("http://%s%s/%s%s",
  58. s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
  59. s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
  60. }
  61. func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
  62. vars := mux.Vars(r)
  63. bucket := vars["bucket"]
  64. object := getObject(vars)
  65. destUrl := fmt.Sprintf("http://%s%s/%s%s",
  66. s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
  67. s3a.proxyToFiler(w, r, destUrl, passThroughResponse)
  68. }
  69. func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
  70. vars := mux.Vars(r)
  71. bucket := vars["bucket"]
  72. object := getObject(vars)
  73. destUrl := fmt.Sprintf("http://%s%s/%s%s",
  74. s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
  75. s3a.proxyToFiler(w, r, destUrl, func(proxyResonse *http.Response, w http.ResponseWriter) {
  76. for k, v := range proxyResonse.Header {
  77. w.Header()[k] = v
  78. }
  79. w.WriteHeader(http.StatusNoContent)
  80. })
  81. }
  82. // DeleteMultipleObjectsHandler - Delete multiple objects
  83. func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
  84. // TODO
  85. writeErrorResponse(w, ErrNotImplemented, r.URL)
  86. }
  87. func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) {
  88. glog.V(2).Infof("s3 proxying %s to %s", r.Method, destUrl)
  89. proxyReq, err := http.NewRequest(r.Method, destUrl, r.Body)
  90. if err != nil {
  91. glog.Errorf("NewRequest %s: %v", destUrl, err)
  92. writeErrorResponse(w, ErrInternalError, r.URL)
  93. return
  94. }
  95. proxyReq.Header.Set("Host", s3a.option.Filer)
  96. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  97. proxyReq.Header.Set("Etag-MD5", "True")
  98. for header, values := range r.Header {
  99. for _, value := range values {
  100. proxyReq.Header.Add(header, value)
  101. }
  102. }
  103. resp, postErr := client.Do(proxyReq)
  104. if postErr != nil {
  105. glog.Errorf("post to filer: %v", postErr)
  106. writeErrorResponse(w, ErrInternalError, r.URL)
  107. return
  108. }
  109. defer util.CloseResponse(resp)
  110. responseFn(resp, w)
  111. }
  112. func passThroughResponse(proxyResonse *http.Response, w http.ResponseWriter) {
  113. for k, v := range proxyResonse.Header {
  114. w.Header()[k] = v
  115. }
  116. w.WriteHeader(proxyResonse.StatusCode)
  117. io.Copy(w, proxyResonse.Body)
  118. }
  119. func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader) (etag string, code ErrorCode) {
  120. hash := md5.New()
  121. var body io.Reader = io.TeeReader(dataReader, hash)
  122. proxyReq, err := http.NewRequest("PUT", uploadUrl, body)
  123. if err != nil {
  124. glog.Errorf("NewRequest %s: %v", uploadUrl, err)
  125. return "", ErrInternalError
  126. }
  127. proxyReq.Header.Set("Host", s3a.option.Filer)
  128. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  129. for header, values := range r.Header {
  130. for _, value := range values {
  131. proxyReq.Header.Add(header, value)
  132. }
  133. }
  134. resp, postErr := client.Do(proxyReq)
  135. if postErr != nil {
  136. glog.Errorf("post to filer: %v", postErr)
  137. return "", ErrInternalError
  138. }
  139. defer resp.Body.Close()
  140. etag = fmt.Sprintf("%x", hash.Sum(nil))
  141. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  142. if ra_err != nil {
  143. glog.Errorf("upload to filer response read: %v", ra_err)
  144. return etag, ErrInternalError
  145. }
  146. var ret weed_server.FilerPostResult
  147. unmarshal_err := json.Unmarshal(resp_body, &ret)
  148. if unmarshal_err != nil {
  149. glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
  150. return "", ErrInternalError
  151. }
  152. if ret.Error != "" {
  153. glog.Errorf("upload to filer error: %v", ret.Error)
  154. return "", ErrInternalError
  155. }
  156. return etag, ErrNone
  157. }
  158. func setEtag(w http.ResponseWriter, etag string) {
  159. if etag != "" {
  160. if strings.HasPrefix(etag, "\"") {
  161. w.Header().Set("ETag", etag)
  162. } else {
  163. w.Header().Set("ETag", "\""+etag+"\"")
  164. }
  165. }
  166. }
  167. func getObject(vars map[string]string) string {
  168. object := vars["object"]
  169. if !strings.HasPrefix(object, "/") {
  170. object = "/" + object
  171. }
  172. return object
  173. }