filer_backup.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb"
  7. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  8. "github.com/chrislusf/seaweedfs/weed/replication/source"
  9. "github.com/chrislusf/seaweedfs/weed/security"
  10. "github.com/chrislusf/seaweedfs/weed/util"
  11. "google.golang.org/grpc"
  12. "io"
  13. "time"
  14. )
  15. type FilerBackupOptions struct {
  16. isActivePassive *bool
  17. filer *string
  18. path *string
  19. debug *bool
  20. proxyByFiler *bool
  21. timeAgo *time.Duration
  22. }
  23. var (
  24. filerBackupOptions FilerBackupOptions
  25. )
  26. func init() {
  27. cmdFilerBackup.Run = runFilerBackup // break init cycle
  28. filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
  29. filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
  30. filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
  31. filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
  32. filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  33. }
  34. var cmdFilerBackup = &Command{
  35. UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
  36. Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
  37. Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
  38. filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
  39. and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
  40. If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
  41. A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
  42. `,
  43. }
  44. func runFilerBackup(cmd *Command, args []string) bool {
  45. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  46. util.LoadConfiguration("security", false)
  47. util.LoadConfiguration("replication", true)
  48. for {
  49. err := doFilerBackup(grpcDialOption, &filerBackupOptions)
  50. if err != nil {
  51. glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
  52. time.Sleep(1747 * time.Millisecond)
  53. }
  54. }
  55. return true
  56. }
  57. const (
  58. BackupKeyPrefix = "backup."
  59. )
  60. func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error {
  61. // find data sink
  62. config := util.GetViper()
  63. dataSink := findSink(config)
  64. if dataSink == nil {
  65. return fmt.Errorf("no data sink configured in replication.toml")
  66. }
  67. sourceFiler := *backupOption.filer
  68. sourcePath := *backupOption.path
  69. timeAgo := *backupOption.timeAgo
  70. targetPath := dataSink.GetSinkToDirectory()
  71. debug := *backupOption.debug
  72. // get start time for the data sink
  73. startFrom := time.Unix(0, 0)
  74. sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
  75. if timeAgo.Milliseconds() == 0 {
  76. lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
  77. if err != nil {
  78. glog.V(0).Infof("starting from %v", startFrom)
  79. } else {
  80. startFrom = time.Unix(0, lastOffsetTsNs)
  81. glog.V(0).Infof("resuming from %v", startFrom)
  82. }
  83. } else {
  84. startFrom = time.Now().Add(-timeAgo)
  85. glog.V(0).Infof("start time is set to %v", startFrom)
  86. }
  87. // create filer sink
  88. filerSource := &source.FilerSource{}
  89. filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler)
  90. dataSink.SetSourceFiler(filerSource)
  91. processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
  92. return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  93. ctx, cancel := context.WithCancel(context.Background())
  94. defer cancel()
  95. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  96. ClientName: "backup_" + dataSink.GetName(),
  97. PathPrefix: sourcePath,
  98. SinceNs: startFrom.UnixNano(),
  99. })
  100. if err != nil {
  101. return fmt.Errorf("listen: %v", err)
  102. }
  103. var counter int64
  104. var lastWriteTime time.Time
  105. for {
  106. resp, listenErr := stream.Recv()
  107. if listenErr == io.EOF {
  108. return nil
  109. }
  110. if listenErr != nil {
  111. return listenErr
  112. }
  113. if err := processEventFn(resp); err != nil {
  114. return fmt.Errorf("processEventFn: %v", err)
  115. }
  116. counter++
  117. if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
  118. glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
  119. counter = 0
  120. lastWriteTime = time.Now()
  121. if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
  122. return fmt.Errorf("setOffset: %v", err)
  123. }
  124. }
  125. }
  126. })
  127. }