volume_server_handlers.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. package weed_server
  2. import (
  3. "fmt"
  4. "net/http"
  5. "strconv"
  6. "strings"
  7. "sync/atomic"
  8. "time"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/security"
  12. "github.com/seaweedfs/seaweedfs/weed/stats"
  13. )
  14. /*
  15. If volume server is started with a separated public port, the public port will
  16. be more "secure".
  17. Public port currently only supports reads.
  18. Later writes on public port can have one of the 3
  19. security settings:
  20. 1. not secured
  21. 2. secured by white list
  22. 3. secured by JWT(Json Web Token)
  23. */
  24. func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) {
  25. inFlightGauge := stats.VolumeServerInFlightRequestsGauge.WithLabelValues(r.Method)
  26. inFlightGauge.Inc()
  27. defer inFlightGauge.Dec()
  28. statusRecorder := stats.NewStatusResponseWriter(w)
  29. w = statusRecorder
  30. w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
  31. if r.Header.Get("Origin") != "" {
  32. w.Header().Set("Access-Control-Allow-Origin", "*")
  33. w.Header().Set("Access-Control-Allow-Credentials", "true")
  34. }
  35. start := time.Now()
  36. requestMethod := r.Method
  37. defer func(start time.Time, method *string, statusRecorder *stats.StatusRecorder) {
  38. stats.VolumeServerRequestCounter.WithLabelValues(*method, strconv.Itoa(statusRecorder.Status)).Inc()
  39. stats.VolumeServerRequestHistogram.WithLabelValues(*method).Observe(time.Since(start).Seconds())
  40. }(start, &requestMethod, statusRecorder)
  41. switch r.Method {
  42. case http.MethodGet, http.MethodHead:
  43. stats.ReadRequest()
  44. vs.inFlightDownloadDataLimitCond.L.Lock()
  45. inFlightDownloadSize := atomic.LoadInt64(&vs.inFlightDownloadDataSize)
  46. for vs.concurrentDownloadLimit != 0 && inFlightDownloadSize > vs.concurrentDownloadLimit {
  47. select {
  48. case <-r.Context().Done():
  49. glog.V(4).Infof("request cancelled from %s: %v", r.RemoteAddr, r.Context().Err())
  50. w.WriteHeader(util.HttpStatusCancelled)
  51. vs.inFlightDownloadDataLimitCond.L.Unlock()
  52. return
  53. default:
  54. glog.V(4).Infof("wait because inflight download data %d > %d", inFlightDownloadSize, vs.concurrentDownloadLimit)
  55. vs.inFlightDownloadDataLimitCond.Wait()
  56. }
  57. inFlightDownloadSize = atomic.LoadInt64(&vs.inFlightDownloadDataSize)
  58. }
  59. vs.inFlightDownloadDataLimitCond.L.Unlock()
  60. vs.GetOrHeadHandler(w, r)
  61. case http.MethodDelete:
  62. stats.DeleteRequest()
  63. vs.guard.WhiteList(vs.DeleteHandler)(w, r)
  64. case http.MethodPut, http.MethodPost:
  65. contentLength := getContentLength(r)
  66. // exclude the replication from the concurrentUploadLimitMB
  67. if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 {
  68. startTime := time.Now()
  69. vs.inFlightUploadDataLimitCond.L.Lock()
  70. inFlightUploadDataSize := atomic.LoadInt64(&vs.inFlightUploadDataSize)
  71. for inFlightUploadDataSize > vs.concurrentUploadLimit {
  72. //wait timeout check
  73. if startTime.Add(vs.inflightUploadDataTimeout).Before(time.Now()) {
  74. vs.inFlightUploadDataLimitCond.L.Unlock()
  75. err := fmt.Errorf("reject because inflight upload data %d > %d, and wait timeout", inFlightUploadDataSize, vs.concurrentUploadLimit)
  76. glog.V(1).Infof("too many requests: %v", err)
  77. writeJsonError(w, r, http.StatusTooManyRequests, err)
  78. return
  79. }
  80. glog.V(4).Infof("wait because inflight upload data %d > %d", inFlightUploadDataSize, vs.concurrentUploadLimit)
  81. vs.inFlightUploadDataLimitCond.Wait()
  82. inFlightUploadDataSize = atomic.LoadInt64(&vs.inFlightUploadDataSize)
  83. }
  84. vs.inFlightUploadDataLimitCond.L.Unlock()
  85. }
  86. atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength)
  87. defer func() {
  88. atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength)
  89. if vs.concurrentUploadLimit != 0 {
  90. vs.inFlightUploadDataLimitCond.Signal()
  91. }
  92. }()
  93. // processes uploads
  94. stats.WriteRequest()
  95. vs.guard.WhiteList(vs.PostHandler)(w, r)
  96. case http.MethodOptions:
  97. stats.ReadRequest()
  98. w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
  99. w.Header().Add("Access-Control-Allow-Headers", "*")
  100. default:
  101. requestMethod = "INVALID"
  102. writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("unsupported method %s", r.Method))
  103. }
  104. }
  105. func getContentLength(r *http.Request) int64 {
  106. contentLength := r.Header.Get("Content-Length")
  107. if contentLength != "" {
  108. length, err := strconv.ParseInt(contentLength, 10, 64)
  109. if err != nil {
  110. return 0
  111. }
  112. return length
  113. }
  114. return 0
  115. }
  116. func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
  117. statusRecorder := stats.NewStatusResponseWriter(w)
  118. w = statusRecorder
  119. w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
  120. if r.Header.Get("Origin") != "" {
  121. w.Header().Set("Access-Control-Allow-Origin", "*")
  122. w.Header().Set("Access-Control-Allow-Credentials", "true")
  123. }
  124. start := time.Now()
  125. requestMethod := r.Method
  126. defer func(start time.Time, method *string, statusRecorder *stats.StatusRecorder) {
  127. stats.VolumeServerRequestCounter.WithLabelValues(*method, strconv.Itoa(statusRecorder.Status)).Inc()
  128. stats.VolumeServerRequestHistogram.WithLabelValues(*method).Observe(time.Since(start).Seconds())
  129. }(start, &requestMethod, statusRecorder)
  130. switch r.Method {
  131. case http.MethodGet, http.MethodHead:
  132. stats.ReadRequest()
  133. vs.inFlightDownloadDataLimitCond.L.Lock()
  134. inFlightDownloadSize := atomic.LoadInt64(&vs.inFlightDownloadDataSize)
  135. for vs.concurrentDownloadLimit != 0 && inFlightDownloadSize > vs.concurrentDownloadLimit {
  136. glog.V(4).Infof("wait because inflight download data %d > %d", inFlightDownloadSize, vs.concurrentDownloadLimit)
  137. vs.inFlightDownloadDataLimitCond.Wait()
  138. inFlightDownloadSize = atomic.LoadInt64(&vs.inFlightDownloadDataSize)
  139. }
  140. vs.inFlightDownloadDataLimitCond.L.Unlock()
  141. vs.GetOrHeadHandler(w, r)
  142. case http.MethodOptions:
  143. stats.ReadRequest()
  144. w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS")
  145. w.Header().Add("Access-Control-Allow-Headers", "*")
  146. }
  147. }
  148. func (vs *VolumeServer) maybeCheckJwtAuthorization(r *http.Request, vid, fid string, isWrite bool) bool {
  149. var signingKey security.SigningKey
  150. if isWrite {
  151. if len(vs.guard.SigningKey) == 0 {
  152. return true
  153. } else {
  154. signingKey = vs.guard.SigningKey
  155. }
  156. } else {
  157. if len(vs.guard.ReadSigningKey) == 0 {
  158. return true
  159. } else {
  160. signingKey = vs.guard.ReadSigningKey
  161. }
  162. }
  163. tokenStr := security.GetJwt(r)
  164. if tokenStr == "" {
  165. glog.V(1).Infof("missing jwt from %s", r.RemoteAddr)
  166. return false
  167. }
  168. token, err := security.DecodeJwt(signingKey, tokenStr, &security.SeaweedFileIdClaims{})
  169. if err != nil {
  170. glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err)
  171. return false
  172. }
  173. if !token.Valid {
  174. glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr)
  175. return false
  176. }
  177. if sc, ok := token.Claims.(*security.SeaweedFileIdClaims); ok {
  178. if sepIndex := strings.LastIndex(fid, "_"); sepIndex > 0 {
  179. fid = fid[:sepIndex]
  180. }
  181. return sc.Fid == vid+","+fid
  182. }
  183. glog.V(1).Infof("unexpected jwt from %s: %v", r.RemoteAddr, tokenStr)
  184. return false
  185. }