http_util.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. package util
  2. import (
  3. "compress/gzip"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  8. "io"
  9. "net/http"
  10. "net/url"
  11. "strings"
  12. "time"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. )
  15. var (
  16. client *http.Client
  17. Transport *http.Transport
  18. )
  19. func init() {
  20. Transport = &http.Transport{
  21. MaxIdleConns: 1024,
  22. MaxIdleConnsPerHost: 1024,
  23. }
  24. client = &http.Client{
  25. Transport: Transport,
  26. }
  27. }
  28. func Post(url string, values url.Values) ([]byte, error) {
  29. r, err := client.PostForm(url, values)
  30. if err != nil {
  31. return nil, err
  32. }
  33. defer r.Body.Close()
  34. b, err := io.ReadAll(r.Body)
  35. if r.StatusCode >= 400 {
  36. if err != nil {
  37. return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b))
  38. } else {
  39. return nil, fmt.Errorf("%s: %s", url, r.Status)
  40. }
  41. }
  42. if err != nil {
  43. return nil, err
  44. }
  45. return b, nil
  46. }
  47. // github.com/seaweedfs/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
  48. // may need increasing http.Client.Timeout
  49. func Get(url string) ([]byte, bool, error) {
  50. return GetAuthenticated(url, "")
  51. }
  52. func GetAuthenticated(url, jwt string) ([]byte, bool, error) {
  53. request, err := http.NewRequest(http.MethodGet, url, nil)
  54. if err != nil {
  55. return nil, true, err
  56. }
  57. maybeAddAuth(request, jwt)
  58. request.Header.Add("Accept-Encoding", "gzip")
  59. response, err := client.Do(request)
  60. if err != nil {
  61. return nil, true, err
  62. }
  63. defer CloseResponse(response)
  64. var reader io.ReadCloser
  65. switch response.Header.Get("Content-Encoding") {
  66. case "gzip":
  67. reader, err = gzip.NewReader(response.Body)
  68. if err != nil {
  69. return nil, true, err
  70. }
  71. defer reader.Close()
  72. default:
  73. reader = response.Body
  74. }
  75. b, err := io.ReadAll(reader)
  76. if response.StatusCode >= 400 {
  77. retryable := response.StatusCode >= 500
  78. return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
  79. }
  80. if err != nil {
  81. return nil, false, err
  82. }
  83. return b, false, nil
  84. }
  85. func Head(url string) (http.Header, error) {
  86. r, err := client.Head(url)
  87. if err != nil {
  88. return nil, err
  89. }
  90. defer CloseResponse(r)
  91. if r.StatusCode >= 400 {
  92. return nil, fmt.Errorf("%s: %s", url, r.Status)
  93. }
  94. return r.Header, nil
  95. }
  96. func maybeAddAuth(req *http.Request, jwt string) {
  97. if jwt != "" {
  98. req.Header.Set("Authorization", "BEARER "+string(jwt))
  99. }
  100. }
  101. func Delete(url string, jwt string) error {
  102. req, err := http.NewRequest(http.MethodDelete, url, nil)
  103. maybeAddAuth(req, jwt)
  104. if err != nil {
  105. return err
  106. }
  107. resp, e := client.Do(req)
  108. if e != nil {
  109. return e
  110. }
  111. defer resp.Body.Close()
  112. body, err := io.ReadAll(resp.Body)
  113. if err != nil {
  114. return err
  115. }
  116. switch resp.StatusCode {
  117. case http.StatusNotFound, http.StatusAccepted, http.StatusOK:
  118. return nil
  119. }
  120. m := make(map[string]interface{})
  121. if e := json.Unmarshal(body, &m); e == nil {
  122. if s, ok := m["error"].(string); ok {
  123. return errors.New(s)
  124. }
  125. }
  126. return errors.New(string(body))
  127. }
  128. func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
  129. req, err := http.NewRequest(http.MethodDelete, url, nil)
  130. maybeAddAuth(req, jwt)
  131. if err != nil {
  132. return
  133. }
  134. resp, err := client.Do(req)
  135. if err != nil {
  136. return
  137. }
  138. defer resp.Body.Close()
  139. body, err = io.ReadAll(resp.Body)
  140. if err != nil {
  141. return
  142. }
  143. httpStatus = resp.StatusCode
  144. return
  145. }
  146. func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
  147. r, err := client.PostForm(url, values)
  148. if err != nil {
  149. return err
  150. }
  151. defer CloseResponse(r)
  152. if r.StatusCode != 200 {
  153. return fmt.Errorf("%s: %s", url, r.Status)
  154. }
  155. for {
  156. n, err := r.Body.Read(allocatedBytes)
  157. if n > 0 {
  158. eachBuffer(allocatedBytes[:n])
  159. }
  160. if err != nil {
  161. if err == io.EOF {
  162. return nil
  163. }
  164. return err
  165. }
  166. }
  167. }
  168. func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
  169. r, err := client.PostForm(url, values)
  170. if err != nil {
  171. return err
  172. }
  173. defer CloseResponse(r)
  174. if r.StatusCode != 200 {
  175. return fmt.Errorf("%s: %s", url, r.Status)
  176. }
  177. return readFn(r.Body)
  178. }
  179. func DownloadFile(fileUrl string, jwt string) (filename string, header http.Header, resp *http.Response, e error) {
  180. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  181. if err != nil {
  182. return "", nil, nil, err
  183. }
  184. maybeAddAuth(req, jwt)
  185. response, err := client.Do(req)
  186. if err != nil {
  187. return "", nil, nil, err
  188. }
  189. header = response.Header
  190. contentDisposition := response.Header["Content-Disposition"]
  191. if len(contentDisposition) > 0 {
  192. idx := strings.Index(contentDisposition[0], "filename=")
  193. if idx != -1 {
  194. filename = contentDisposition[0][idx+len("filename="):]
  195. filename = strings.Trim(filename, "\"")
  196. }
  197. }
  198. resp = response
  199. return
  200. }
  201. func Do(req *http.Request) (resp *http.Response, err error) {
  202. return client.Do(req)
  203. }
  204. func NormalizeUrl(url string) string {
  205. if strings.HasPrefix(url, "http://") || strings.HasPrefix(url, "https://") {
  206. return url
  207. }
  208. return "http://" + url
  209. }
  210. func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
  211. if cipherKey != nil {
  212. var n int
  213. _, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
  214. n = copy(buf, data)
  215. })
  216. return int64(n), err
  217. }
  218. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  219. if err != nil {
  220. return 0, err
  221. }
  222. if !isFullChunk {
  223. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  224. } else {
  225. req.Header.Set("Accept-Encoding", "gzip")
  226. }
  227. r, err := client.Do(req)
  228. if err != nil {
  229. return 0, err
  230. }
  231. defer CloseResponse(r)
  232. if r.StatusCode >= 400 {
  233. return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
  234. }
  235. var reader io.ReadCloser
  236. contentEncoding := r.Header.Get("Content-Encoding")
  237. switch contentEncoding {
  238. case "gzip":
  239. reader, err = gzip.NewReader(r.Body)
  240. if err != nil {
  241. return 0, err
  242. }
  243. defer reader.Close()
  244. default:
  245. reader = r.Body
  246. }
  247. var (
  248. i, m int
  249. n int64
  250. )
  251. // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
  252. // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
  253. for {
  254. m, err = reader.Read(buf[i:])
  255. i += m
  256. n += int64(m)
  257. if err == io.EOF {
  258. return n, nil
  259. }
  260. if err != nil {
  261. return n, err
  262. }
  263. if n == int64(len(buf)) {
  264. break
  265. }
  266. }
  267. // drains the response body to avoid memory leak
  268. data, _ := io.ReadAll(reader)
  269. if len(data) != 0 {
  270. glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
  271. }
  272. return n, err
  273. }
  274. func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
  275. return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
  276. }
  277. func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
  278. if cipherKey != nil {
  279. return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
  280. }
  281. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  282. maybeAddAuth(req, jwt)
  283. if err != nil {
  284. return false, err
  285. }
  286. if isFullChunk {
  287. req.Header.Add("Accept-Encoding", "gzip")
  288. } else {
  289. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  290. }
  291. r, err := client.Do(req)
  292. if err != nil {
  293. return true, err
  294. }
  295. defer CloseResponse(r)
  296. if r.StatusCode >= 400 {
  297. retryable = r.StatusCode == http.StatusNotFound || r.StatusCode >= 499
  298. return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
  299. }
  300. var reader io.ReadCloser
  301. contentEncoding := r.Header.Get("Content-Encoding")
  302. switch contentEncoding {
  303. case "gzip":
  304. reader, err = gzip.NewReader(r.Body)
  305. defer reader.Close()
  306. default:
  307. reader = r.Body
  308. }
  309. var (
  310. m int
  311. )
  312. buf := mem.Allocate(64 * 1024)
  313. defer mem.Free(buf)
  314. for {
  315. m, err = reader.Read(buf)
  316. if m > 0 {
  317. fn(buf[:m])
  318. }
  319. if err == io.EOF {
  320. return false, nil
  321. }
  322. if err != nil {
  323. return true, err
  324. }
  325. }
  326. }
  327. func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
  328. encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt)
  329. if err != nil {
  330. return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
  331. }
  332. decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
  333. if err != nil {
  334. return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
  335. }
  336. if isContentCompressed {
  337. decryptedData, err = DecompressData(decryptedData)
  338. if err != nil {
  339. glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
  340. }
  341. }
  342. if len(decryptedData) < int(offset)+size {
  343. return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
  344. }
  345. if isFullChunk {
  346. fn(decryptedData)
  347. } else {
  348. fn(decryptedData[int(offset) : int(offset)+size])
  349. }
  350. return false, nil
  351. }
  352. func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*http.Response, io.ReadCloser, error) {
  353. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  354. if err != nil {
  355. return nil, nil, err
  356. }
  357. if rangeHeader != "" {
  358. req.Header.Add("Range", rangeHeader)
  359. } else {
  360. req.Header.Add("Accept-Encoding", "gzip")
  361. }
  362. maybeAddAuth(req, jwt)
  363. r, err := client.Do(req)
  364. if err != nil {
  365. return nil, nil, err
  366. }
  367. if r.StatusCode >= 400 {
  368. CloseResponse(r)
  369. return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  370. }
  371. var reader io.ReadCloser
  372. contentEncoding := r.Header.Get("Content-Encoding")
  373. switch contentEncoding {
  374. case "gzip":
  375. reader, err = gzip.NewReader(r.Body)
  376. if err != nil {
  377. return nil, nil, err
  378. }
  379. default:
  380. reader = r.Body
  381. }
  382. return r, reader, nil
  383. }
  384. func CloseResponse(resp *http.Response) {
  385. if resp == nil || resp.Body == nil {
  386. return
  387. }
  388. reader := &CountingReader{reader: resp.Body}
  389. io.Copy(io.Discard, reader)
  390. resp.Body.Close()
  391. if reader.BytesRead > 0 {
  392. glog.V(1).Infof("response leftover %d bytes", reader.BytesRead)
  393. }
  394. }
  395. func CloseRequest(req *http.Request) {
  396. reader := &CountingReader{reader: req.Body}
  397. io.Copy(io.Discard, reader)
  398. req.Body.Close()
  399. if reader.BytesRead > 0 {
  400. glog.V(1).Infof("request leftover %d bytes", reader.BytesRead)
  401. }
  402. }
  403. type CountingReader struct {
  404. reader io.Reader
  405. BytesRead int
  406. }
  407. func (r *CountingReader) Read(p []byte) (n int, err error) {
  408. n, err = r.reader.Read(p)
  409. r.BytesRead += n
  410. return n, err
  411. }
  412. func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
  413. var shouldRetry bool
  414. for waitTime := time.Second; waitTime < RetryWaitTime; waitTime += waitTime / 2 {
  415. for _, urlString := range urlStrings {
  416. n = 0
  417. if strings.Contains(urlString, "%") {
  418. urlString = url.PathEscape(urlString)
  419. }
  420. shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
  421. if n < len(buffer) {
  422. x := copy(buffer[n:], data)
  423. n += x
  424. }
  425. })
  426. if !shouldRetry {
  427. break
  428. }
  429. if err != nil {
  430. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  431. } else {
  432. break
  433. }
  434. }
  435. if err != nil && shouldRetry {
  436. glog.V(0).Infof("retry reading in %v", waitTime)
  437. time.Sleep(waitTime)
  438. } else {
  439. break
  440. }
  441. }
  442. return n, err
  443. }