http_global_client_util.go 11 KB


  1. package http
  2. import (
  3. "compress/gzip"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  9. "io"
  10. "net/http"
  11. "net/url"
  12. "strings"
  13. "time"
  14. "github.com/seaweedfs/seaweedfs/weed/glog"
  15. )
  16. var ErrNotFound = fmt.Errorf("not found")
  17. func Post(url string, values url.Values) ([]byte, error) {
  18. r, err := GetGlobalHttpClient().PostForm(url, values)
  19. if err != nil {
  20. return nil, err
  21. }
  22. defer r.Body.Close()
  23. b, err := io.ReadAll(r.Body)
  24. if r.StatusCode >= 400 {
  25. if err != nil {
  26. return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b))
  27. } else {
  28. return nil, fmt.Errorf("%s: %s", url, r.Status)
  29. }
  30. }
  31. if err != nil {
  32. return nil, err
  33. }
  34. return b, nil
  35. }
  36. // github.com/seaweedfs/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
  37. // may need increasing http.Client.Timeout
  38. func Get(url string) ([]byte, bool, error) {
  39. return GetAuthenticated(url, "")
  40. }
  41. func GetAuthenticated(url, jwt string) ([]byte, bool, error) {
  42. request, err := http.NewRequest(http.MethodGet, url, nil)
  43. if err != nil {
  44. return nil, true, err
  45. }
  46. maybeAddAuth(request, jwt)
  47. request.Header.Add("Accept-Encoding", "gzip")
  48. response, err := GetGlobalHttpClient().Do(request)
  49. if err != nil {
  50. return nil, true, err
  51. }
  52. defer CloseResponse(response)
  53. var reader io.ReadCloser
  54. switch response.Header.Get("Content-Encoding") {
  55. case "gzip":
  56. reader, err = gzip.NewReader(response.Body)
  57. if err != nil {
  58. return nil, true, err
  59. }
  60. defer reader.Close()
  61. default:
  62. reader = response.Body
  63. }
  64. b, err := io.ReadAll(reader)
  65. if response.StatusCode >= 400 {
  66. retryable := response.StatusCode >= 500
  67. return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
  68. }
  69. if err != nil {
  70. return nil, false, err
  71. }
  72. return b, false, nil
  73. }
  74. func Head(url string) (http.Header, error) {
  75. r, err := GetGlobalHttpClient().Head(url)
  76. if err != nil {
  77. return nil, err
  78. }
  79. defer CloseResponse(r)
  80. if r.StatusCode >= 400 {
  81. return nil, fmt.Errorf("%s: %s", url, r.Status)
  82. }
  83. return r.Header, nil
  84. }
  85. func maybeAddAuth(req *http.Request, jwt string) {
  86. if jwt != "" {
  87. req.Header.Set("Authorization", "BEARER "+string(jwt))
  88. }
  89. }
  90. func Delete(url string, jwt string) error {
  91. req, err := http.NewRequest(http.MethodDelete, url, nil)
  92. maybeAddAuth(req, jwt)
  93. if err != nil {
  94. return err
  95. }
  96. resp, e := GetGlobalHttpClient().Do(req)
  97. if e != nil {
  98. return e
  99. }
  100. defer resp.Body.Close()
  101. body, err := io.ReadAll(resp.Body)
  102. if err != nil {
  103. return err
  104. }
  105. switch resp.StatusCode {
  106. case http.StatusNotFound, http.StatusAccepted, http.StatusOK:
  107. return nil
  108. }
  109. m := make(map[string]interface{})
  110. if e := json.Unmarshal(body, &m); e == nil {
  111. if s, ok := m["error"].(string); ok {
  112. return errors.New(s)
  113. }
  114. }
  115. return errors.New(string(body))
  116. }
  117. func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
  118. req, err := http.NewRequest(http.MethodDelete, url, nil)
  119. maybeAddAuth(req, jwt)
  120. if err != nil {
  121. return
  122. }
  123. resp, err := GetGlobalHttpClient().Do(req)
  124. if err != nil {
  125. return
  126. }
  127. defer resp.Body.Close()
  128. body, err = io.ReadAll(resp.Body)
  129. if err != nil {
  130. return
  131. }
  132. httpStatus = resp.StatusCode
  133. return
  134. }
  135. func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
  136. r, err := GetGlobalHttpClient().PostForm(url, values)
  137. if err != nil {
  138. return err
  139. }
  140. defer CloseResponse(r)
  141. if r.StatusCode != 200 {
  142. return fmt.Errorf("%s: %s", url, r.Status)
  143. }
  144. for {
  145. n, err := r.Body.Read(allocatedBytes)
  146. if n > 0 {
  147. eachBuffer(allocatedBytes[:n])
  148. }
  149. if err != nil {
  150. if err == io.EOF {
  151. return nil
  152. }
  153. return err
  154. }
  155. }
  156. }
  157. func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
  158. r, err := GetGlobalHttpClient().PostForm(url, values)
  159. if err != nil {
  160. return err
  161. }
  162. defer CloseResponse(r)
  163. if r.StatusCode != 200 {
  164. return fmt.Errorf("%s: %s", url, r.Status)
  165. }
  166. return readFn(r.Body)
  167. }
  168. func DownloadFile(fileUrl string, jwt string) (filename string, header http.Header, resp *http.Response, e error) {
  169. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  170. if err != nil {
  171. return "", nil, nil, err
  172. }
  173. maybeAddAuth(req, jwt)
  174. response, err := GetGlobalHttpClient().Do(req)
  175. if err != nil {
  176. return "", nil, nil, err
  177. }
  178. header = response.Header
  179. contentDisposition := response.Header["Content-Disposition"]
  180. if len(contentDisposition) > 0 {
  181. idx := strings.Index(contentDisposition[0], "filename=")
  182. if idx != -1 {
  183. filename = contentDisposition[0][idx+len("filename="):]
  184. filename = strings.Trim(filename, "\"")
  185. }
  186. }
  187. resp = response
  188. return
  189. }
  190. func Do(req *http.Request) (resp *http.Response, err error) {
  191. return GetGlobalHttpClient().Do(req)
  192. }
  193. func NormalizeUrl(url string) (string, error) {
  194. return GetGlobalHttpClient().NormalizeHttpScheme(url)
  195. }
  196. func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
  197. if cipherKey != nil {
  198. var n int
  199. _, err := readEncryptedUrl(fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
  200. n = copy(buf, data)
  201. })
  202. return int64(n), err
  203. }
  204. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  205. if err != nil {
  206. return 0, err
  207. }
  208. if !isFullChunk {
  209. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  210. } else {
  211. req.Header.Set("Accept-Encoding", "gzip")
  212. }
  213. r, err := GetGlobalHttpClient().Do(req)
  214. if err != nil {
  215. return 0, err
  216. }
  217. defer CloseResponse(r)
  218. if r.StatusCode >= 400 {
  219. return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
  220. }
  221. var reader io.ReadCloser
  222. contentEncoding := r.Header.Get("Content-Encoding")
  223. switch contentEncoding {
  224. case "gzip":
  225. reader, err = gzip.NewReader(r.Body)
  226. if err != nil {
  227. return 0, err
  228. }
  229. defer reader.Close()
  230. default:
  231. reader = r.Body
  232. }
  233. var (
  234. i, m int
  235. n int64
  236. )
  237. // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
  238. // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
  239. for {
  240. m, err = reader.Read(buf[i:])
  241. i += m
  242. n += int64(m)
  243. if err == io.EOF {
  244. return n, nil
  245. }
  246. if err != nil {
  247. return n, err
  248. }
  249. if n == int64(len(buf)) {
  250. break
  251. }
  252. }
  253. // drains the response body to avoid memory leak
  254. data, _ := io.ReadAll(reader)
  255. if len(data) != 0 {
  256. glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
  257. }
  258. return n, err
  259. }
  260. func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
  261. return ReadUrlAsStreamAuthenticated(fileUrl, "", cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
  262. }
  263. func ReadUrlAsStreamAuthenticated(fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
  264. if cipherKey != nil {
  265. return readEncryptedUrl(fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
  266. }
  267. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  268. maybeAddAuth(req, jwt)
  269. if err != nil {
  270. return false, err
  271. }
  272. if isFullChunk {
  273. req.Header.Add("Accept-Encoding", "gzip")
  274. } else {
  275. req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
  276. }
  277. r, err := GetGlobalHttpClient().Do(req)
  278. if err != nil {
  279. return true, err
  280. }
  281. defer CloseResponse(r)
  282. if r.StatusCode >= 400 {
  283. if r.StatusCode == http.StatusNotFound {
  284. return true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound)
  285. }
  286. retryable = r.StatusCode >= 499
  287. return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
  288. }
  289. var reader io.ReadCloser
  290. contentEncoding := r.Header.Get("Content-Encoding")
  291. switch contentEncoding {
  292. case "gzip":
  293. reader, err = gzip.NewReader(r.Body)
  294. defer reader.Close()
  295. default:
  296. reader = r.Body
  297. }
  298. var (
  299. m int
  300. )
  301. buf := mem.Allocate(64 * 1024)
  302. defer mem.Free(buf)
  303. for {
  304. m, err = reader.Read(buf)
  305. if m > 0 {
  306. fn(buf[:m])
  307. }
  308. if err == io.EOF {
  309. return false, nil
  310. }
  311. if err != nil {
  312. return true, err
  313. }
  314. }
  315. }
  316. func readEncryptedUrl(fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
  317. encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt)
  318. if err != nil {
  319. return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
  320. }
  321. decryptedData, err := util.Decrypt(encryptedData, util.CipherKey(cipherKey))
  322. if err != nil {
  323. return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
  324. }
  325. if isContentCompressed {
  326. decryptedData, err = util.DecompressData(decryptedData)
  327. if err != nil {
  328. glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err)
  329. }
  330. }
  331. if len(decryptedData) < int(offset)+size {
  332. return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
  333. }
  334. if isFullChunk {
  335. fn(decryptedData)
  336. } else {
  337. fn(decryptedData[int(offset) : int(offset)+size])
  338. }
  339. return false, nil
  340. }
  341. func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*http.Response, io.ReadCloser, error) {
  342. req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
  343. if err != nil {
  344. return nil, nil, err
  345. }
  346. if rangeHeader != "" {
  347. req.Header.Add("Range", rangeHeader)
  348. } else {
  349. req.Header.Add("Accept-Encoding", "gzip")
  350. }
  351. maybeAddAuth(req, jwt)
  352. r, err := GetGlobalHttpClient().Do(req)
  353. if err != nil {
  354. return nil, nil, err
  355. }
  356. if r.StatusCode >= 400 {
  357. CloseResponse(r)
  358. return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
  359. }
  360. var reader io.ReadCloser
  361. contentEncoding := r.Header.Get("Content-Encoding")
  362. switch contentEncoding {
  363. case "gzip":
  364. reader, err = gzip.NewReader(r.Body)
  365. if err != nil {
  366. return nil, nil, err
  367. }
  368. default:
  369. reader = r.Body
  370. }
  371. return r, reader, nil
  372. }
  373. func CloseResponse(resp *http.Response) {
  374. if resp == nil || resp.Body == nil {
  375. return
  376. }
  377. reader := &CountingReader{reader: resp.Body}
  378. io.Copy(io.Discard, reader)
  379. resp.Body.Close()
  380. if reader.BytesRead > 0 {
  381. glog.V(1).Infof("response leftover %d bytes", reader.BytesRead)
  382. }
  383. }
  384. func CloseRequest(req *http.Request) {
  385. reader := &CountingReader{reader: req.Body}
  386. io.Copy(io.Discard, reader)
  387. req.Body.Close()
  388. if reader.BytesRead > 0 {
  389. glog.V(1).Infof("request leftover %d bytes", reader.BytesRead)
  390. }
  391. }
  392. type CountingReader struct {
  393. reader io.Reader
  394. BytesRead int
  395. }
  396. func (r *CountingReader) Read(p []byte) (n int, err error) {
  397. n, err = r.reader.Read(p)
  398. r.BytesRead += n
  399. return n, err
  400. }
  401. func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
  402. var shouldRetry bool
  403. for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
  404. for _, urlString := range urlStrings {
  405. n = 0
  406. if strings.Contains(urlString, "%") {
  407. urlString = url.PathEscape(urlString)
  408. }
  409. shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
  410. if n < len(buffer) {
  411. x := copy(buffer[n:], data)
  412. n += x
  413. }
  414. })
  415. if !shouldRetry {
  416. break
  417. }
  418. if err != nil {
  419. glog.V(0).Infof("read %s failed, err: %v", urlString, err)
  420. } else {
  421. break
  422. }
  423. }
  424. if err != nil && shouldRetry {
  425. glog.V(0).Infof("retry reading in %v", waitTime)
  426. time.Sleep(waitTime)
  427. } else {
  428. break
  429. }
  430. }
  431. return n, err
  432. }