filer_server_handlers_write_autochunk.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package weed_server
  2. import (
  3. "context"
  4. "io"
  5. "net/http"
  6. "path"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "github.com/chrislusf/seaweedfs/weed/filer2"
  11. "github.com/chrislusf/seaweedfs/weed/glog"
  12. "github.com/chrislusf/seaweedfs/weed/operation"
  13. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  14. "github.com/chrislusf/seaweedfs/weed/security"
  15. "github.com/chrislusf/seaweedfs/weed/stats"
  16. )
  17. func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
  18. replication string, collection string, dataCenter string, ttlSec int32, ttlString string) bool {
  19. if r.Method != "POST" {
  20. glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
  21. return false
  22. }
  23. // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
  24. query := r.URL.Query()
  25. parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32)
  26. maxMB := int32(parsedMaxMB)
  27. if maxMB <= 0 && fs.option.MaxMB > 0 {
  28. maxMB = int32(fs.option.MaxMB)
  29. }
  30. if maxMB <= 0 {
  31. glog.V(4).Infoln("AutoChunking not enabled")
  32. return false
  33. }
  34. glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)")
  35. chunkSize := 1024 * 1024 * maxMB
  36. contentLength := int64(0)
  37. if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 {
  38. contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64)
  39. if contentLength <= int64(chunkSize) {
  40. glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.")
  41. return false
  42. }
  43. }
  44. if contentLength <= 0 {
  45. glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.")
  46. return false
  47. }
  48. reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString)
  49. if err != nil {
  50. writeJsonError(w, r, http.StatusInternalServerError, err)
  51. } else if reply != nil {
  52. writeJsonQuiet(w, r, http.StatusCreated, reply)
  53. }
  54. return true
  55. }
  56. func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
  57. contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string) (filerResult *FilerPostResult, replyerr error) {
  58. stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
  59. start := time.Now()
  60. defer func() {
  61. stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
  62. }()
  63. multipartReader, multipartReaderErr := r.MultipartReader()
  64. if multipartReaderErr != nil {
  65. return nil, multipartReaderErr
  66. }
  67. part1, part1Err := multipartReader.NextPart()
  68. if part1Err != nil {
  69. return nil, part1Err
  70. }
  71. fileName := part1.FileName()
  72. if fileName != "" {
  73. fileName = path.Base(fileName)
  74. }
  75. contentType := part1.Header.Get("Content-Type")
  76. var fileChunks []*filer_pb.FileChunk
  77. chunkOffset := int64(0)
  78. for chunkOffset < contentLength {
  79. limitedReader := io.LimitReader(part1, int64(chunkSize))
  80. // assign one file id for one chunk
  81. fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString)
  82. if assignErr != nil {
  83. return nil, assignErr
  84. }
  85. // upload the chunk to the volume server
  86. uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
  87. if uploadErr != nil {
  88. return nil, uploadErr
  89. }
  90. // if last chunk exhausted the reader exactly at the border
  91. if uploadResult.Size == 0 {
  92. break
  93. }
  94. // Save to chunk manifest structure
  95. fileChunks = append(fileChunks,
  96. &filer_pb.FileChunk{
  97. FileId: fileId,
  98. Offset: chunkOffset,
  99. Size: uint64(uploadResult.Size),
  100. Mtime: time.Now().UnixNano(),
  101. ETag: uploadResult.ETag,
  102. CipherKey: uploadResult.CipherKey,
  103. IsGzipped: uploadResult.Gzip > 0,
  104. },
  105. )
  106. glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
  107. // reset variables for the next chunk
  108. chunkOffset = chunkOffset + int64(uploadResult.Size)
  109. // if last chunk was not at full chunk size, but already exhausted the reader
  110. if int64(uploadResult.Size) < int64(chunkSize) {
  111. break
  112. }
  113. }
  114. path := r.URL.Path
  115. if strings.HasSuffix(path, "/") {
  116. if fileName != "" {
  117. path += fileName
  118. }
  119. }
  120. glog.V(4).Infoln("saving", path)
  121. entry := &filer2.Entry{
  122. FullPath: filer2.FullPath(path),
  123. Attr: filer2.Attr{
  124. Mtime: time.Now(),
  125. Crtime: time.Now(),
  126. Mode: 0660,
  127. Uid: OS_UID,
  128. Gid: OS_GID,
  129. Replication: replication,
  130. Collection: collection,
  131. TtlSec: ttlSec,
  132. Mime: contentType,
  133. },
  134. Chunks: fileChunks,
  135. }
  136. filerResult = &FilerPostResult{
  137. Name: fileName,
  138. Size: chunkOffset,
  139. }
  140. if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil {
  141. fs.filer.DeleteChunks(entry.Chunks)
  142. replyerr = dbErr
  143. filerResult.Error = dbErr.Error()
  144. glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
  145. return
  146. }
  147. return
  148. }
  149. func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) {
  150. stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc()
  151. start := time.Now()
  152. defer func() {
  153. stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
  154. }()
  155. return operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
  156. }