upload_content.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package operation
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "mime"
  10. "mime/multipart"
  11. "net/http"
  12. "net/textproto"
  13. "path/filepath"
  14. "strings"
  15. "time"
  16. "github.com/chrislusf/seaweedfs/weed/glog"
  17. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  18. "github.com/chrislusf/seaweedfs/weed/security"
  19. "github.com/chrislusf/seaweedfs/weed/util"
  20. )
  21. type UploadResult struct {
  22. Name string `json:"name,omitempty"`
  23. Size uint32 `json:"size,omitempty"`
  24. Error string `json:"error,omitempty"`
  25. ETag string `json:"eTag,omitempty"`
  26. CipherKey []byte `json:"cipherKey,omitempty"`
  27. Mime string `json:"mime,omitempty"`
  28. Gzip uint32 `json:"gzip,omitempty"`
  29. ContentMd5 string `json:"contentMd5,omitempty"`
  30. }
  31. func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
  32. return &filer_pb.FileChunk{
  33. FileId: fileId,
  34. Offset: offset,
  35. Size: uint64(uploadResult.Size),
  36. Mtime: time.Now().UnixNano(),
  37. ETag: uploadResult.ETag,
  38. CipherKey: uploadResult.CipherKey,
  39. IsCompressed: uploadResult.Gzip > 0,
  40. }
  41. }
  42. // HTTPClient interface for testing
  43. type HTTPClient interface {
  44. Do(req *http.Request) (*http.Response, error)
  45. }
  46. var (
  47. HttpClient HTTPClient
  48. )
  49. func init() {
  50. HttpClient = &http.Client{Transport: &http.Transport{
  51. MaxIdleConnsPerHost: 1024,
  52. }}
  53. }
  54. var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
  55. // Upload sends a POST request to a volume server to upload the content with adjustable compression level
  56. func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
  57. uploadResult, err = retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
  58. return
  59. }
  60. // Upload sends a POST request to a volume server to upload the content with fast compression
  61. func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
  62. uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
  63. return
  64. }
  65. func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
  66. data, err = ioutil.ReadAll(reader)
  67. if err != nil {
  68. err = fmt.Errorf("read input: %v", err)
  69. return
  70. }
  71. uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
  72. return uploadResult, uploadErr, data
  73. }
  74. func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
  75. for i:=0; i< 3; i++ {
  76. uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
  77. if err == nil {
  78. return
  79. } else {
  80. glog.Warningf("uploading to %s: %v", uploadUrl, err)
  81. }
  82. }
  83. return
  84. }
  85. func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
  86. contentIsGzipped := isInputCompressed
  87. shouldGzipNow := false
  88. if !isInputCompressed {
  89. if mtype == "" {
  90. mtype = http.DetectContentType(data)
  91. // println("detect1 mimetype to", mtype)
  92. if mtype == "application/octet-stream" {
  93. mtype = ""
  94. }
  95. }
  96. if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed {
  97. shouldGzipNow = true
  98. } else if !iAmSure && mtype == "" && len(data) > 128 {
  99. var compressed []byte
  100. compressed, err = util.GzipData(data[0:128])
  101. shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90%
  102. }
  103. }
  104. var clearDataLen int
  105. // gzip if possible
  106. // this could be double copying
  107. clearDataLen = len(data)
  108. if shouldGzipNow {
  109. compressed, compressErr := util.GzipData(data)
  110. // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
  111. if compressErr == nil {
  112. data = compressed
  113. contentIsGzipped = true
  114. }
  115. } else if isInputCompressed {
  116. // just to get the clear data length
  117. clearData, err := util.DecompressData(data)
  118. if err == nil {
  119. clearDataLen = len(clearData)
  120. }
  121. }
  122. if cipher {
  123. // encrypt(gzip(data))
  124. // encrypt
  125. cipherKey := util.GenCipherKey()
  126. encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
  127. if encryptionErr != nil {
  128. err = fmt.Errorf("encrypt input: %v", encryptionErr)
  129. return
  130. }
  131. // upload data
  132. uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
  133. _, err = w.Write(encryptedData)
  134. return
  135. }, "", false, len(encryptedData), "", nil, jwt)
  136. if uploadResult != nil {
  137. uploadResult.Name = filename
  138. uploadResult.Mime = mtype
  139. uploadResult.CipherKey = cipherKey
  140. }
  141. } else {
  142. // upload data
  143. uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
  144. _, err = w.Write(data)
  145. return
  146. }, filename, contentIsGzipped, 0, mtype, pairMap, jwt)
  147. }
  148. if uploadResult == nil {
  149. return
  150. }
  151. uploadResult.Size = uint32(clearDataLen)
  152. if contentIsGzipped {
  153. uploadResult.Gzip = 1
  154. }
  155. return uploadResult, err
  156. }
  157. func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
  158. body_buf := bytes.NewBufferString("")
  159. body_writer := multipart.NewWriter(body_buf)
  160. h := make(textproto.MIMEHeader)
  161. h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename)))
  162. if mtype == "" {
  163. mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename)))
  164. }
  165. if mtype != "" {
  166. h.Set("Content-Type", mtype)
  167. }
  168. if isGzipped {
  169. h.Set("Content-Encoding", "gzip")
  170. }
  171. file_writer, cp_err := body_writer.CreatePart(h)
  172. if cp_err != nil {
  173. glog.V(0).Infoln("error creating form file", cp_err.Error())
  174. return nil, cp_err
  175. }
  176. if err := fillBufferFunction(file_writer); err != nil {
  177. glog.V(0).Infoln("error copying data", err)
  178. return nil, err
  179. }
  180. content_type := body_writer.FormDataContentType()
  181. if err := body_writer.Close(); err != nil {
  182. glog.V(0).Infoln("error closing body", err)
  183. return nil, err
  184. }
  185. req, postErr := http.NewRequest("POST", uploadUrl, body_buf)
  186. if postErr != nil {
  187. glog.V(1).Infof("failing to upload to %s: %v", uploadUrl, postErr)
  188. return nil, fmt.Errorf("failing to upload to %s: %v", uploadUrl, postErr)
  189. }
  190. req.Header.Set("Content-Type", content_type)
  191. for k, v := range pairMap {
  192. req.Header.Set(k, v)
  193. }
  194. if jwt != "" {
  195. req.Header.Set("Authorization", "BEARER "+string(jwt))
  196. }
  197. resp, post_err := HttpClient.Do(req)
  198. if post_err != nil {
  199. glog.V(1).Infof("failing to upload to %v: %v", uploadUrl, post_err)
  200. return nil, fmt.Errorf("failing to upload to %v: %v", uploadUrl, post_err)
  201. }
  202. defer resp.Body.Close()
  203. var ret UploadResult
  204. etag := getEtag(resp)
  205. if resp.StatusCode == http.StatusNoContent {
  206. ret.ETag = etag
  207. return &ret, nil
  208. }
  209. resp_body, ra_err := ioutil.ReadAll(resp.Body)
  210. if ra_err != nil {
  211. return nil, ra_err
  212. }
  213. unmarshal_err := json.Unmarshal(resp_body, &ret)
  214. if unmarshal_err != nil {
  215. glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body))
  216. return nil, unmarshal_err
  217. }
  218. if ret.Error != "" {
  219. return nil, errors.New(ret.Error)
  220. }
  221. ret.ETag = etag
  222. ret.ContentMd5 = resp.Header.Get("Content-MD5")
  223. return &ret, nil
  224. }
  225. func getEtag(r *http.Response) (etag string) {
  226. etag = r.Header.Get("ETag")
  227. if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") {
  228. etag = etag[1 : len(etag)-1]
  229. }
  230. return
  231. }