filer_copy.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "time"
  15. "github.com/chrislusf/seaweedfs/weed/operation"
  16. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  17. "github.com/chrislusf/seaweedfs/weed/security"
  18. "github.com/chrislusf/seaweedfs/weed/util"
  19. "github.com/chrislusf/seaweedfs/weed/wdclient"
  20. "github.com/spf13/viper"
  21. "google.golang.org/grpc"
  22. )
  23. var (
  24. copy CopyOptions
  25. waitGroup sync.WaitGroup
  26. )
  27. type CopyOptions struct {
  28. include *string
  29. replication *string
  30. collection *string
  31. ttl *string
  32. maxMB *int
  33. masterClient *wdclient.MasterClient
  34. concurrenctFiles *int
  35. concurrenctChunks *int
  36. compressionLevel *int
  37. grpcDialOption grpc.DialOption
  38. masters []string
  39. }
  40. func init() {
  41. cmdCopy.Run = runCopy // break init cycle
  42. cmdCopy.IsDebug = cmdCopy.Flag.Bool("debug", false, "verbose debug information")
  43. copy.include = cmdCopy.Flag.String("include", "", "pattens of files to copy, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
  44. copy.replication = cmdCopy.Flag.String("replication", "", "replication type")
  45. copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name")
  46. copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
  47. copy.maxMB = cmdCopy.Flag.Int("maxMB", 32, "split files larger than the limit")
  48. copy.concurrenctFiles = cmdCopy.Flag.Int("c", 8, "concurrent file copy goroutines")
  49. copy.concurrenctChunks = cmdCopy.Flag.Int("concurrentChunks", 8, "concurrent chunk copy goroutines for each file")
  50. copy.compressionLevel = cmdCopy.Flag.Int("compressionLevel", 9, "local file compression level 1 ~ 9")
  51. }
  52. var cmdCopy = &Command{
  53. UsageLine: "filer.copy file_or_dir1 [file_or_dir2 file_or_dir3] http://localhost:8888/path/to/a/folder/",
  54. Short: "copy one or a list of files to a filer folder",
  55. Long: `copy one or a list of files, or batch copy one whole folder recursively, to a filer folder
  56. It can copy one or a list of files or folders.
  57. If copying a whole folder recursively:
  58. All files under the folder and subfolders will be copyed.
  59. Optional parameter "-include" allows you to specify the file name patterns.
  60. If "maxMB" is set to a positive number, files larger than it would be split into chunks.
  61. `,
  62. }
  63. func runCopy(cmd *Command, args []string) bool {
  64. util.LoadConfiguration("security", false)
  65. if len(args) <= 1 {
  66. return false
  67. }
  68. filerDestination := args[len(args)-1]
  69. fileOrDirs := args[0 : len(args)-1]
  70. filerUrl, err := url.Parse(filerDestination)
  71. if err != nil {
  72. fmt.Printf("The last argument should be a URL on filer: %v\n", err)
  73. return false
  74. }
  75. urlPath := filerUrl.Path
  76. if !strings.HasSuffix(urlPath, "/") {
  77. fmt.Printf("The last argument should be a folder and end with \"/\": %v\n", err)
  78. return false
  79. }
  80. if filerUrl.Port() == "" {
  81. fmt.Printf("The filer port should be specified.\n")
  82. return false
  83. }
  84. filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
  85. if parseErr != nil {
  86. fmt.Printf("The filer port parse error: %v\n", parseErr)
  87. return false
  88. }
  89. filerGrpcPort := filerPort + 10000
  90. filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
  91. copy.grpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "client")
  92. ctx := context.Background()
  93. masters, collection, replication, maxMB, err := readFilerConfiguration(ctx, copy.grpcDialOption, filerGrpcAddress)
  94. if err != nil {
  95. fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
  96. return false
  97. }
  98. if *copy.collection == "" {
  99. *copy.collection = collection
  100. }
  101. if *copy.replication == "" {
  102. *copy.replication = replication
  103. }
  104. if *copy.maxMB == 0 {
  105. *copy.maxMB = int(maxMB)
  106. }
  107. copy.masters = masters
  108. copy.masterClient = wdclient.NewMasterClient(ctx, copy.grpcDialOption, "client", copy.masters)
  109. go copy.masterClient.KeepConnectedToMaster()
  110. copy.masterClient.WaitUntilConnected()
  111. if *cmdCopy.IsDebug {
  112. util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
  113. }
  114. fileCopyTaskChan := make(chan FileCopyTask, *copy.concurrenctFiles)
  115. go func() {
  116. defer close(fileCopyTaskChan)
  117. for _, fileOrDir := range fileOrDirs {
  118. if err := genFileCopyTask(fileOrDir, urlPath, fileCopyTaskChan); err != nil {
  119. fmt.Fprintf(os.Stderr, "gen file list error: %v\n", err)
  120. break
  121. }
  122. }
  123. }()
  124. for i := 0; i < *copy.concurrenctFiles; i++ {
  125. waitGroup.Add(1)
  126. go func() {
  127. defer waitGroup.Done()
  128. worker := FileCopyWorker{
  129. options: &copy,
  130. filerHost: filerUrl.Host,
  131. filerGrpcAddress: filerGrpcAddress,
  132. }
  133. if err := worker.copyFiles(ctx, fileCopyTaskChan); err != nil {
  134. fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
  135. return
  136. }
  137. }()
  138. }
  139. waitGroup.Wait()
  140. return true
  141. }
  142. func readFilerConfiguration(ctx context.Context, grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, maxMB uint32, err error) {
  143. err = withFilerClient(ctx, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  144. resp, err := client.GetFilerConfiguration(ctx, &filer_pb.GetFilerConfigurationRequest{})
  145. if err != nil {
  146. return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
  147. }
  148. masters, collection, replication, maxMB = resp.Masters, resp.Collection, resp.Replication, resp.MaxMb
  149. return nil
  150. })
  151. return
  152. }
  153. func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan FileCopyTask) error {
  154. fi, err := os.Stat(fileOrDir)
  155. if err != nil {
  156. fmt.Fprintf(os.Stderr, "Failed to get stat for file %s: %v\n", fileOrDir, err)
  157. return nil
  158. }
  159. mode := fi.Mode()
  160. if mode.IsDir() {
  161. files, _ := ioutil.ReadDir(fileOrDir)
  162. for _, subFileOrDir := range files {
  163. if err = genFileCopyTask(fileOrDir+"/"+subFileOrDir.Name(), destPath+fi.Name()+"/", fileCopyTaskChan); err != nil {
  164. return err
  165. }
  166. }
  167. return nil
  168. }
  169. uid, gid := util.GetFileUidGid(fi)
  170. fileCopyTaskChan <- FileCopyTask{
  171. sourceLocation: fileOrDir,
  172. destinationUrlPath: destPath,
  173. fileSize: fi.Size(),
  174. fileMode: fi.Mode(),
  175. uid: uid,
  176. gid: gid,
  177. }
  178. return nil
  179. }
  180. type FileCopyWorker struct {
  181. options *CopyOptions
  182. filerHost string
  183. filerGrpcAddress string
  184. }
  185. func (worker *FileCopyWorker) copyFiles(ctx context.Context, fileCopyTaskChan chan FileCopyTask) error {
  186. for task := range fileCopyTaskChan {
  187. if err := worker.doEachCopy(ctx, task); err != nil {
  188. return err
  189. }
  190. }
  191. return nil
  192. }
  193. type FileCopyTask struct {
  194. sourceLocation string
  195. destinationUrlPath string
  196. fileSize int64
  197. fileMode os.FileMode
  198. uid uint32
  199. gid uint32
  200. }
  201. func (worker *FileCopyWorker) doEachCopy(ctx context.Context, task FileCopyTask) error {
  202. f, err := os.Open(task.sourceLocation)
  203. if err != nil {
  204. fmt.Printf("Failed to open file %s: %v\n", task.sourceLocation, err)
  205. if _, ok := err.(*os.PathError); ok {
  206. fmt.Printf("skipping %s\n", task.sourceLocation)
  207. return nil
  208. }
  209. return err
  210. }
  211. defer f.Close()
  212. // this is a regular file
  213. if *worker.options.include != "" {
  214. if ok, _ := filepath.Match(*worker.options.include, filepath.Base(task.sourceLocation)); !ok {
  215. return nil
  216. }
  217. }
  218. // find the chunk count
  219. chunkSize := int64(*worker.options.maxMB * 1024 * 1024)
  220. chunkCount := 1
  221. if chunkSize > 0 && task.fileSize > chunkSize {
  222. chunkCount = int(task.fileSize/chunkSize) + 1
  223. }
  224. if chunkCount == 1 {
  225. return worker.uploadFileAsOne(ctx, task, f)
  226. }
  227. return worker.uploadFileInChunks(ctx, task, f, chunkCount, chunkSize)
  228. }
  229. func (worker *FileCopyWorker) uploadFileAsOne(ctx context.Context, task FileCopyTask, f *os.File) error {
  230. // upload the file content
  231. fileName := filepath.Base(f.Name())
  232. mimeType := detectMimeType(f)
  233. var chunks []*filer_pb.FileChunk
  234. if task.fileSize > 0 {
  235. // assign a volume
  236. assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
  237. Count: 1,
  238. Replication: *worker.options.replication,
  239. Collection: *worker.options.collection,
  240. Ttl: *worker.options.ttl,
  241. })
  242. if err != nil {
  243. fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
  244. }
  245. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  246. uploadResult, err := operation.UploadWithLocalCompressionLevel(targetUrl, fileName, f, false, mimeType, nil, assignResult.Auth, *worker.options.compressionLevel)
  247. if err != nil {
  248. return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  249. }
  250. if uploadResult.Error != "" {
  251. return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  252. }
  253. fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
  254. chunks = append(chunks, &filer_pb.FileChunk{
  255. FileId: assignResult.Fid,
  256. Offset: 0,
  257. Size: uint64(uploadResult.Size),
  258. Mtime: time.Now().UnixNano(),
  259. ETag: uploadResult.ETag,
  260. })
  261. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  262. }
  263. if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  264. request := &filer_pb.CreateEntryRequest{
  265. Directory: task.destinationUrlPath,
  266. Entry: &filer_pb.Entry{
  267. Name: fileName,
  268. Attributes: &filer_pb.FuseAttributes{
  269. Crtime: time.Now().Unix(),
  270. Mtime: time.Now().Unix(),
  271. Gid: task.gid,
  272. Uid: task.uid,
  273. FileSize: uint64(task.fileSize),
  274. FileMode: uint32(task.fileMode),
  275. Mime: mimeType,
  276. Replication: *worker.options.replication,
  277. Collection: *worker.options.collection,
  278. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  279. },
  280. Chunks: chunks,
  281. },
  282. }
  283. if _, err := client.CreateEntry(ctx, request); err != nil {
  284. return fmt.Errorf("update fh: %v", err)
  285. }
  286. return nil
  287. }); err != nil {
  288. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  289. }
  290. return nil
  291. }
  292. func (worker *FileCopyWorker) uploadFileInChunks(ctx context.Context, task FileCopyTask, f *os.File, chunkCount int, chunkSize int64) error {
  293. fileName := filepath.Base(f.Name())
  294. mimeType := detectMimeType(f)
  295. chunksChan := make(chan *filer_pb.FileChunk, chunkCount)
  296. concurrentChunks := make(chan struct{}, *worker.options.concurrenctChunks)
  297. var wg sync.WaitGroup
  298. var uploadError error
  299. fmt.Printf("uploading %s in %d chunks ...\n", fileName, chunkCount)
  300. for i := int64(0); i < int64(chunkCount) && uploadError == nil; i++ {
  301. wg.Add(1)
  302. concurrentChunks <- struct{}{}
  303. go func(i int64) {
  304. defer func() {
  305. wg.Done()
  306. <-concurrentChunks
  307. }()
  308. // assign a volume
  309. assignResult, err := operation.Assign(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, &operation.VolumeAssignRequest{
  310. Count: 1,
  311. Replication: *worker.options.replication,
  312. Collection: *worker.options.collection,
  313. Ttl: *worker.options.ttl,
  314. })
  315. if err != nil {
  316. fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
  317. }
  318. targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
  319. uploadResult, err := operation.Upload(targetUrl,
  320. fileName+"-"+strconv.FormatInt(i+1, 10),
  321. io.NewSectionReader(f, i*chunkSize, chunkSize),
  322. false, "application/octet-stream", nil, assignResult.Auth)
  323. if err != nil {
  324. uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
  325. return
  326. }
  327. if uploadResult.Error != "" {
  328. uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
  329. return
  330. }
  331. chunksChan <- &filer_pb.FileChunk{
  332. FileId: assignResult.Fid,
  333. Offset: i * chunkSize,
  334. Size: uint64(uploadResult.Size),
  335. Mtime: time.Now().UnixNano(),
  336. ETag: uploadResult.ETag,
  337. }
  338. fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
  339. }(i)
  340. }
  341. wg.Wait()
  342. close(chunksChan)
  343. var chunks []*filer_pb.FileChunk
  344. for chunk := range chunksChan {
  345. chunks = append(chunks, chunk)
  346. }
  347. if uploadError != nil {
  348. var fileIds []string
  349. for _, chunk := range chunks {
  350. fileIds = append(fileIds, chunk.FileId)
  351. }
  352. operation.DeleteFiles(worker.options.masterClient.GetMaster(), worker.options.grpcDialOption, fileIds)
  353. return uploadError
  354. }
  355. if err := withFilerClient(ctx, worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  356. request := &filer_pb.CreateEntryRequest{
  357. Directory: task.destinationUrlPath,
  358. Entry: &filer_pb.Entry{
  359. Name: fileName,
  360. Attributes: &filer_pb.FuseAttributes{
  361. Crtime: time.Now().Unix(),
  362. Mtime: time.Now().Unix(),
  363. Gid: task.gid,
  364. Uid: task.uid,
  365. FileSize: uint64(task.fileSize),
  366. FileMode: uint32(task.fileMode),
  367. Mime: mimeType,
  368. Replication: *worker.options.replication,
  369. Collection: *worker.options.collection,
  370. TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
  371. },
  372. Chunks: chunks,
  373. },
  374. }
  375. if _, err := client.CreateEntry(ctx, request); err != nil {
  376. return fmt.Errorf("update fh: %v", err)
  377. }
  378. return nil
  379. }); err != nil {
  380. return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
  381. }
  382. fmt.Printf("copied %s => http://%s%s%s\n", fileName, worker.filerHost, task.destinationUrlPath, fileName)
  383. return nil
  384. }
  385. func detectMimeType(f *os.File) string {
  386. head := make([]byte, 512)
  387. f.Seek(0, io.SeekStart)
  388. n, err := f.Read(head)
  389. if err == io.EOF {
  390. return ""
  391. }
  392. if err != nil {
  393. fmt.Printf("read head of %v: %v\n", f.Name(), err)
  394. return "application/octet-stream"
  395. }
  396. f.Seek(0, io.SeekStart)
  397. mimeType := http.DetectContentType(head[:n])
  398. return mimeType
  399. }
  400. func withFilerClient(ctx context.Context, filerAddress string, grpcDialOption grpc.DialOption, fn func(filer_pb.SeaweedFilerClient) error) error {
  401. return util.WithCachedGrpcClient(ctx, func(clientConn *grpc.ClientConn) error {
  402. client := filer_pb.NewSeaweedFilerClient(clientConn)
  403. return fn(client)
  404. }, filerAddress, grpcDialOption)
  405. }