filer_server_handlers.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package weed_server
  2. import (
  3. "errors"
  4. "net/http"
  5. "os"
  6. "strings"
  7. "sync/atomic"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/security"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "github.com/seaweedfs/seaweedfs/weed/stats"
  13. )
  14. func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
  15. start := time.Now()
  16. origin := r.Header.Get("Origin")
  17. if origin != "" {
  18. if fs.option.AllowedOrigins == nil || len(fs.option.AllowedOrigins) == 0 || fs.option.AllowedOrigins[0] == "*" {
  19. origin = "*"
  20. } else {
  21. originFound := false
  22. for _, allowedOrigin := range fs.option.AllowedOrigins {
  23. if origin == allowedOrigin {
  24. originFound = true
  25. }
  26. }
  27. if !originFound {
  28. writeJsonError(w, r, http.StatusForbidden, errors.New("origin not allowed"))
  29. return
  30. }
  31. }
  32. w.Header().Set("Access-Control-Allow-Origin", origin)
  33. w.Header().Set("Access-Control-Expose-Headers", "*")
  34. w.Header().Set("Access-Control-Allow-Headers", "*")
  35. w.Header().Set("Access-Control-Allow-Credentials", "true")
  36. w.Header().Set("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
  37. }
  38. if r.Method == "OPTIONS" {
  39. OptionsHandler(w, r, false)
  40. return
  41. }
  42. // proxy to volume servers
  43. var fileId string
  44. if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") {
  45. fileId = r.RequestURI[len("/?proxyChunkId="):]
  46. }
  47. if fileId != "" {
  48. stats.FilerRequestCounter.WithLabelValues(stats.ChunkProxy).Inc()
  49. fs.proxyToVolumeServer(w, r, fileId)
  50. stats.FilerRequestHistogram.WithLabelValues(stats.ChunkProxy).Observe(time.Since(start).Seconds())
  51. return
  52. }
  53. isReadHttpCall := r.Method == "GET" || r.Method == "HEAD"
  54. if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) {
  55. writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
  56. return
  57. }
  58. stats.FilerRequestCounter.WithLabelValues(r.Method).Inc()
  59. defer func() {
  60. stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds())
  61. }()
  62. w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
  63. switch r.Method {
  64. case "GET":
  65. fs.GetOrHeadHandler(w, r)
  66. case "HEAD":
  67. fs.GetOrHeadHandler(w, r)
  68. case "DELETE":
  69. if _, ok := r.URL.Query()["tagging"]; ok {
  70. fs.DeleteTaggingHandler(w, r)
  71. } else {
  72. fs.DeleteHandler(w, r)
  73. }
  74. case "POST", "PUT":
  75. // wait until in flight data is less than the limit
  76. contentLength := getContentLength(r)
  77. fs.inFlightDataLimitCond.L.Lock()
  78. inFlightDataSize := atomic.LoadInt64(&fs.inFlightDataSize)
  79. for fs.option.ConcurrentUploadLimit != 0 && inFlightDataSize > fs.option.ConcurrentUploadLimit {
  80. glog.V(4).Infof("wait because inflight data %d > %d", inFlightDataSize, fs.option.ConcurrentUploadLimit)
  81. fs.inFlightDataLimitCond.Wait()
  82. inFlightDataSize = atomic.LoadInt64(&fs.inFlightDataSize)
  83. }
  84. fs.inFlightDataLimitCond.L.Unlock()
  85. atomic.AddInt64(&fs.inFlightDataSize, contentLength)
  86. defer func() {
  87. atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
  88. fs.inFlightDataLimitCond.Signal()
  89. }()
  90. if r.Method == "PUT" {
  91. if _, ok := r.URL.Query()["tagging"]; ok {
  92. fs.PutTaggingHandler(w, r)
  93. } else {
  94. fs.PostHandler(w, r, contentLength)
  95. }
  96. } else { // method == "POST"
  97. fs.PostHandler(w, r, contentLength)
  98. }
  99. }
  100. }
  101. func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
  102. start := time.Now()
  103. os.Stdout.WriteString("Request: " + r.Method + " " + r.URL.String() + "\n")
  104. origin := r.Header.Get("Origin")
  105. if origin != "" {
  106. if fs.option.AllowedOrigins == nil || len(fs.option.AllowedOrigins) == 0 || fs.option.AllowedOrigins[0] == "*" {
  107. origin = "*"
  108. } else {
  109. originFound := false
  110. for _, allowedOrigin := range fs.option.AllowedOrigins {
  111. if origin == allowedOrigin {
  112. originFound = true
  113. }
  114. }
  115. if !originFound {
  116. writeJsonError(w, r, http.StatusForbidden, errors.New("origin not allowed"))
  117. return
  118. }
  119. }
  120. w.Header().Set("Access-Control-Allow-Origin", origin)
  121. w.Header().Set("Access-Control-Allow-Headers", "OPTIONS, GET, HEAD")
  122. w.Header().Set("Access-Control-Allow-Credentials", "true")
  123. }
  124. stats.FilerRequestCounter.WithLabelValues(r.Method).Inc()
  125. defer func() {
  126. stats.FilerRequestHistogram.WithLabelValues(r.Method).Observe(time.Since(start).Seconds())
  127. }()
  128. // We handle OPTIONS first because it never should be authenticated
  129. if r.Method == "OPTIONS" {
  130. OptionsHandler(w, r, true)
  131. return
  132. }
  133. if !fs.maybeCheckJwtAuthorization(r, false) {
  134. writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
  135. return
  136. }
  137. w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
  138. switch r.Method {
  139. case "GET":
  140. fs.GetOrHeadHandler(w, r)
  141. case "HEAD":
  142. fs.GetOrHeadHandler(w, r)
  143. }
  144. }
  145. func OptionsHandler(w http.ResponseWriter, r *http.Request, isReadOnly bool) {
  146. if isReadOnly {
  147. w.Header().Set("Access-Control-Allow-Methods", "GET, OPTIONS")
  148. } else {
  149. w.Header().Set("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
  150. w.Header().Set("Access-Control-Expose-Headers", "*")
  151. }
  152. w.Header().Set("Access-Control-Allow-Headers", "*")
  153. w.Header().Set("Access-Control-Allow-Credentials", "true")
  154. }
  155. // maybeCheckJwtAuthorization returns true if access should be granted, false if it should be denied
  156. func (fs *FilerServer) maybeCheckJwtAuthorization(r *http.Request, isWrite bool) bool {
  157. var signingKey security.SigningKey
  158. if isWrite {
  159. if len(fs.filerGuard.SigningKey) == 0 {
  160. return true
  161. } else {
  162. signingKey = fs.filerGuard.SigningKey
  163. }
  164. } else {
  165. if len(fs.filerGuard.ReadSigningKey) == 0 {
  166. return true
  167. } else {
  168. signingKey = fs.filerGuard.ReadSigningKey
  169. }
  170. }
  171. tokenStr := security.GetJwt(r)
  172. if tokenStr == "" {
  173. glog.V(1).Infof("missing jwt from %s", r.RemoteAddr)
  174. return false
  175. }
  176. token, err := security.DecodeJwt(signingKey, tokenStr, &security.SeaweedFilerClaims{})
  177. if err != nil {
  178. glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err)
  179. return false
  180. }
  181. if !token.Valid {
  182. glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr)
  183. return false
  184. } else {
  185. return true
  186. }
  187. }
  188. func (fs *FilerServer) filerHealthzHandler(w http.ResponseWriter, r *http.Request) {
  189. w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
  190. w.WriteHeader(http.StatusOK)
  191. }