command_fs_meta_load.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package shell
  2. import (
  3. "flag"
  4. "fmt"
  5. "io"
  6. "os"
  7. "strings"
  8. "sync"
  9. "time"
  10. "google.golang.org/protobuf/proto"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. )
  14. func init() {
  15. Commands = append(Commands, &commandFsMetaLoad{})
  16. }
  17. type commandFsMetaLoad struct {
  18. dirPrefix *string
  19. }
  20. func (c *commandFsMetaLoad) Name() string {
  21. return "fs.meta.load"
  22. }
  23. func (c *commandFsMetaLoad) Help() string {
  24. return `load saved filer meta data to restore the directory and file structure
  25. fs.meta.load <filer_host>-<port>-<time>.meta
  26. fs.meta.load -v=false <filer_host>-<port>-<time>.meta // skip printing out the verbose output
  27. fs.meta.load -concurrency=1 <filer_host>-<port>-<time>.meta // number of parallel meta load to filer
  28. fs.meta.load -dirPrefix=/buckets/important <filer_host>.meta // load any dirs with prefix "important"
  29. `
  30. }
  31. func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  32. if len(args) == 0 {
  33. fmt.Fprintf(writer, "missing a metadata file\n")
  34. return nil
  35. }
  36. fileName := args[len(args)-1]
  37. metaLoadCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
  38. c.dirPrefix = metaLoadCommand.String("dirPrefix", "", "load entries only with directories matching prefix")
  39. concurrency := metaLoadCommand.Int("concurrency", 1, "number of parallel meta load to filer")
  40. verbose := metaLoadCommand.Bool("v", true, "verbose mode")
  41. if err = metaLoadCommand.Parse(args[0 : len(args)-1]); err != nil {
  42. return nil
  43. }
  44. dst, err := os.OpenFile(fileName, os.O_RDONLY, 0644)
  45. if err != nil {
  46. return nil
  47. }
  48. defer dst.Close()
  49. var dirCount, fileCount uint64
  50. lastLogTime := time.Now()
  51. err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  52. sizeBuf := make([]byte, 4)
  53. waitChan := make(chan struct{}, *concurrency)
  54. defer close(waitChan)
  55. var wg sync.WaitGroup
  56. for {
  57. if n, err := dst.Read(sizeBuf); n != 4 {
  58. if err == io.EOF {
  59. return nil
  60. }
  61. return err
  62. }
  63. size := util.BytesToUint32(sizeBuf)
  64. data := make([]byte, int(size))
  65. if n, err := dst.Read(data); n != len(data) {
  66. return err
  67. }
  68. fullEntry := &filer_pb.FullEntry{}
  69. if err = proto.Unmarshal(data, fullEntry); err != nil {
  70. return err
  71. }
  72. // check collection name pattern
  73. entryFullName := string(util.FullPath(fullEntry.Dir).Child(fullEntry.Entry.Name))
  74. if *c.dirPrefix != "" {
  75. if !strings.HasPrefix(fullEntry.Dir, *c.dirPrefix) {
  76. if *verbose {
  77. fmt.Fprintf(writer, "not match dir prefix %s\n", entryFullName)
  78. }
  79. continue
  80. }
  81. }
  82. if *verbose || lastLogTime.Add(time.Second).Before(time.Now()) {
  83. if !*verbose {
  84. lastLogTime = time.Now()
  85. }
  86. fmt.Fprintf(writer, "load %s\n", entryFullName)
  87. }
  88. fullEntry.Entry.Name = strings.ReplaceAll(fullEntry.Entry.Name, "/", "x")
  89. if fullEntry.Entry.IsDirectory {
  90. wg.Wait()
  91. if errEntry := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  92. Directory: fullEntry.Dir,
  93. Entry: fullEntry.Entry,
  94. }); errEntry != nil {
  95. return errEntry
  96. }
  97. dirCount++
  98. } else {
  99. wg.Add(1)
  100. waitChan <- struct{}{}
  101. go func(entry *filer_pb.FullEntry) {
  102. if errEntry := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
  103. Directory: entry.Dir,
  104. Entry: entry.Entry,
  105. }); errEntry != nil {
  106. err = errEntry
  107. }
  108. defer wg.Done()
  109. <-waitChan
  110. }(fullEntry)
  111. if err != nil {
  112. return err
  113. }
  114. fileCount++
  115. }
  116. }
  117. })
  118. if err == nil {
  119. fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount)
  120. fmt.Fprintf(writer, "\n%s is loaded.\n", fileName)
  121. }
  122. return err
  123. }