backup.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "github.com/seaweedfs/seaweedfs/weed/security"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "github.com/seaweedfs/seaweedfs/weed/operation"
  10. "github.com/seaweedfs/seaweedfs/weed/storage"
  11. )
  12. var (
  13. s BackupOptions
  14. )
  15. type BackupOptions struct {
  16. master *string
  17. collection *string
  18. dir *string
  19. volumeId *int
  20. ttl *string
  21. replication *string
  22. }
  23. func init() {
  24. cmdBackup.Run = runBackup // break init cycle
  25. s.master = cmdBackup.Flag.String("server", "localhost:9333", "SeaweedFS master location")
  26. s.collection = cmdBackup.Flag.String("collection", "", "collection name")
  27. s.dir = cmdBackup.Flag.String("dir", ".", "directory to store volume data files")
  28. s.volumeId = cmdBackup.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.")
  29. s.ttl = cmdBackup.Flag.String("ttl", "", `backup volume's time to live, format:
  30. 3m: 3 minutes
  31. 4h: 4 hours
  32. 5d: 5 days
  33. 6w: 6 weeks
  34. 7M: 7 months
  35. 8y: 8 years
  36. default is the same with origin`)
  37. s.replication = cmdBackup.Flag.String("replication", "", "backup volume's replication, default is the same with origin")
  38. }
  39. var cmdBackup = &Command{
  40. UsageLine: "backup -dir=. -volumeId=234 -server=localhost:9333",
  41. Short: "incrementally backup a volume to local folder",
  42. Long: `Incrementally backup volume data.
  43. It is expected that you use this inside a script, to loop through
  44. all possible volume ids that needs to be backup to local folder.
  45. The volume id does not need to exist locally or even remotely.
  46. This will help to backup future new volumes.
  47. Usually backing up is just copying the .dat (and .idx) files.
  48. But it's tricky to incrementally copy the differences.
  49. The complexity comes when there are multiple addition, deletion and compaction.
  50. This tool will handle them correctly and efficiently, avoiding unnecessary data transportation.
  51. `,
  52. }
  53. func runBackup(cmd *Command, args []string) bool {
  54. util.LoadConfiguration("security", false)
  55. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  56. if *s.volumeId == -1 {
  57. return false
  58. }
  59. vid := needle.VolumeId(*s.volumeId)
  60. // find volume location, replication, ttl info
  61. lookup, err := operation.LookupVolumeId(func() pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String())
  62. if err != nil {
  63. fmt.Printf("Error looking up volume %d: %v\n", vid, err)
  64. return true
  65. }
  66. volumeServer := lookup.Locations[0].ServerAddress()
  67. stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
  68. if err != nil {
  69. fmt.Printf("Error get volume %d status: %v\n", vid, err)
  70. return true
  71. }
  72. var ttl *needle.TTL
  73. if *s.ttl != "" {
  74. ttl, err = needle.ReadTTL(*s.ttl)
  75. if err != nil {
  76. fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err)
  77. return true
  78. }
  79. } else {
  80. ttl, err = needle.ReadTTL(stats.Ttl)
  81. if err != nil {
  82. fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
  83. return true
  84. }
  85. }
  86. var replication *super_block.ReplicaPlacement
  87. if *s.replication != "" {
  88. replication, err = super_block.NewReplicaPlacementFromString(*s.replication)
  89. if err != nil {
  90. fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err)
  91. return true
  92. }
  93. } else {
  94. replication, err = super_block.NewReplicaPlacementFromString(stats.Replication)
  95. if err != nil {
  96. fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
  97. return true
  98. }
  99. }
  100. v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
  101. if err != nil {
  102. fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
  103. return true
  104. }
  105. if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
  106. if err = v.Compact2(0, 0, nil); err != nil {
  107. fmt.Printf("Compact Volume before synchronizing %v\n", err)
  108. return true
  109. }
  110. if err = v.CommitCompact(); err != nil {
  111. fmt.Printf("Commit Compact before synchronizing %v\n", err)
  112. return true
  113. }
  114. v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision)
  115. v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
  116. }
  117. datSize, _, _ := v.FileStat()
  118. if datSize > stats.TailOffset {
  119. // remove the old data
  120. v.Destroy()
  121. // recreate an empty volume
  122. v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
  123. if err != nil {
  124. fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
  125. return true
  126. }
  127. }
  128. defer v.Close()
  129. if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil {
  130. fmt.Printf("Error synchronizing volume %d: %v\n", vid, err)
  131. return true
  132. }
  133. return true
  134. }