filer_remote_sync.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/glog"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/replication/source"
  8. "github.com/chrislusf/seaweedfs/weed/security"
  9. "github.com/chrislusf/seaweedfs/weed/util"
  10. "google.golang.org/grpc"
  11. "time"
  12. )
  13. type RemoteSyncOptions struct {
  14. filerAddress *string
  15. grpcDialOption grpc.DialOption
  16. readChunkFromFiler *bool
  17. timeAgo *time.Duration
  18. dir *string
  19. clientId int32
  20. }
  21. var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
  22. func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  23. return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  24. return fn(client)
  25. })
  26. }
  27. func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
  28. return location.Url
  29. }
  30. var (
  31. remoteSyncOptions RemoteSyncOptions
  32. )
  33. func init() {
  34. cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
  35. remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
  36. remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
  37. remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
  38. remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now, skipping previous metadata changes. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  39. remoteSyncOptions.clientId = util.RandomInt32()
  40. }
  41. var cmdFilerRemoteSynchronize = &Command{
  42. UsageLine: "filer.remote.sync",
  43. Short: "resumable continuously write back updates to remote storage",
  44. Long: `resumable continuously write back updates to remote storage
  45. filer.remote.sync listens on filer update events.
  46. If any mounted remote file is updated, it will fetch the updated content,
  47. and write to the remote storage.
  48. weed filer.remote.sync -dir=/mount/s3_on_cloud
  49. The metadata sync starting time is determined with the following priority order:
  50. 1. specified by timeAgo
  51. 2. last sync timestamp for this directory
  52. 3. directory creation time
  53. `,
  54. }
  55. func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
  56. util.LoadConfiguration("security", false)
  57. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  58. remoteSyncOptions.grpcDialOption = grpcDialOption
  59. dir := *remoteSyncOptions.dir
  60. filerAddress := pb.ServerAddress(*remoteSyncOptions.filerAddress)
  61. filerSource := &source.FilerSource{}
  62. filerSource.DoInitialize(
  63. filerAddress.ToHttpAddress(),
  64. filerAddress.ToGrpcAddress(),
  65. "/", // does not matter
  66. *remoteSyncOptions.readChunkFromFiler,
  67. )
  68. if dir != "" {
  69. fmt.Printf("synchronize %s to remote storage...\n", dir)
  70. util.RetryForever("filer.remote.sync "+dir, func() error {
  71. return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
  72. }, func(err error) bool {
  73. if err != nil {
  74. glog.Errorf("synchronize %s: %v", dir, err)
  75. }
  76. return true
  77. })
  78. return true
  79. }
  80. return true
  81. }