filer_remote_sync.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package command
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/glog"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  7. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  8. "github.com/seaweedfs/seaweedfs/weed/security"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. "google.golang.org/grpc"
  11. "time"
  12. )
  13. type RemoteSyncOptions struct {
  14. filerAddress *string
  15. storageClass *string
  16. grpcDialOption grpc.DialOption
  17. readChunkFromFiler *bool
  18. timeAgo *time.Duration
  19. dir *string
  20. clientId int32
  21. clientEpoch int32
  22. }
  23. var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
  24. func (option *RemoteSyncOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  25. return pb.WithFilerClient(streamingMode, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  26. return fn(client)
  27. })
  28. }
  29. func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
  30. return location.Url
  31. }
  32. func (option *RemoteSyncOptions) GetDataCenter() string {
  33. return ""
  34. }
  35. var (
  36. remoteSyncOptions RemoteSyncOptions
  37. )
  38. func init() {
  39. cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
  40. remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
  41. remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
  42. remoteSyncOptions.storageClass = cmdFilerRemoteSynchronize.Flag.String("storageClass", "", "override amz storage class, empty to delete")
  43. remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
  44. remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now, skipping previous metadata changes. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  45. remoteSyncOptions.clientId = util.RandomInt32()
  46. }
  47. var cmdFilerRemoteSynchronize = &Command{
  48. UsageLine: "filer.remote.sync",
  49. Short: "resumable continuously write back updates to remote storage",
  50. Long: `resumable continuously write back updates to remote storage
  51. filer.remote.sync listens on filer update events.
  52. If any mounted remote file is updated, it will fetch the updated content,
  53. and write to the remote storage.
  54. weed filer.remote.sync -dir=/mount/s3_on_cloud
  55. The metadata sync starting time is determined with the following priority order:
  56. 1. specified by timeAgo
  57. 2. last sync timestamp for this directory
  58. 3. directory creation time
  59. `,
  60. }
  61. func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
  62. util.LoadConfiguration("security", false)
  63. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  64. remoteSyncOptions.grpcDialOption = grpcDialOption
  65. dir := *remoteSyncOptions.dir
  66. filerAddress := pb.ServerAddress(*remoteSyncOptions.filerAddress)
  67. filerSource := &source.FilerSource{}
  68. filerSource.DoInitialize(
  69. filerAddress.ToHttpAddress(),
  70. filerAddress.ToGrpcAddress(),
  71. "/", // does not matter
  72. *remoteSyncOptions.readChunkFromFiler,
  73. )
  74. if dir != "" {
  75. fmt.Printf("synchronize %s to remote storage...\n", dir)
  76. util.RetryForever("filer.remote.sync "+dir, func() error {
  77. return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
  78. }, func(err error) bool {
  79. if err != nil {
  80. glog.Errorf("synchronize %s: %v", dir, err)
  81. }
  82. return true
  83. })
  84. return true
  85. }
  86. return true
  87. }