submit.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. package operation
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb"
  4. "io"
  5. "mime"
  6. "net/url"
  7. "os"
  8. "path"
  9. "strconv"
  10. "strings"
  11. "google.golang.org/grpc"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/security"
  14. )
  15. type FilePart struct {
  16. Reader io.Reader
  17. FileName string
  18. FileSize int64
  19. MimeType string
  20. ModTime int64 //in seconds
  21. Replication string
  22. Collection string
  23. DataCenter string
  24. Ttl string
  25. DiskType string
  26. Server string //this comes from assign result
  27. Fid string //this comes from assign result, but customizable
  28. Fsync bool
  29. }
  30. type SubmitResult struct {
  31. FileName string `json:"fileName,omitempty"`
  32. FileUrl string `json:"url,omitempty"`
  33. Fid string `json:"fid,omitempty"`
  34. Size uint32 `json:"size,omitempty"`
  35. Error string `json:"error,omitempty"`
  36. }
  37. type GetMasterFn func() pb.ServerAddress
  38. func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
  39. results := make([]SubmitResult, len(files))
  40. for index, file := range files {
  41. results[index].FileName = file.FileName
  42. }
  43. ar := &VolumeAssignRequest{
  44. Count: uint64(len(files)),
  45. Replication: replication,
  46. Collection: collection,
  47. DataCenter: dataCenter,
  48. Ttl: ttl,
  49. DiskType: diskType,
  50. }
  51. ret, err := Assign(masterFn, grpcDialOption, ar)
  52. if err != nil {
  53. for index := range files {
  54. results[index].Error = err.Error()
  55. }
  56. return results, err
  57. }
  58. for index, file := range files {
  59. file.Fid = ret.Fid
  60. if index > 0 {
  61. file.Fid = file.Fid + "_" + strconv.Itoa(index)
  62. }
  63. file.Server = ret.Url
  64. if usePublicUrl {
  65. file.Server = ret.PublicUrl
  66. }
  67. file.Replication = replication
  68. file.Collection = collection
  69. file.DataCenter = dataCenter
  70. file.Ttl = ttl
  71. file.DiskType = diskType
  72. results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
  73. if err != nil {
  74. results[index].Error = err.Error()
  75. }
  76. results[index].Fid = file.Fid
  77. results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
  78. }
  79. return results, nil
  80. }
  81. func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
  82. ret = make([]FilePart, len(fullPathFilenames))
  83. for index, file := range fullPathFilenames {
  84. if ret[index], err = newFilePart(file); err != nil {
  85. return
  86. }
  87. }
  88. return
  89. }
  90. func newFilePart(fullPathFilename string) (ret FilePart, err error) {
  91. fh, openErr := os.Open(fullPathFilename)
  92. if openErr != nil {
  93. glog.V(0).Info("Failed to open file: ", fullPathFilename)
  94. return ret, openErr
  95. }
  96. ret.Reader = fh
  97. fi, fiErr := fh.Stat()
  98. if fiErr != nil {
  99. glog.V(0).Info("Failed to stat file:", fullPathFilename)
  100. return ret, fiErr
  101. }
  102. ret.ModTime = fi.ModTime().UTC().Unix()
  103. ret.FileSize = fi.Size()
  104. ext := strings.ToLower(path.Ext(fullPathFilename))
  105. ret.FileName = fi.Name()
  106. if ext != "" {
  107. ret.MimeType = mime.TypeByExtension(ext)
  108. }
  109. return ret, nil
  110. }
  111. func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
  112. fileUrl := "http://" + fi.Server + "/" + fi.Fid
  113. if fi.ModTime != 0 {
  114. fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
  115. }
  116. if fi.Fsync {
  117. fileUrl += "?fsync=true"
  118. }
  119. if closer, ok := fi.Reader.(io.Closer); ok {
  120. defer closer.Close()
  121. }
  122. baseName := path.Base(fi.FileName)
  123. if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
  124. chunkSize := int64(maxMB * 1024 * 1024)
  125. chunks := fi.FileSize/chunkSize + 1
  126. cm := ChunkManifest{
  127. Name: baseName,
  128. Size: fi.FileSize,
  129. Mime: fi.MimeType,
  130. Chunks: make([]*ChunkInfo, 0, chunks),
  131. }
  132. var ret *AssignResult
  133. var id string
  134. if fi.DataCenter != "" {
  135. ar := &VolumeAssignRequest{
  136. Count: uint64(chunks),
  137. Replication: fi.Replication,
  138. Collection: fi.Collection,
  139. Ttl: fi.Ttl,
  140. DiskType: fi.DiskType,
  141. }
  142. ret, err = Assign(masterFn, grpcDialOption, ar)
  143. if err != nil {
  144. return
  145. }
  146. }
  147. for i := int64(0); i < chunks; i++ {
  148. if fi.DataCenter == "" {
  149. ar := &VolumeAssignRequest{
  150. Count: 1,
  151. Replication: fi.Replication,
  152. Collection: fi.Collection,
  153. Ttl: fi.Ttl,
  154. DiskType: fi.DiskType,
  155. }
  156. ret, err = Assign(masterFn, grpcDialOption, ar)
  157. if err != nil {
  158. // delete all uploaded chunks
  159. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  160. return
  161. }
  162. id = ret.Fid
  163. } else {
  164. id = ret.Fid
  165. if i > 0 {
  166. id += "_" + strconv.FormatInt(i, 10)
  167. }
  168. }
  169. fileUrl := "http://" + ret.Url + "/" + id
  170. if usePublicUrl {
  171. fileUrl = "http://" + ret.PublicUrl + "/" + id
  172. }
  173. count, e := upload_one_chunk(
  174. baseName+"-"+strconv.FormatInt(i+1, 10),
  175. io.LimitReader(fi.Reader, chunkSize),
  176. masterFn, fileUrl,
  177. ret.Auth)
  178. if e != nil {
  179. // delete all uploaded chunks
  180. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  181. return 0, e
  182. }
  183. cm.Chunks = append(cm.Chunks,
  184. &ChunkInfo{
  185. Offset: i * chunkSize,
  186. Size: int64(count),
  187. Fid: id,
  188. },
  189. )
  190. retSize += count
  191. }
  192. err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
  193. if err != nil {
  194. // delete all uploaded chunks
  195. cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
  196. }
  197. } else {
  198. uploadOption := &UploadOption{
  199. UploadUrl: fileUrl,
  200. Filename: baseName,
  201. Cipher: false,
  202. IsInputCompressed: false,
  203. MimeType: fi.MimeType,
  204. PairMap: nil,
  205. Jwt: jwt,
  206. }
  207. ret, e, _ := Upload(fi.Reader, uploadOption)
  208. if e != nil {
  209. return 0, e
  210. }
  211. return ret.Size, e
  212. }
  213. return
  214. }
  215. func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
  216. fileUrl string, jwt security.EncodedJwt,
  217. ) (size uint32, e error) {
  218. glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
  219. uploadOption := &UploadOption{
  220. UploadUrl: fileUrl,
  221. Filename: filename,
  222. Cipher: false,
  223. IsInputCompressed: false,
  224. MimeType: "",
  225. PairMap: nil,
  226. Jwt: jwt,
  227. }
  228. uploadResult, uploadError, _ := Upload(reader, uploadOption)
  229. if uploadError != nil {
  230. return 0, uploadError
  231. }
  232. return uploadResult.Size, nil
  233. }
  234. func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
  235. buf, e := manifest.Marshal()
  236. if e != nil {
  237. return e
  238. }
  239. glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
  240. u, _ := url.Parse(fileUrl)
  241. q := u.Query()
  242. q.Set("cm", "true")
  243. u.RawQuery = q.Encode()
  244. uploadOption := &UploadOption{
  245. UploadUrl: u.String(),
  246. Filename: manifest.Name,
  247. Cipher: false,
  248. IsInputCompressed: false,
  249. MimeType: "application/json",
  250. PairMap: nil,
  251. Jwt: jwt,
  252. }
  253. _, e = UploadData(buf, uploadOption)
  254. return e
  255. }