http_util.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. package util
  2. import (
  3. "compress/gzip"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "strings"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. )
  13. var (
  14. client *http.Client
  15. Transport *http.Transport
  16. )
  17. func init() {
  18. Transport = &http.Transport{
  19. MaxIdleConns: 1024,
  20. MaxIdleConnsPerHost: 1024,
  21. }
  22. client = &http.Client{
  23. Transport: Transport,
  24. }
  25. }
  26. func Post(url string, values url.Values) ([]byte, error) {
  27. r, err := client.PostForm(url, values)
  28. if err != nil {
  29. return nil, err
  30. }
  31. defer r.Body.Close()
  32. b, err := io.ReadAll(r.Body)
  33. if r.StatusCode >= 400 {
  34. if err != nil {
  35. return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b))
  36. } else {
  37. return nil, fmt.Errorf("%s: %s", url, r.Status)
  38. }
  39. }
  40. if err != nil {
  41. return nil, err
  42. }
  43. return b, nil
  44. }
  45. // github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
  46. // may need increasing http.Client.Timeout
  47. func Get(url string) ([]byte, bool, error) {
  48. request, err := http.NewRequest("GET", url, nil)
  49. request.Header.Add("Accept-Encoding", "gzip")
  50. response, err := client.Do(request)
  51. if err != nil {
  52. return nil, true, err
  53. }
  54. defer response.Body.Close()
  55. var reader io.ReadCloser
  56. switch response.Header.Get("Content-Encoding") {
  57. case "gzip":
  58. reader, err = gzip.NewReader(response.Body)
  59. defer reader.Close()
  60. default:
  61. reader = response.Body
  62. }
  63. b, err := io.ReadAll(reader)
  64. if response.StatusCode >= 400 {
  65. retryable := response.StatusCode >= 500
  66. return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
  67. }
  68. if err != nil {
  69. return nil, false, err
  70. }
  71. return b, false, nil
  72. }
  73. func Head(url string) (http.Header, error) {
  74. r, err := client.Head(url)
  75. if err != nil {
  76. return nil, err
  77. }
  78. defer CloseResponse(r)
  79. if r.StatusCode >= 400 {
  80. return nil, fmt.Errorf("%s: %s", url, r.Status)
  81. }
  82. return r.Header, nil
  83. }
  84. func Delete(url string, jwt string) error {
  85. req, err := http.NewRequest("DELETE", url, nil)
  86. if jwt != "" {
  87. req.Header.Set("Authorization", "BEARER "+string(jwt))
  88. }
  89. if err != nil {
  90. return err
  91. }
  92. resp, e := client.Do(req)
  93. if e != nil {
  94. return e
  95. }
  96. defer resp.Body.Close()
  97. body, err := io.ReadAll(resp.Body)
  98. if err != nil {
  99. return err
  100. }
  101. switch resp.StatusCode {
  102. case http.StatusNotFound, http.StatusAccepted, http.StatusOK:
  103. return nil
  104. }
  105. m := make(map[string]interface{})
  106. if e := json.Unmarshal(body, &m); e == nil {
  107. if s, ok := m["error"].(string); ok {
  108. return errors.New(s)
  109. }
  110. }
  111. return errors.New(string(body))
  112. }
  113. func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
  114. req, err := http.NewRequest("DELETE", url, nil)
  115. if jwt != "" {
  116. req.Header.Set("Authorization", "BEARER "+string(jwt))
  117. }
  118. if err != nil {
  119. return
  120. }
  121. resp, err := client.Do(req)
  122. if err != nil {
  123. return
  124. }
  125. defer resp.Body.Close()
  126. body, err = io.ReadAll(resp.Body)
  127. if err != nil {
  128. return
  129. }
  130. httpStatus = resp.StatusCode
  131. return
  132. }
  133. func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
  134. r, err := client.PostForm(url, values)
  135. if err != nil {
  136. return err
  137. }
  138. defer CloseResponse(r)
  139. if r.StatusCode != 200 {
  140. return fmt.Errorf("%s: %s", url, r.Status)
  141. }
  142. for {
  143. n, err := r.Body.Read(allocatedBytes)
  144. if n > 0 {
  145. eachBuffer(allocatedBytes[:n])
  146. }
  147. if err != nil {
  148. if err == io.EOF {
  149. return nil
  150. }
  151. return err
  152. }
  153. }
  154. }
  155. func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
  156. r, err := client.PostForm(url, values)
  157. if err != nil {
  158. return err
  159. }
  160. defer CloseResponse(r)
  161. if r.StatusCode != 200 {
  162. return fmt.Errorf("%s: %s", url, r.Status)
  163. }
  164. return readFn(r.Body)
  165. }
  166. func DownloadFile(fileUrl string, jwt string) (filename string, header http.Header, resp *http.Response, e error) {
  167. req, err := http.NewRequest("GET", fileUrl, nil)
  168. if err != nil {
  169. return "", nil, nil, err
  170. }
  171. if len(jwt) > 0 {
  172. req.Header.Set("Authorization", "BEARER "+jwt)
  173. }
  174. response, err := client.Do(req)
  175. if err != nil {
  176. return "", nil, nil, err
  177. }
  178. header = response.Header
  179. contentDisposition := response.Header["Content-Disposition"]
  180. if len(contentDisposition) > 0 {
  181. idx := strings.Index(contentDisposition[0], "filename=")
  182. if idx != -1 {
  183. filename = contentDisposition[0][idx+len("filename="):]
  184. filename = strings.Trim(filename, "\"")
  185. }
  186. }
  187. resp = response
  188. return
  189. }
  190. func Do(req *http.Request) (resp *http.Response, err error) {
  191. return client.Do(req)
  192. }
  193. func NormalizeUrl(url string) string {
  194. if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
  195. return url
  196. }
  197. return "http://" + url
  198. }
  199. func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
  200. if cipherKey != nil {
  201. var n int
  202. _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
  203. n = copy(buf, data)
  204. })
  205. return int64(n), err
  206. }
  207. req, err := http.NewRequest("GET", fileUrl, nil)
  208. if err != nil {
  209. return 0, err
  210. }
  211. if !isFullChunk {
  212. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  213. } else {
  214. req.Header.Set("Accept-Encoding", "gzip")
  215. }
  216. r, err := client.Do(req)
  217. if err != nil {
  218. return 0, err
  219. }
  220. defer r.Body.Close()
  221. if r.StatusCode >= 400 {
  222. return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
  223. }
  224. var reader io.ReadCloser
  225. contentEncoding := r.Header.Get("Content-Encoding")
  226. switch contentEncoding {
  227. case "gzip":
  228. reader, err = gzip.NewReader(r.Body)
  229. defer reader.Close()
  230. default:
  231. reader = r.Body
  232. }
  233. var (
  234. i, m int
  235. n int64
  236. )
  237. // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
  238. // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
  239. for {
  240. m, err = reader.Read(buf[i:])
  241. i += m
  242. n += int64(m)
  243. if err == io.EOF {
  244. return n, nil
  245. }
  246. if err != nil {
  247. return n, err
  248. }
  249. if n == int64(len(buf)) {
  250. break
  251. }
  252. }
  253. // drains the response body to avoid memory leak
  254. data, _ := io.ReadAll(reader)
  255. if len(data) != 0 {
  256. glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
  257. }
  258. return n, err
  259. }
  260. func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
  261. if cipherKey != nil {
  262. return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
  263. }
  264. req, err := http.NewRequest("GET", fileUrl, nil)
  265. if err != nil {
  266. return false, err
  267. }
  268. if isFullChunk {
  269. req.Header.Add("Accept-Encoding", "gzip")
  270. } else {
  271. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  272. }
  273. r, err := client.Do(req)
  274. if err != nil {
  275. return true, err
  276. }
  277. defer CloseResponse(r)
  278. if r.StatusCode >= 400 {
  279. retryable = r.StatusCode >= 500
  280. return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
  281. }
  282. var reader io.ReadCloser
  283. contentEncoding := r.Header.Get("Content-Encoding")
  284. switch contentEncoding {
  285. case "gzip":
  286. reader, err = gzip.NewReader(r.Body)
  287. defer reader.Close()
  288. default:
  289. reader = r.Body
  290. }
  291. var (
  292. m int
  293. )
  294. buf := make([]byte, 64*1024)
  295. for {
  296. m, err = reader.Read(buf)
  297. fn(buf[:m])
  298. if err == io.EOF {
  299. return false, nil
  300. }
  301. if err != nil {
  302. return true, err
  303. }
  304. }
  305. }
  306. func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
  307. encryptedData, retryable, err := Get(fileUrl)
  308. if err != nil {
  309. return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
  310. }
  311. decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
  312. if err != nil {
  313. return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
  314. }
  315. if isContentCompressed {
  316. decryptedData, err = DecompressData(decryptedData)
  317. if err != nil {
  318. glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
  319. }
  320. }
  321. if len(decryptedData) < int(offset)+size {
  322. return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
  323. }
  324. if isFullChunk {
  325. fn(decryptedData)
  326. } else {
  327. fn(decryptedData[int(offset) : int(offset)+size])
  328. }
  329. return false, nil
  330. }
  331. func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (io.ReadCloser, error) {
  332. req, err := http.NewRequest("GET", fileUrl, nil)
  333. if err != nil {
  334. return nil, err
  335. }
  336. if rangeHeader != "" {
  337. req.Header.Add("Range", rangeHeader)
  338. } else {
  339. req.Header.Add("Accept-Encoding", "gzip")
  340. }
  341. if len(jwt) > 0 {
  342. req.Header.Set("Authorization", "BEARER "+jwt)
  343. }
  344. r, err := client.Do(req)
  345. if err != nil {
  346. return nil, err
  347. }
  348. if r.StatusCode >= 400 {
  349. return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  350. }
  351. var reader io.ReadCloser
  352. contentEncoding := r.Header.Get("Content-Encoding")
  353. switch contentEncoding {
  354. case "gzip":
  355. reader, err = gzip.NewReader(r.Body)
  356. defer reader.Close()
  357. default:
  358. reader = r.Body
  359. }
  360. return reader, nil
  361. }
  362. func CloseResponse(resp *http.Response) {
  363. reader := &CountingReader{reader: resp.Body}
  364. io.Copy(io.Discard, reader)
  365. resp.Body.Close()
  366. if reader.BytesRead > 0 {
  367. glog.V(1).Infof("response leftover %d bytes", reader.BytesRead)
  368. }
  369. }
  370. func CloseRequest(req *http.Request) {
  371. reader := &CountingReader{reader: req.Body}
  372. io.Copy(io.Discard, reader)
  373. req.Body.Close()
  374. if reader.BytesRead > 0 {
  375. glog.V(1).Infof("request leftover %d bytes", reader.BytesRead)
  376. }
  377. }
  378. type CountingReader struct {
  379. reader io.Reader
  380. BytesRead int
  381. }
  382. func (r *CountingReader) Read(p []byte) (n int, err error) {
  383. n, err = r.reader.Read(p)
  384. r.BytesRead += n
  385. return n, err
  386. }