filer_remote_gateway.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package command
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/pb"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  10. "github.com/seaweedfs/seaweedfs/weed/security"
  11. "github.com/seaweedfs/seaweedfs/weed/util"
  12. "google.golang.org/grpc"
  13. "os"
  14. "time"
  15. )
  16. type RemoteGatewayOptions struct {
  17. filerAddress *string
  18. grpcDialOption grpc.DialOption
  19. readChunkFromFiler *bool
  20. timeAgo *time.Duration
  21. createBucketAt *string
  22. createBucketRandomSuffix *bool
  23. include *string
  24. exclude *string
  25. mappings *remote_pb.RemoteStorageMapping
  26. remoteConfs map[string]*remote_pb.RemoteConf
  27. bucketsDir string
  28. clientId int32
  29. clientEpoch int32
  30. }
  31. var _ = filer_pb.FilerClient(&RemoteGatewayOptions{})
  32. func (option *RemoteGatewayOptions) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
  33. return pb.WithFilerClient(streamingMode, option.clientId, pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  34. return fn(client)
  35. })
  36. }
  37. func (option *RemoteGatewayOptions) AdjustedUrl(location *filer_pb.Location) string {
  38. return location.Url
  39. }
  40. func (option *RemoteGatewayOptions) GetDataCenter() string {
  41. return ""
  42. }
  43. var (
  44. remoteGatewayOptions RemoteGatewayOptions
  45. )
  46. func init() {
  47. cmdFilerRemoteGateway.Run = runFilerRemoteGateway // break init cycle
  48. remoteGatewayOptions.filerAddress = cmdFilerRemoteGateway.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
  49. remoteGatewayOptions.createBucketAt = cmdFilerRemoteGateway.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
  50. remoteGatewayOptions.createBucketRandomSuffix = cmdFilerRemoteGateway.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts")
  51. remoteGatewayOptions.readChunkFromFiler = cmdFilerRemoteGateway.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
  52. remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
  53. remoteGatewayOptions.include = cmdFilerRemoteGateway.Flag.String("include", "", "pattens of new bucket names, e.g., s3*")
  54. remoteGatewayOptions.exclude = cmdFilerRemoteGateway.Flag.String("exclude", "", "pattens of new bucket names, e.g., local*")
  55. remoteGatewayOptions.clientId = util.RandomInt32()
  56. }
  57. var cmdFilerRemoteGateway = &Command{
  58. UsageLine: "filer.remote.gateway",
  59. Short: "resumable continuously write back bucket creation, deletion, and other local updates to remote object store",
  60. Long: `resumable continuously write back bucket creation, deletion, and other local updates to remote object store
  61. filer.remote.gateway listens on filer local buckets update events.
  62. If any bucket is created, deleted, or updated, it will mirror the changes to remote object store.
  63. weed filer.remote.gateway -createBucketAt=cloud1
  64. `,
  65. }
  66. func runFilerRemoteGateway(cmd *Command, args []string) bool {
  67. util.LoadSecurityConfiguration()
  68. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  69. remoteGatewayOptions.grpcDialOption = grpcDialOption
  70. filerAddress := pb.ServerAddress(*remoteGatewayOptions.filerAddress)
  71. filerSource := &source.FilerSource{}
  72. filerSource.DoInitialize(
  73. filerAddress.ToHttpAddress(),
  74. filerAddress.ToGrpcAddress(),
  75. "/", // does not matter
  76. *remoteGatewayOptions.readChunkFromFiler,
  77. )
  78. remoteGatewayOptions.bucketsDir = "/buckets"
  79. // check buckets again
  80. remoteGatewayOptions.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
  81. resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
  82. if err != nil {
  83. return err
  84. }
  85. remoteGatewayOptions.bucketsDir = resp.DirBuckets
  86. return nil
  87. })
  88. // read filer remote storage mount mappings
  89. if detectErr := remoteGatewayOptions.collectRemoteStorageConf(); detectErr != nil {
  90. fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr)
  91. return true
  92. }
  93. // synchronize /buckets folder
  94. fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir)
  95. util.RetryUntil("filer.remote.sync buckets", func() error {
  96. return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource)
  97. }, func(err error) bool {
  98. if err != nil {
  99. glog.Errorf("synchronize %s: %v", remoteGatewayOptions.bucketsDir, err)
  100. }
  101. return true
  102. })
  103. return true
  104. }