filer_backup.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package command
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  9. "github.com/seaweedfs/seaweedfs/weed/security"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. "github.com/seaweedfs/seaweedfs/weed/util/http"
  12. "google.golang.org/grpc"
  13. "regexp"
  14. "strings"
  15. "time"
  16. )
  17. type FilerBackupOptions struct {
  18. isActivePassive *bool
  19. filer *string
  20. path *string
  21. excludePaths *string
  22. excludeFileName *string
  23. debug *bool
  24. proxyByFiler *bool
  25. doDeleteFiles *bool
  26. disableErrorRetry *bool
  27. ignore404Error *bool
  28. timeAgo *time.Duration
  29. retentionDays *int
  30. }
  31. var (
  32. filerBackupOptions FilerBackupOptions
  33. )
  34. func init() {
  35. cmdFilerBackup.Run = runFilerBackup // break init cycle
  36. filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
  37. filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
  38. filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
  39. filerBackupOptions.excludeFileName = cmdFilerBackup.Flag.String("filerExcludeFileName", "", "exclude file names that match the regexp to sync on filer")
  40. filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
  41. filerBackupOptions.doDeleteFiles = cmdFilerBackup.Flag.Bool("doDeleteFiles", false, "delete files on the destination")
  42. filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
  43. filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  44. filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days")
  45. filerBackupOptions.disableErrorRetry = cmdFilerBackup.Flag.Bool("disableErrorRetry", false, "disables errors retry, only logs will print")
  46. filerBackupOptions.ignore404Error = cmdFilerBackup.Flag.Bool("ignore404Error", true, "ignore 404 errors from filer")
  47. }
  48. var cmdFilerBackup = &Command{
  49. UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
  50. Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
  51. Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
  52. filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
  53. and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
  54. If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
  55. A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
  56. `,
  57. }
  58. func runFilerBackup(cmd *Command, args []string) bool {
  59. util.LoadSecurityConfiguration()
  60. util.LoadConfiguration("replication", true)
  61. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  62. clientId := util.RandomInt32()
  63. var clientEpoch int32
  64. for {
  65. clientEpoch++
  66. err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId, clientEpoch)
  67. if err != nil {
  68. glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
  69. time.Sleep(1747 * time.Millisecond)
  70. }
  71. }
  72. return true
  73. }
  74. const (
  75. BackupKeyPrefix = "backup."
  76. )
  77. func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32, clientEpoch int32) error {
  78. // find data sink
  79. dataSink := findSink(util.GetViper())
  80. if dataSink == nil {
  81. return fmt.Errorf("no data sink configured in replication.toml")
  82. }
  83. sourceFiler := pb.ServerAddress(*backupOption.filer)
  84. sourcePath := *backupOption.path
  85. excludePaths := util.StringSplit(*backupOption.excludePaths, ",")
  86. var reExcludeFileName *regexp.Regexp
  87. if *backupOption.excludeFileName != "" {
  88. var err error
  89. if reExcludeFileName, err = regexp.Compile(*backupOption.excludeFileName); err != nil {
  90. return fmt.Errorf("error compile regexp %v for exclude file name: %+v", *backupOption.excludeFileName, err)
  91. }
  92. }
  93. timeAgo := *backupOption.timeAgo
  94. targetPath := dataSink.GetSinkToDirectory()
  95. debug := *backupOption.debug
  96. // get start time for the data sink
  97. startFrom := time.Unix(0, 0)
  98. sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
  99. if timeAgo.Milliseconds() == 0 {
  100. lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
  101. if err != nil {
  102. glog.V(0).Infof("starting from %v", startFrom)
  103. } else {
  104. startFrom = time.Unix(0, lastOffsetTsNs)
  105. glog.V(0).Infof("resuming from %v", startFrom)
  106. }
  107. } else {
  108. startFrom = time.Now().Add(-timeAgo)
  109. glog.V(0).Infof("start time is set to %v", startFrom)
  110. }
  111. // create filer sink
  112. filerSource := &source.FilerSource{}
  113. filerSource.DoInitialize(
  114. sourceFiler.ToHttpAddress(),
  115. sourceFiler.ToGrpcAddress(),
  116. sourcePath,
  117. *backupOption.proxyByFiler)
  118. dataSink.SetSourceFiler(filerSource)
  119. var processEventFn func(*filer_pb.SubscribeMetadataResponse) error
  120. if *backupOption.ignore404Error {
  121. processEventFnGenerated := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
  122. processEventFn = func(resp *filer_pb.SubscribeMetadataResponse) error {
  123. err := processEventFnGenerated(resp)
  124. if err == nil {
  125. return nil
  126. }
  127. if errors.Is(err, http.ErrNotFound) {
  128. glog.V(0).Infof("got 404 error, ignore it: %s", err.Error())
  129. return nil
  130. }
  131. return err
  132. }
  133. } else {
  134. processEventFn = genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, dataSink, *backupOption.doDeleteFiles, debug)
  135. }
  136. processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
  137. glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
  138. return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
  139. })
  140. if dataSink.IsIncremental() && *filerBackupOptions.retentionDays > 0 {
  141. go func() {
  142. for {
  143. now := time.Now()
  144. time.Sleep(time.Hour * 24)
  145. key := util.Join(targetPath, now.Add(-1*time.Hour*24*time.Duration(*filerBackupOptions.retentionDays)).Format("2006-01-02"))
  146. _ = dataSink.DeleteEntry(util.Join(targetPath, key), true, true, nil)
  147. glog.V(0).Infof("incremental backup delete directory:%s", key)
  148. }
  149. }()
  150. }
  151. prefix := sourcePath
  152. if !strings.HasSuffix(prefix, "/") {
  153. prefix = prefix + "/"
  154. }
  155. eventErrorType := pb.RetryForeverOnError
  156. if *backupOption.disableErrorRetry {
  157. eventErrorType = pb.TrivialOnError
  158. }
  159. metadataFollowOption := &pb.MetadataFollowOption{
  160. ClientName: "backup_" + dataSink.GetName(),
  161. ClientId: clientId,
  162. ClientEpoch: clientEpoch,
  163. SelfSignature: 0,
  164. PathPrefix: prefix,
  165. AdditionalPathPrefixes: nil,
  166. DirectoriesToWatch: nil,
  167. StartTsNs: startFrom.UnixNano(),
  168. StopTsNs: 0,
  169. EventErrorType: eventErrorType,
  170. }
  171. return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
  172. }