http_util.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. package util
  2. import (
  3. "compress/gzip"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  8. "io"
  9. "net/http"
  10. "net/url"
  11. "strings"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. )
  14. var (
  15. client *http.Client
  16. Transport *http.Transport
  17. )
  18. func init() {
  19. Transport = &http.Transport{
  20. MaxIdleConns: 1024,
  21. MaxIdleConnsPerHost: 1024,
  22. }
  23. client = &http.Client{
  24. Transport: Transport,
  25. }
  26. }
  27. func Post(url string, values url.Values) ([]byte, error) {
  28. r, err := client.PostForm(url, values)
  29. if err != nil {
  30. return nil, err
  31. }
  32. defer r.Body.Close()
  33. b, err := io.ReadAll(r.Body)
  34. if r.StatusCode >= 400 {
  35. if err != nil {
  36. return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b))
  37. } else {
  38. return nil, fmt.Errorf("%s: %s", url, r.Status)
  39. }
  40. }
  41. if err != nil {
  42. return nil, err
  43. }
  44. return b, nil
  45. }
  46. // github.com/seaweedfs/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
  47. // may need increasing http.Client.Timeout
  48. func Get(url string) ([]byte, bool, error) {
  49. request, err := http.NewRequest("GET", url, nil)
  50. if err != nil {
  51. return nil, true, err
  52. }
  53. request.Header.Add("Accept-Encoding", "gzip")
  54. response, err := client.Do(request)
  55. if err != nil {
  56. return nil, true, err
  57. }
  58. defer CloseResponse(response)
  59. var reader io.ReadCloser
  60. switch response.Header.Get("Content-Encoding") {
  61. case "gzip":
  62. reader, err = gzip.NewReader(response.Body)
  63. if err != nil {
  64. return nil, true, err
  65. }
  66. defer reader.Close()
  67. default:
  68. reader = response.Body
  69. }
  70. b, err := io.ReadAll(reader)
  71. if response.StatusCode >= 400 {
  72. retryable := response.StatusCode >= 500
  73. return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
  74. }
  75. if err != nil {
  76. return nil, false, err
  77. }
  78. return b, false, nil
  79. }
  80. func Head(url string) (http.Header, error) {
  81. r, err := client.Head(url)
  82. if err != nil {
  83. return nil, err
  84. }
  85. defer CloseResponse(r)
  86. if r.StatusCode >= 400 {
  87. return nil, fmt.Errorf("%s: %s", url, r.Status)
  88. }
  89. return r.Header, nil
  90. }
  91. func Delete(url string, jwt string) error {
  92. req, err := http.NewRequest("DELETE", url, nil)
  93. if jwt != "" {
  94. req.Header.Set("Authorization", "BEARER "+string(jwt))
  95. }
  96. if err != nil {
  97. return err
  98. }
  99. resp, e := client.Do(req)
  100. if e != nil {
  101. return e
  102. }
  103. defer resp.Body.Close()
  104. body, err := io.ReadAll(resp.Body)
  105. if err != nil {
  106. return err
  107. }
  108. switch resp.StatusCode {
  109. case http.StatusNotFound, http.StatusAccepted, http.StatusOK:
  110. return nil
  111. }
  112. m := make(map[string]interface{})
  113. if e := json.Unmarshal(body, &m); e == nil {
  114. if s, ok := m["error"].(string); ok {
  115. return errors.New(s)
  116. }
  117. }
  118. return errors.New(string(body))
  119. }
  120. func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
  121. req, err := http.NewRequest("DELETE", url, nil)
  122. if jwt != "" {
  123. req.Header.Set("Authorization", "BEARER "+string(jwt))
  124. }
  125. if err != nil {
  126. return
  127. }
  128. resp, err := client.Do(req)
  129. if err != nil {
  130. return
  131. }
  132. defer resp.Body.Close()
  133. body, err = io.ReadAll(resp.Body)
  134. if err != nil {
  135. return
  136. }
  137. httpStatus = resp.StatusCode
  138. return
  139. }
  140. func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
  141. r, err := client.PostForm(url, values)
  142. if err != nil {
  143. return err
  144. }
  145. defer CloseResponse(r)
  146. if r.StatusCode != 200 {
  147. return fmt.Errorf("%s: %s", url, r.Status)
  148. }
  149. for {
  150. n, err := r.Body.Read(allocatedBytes)
  151. if n > 0 {
  152. eachBuffer(allocatedBytes[:n])
  153. }
  154. if err != nil {
  155. if err == io.EOF {
  156. return nil
  157. }
  158. return err
  159. }
  160. }
  161. }
  162. func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
  163. r, err := client.PostForm(url, values)
  164. if err != nil {
  165. return err
  166. }
  167. defer CloseResponse(r)
  168. if r.StatusCode != 200 {
  169. return fmt.Errorf("%s: %s", url, r.Status)
  170. }
  171. return readFn(r.Body)
  172. }
  173. func DownloadFile(fileUrl string, jwt string) (filename string, header http.Header, resp *http.Response, e error) {
  174. req, err := http.NewRequest("GET", fileUrl, nil)
  175. if err != nil {
  176. return "", nil, nil, err
  177. }
  178. if len(jwt) > 0 {
  179. req.Header.Set("Authorization", "BEARER "+jwt)
  180. }
  181. response, err := client.Do(req)
  182. if err != nil {
  183. return "", nil, nil, err
  184. }
  185. header = response.Header
  186. contentDisposition := response.Header["Content-Disposition"]
  187. if len(contentDisposition) > 0 {
  188. idx := strings.Index(contentDisposition[0], "filename=")
  189. if idx != -1 {
  190. filename = contentDisposition[0][idx+len("filename="):]
  191. filename = strings.Trim(filename, "\"")
  192. }
  193. }
  194. resp = response
  195. return
  196. }
  197. func Do(req *http.Request) (resp *http.Response, err error) {
  198. return client.Do(req)
  199. }
  200. func NormalizeUrl(url string) string {
  201. if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
  202. return url
  203. }
  204. return "http://" + url
  205. }
  206. func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
  207. if cipherKey != nil {
  208. var n int
  209. _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
  210. n = copy(buf, data)
  211. })
  212. return int64(n), err
  213. }
  214. req, err := http.NewRequest("GET", fileUrl, nil)
  215. if err != nil {
  216. return 0, err
  217. }
  218. if !isFullChunk {
  219. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  220. } else {
  221. req.Header.Set("Accept-Encoding", "gzip")
  222. }
  223. r, err := client.Do(req)
  224. if err != nil {
  225. return 0, err
  226. }
  227. defer CloseResponse(r)
  228. if r.StatusCode >= 400 {
  229. return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
  230. }
  231. var reader io.ReadCloser
  232. contentEncoding := r.Header.Get("Content-Encoding")
  233. switch contentEncoding {
  234. case "gzip":
  235. reader, err = gzip.NewReader(r.Body)
  236. if err != nil {
  237. return 0, err
  238. }
  239. defer reader.Close()
  240. default:
  241. reader = r.Body
  242. }
  243. var (
  244. i, m int
  245. n int64
  246. )
  247. // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
  248. // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
  249. for {
  250. m, err = reader.Read(buf[i:])
  251. i += m
  252. n += int64(m)
  253. if err == io.EOF {
  254. return n, nil
  255. }
  256. if err != nil {
  257. return n, err
  258. }
  259. if n == int64(len(buf)) {
  260. break
  261. }
  262. }
  263. // drains the response body to avoid memory leak
  264. data, _ := io.ReadAll(reader)
  265. if len(data) != 0 {
  266. glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
  267. }
  268. return n, err
  269. }
  270. func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
  271. if cipherKey != nil {
  272. return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
  273. }
  274. req, err := http.NewRequest("GET", fileUrl, nil)
  275. if err != nil {
  276. return false, err
  277. }
  278. if isFullChunk {
  279. req.Header.Add("Accept-Encoding", "gzip")
  280. } else {
  281. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  282. }
  283. r, err := client.Do(req)
  284. if err != nil {
  285. return true, err
  286. }
  287. defer CloseResponse(r)
  288. if r.StatusCode >= 400 {
  289. retryable = r.StatusCode == http.StatusNotFound || r.StatusCode >= 500
  290. return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
  291. }
  292. var reader io.ReadCloser
  293. contentEncoding := r.Header.Get("Content-Encoding")
  294. switch contentEncoding {
  295. case "gzip":
  296. reader, err = gzip.NewReader(r.Body)
  297. defer reader.Close()
  298. default:
  299. reader = r.Body
  300. }
  301. var (
  302. m int
  303. )
  304. buf := mem.Allocate(64 * 1024)
  305. defer mem.Free(buf)
  306. for {
  307. m, err = reader.Read(buf)
  308. if m > 0 {
  309. fn(buf[:m])
  310. }
  311. if err == io.EOF {
  312. return false, nil
  313. }
  314. if err != nil {
  315. return true, err
  316. }
  317. }
  318. }
  319. func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
  320. encryptedData, retryable, err := Get(fileUrl)
  321. if err != nil {
  322. return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
  323. }
  324. decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
  325. if err != nil {
  326. return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
  327. }
  328. if isContentCompressed {
  329. decryptedData, err = DecompressData(decryptedData)
  330. if err != nil {
  331. glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
  332. }
  333. }
  334. if len(decryptedData) < int(offset)+size {
  335. return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
  336. }
  337. if isFullChunk {
  338. fn(decryptedData)
  339. } else {
  340. fn(decryptedData[int(offset) : int(offset)+size])
  341. }
  342. return false, nil
  343. }
  344. func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*http.Response, io.ReadCloser, error) {
  345. req, err := http.NewRequest("GET", fileUrl, nil)
  346. if err != nil {
  347. return nil, nil, err
  348. }
  349. if rangeHeader != "" {
  350. req.Header.Add("Range", rangeHeader)
  351. } else {
  352. req.Header.Add("Accept-Encoding", "gzip")
  353. }
  354. if len(jwt) > 0 {
  355. req.Header.Set("Authorization", "BEARER "+jwt)
  356. }
  357. r, err := client.Do(req)
  358. if err != nil {
  359. return nil, nil, err
  360. }
  361. if r.StatusCode >= 400 {
  362. CloseResponse(r)
  363. return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  364. }
  365. var reader io.ReadCloser
  366. contentEncoding := r.Header.Get("Content-Encoding")
  367. switch contentEncoding {
  368. case "gzip":
  369. reader, err = gzip.NewReader(r.Body)
  370. if err != nil {
  371. return nil, nil, err
  372. }
  373. default:
  374. reader = r.Body
  375. }
  376. return r, reader, nil
  377. }
  378. func CloseResponse(resp *http.Response) {
  379. if resp == nil || resp.Body == nil {
  380. return
  381. }
  382. reader := &CountingReader{reader: resp.Body}
  383. io.Copy(io.Discard, reader)
  384. resp.Body.Close()
  385. if reader.BytesRead > 0 {
  386. glog.V(1).Infof("response leftover %d bytes", reader.BytesRead)
  387. }
  388. }
  389. func CloseRequest(req *http.Request) {
  390. reader := &CountingReader{reader: req.Body}
  391. io.Copy(io.Discard, reader)
  392. req.Body.Close()
  393. if reader.BytesRead > 0 {
  394. glog.V(1).Infof("request leftover %d bytes", reader.BytesRead)
  395. }
  396. }
  397. type CountingReader struct {
  398. reader io.Reader
  399. BytesRead int
  400. }
  401. func (r *CountingReader) Read(p []byte) (n int, err error) {
  402. n, err = r.reader.Read(p)
  403. r.BytesRead += n
  404. return n, err
  405. }