filer_replication.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package command
  2. import (
  3. "context"
  4. "strings"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/replication"
  7. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  8. "github.com/seaweedfs/seaweedfs/weed/replication/sub"
  9. "github.com/seaweedfs/seaweedfs/weed/util"
  10. )
  11. func init() {
  12. cmdFilerReplicate.Run = runFilerReplicate // break init cycle
  13. }
  14. var cmdFilerReplicate = &Command{
  15. UsageLine: "filer.replicate",
  16. Short: "replicate file changes to another destination",
  17. Long: `replicate file changes to another destination
  18. filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content,
  19. and write to the other destination.
  20. Run "weed scaffold -config=replication" to generate a replication.toml file and customize the parameters.
  21. `,
  22. }
  23. func runFilerReplicate(cmd *Command, args []string) bool {
  24. util.LoadConfiguration("security", false)
  25. util.LoadConfiguration("replication", true)
  26. util.LoadConfiguration("notification", true)
  27. config := util.GetViper()
  28. var notificationInput sub.NotificationInput
  29. validateOneEnabledInput(config)
  30. for _, input := range sub.NotificationInputs {
  31. if config.GetBool("notification." + input.GetName() + ".enabled") {
  32. if err := input.Initialize(config, "notification."+input.GetName()+"."); err != nil {
  33. glog.Fatalf("Failed to initialize notification input for %s: %+v",
  34. input.GetName(), err)
  35. }
  36. glog.V(0).Infof("Configure notification input to %s", input.GetName())
  37. notificationInput = input
  38. break
  39. }
  40. }
  41. if notificationInput == nil {
  42. println("No notification is defined in notification.toml file.")
  43. println("Please follow 'weed scaffold -config=notification' to see example notification configurations.")
  44. return true
  45. }
  46. // avoid recursive replication
  47. if config.GetBool("notification.source.filer.enabled") && config.GetBool("notification.sink.filer.enabled") {
  48. if config.GetString("source.filer.grpcAddress") == config.GetString("sink.filer.grpcAddress") {
  49. fromDir := config.GetString("source.filer.directory")
  50. toDir := config.GetString("sink.filer.directory")
  51. if strings.HasPrefix(toDir, fromDir) {
  52. glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
  53. }
  54. }
  55. }
  56. dataSink := findSink(config)
  57. if dataSink == nil {
  58. println("no data sink configured in replication.toml:")
  59. for _, sk := range sink.Sinks {
  60. println(" " + sk.GetName())
  61. }
  62. return true
  63. }
  64. replicator := replication.NewReplicator(config, "source.filer.", dataSink)
  65. for {
  66. key, m, onSuccessFn, onFailureFn, err := notificationInput.ReceiveMessage()
  67. if err != nil {
  68. glog.Errorf("receive %s: %+v", key, err)
  69. if onFailureFn != nil {
  70. onFailureFn()
  71. }
  72. continue
  73. }
  74. if key == "" {
  75. // long poll received no messages
  76. if onSuccessFn != nil {
  77. onSuccessFn()
  78. }
  79. continue
  80. }
  81. if m.OldEntry != nil && m.NewEntry == nil {
  82. glog.V(1).Infof("delete: %s", key)
  83. } else if m.OldEntry == nil && m.NewEntry != nil {
  84. glog.V(1).Infof("add: %s", key)
  85. } else {
  86. glog.V(1).Infof("modify: %s", key)
  87. }
  88. if err = replicator.Replicate(context.Background(), key, m); err != nil {
  89. glog.Errorf("replicate %s: %+v", key, err)
  90. if onFailureFn != nil {
  91. onFailureFn()
  92. }
  93. } else {
  94. glog.V(1).Infof("replicated %s", key)
  95. if onSuccessFn != nil {
  96. onSuccessFn()
  97. }
  98. }
  99. }
  100. }
  101. func findSink(config *util.ViperProxy) sink.ReplicationSink {
  102. var dataSink sink.ReplicationSink
  103. for _, sk := range sink.Sinks {
  104. if config.GetBool("sink." + sk.GetName() + ".enabled") {
  105. if err := sk.Initialize(config, "sink."+sk.GetName()+"."); err != nil {
  106. glog.Fatalf("Failed to initialize sink for %s: %+v",
  107. sk.GetName(), err)
  108. }
  109. glog.V(0).Infof("Configure sink to %s", sk.GetName())
  110. dataSink = sk
  111. break
  112. }
  113. }
  114. return dataSink
  115. }
  116. func validateOneEnabledInput(config *util.ViperProxy) {
  117. enabledInput := ""
  118. for _, input := range sub.NotificationInputs {
  119. if config.GetBool("notification." + input.GetName() + ".enabled") {
  120. if enabledInput == "" {
  121. enabledInput = input.GetName()
  122. } else {
  123. glog.Fatalf("Notification input is enabled for both %s and %s", enabledInput, input.GetName())
  124. }
  125. }
  126. }
  127. }