filer_server_handlers.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package weed_server
  2. import (
  3. "errors"
  4. "net/http"
  5. "strings"
  6. "sync/atomic"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/security"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. "github.com/seaweedfs/seaweedfs/weed/stats"
  12. )
  13. func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
  14. start := time.Now()
  15. if r.Method == "OPTIONS" {
  16. OptionsHandler(w, r, false)
  17. return
  18. }
  19. isReadHttpCall := r.Method == "GET" || r.Method == "HEAD"
  20. if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) {
  21. writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
  22. return
  23. }
  24. // proxy to volume servers
  25. var fileId string
  26. if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") {
  27. fileId = r.RequestURI[len("/?proxyChunkId="):]
  28. }
  29. if fileId != "" {
  30. stats.FilerRequestCounter.WithLabelValues(stats.ChunkProxy).Inc()
  31. fs.proxyToVolumeServer(w, r, fileId)
  32. stats.FilerRequestHistogram.WithLabelValues(stats.ChunkProxy).Observe(time.Since(start).Seconds())
  33. return
  34. }
  35. stats.FilerRequestCounter.WithLabelValues(r.Method).Inc()
  36. defer func() {
  37. stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds())
  38. }()
  39. w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
  40. if r.Header.Get("Origin") != "" {
  41. w.Header().Set("Access-Control-Allow-Origin", "*")
  42. w.Header().Set("Access-Control-Allow-Credentials", "true")
  43. }
  44. switch r.Method {
  45. case "GET":
  46. fs.GetOrHeadHandler(w, r)
  47. case "HEAD":
  48. fs.GetOrHeadHandler(w, r)
  49. case "DELETE":
  50. if _, ok := r.URL.Query()["tagging"]; ok {
  51. fs.DeleteTaggingHandler(w, r)
  52. } else {
  53. fs.DeleteHandler(w, r)
  54. }
  55. case "POST", "PUT":
  56. // wait until in flight data is less than the limit
  57. contentLength := getContentLength(r)
  58. fs.inFlightDataLimitCond.L.Lock()
  59. inFlightDataSize := atomic.LoadInt64(&fs.inFlightDataSize)
  60. for fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit {
  61. glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit)
  62. fs.inFlightDataLimitCond.Wait()
  63. inFlightDataSize = atomic.LoadInt64(&fs.inFlightDataSize)
  64. }
  65. fs.inFlightDataLimitCond.L.Unlock()
  66. atomic.AddInt64(&fs.inFlightDataSize, contentLength)
  67. defer func() {
  68. atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
  69. fs.inFlightDataLimitCond.Signal()
  70. }()
  71. if r.Method == "PUT" {
  72. if _, ok := r.URL.Query()["tagging"]; ok {
  73. fs.PutTaggingHandler(w, r)
  74. } else {
  75. fs.PostHandler(w, r, contentLength)
  76. }
  77. } else { // method == "POST"
  78. fs.PostHandler(w, r, contentLength)
  79. }
  80. }
  81. }
  82. func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
  83. start := time.Now()
  84. stats.FilerRequestCounter.WithLabelValues(r.Method).Inc()
  85. defer func() {
  86. stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds())
  87. }()
  88. // We handle OPTIONS first because it never should be authenticated
  89. if r.Method == "OPTIONS" {
  90. OptionsHandler(w, r, true)
  91. return
  92. }
  93. if !fs.maybeCheckJwtAuthorization(r, false) {
  94. writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
  95. return
  96. }
  97. w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
  98. if r.Header.Get("Origin") != "" {
  99. w.Header().Set("Access-Control-Allow-Origin", "*")
  100. w.Header().Set("Access-Control-Allow-Credentials", "true")
  101. }
  102. switch r.Method {
  103. case "GET":
  104. fs.GetOrHeadHandler(w, r)
  105. case "HEAD":
  106. fs.GetOrHeadHandler(w, r)
  107. }
  108. }
  109. func OptionsHandler(w http.ResponseWriter, r *http.Request, isReadOnly bool) {
  110. if isReadOnly {
  111. w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS")
  112. } else {
  113. w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
  114. }
  115. w.Header().Add("Access-Control-Allow-Headers", "*")
  116. }
  117. // maybeCheckJwtAuthorization returns true if access should be granted, false if it should be denied
  118. func (fs *FilerServer) maybeCheckJwtAuthorization(r *http.Request, isWrite bool) bool {
  119. var signingKey security.SigningKey
  120. if isWrite {
  121. if len(fs.filerGuard.SigningKey) == 0 {
  122. return true
  123. } else {
  124. signingKey = fs.filerGuard.SigningKey
  125. }
  126. } else {
  127. if len(fs.filerGuard.ReadSigningKey) == 0 {
  128. return true
  129. } else {
  130. signingKey = fs.filerGuard.ReadSigningKey
  131. }
  132. }
  133. tokenStr := security.GetJwt(r)
  134. if tokenStr == "" {
  135. glog.V(1).Infof("missing jwt from %s", r.RemoteAddr)
  136. return false
  137. }
  138. token, err := security.DecodeJwt(signingKey, tokenStr, &security.SeaweedFilerClaims{})
  139. if err != nil {
  140. glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err)
  141. return false
  142. }
  143. if !token.Valid {
  144. glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr)
  145. return false
  146. } else {
  147. return true
  148. }
  149. }