backup.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/security"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/super_block"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "github.com/seaweedfs/seaweedfs/weed/operation"
  11. "github.com/seaweedfs/seaweedfs/weed/storage"
  12. )
  13. var (
  14. s BackupOptions
  15. )
  16. type BackupOptions struct {
  17. master *string
  18. collection *string
  19. dir *string
  20. volumeId *int
  21. ttl *string
  22. replication *string
  23. }
  24. func init() {
  25. cmdBackup.Run = runBackup // break init cycle
  26. s.master = cmdBackup.Flag.String("server", "localhost:9333", "SeaweedFS master location")
  27. s.collection = cmdBackup.Flag.String("collection", "", "collection name")
  28. s.dir = cmdBackup.Flag.String("dir", ".", "directory to store volume data files")
  29. s.volumeId = cmdBackup.Flag.Int("volumeId", -1, "a volume id. The volume .dat and .idx files should already exist in the dir.")
  30. s.ttl = cmdBackup.Flag.String("ttl", "", `backup volume's time to live, format:
  31. 3m: 3 minutes
  32. 4h: 4 hours
  33. 5d: 5 days
  34. 6w: 6 weeks
  35. 7M: 7 months
  36. 8y: 8 years
  37. default is the same with origin`)
  38. s.replication = cmdBackup.Flag.String("replication", "", "backup volume's replication, default is the same with origin")
  39. }
  40. var cmdBackup = &Command{
  41. UsageLine: "backup -dir=. -volumeId=234 -server=localhost:9333",
  42. Short: "incrementally backup a volume to local folder",
  43. Long: `Incrementally backup volume data.
  44. It is expected that you use this inside a script, to loop through
  45. all possible volume ids that needs to be backup to local folder.
  46. The volume id does not need to exist locally or even remotely.
  47. This will help to backup future new volumes.
  48. Usually backing up is just copying the .dat (and .idx) files.
  49. But it's tricky to incrementally copy the differences.
  50. The complexity comes when there are multiple addition, deletion and compaction.
  51. This tool will handle them correctly and efficiently, avoiding unnecessary data transportation.
  52. `,
  53. }
  54. func runBackup(cmd *Command, args []string) bool {
  55. util.LoadSecurityConfiguration()
  56. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  57. if *s.volumeId == -1 {
  58. return false
  59. }
  60. vid := needle.VolumeId(*s.volumeId)
  61. // find volume location, replication, ttl info
  62. lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String())
  63. if err != nil {
  64. fmt.Printf("Error looking up volume %d: %v\n", vid, err)
  65. return true
  66. }
  67. volumeServer := lookup.Locations[0].ServerAddress()
  68. stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
  69. if err != nil {
  70. fmt.Printf("Error get volume %d status: %v\n", vid, err)
  71. return true
  72. }
  73. var ttl *needle.TTL
  74. if *s.ttl != "" {
  75. ttl, err = needle.ReadTTL(*s.ttl)
  76. if err != nil {
  77. fmt.Printf("Error generate volume %d ttl %s: %v\n", vid, *s.ttl, err)
  78. return true
  79. }
  80. } else {
  81. ttl, err = needle.ReadTTL(stats.Ttl)
  82. if err != nil {
  83. fmt.Printf("Error get volume %d ttl %s: %v\n", vid, stats.Ttl, err)
  84. return true
  85. }
  86. }
  87. var replication *super_block.ReplicaPlacement
  88. if *s.replication != "" {
  89. replication, err = super_block.NewReplicaPlacementFromString(*s.replication)
  90. if err != nil {
  91. fmt.Printf("Error generate volume %d replication %s : %v\n", vid, *s.replication, err)
  92. return true
  93. }
  94. } else {
  95. replication, err = super_block.NewReplicaPlacementFromString(stats.Replication)
  96. if err != nil {
  97. fmt.Printf("Error get volume %d replication %s : %v\n", vid, stats.Replication, err)
  98. return true
  99. }
  100. }
  101. v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0)
  102. if err != nil {
  103. fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
  104. return true
  105. }
  106. if v.SuperBlock.CompactionRevision < uint16(stats.CompactRevision) {
  107. if err = v.Compact2(0, 0, nil); err != nil {
  108. fmt.Printf("Compact Volume before synchronizing %v\n", err)
  109. return true
  110. }
  111. if err = v.CommitCompact(); err != nil {
  112. fmt.Printf("Commit Compact before synchronizing %v\n", err)
  113. return true
  114. }
  115. v.SuperBlock.CompactionRevision = uint16(stats.CompactRevision)
  116. v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
  117. }
  118. datSize, _, _ := v.FileStat()
  119. if datSize > stats.TailOffset {
  120. // remove the old data
  121. if err := v.Destroy(false); err != nil {
  122. fmt.Printf("Error destroying volume: %v\n", err)
  123. }
  124. // recreate an empty volume
  125. v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0)
  126. if err != nil {
  127. fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
  128. return true
  129. }
  130. }
  131. defer v.Close()
  132. if err := v.IncrementalBackup(volumeServer, grpcDialOption); err != nil {
  133. fmt.Printf("Error synchronizing volume %d: %v\n", vid, err)
  134. return true
  135. }
  136. return true
  137. }