volume_server_fasthttp_handlers_read.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. package weed_server
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "mime"
  10. "mime/multipart"
  11. "net/http"
  12. "net/url"
  13. "path"
  14. "strconv"
  15. "strings"
  16. "time"
  17. "github.com/valyala/fasthttp"
  18. "github.com/chrislusf/seaweedfs/weed/glog"
  19. "github.com/chrislusf/seaweedfs/weed/images"
  20. "github.com/chrislusf/seaweedfs/weed/operation"
  21. "github.com/chrislusf/seaweedfs/weed/stats"
  22. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  23. "github.com/chrislusf/seaweedfs/weed/util"
  24. )
  25. func (vs *VolumeServer) fastGetOrHeadHandler(ctx *fasthttp.RequestCtx) {
  26. stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
  27. start := time.Now()
  28. defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
  29. requestPath := string(ctx.Path())
  30. n := new(needle.Needle)
  31. vid, fid, filename, ext, _ := parseURLPath(requestPath)
  32. if !vs.maybeCheckJwtAuthorization(ctx, vid, fid, false) {
  33. writeJsonError(ctx, http.StatusUnauthorized, errors.New("wrong jwt"))
  34. return
  35. }
  36. volumeId, err := needle.NewVolumeId(vid)
  37. if err != nil {
  38. glog.V(2).Infof("parsing volumd id %s error: %v", err, requestPath)
  39. ctx.SetStatusCode(http.StatusBadRequest)
  40. return
  41. }
  42. err = n.ParsePath(fid)
  43. if err != nil {
  44. glog.V(2).Infof("parsing fid %s error: %v", err, requestPath)
  45. ctx.SetStatusCode(http.StatusBadRequest)
  46. return
  47. }
  48. // glog.V(4).Infoln("volume", volumeId, "reading", n)
  49. hasVolume := vs.store.HasVolume(volumeId)
  50. _, hasEcVolume := vs.store.FindEcVolume(volumeId)
  51. if !hasVolume && !hasEcVolume {
  52. if !vs.ReadRedirect {
  53. glog.V(2).Infoln("volume is not local:", err, requestPath)
  54. ctx.SetStatusCode(http.StatusNotFound)
  55. return
  56. }
  57. lookupResult, err := operation.Lookup(vs.GetMaster(), volumeId.String())
  58. glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
  59. if err == nil && len(lookupResult.Locations) > 0 {
  60. u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
  61. u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
  62. arg := url.Values{}
  63. if c := ctx.FormValue("collection"); c != nil {
  64. arg.Set("collection", string(c))
  65. }
  66. u.RawQuery = arg.Encode()
  67. ctx.Redirect(u.String(), http.StatusMovedPermanently)
  68. } else {
  69. glog.V(2).Infof("lookup %s error: %v", requestPath, err)
  70. ctx.SetStatusCode(http.StatusNotFound)
  71. }
  72. return
  73. }
  74. cookie := n.Cookie
  75. var count int
  76. if hasVolume {
  77. count, err = vs.store.ReadVolumeNeedle(volumeId, n)
  78. } else if hasEcVolume {
  79. count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
  80. }
  81. // glog.V(4).Infoln("read bytes", count, "error", err)
  82. if err != nil || count < 0 {
  83. glog.V(0).Infof("read %s isNormalVolume %v error: %v", requestPath, hasVolume, err)
  84. ctx.SetStatusCode(http.StatusNotFound)
  85. return
  86. }
  87. if n.Cookie != cookie {
  88. glog.V(0).Infof("request %s with cookie:%x expected:%x agent %s", requestPath, cookie, n.Cookie, string(ctx.UserAgent()))
  89. ctx.SetStatusCode(http.StatusNotFound)
  90. return
  91. }
  92. if n.LastModified != 0 {
  93. ctx.Response.Header.Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat))
  94. if ctx.Response.Header.Peek("If-Modified-Since") != nil {
  95. if t, parseError := time.Parse(http.TimeFormat, string(ctx.Response.Header.Peek("If-Modified-Since"))); parseError == nil {
  96. if t.Unix() >= int64(n.LastModified) {
  97. ctx.SetStatusCode(http.StatusNotModified)
  98. return
  99. }
  100. }
  101. }
  102. }
  103. if inm := ctx.Response.Header.Peek("If-None-Match"); inm != nil && string(inm) == "\""+n.Etag()+"\"" {
  104. ctx.SetStatusCode(http.StatusNotModified)
  105. return
  106. }
  107. eTagMd5 := ctx.Response.Header.Peek("ETag-MD5")
  108. if eTagMd5 != nil && string(eTagMd5) == "True" {
  109. fastSetEtag(ctx, n.MD5())
  110. } else {
  111. fastSetEtag(ctx, n.Etag())
  112. }
  113. if n.HasPairs() {
  114. pairMap := make(map[string]string)
  115. err = json.Unmarshal(n.Pairs, &pairMap)
  116. if err != nil {
  117. glog.V(0).Infoln("Unmarshal pairs error:", err)
  118. }
  119. for k, v := range pairMap {
  120. ctx.Response.Header.Set(k, v)
  121. }
  122. }
  123. if vs.fastTryHandleChunkedFile(n, filename, ctx) {
  124. return
  125. }
  126. if n.NameSize > 0 && filename == "" {
  127. filename = string(n.Name)
  128. if ext == "" {
  129. ext = path.Ext(filename)
  130. }
  131. }
  132. mtype := ""
  133. if n.MimeSize > 0 {
  134. mt := string(n.Mime)
  135. if !strings.HasPrefix(mt, "application/octet-stream") {
  136. mtype = mt
  137. }
  138. }
  139. if ext != ".gz" {
  140. if n.IsGzipped() {
  141. acceptEncoding := ctx.Request.Header.Peek("Accept-Encoding")
  142. if acceptEncoding != nil && strings.Contains(string(acceptEncoding), "gzip") {
  143. ctx.Response.Header.Set("Content-Encoding", "gzip")
  144. } else {
  145. if n.Data, err = util.UnGzipData(n.Data); err != nil {
  146. glog.V(0).Infoln("ungzip error:", err, requestPath)
  147. }
  148. }
  149. }
  150. }
  151. rs := fastConditionallyResizeImages(bytes.NewReader(n.Data), ext, ctx)
  152. if e := fastWriteResponseContent(filename, mtype, rs, ctx); e != nil {
  153. glog.V(2).Infoln("response write error:", e)
  154. }
  155. }
  156. func (vs *VolumeServer) fastTryHandleChunkedFile(n *needle.Needle, fileName string, ctx *fasthttp.RequestCtx) (processed bool) {
  157. if !n.IsChunkedManifest() || string(ctx.FormValue("cm")) == "false" {
  158. return false
  159. }
  160. chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
  161. if e != nil {
  162. glog.V(0).Infof("load chunked manifest (%s) error: %v", string(ctx.Path()), e)
  163. return false
  164. }
  165. if fileName == "" && chunkManifest.Name != "" {
  166. fileName = chunkManifest.Name
  167. }
  168. ext := path.Ext(fileName)
  169. mType := ""
  170. if chunkManifest.Mime != "" {
  171. mt := chunkManifest.Mime
  172. if !strings.HasPrefix(mt, "application/octet-stream") {
  173. mType = mt
  174. }
  175. }
  176. ctx.Response.Header.Set("X-File-Store", "chunked")
  177. chunkedFileReader := &operation.ChunkedFileReader{
  178. Manifest: chunkManifest,
  179. Master: vs.GetMaster(),
  180. }
  181. defer chunkedFileReader.Close()
  182. rs := fastConditionallyResizeImages(chunkedFileReader, ext, ctx)
  183. if e := fastWriteResponseContent(fileName, mType, rs, ctx); e != nil {
  184. glog.V(2).Infoln("response write error:", e)
  185. }
  186. return true
  187. }
  188. func fastConditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext string, ctx *fasthttp.RequestCtx) io.ReadSeeker {
  189. rs := originalDataReaderSeeker
  190. if len(ext) > 0 {
  191. ext = strings.ToLower(ext)
  192. }
  193. if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" {
  194. width, height := 0, 0
  195. formWidth, formHeight := ctx.FormValue("width"), ctx.FormValue("height")
  196. if formWidth != nil {
  197. width, _ = strconv.Atoi(string(formWidth))
  198. }
  199. if formHeight != nil {
  200. height, _ = strconv.Atoi(string(formHeight))
  201. }
  202. formMode := ctx.FormValue("mode")
  203. rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, string(formMode))
  204. }
  205. return rs
  206. }
  207. func fastWriteResponseContent(filename, mimeType string, rs io.ReadSeeker, ctx *fasthttp.RequestCtx) error {
  208. totalSize, e := rs.Seek(0, 2)
  209. if mimeType == "" {
  210. if ext := path.Ext(filename); ext != "" {
  211. mimeType = mime.TypeByExtension(ext)
  212. }
  213. }
  214. if mimeType != "" {
  215. ctx.Response.Header.Set("Content-Type", mimeType)
  216. }
  217. if filename != "" {
  218. contentDisposition := "inline"
  219. dlFormValue := ctx.FormValue("dl")
  220. if dlFormValue != nil {
  221. if dl, _ := strconv.ParseBool(string(dlFormValue)); dl {
  222. contentDisposition = "attachment"
  223. }
  224. }
  225. ctx.Response.Header.Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`)
  226. }
  227. ctx.Response.Header.Set("Accept-Ranges", "bytes")
  228. if ctx.IsHead() {
  229. ctx.Response.Header.Set("Content-Length", strconv.FormatInt(totalSize, 10))
  230. return nil
  231. }
  232. rangeReq := ctx.FormValue("Range")
  233. if rangeReq == nil {
  234. ctx.Response.Header.Set("Content-Length", strconv.FormatInt(totalSize, 10))
  235. if _, e = rs.Seek(0, 0); e != nil {
  236. return e
  237. }
  238. _, e = io.Copy(ctx.Response.BodyWriter(), rs)
  239. return e
  240. }
  241. //the rest is dealing with partial content request
  242. //mostly copy from src/pkg/net/http/fs.go
  243. ranges, err := parseRange(string(rangeReq), totalSize)
  244. if err != nil {
  245. ctx.Response.SetStatusCode(http.StatusRequestedRangeNotSatisfiable)
  246. ctxError(ctx, err.Error(), http.StatusRequestedRangeNotSatisfiable)
  247. return nil
  248. }
  249. if sumRangesSize(ranges) > totalSize {
  250. // The total number of bytes in all the ranges
  251. // is larger than the size of the file by
  252. // itself, so this is probably an attack, or a
  253. // dumb client. Ignore the range request.
  254. return nil
  255. }
  256. if len(ranges) == 0 {
  257. return nil
  258. }
  259. if len(ranges) == 1 {
  260. // RFC 2616, Section 14.16:
  261. // "When an HTTP message includes the content of a single
  262. // range (for example, a response to a request for a
  263. // single range, or to a request for a set of ranges
  264. // that overlap without any holes), this content is
  265. // transmitted with a Content-Range header, and a
  266. // Content-Length header showing the number of bytes
  267. // actually transferred.
  268. // ...
  269. // A response to a request for a single range MUST NOT
  270. // be sent using the multipart/byteranges media type."
  271. ra := ranges[0]
  272. ctx.Response.Header.Set("Content-Length", strconv.FormatInt(ra.length, 10))
  273. ctx.Response.Header.Set("Content-Range", ra.contentRange(totalSize))
  274. ctx.Response.SetStatusCode(http.StatusPartialContent)
  275. if _, e = rs.Seek(ra.start, 0); e != nil {
  276. return e
  277. }
  278. _, e = io.CopyN(ctx.Response.BodyWriter(), rs, ra.length)
  279. return e
  280. }
  281. // process multiple ranges
  282. for _, ra := range ranges {
  283. if ra.start > totalSize {
  284. ctxError(ctx, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
  285. return nil
  286. }
  287. }
  288. sendSize := rangesMIMESize(ranges, mimeType, totalSize)
  289. pr, pw := io.Pipe()
  290. mw := multipart.NewWriter(pw)
  291. ctx.Response.Header.Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
  292. sendContent := pr
  293. defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
  294. go func() {
  295. for _, ra := range ranges {
  296. part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
  297. if e != nil {
  298. pw.CloseWithError(e)
  299. return
  300. }
  301. if _, e = rs.Seek(ra.start, 0); e != nil {
  302. pw.CloseWithError(e)
  303. return
  304. }
  305. if _, e = io.CopyN(part, rs, ra.length); e != nil {
  306. pw.CloseWithError(e)
  307. return
  308. }
  309. }
  310. mw.Close()
  311. pw.Close()
  312. }()
  313. if ctx.Response.Header.Peek("Content-Encoding") == nil {
  314. ctx.Response.Header.Set("Content-Length", strconv.FormatInt(sendSize, 10))
  315. }
  316. ctx.Response.Header.SetStatusCode(http.StatusPartialContent)
  317. _, e = io.CopyN(ctx.Response.BodyWriter(), sendContent, sendSize)
  318. return e
  319. }
  320. func fastSetEtag(ctx *fasthttp.RequestCtx, etag string) {
  321. if etag != "" {
  322. if strings.HasPrefix(etag, "\"") {
  323. ctx.Response.Header.Set("ETag", etag)
  324. } else {
  325. ctx.Response.Header.Set("ETag", "\""+etag+"\"")
  326. }
  327. }
  328. }
  329. func ctxError(ctx *fasthttp.RequestCtx, error string, code int) {
  330. ctx.Response.Header.Set("Content-Type", "text/plain; charset=utf-8")
  331. ctx.Response.Header.Set("X-Content-Type-Options", "nosniff")
  332. ctx.Response.SetStatusCode(code)
  333. fmt.Fprintln(ctx.Response.BodyWriter(), error)
  334. }