123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- package command
- import (
- "context"
- "strings"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/replication"
- "github.com/seaweedfs/seaweedfs/weed/replication/sink"
- "github.com/seaweedfs/seaweedfs/weed/replication/sub"
- "github.com/seaweedfs/seaweedfs/weed/util"
- )
- func init() {
- cmdFilerReplicate.Run = runFilerReplicate // break init cycle
- }
- var cmdFilerReplicate = &Command{
- UsageLine: "filer.replicate",
- Short: "replicate file changes to another destination",
- Long: `replicate file changes to another destination
- filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content,
- and write to the other destination.
- Run "weed scaffold -config=replication" to generate a replication.toml file and customize the parameters.
- `,
- }
- func runFilerReplicate(cmd *Command, args []string) bool {
- util.LoadSecurityConfiguration()
- util.LoadConfiguration("replication", true)
- util.LoadConfiguration("notification", true)
- config := util.GetViper()
- var notificationInput sub.NotificationInput
- validateOneEnabledInput(config)
- for _, input := range sub.NotificationInputs {
- if config.GetBool("notification." + input.GetName() + ".enabled") {
- if err := input.Initialize(config, "notification."+input.GetName()+"."); err != nil {
- glog.Fatalf("Failed to initialize notification input for %s: %+v",
- input.GetName(), err)
- }
- glog.V(0).Infof("Configure notification input to %s", input.GetName())
- notificationInput = input
- break
- }
- }
- if notificationInput == nil {
- println("No notification is defined in notification.toml file.")
- println("Please follow 'weed scaffold -config=notification' to see example notification configurations.")
- return true
- }
- // avoid recursive replication
- if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
- if config.GetString("source.filer.grpcAddress") == config.GetString("sink.filer.grpcAddress") {
- fromDir := config.GetString("source.filer.directory")
- toDir := config.GetString("sink.filer.directory")
- if strings.HasPrefix(toDir, fromDir) {
- glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
- }
- }
- }
- dataSink := findSink(config)
- if dataSink == nil {
- println("no data sink configured in replication.toml:")
- for _, sk := range sink.Sinks {
- println(" " + sk.GetName())
- }
- return true
- }
- replicator := replication.NewReplicator(config, "source.filer.", dataSink)
- for {
- key, m, onSuccessFn, onFailureFn, err := notificationInput.ReceiveMessage()
- if err != nil {
- glog.Errorf("receive %s: %+v", key, err)
- if onFailureFn != nil {
- onFailureFn()
- }
- continue
- }
- if key == "" {
- // long poll received no messages
- if onSuccessFn != nil {
- onSuccessFn()
- }
- continue
- }
- if m.OldEntry != nil && m.NewEntry == nil {
- glog.V(1).Infof("delete: %s", key)
- } else if m.OldEntry == nil && m.NewEntry != nil {
- glog.V(1).Infof("add: %s", key)
- } else {
- glog.V(1).Infof("modify: %s", key)
- }
- if err = replicator.Replicate(context.Background(), key, m); err != nil {
- glog.Errorf("replicate %s: %+v", key, err)
- if onFailureFn != nil {
- onFailureFn()
- }
- } else {
- glog.V(1).Infof("replicated %s", key)
- if onSuccessFn != nil {
- onSuccessFn()
- }
- }
- }
- }
- func findSink(config *util.ViperProxy) sink.ReplicationSink {
- var dataSink sink.ReplicationSink
- for _, sk := range sink.Sinks {
- if config.GetBool("sink." + sk.GetName() + ".enabled") {
- if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
- glog.Fatalf("Failed to initialize sink for %s: %+v",
- sk.GetName(), err)
- }
- glog.V(0).Infof("Configure sink to %s", sk.GetName())
- dataSink = sk
- break
- }
- }
- return dataSink
- }
- func validateOneEnabledInput(config *util.ViperProxy) {
- enabledInput := ""
- for _, input := range sub.NotificationInputs {
- if config.GetBool("notification." + input.GetName() + ".enabled") {
- if enabledInput == "" {
- enabledInput = input.GetName()
- } else {
- glog.Fatalf("Notification input is enabled for both %s and %s", enabledInput, input.GetName())
- }
- }
- }
- }
|