filer_server_handlers_proxy.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package weed_server
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/security"
  5. "github.com/seaweedfs/seaweedfs/weed/util"
  6. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  7. "io"
  8. "math/rand"
  9. "net/http"
  10. )
  11. var (
  12. client *http.Client
  13. )
  14. func init() {
  15. client = &http.Client{Transport: &http.Transport{
  16. MaxIdleConns: 1024,
  17. MaxIdleConnsPerHost: 1024,
  18. }}
  19. }
  20. func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) {
  21. encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite)
  22. if encodedJwt == "" {
  23. return
  24. }
  25. r.Header.Set("Authorization", "BEARER "+string(encodedJwt))
  26. }
  27. func (fs *FilerServer) maybeGetVolumeJwtAuthorizationToken(fileId string, isWrite bool) string {
  28. var encodedJwt security.EncodedJwt
  29. if isWrite {
  30. encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.SigningKey, fs.volumeGuard.ExpiresAfterSec, fileId)
  31. } else {
  32. encodedJwt = security.GenJwtForVolumeServer(fs.volumeGuard.ReadSigningKey, fs.volumeGuard.ReadExpiresAfterSec, fileId)
  33. }
  34. return string(encodedJwt)
  35. }
  36. func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) {
  37. urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId)
  38. if err != nil {
  39. glog.Errorf("locate %s: %v", fileId, err)
  40. w.WriteHeader(http.StatusInternalServerError)
  41. return
  42. }
  43. if len(urlStrings) == 0 {
  44. w.WriteHeader(http.StatusNotFound)
  45. return
  46. }
  47. proxyReq, err := http.NewRequest(r.Method, urlStrings[rand.Intn(len(urlStrings))], r.Body)
  48. if err != nil {
  49. glog.Errorf("NewRequest %s: %v", urlStrings[0], err)
  50. w.WriteHeader(http.StatusInternalServerError)
  51. return
  52. }
  53. proxyReq.Header.Set("Host", r.Host)
  54. proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
  55. for header, values := range r.Header {
  56. for _, value := range values {
  57. proxyReq.Header.Add(header, value)
  58. }
  59. }
  60. proxyResponse, postErr := client.Do(proxyReq)
  61. if postErr != nil {
  62. glog.Errorf("post to filer: %v", postErr)
  63. w.WriteHeader(http.StatusInternalServerError)
  64. return
  65. }
  66. defer util.CloseResponse(proxyResponse)
  67. for k, v := range proxyResponse.Header {
  68. w.Header()[k] = v
  69. }
  70. w.WriteHeader(proxyResponse.StatusCode)
  71. buf := mem.Allocate(128 * 1024)
  72. defer mem.Free(buf)
  73. io.CopyBuffer(w, proxyResponse.Body, buf)
  74. }