filer_server_handlers.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package weed_server
  2. import (
  3. "errors"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/security"
  6. "github.com/chrislusf/seaweedfs/weed/util"
  7. "net/http"
  8. "strings"
  9. "sync/atomic"
  10. "time"
  11. "github.com/chrislusf/seaweedfs/weed/stats"
  12. )
  13. func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
  14. start := time.Now()
  15. if r.Method == "OPTIONS" {
  16. stats.FilerRequestCounter.WithLabelValues("options").Inc()
  17. OptionsHandler(w, r, false)
  18. stats.FilerRequestHistogram.WithLabelValues("options").Observe(time.Since(start).Seconds())
  19. return
  20. }
  21. isReadHttpCall := r.Method == "GET" || r.Method == "HEAD"
  22. if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) {
  23. writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
  24. return
  25. }
  26. // proxy to volume servers
  27. var fileId string
  28. if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") {
  29. fileId = r.RequestURI[len("/?proxyChunkId="):]
  30. }
  31. if fileId != "" {
  32. stats.FilerRequestCounter.WithLabelValues("proxy").Inc()
  33. fs.proxyToVolumeServer(w, r, fileId)
  34. stats.FilerRequestHistogram.WithLabelValues("proxy").Observe(time.Since(start).Seconds())
  35. return
  36. }
  37. w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
  38. if r.Header.Get("Origin") != "" {
  39. w.Header().Set("Access-Control-Allow-Origin", "*")
  40. w.Header().Set("Access-Control-Allow-Credentials", "true")
  41. }
  42. switch r.Method {
  43. case "GET":
  44. stats.FilerRequestCounter.WithLabelValues("get").Inc()
  45. fs.GetOrHeadHandler(w, r)
  46. stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
  47. case "HEAD":
  48. stats.FilerRequestCounter.WithLabelValues("head").Inc()
  49. fs.GetOrHeadHandler(w, r)
  50. stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
  51. case "DELETE":
  52. stats.FilerRequestCounter.WithLabelValues("delete").Inc()
  53. if _, ok := r.URL.Query()["tagging"]; ok {
  54. fs.DeleteTaggingHandler(w, r)
  55. } else {
  56. fs.DeleteHandler(w, r)
  57. }
  58. stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
  59. case "POST", "PUT":
  60. // wait until in flight data is less than the limit
  61. contentLength := getContentLength(r)
  62. fs.inFlightDataLimitCond.L.Lock()
  63. for fs.option.ConcurrentUploadLimit != 0 && atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit {
  64. glog.V(4).Infof("wait because inflight data %d > %d", fs.inFlightDataSize, fs.option.ConcurrentUploadLimit)
  65. fs.inFlightDataLimitCond.Wait()
  66. }
  67. fs.inFlightDataLimitCond.L.Unlock()
  68. atomic.AddInt64(&fs.inFlightDataSize, contentLength)
  69. defer func() {
  70. atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
  71. fs.inFlightDataLimitCond.Signal()
  72. }()
  73. if r.Method == "PUT" {
  74. stats.FilerRequestCounter.WithLabelValues("put").Inc()
  75. if _, ok := r.URL.Query()["tagging"]; ok {
  76. fs.PutTaggingHandler(w, r)
  77. } else {
  78. fs.PostHandler(w, r, contentLength)
  79. }
  80. stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
  81. } else { // method == "POST"
  82. stats.FilerRequestCounter.WithLabelValues("post").Inc()
  83. fs.PostHandler(w, r, contentLength)
  84. stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
  85. }
  86. }
  87. }
  88. func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
  89. start := time.Now()
  90. // We handle OPTIONS first because it never should be authenticated
  91. if r.Method == "OPTIONS" {
  92. stats.FilerRequestCounter.WithLabelValues("options").Inc()
  93. OptionsHandler(w, r, true)
  94. stats.FilerRequestHistogram.WithLabelValues("options").Observe(time.Since(start).Seconds())
  95. return
  96. }
  97. if !fs.maybeCheckJwtAuthorization(r, false) {
  98. writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
  99. return
  100. }
  101. w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
  102. if r.Header.Get("Origin") != "" {
  103. w.Header().Set("Access-Control-Allow-Origin", "*")
  104. w.Header().Set("Access-Control-Allow-Credentials", "true")
  105. }
  106. switch r.Method {
  107. case "GET":
  108. stats.FilerRequestCounter.WithLabelValues("get").Inc()
  109. fs.GetOrHeadHandler(w, r)
  110. stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
  111. case "HEAD":
  112. stats.FilerRequestCounter.WithLabelValues("head").Inc()
  113. fs.GetOrHeadHandler(w, r)
  114. stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
  115. }
  116. }
  117. func OptionsHandler(w http.ResponseWriter, r *http.Request, isReadOnly bool) {
  118. if isReadOnly {
  119. w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS")
  120. } else {
  121. w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
  122. }
  123. w.Header().Add("Access-Control-Allow-Headers", "*")
  124. }
  125. // maybeCheckJwtAuthorization returns true if access should be granted, false if it should be denied
  126. func (fs *FilerServer) maybeCheckJwtAuthorization(r *http.Request, isWrite bool) bool {
  127. var signingKey security.SigningKey
  128. if isWrite {
  129. if len(fs.filerGuard.SigningKey) == 0 {
  130. return true
  131. } else {
  132. signingKey = fs.filerGuard.SigningKey
  133. }
  134. } else {
  135. if len(fs.filerGuard.ReadSigningKey) == 0 {
  136. return true
  137. } else {
  138. signingKey = fs.filerGuard.ReadSigningKey
  139. }
  140. }
  141. tokenStr := security.GetJwt(r)
  142. if tokenStr == "" {
  143. glog.V(1).Infof("missing jwt from %s", r.RemoteAddr)
  144. return false
  145. }
  146. token, err := security.DecodeJwt(signingKey, tokenStr, &security.SeaweedFilerClaims{})
  147. if err != nil {
  148. glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err)
  149. return false
  150. }
  151. if !token.Valid {
  152. glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr)
  153. return false
  154. } else {
  155. return true
  156. }
  157. }