filer_sync.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/replication"
  10. "github.com/chrislusf/seaweedfs/weed/replication/sink"
  11. "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
  12. "github.com/chrislusf/seaweedfs/weed/replication/source"
  13. "github.com/chrislusf/seaweedfs/weed/security"
  14. "github.com/chrislusf/seaweedfs/weed/util"
  15. "github.com/chrislusf/seaweedfs/weed/util/grace"
  16. "google.golang.org/grpc"
  17. "strings"
  18. "time"
  19. )
  20. type SyncOptions struct {
  21. isActivePassive *bool
  22. filerA *string
  23. filerB *string
  24. aPath *string
  25. bPath *string
  26. aReplication *string
  27. bReplication *string
  28. aCollection *string
  29. bCollection *string
  30. aTtlSec *int
  31. bTtlSec *int
  32. aDiskType *string
  33. bDiskType *string
  34. aDebug *bool
  35. bDebug *bool
  36. aProxyByFiler *bool
  37. bProxyByFiler *bool
  38. clientId int32
  39. }
  40. var (
  41. syncOptions SyncOptions
  42. syncCpuProfile *string
  43. syncMemProfile *string
  44. )
  45. func init() {
  46. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  47. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  48. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  49. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  50. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  51. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  52. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  53. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  54. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  55. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  56. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  57. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  58. syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
  59. syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
  60. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  61. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  62. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  63. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  64. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  65. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  66. syncOptions.clientId = util.RandomInt32()
  67. }
  68. var cmdFilerSynchronize = &Command{
  69. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  70. Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  71. Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
  72. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  73. and write to the other destination. Different from filer.replicate:
  74. * filer.sync only works between two filers.
  75. * filer.sync does not need any special message queue setup.
  76. * filer.sync supports both active-active and active-passive modes.
  77. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  78. A fresh sync will start from the earliest metadata logs.
  79. `,
  80. }
  81. func runFilerSynchronize(cmd *Command, args []string) bool {
  82. util.LoadConfiguration("security", false)
  83. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  84. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  85. filerA := pb.ServerAddress(*syncOptions.filerA)
  86. filerB := pb.ServerAddress(*syncOptions.filerB)
  87. go func() {
  88. for {
  89. err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
  90. *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
  91. if err != nil {
  92. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  93. time.Sleep(1747 * time.Millisecond)
  94. }
  95. }
  96. }()
  97. if !*syncOptions.isActivePassive {
  98. go func() {
  99. for {
  100. err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
  101. *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
  102. if err != nil {
  103. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  104. time.Sleep(2147 * time.Millisecond)
  105. }
  106. }
  107. }()
  108. }
  109. select {}
  110. return true
  111. }
  112. func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
  113. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
  114. // read source filer signature
  115. sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
  116. if sourceErr != nil {
  117. return sourceErr
  118. }
  119. // read target filer signature
  120. targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler)
  121. if targetErr != nil {
  122. return targetErr
  123. }
  124. // if first time, start from now
  125. // if has previously synced, resume from that point of time
  126. sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature)
  127. if err != nil {
  128. return err
  129. }
  130. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  131. // create filer sink
  132. filerSource := &source.FilerSource{}
  133. filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
  134. filerSink := &filersink.FilerSink{}
  135. filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
  136. filerSink.SetSourceFiler(filerSource)
  137. persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
  138. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  139. message := resp.EventNotification
  140. for _, sig := range message.Signatures {
  141. if sig == targetFilerSignature && targetFilerSignature != 0 {
  142. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  143. return nil
  144. }
  145. }
  146. return persistEventFn(resp)
  147. }
  148. var lastLogTsNs = time.Now().Nanosecond()
  149. processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
  150. now := time.Now().Nanosecond()
  151. glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  152. lastLogTsNs = now
  153. return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
  154. })
  155. return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId,
  156. sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
  157. }
  158. const (
  159. SyncKeyPrefix = "sync."
  160. )
  161. func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
  162. readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  163. syncKey := []byte(signaturePrefix + "____")
  164. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  165. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  166. if err != nil {
  167. return err
  168. }
  169. if len(resp.Error) != 0 {
  170. return errors.New(resp.Error)
  171. }
  172. if len(resp.Value) < 8 {
  173. return nil
  174. }
  175. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  176. return nil
  177. })
  178. return
  179. }
  180. func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
  181. return pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  182. syncKey := []byte(signaturePrefix + "____")
  183. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  184. valueBuf := make([]byte, 8)
  185. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  186. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  187. Key: syncKey,
  188. Value: valueBuf,
  189. })
  190. if err != nil {
  191. return err
  192. }
  193. if len(resp.Error) != 0 {
  194. return errors.New(resp.Error)
  195. }
  196. return nil
  197. })
  198. }
  199. func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
  200. // process function
  201. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  202. message := resp.EventNotification
  203. var sourceOldKey, sourceNewKey util.FullPath
  204. if message.OldEntry != nil {
  205. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  206. }
  207. if message.NewEntry != nil {
  208. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  209. }
  210. if debug {
  211. glog.V(0).Infof("received %v", resp)
  212. }
  213. if !strings.HasPrefix(resp.Directory, sourcePath) {
  214. return nil
  215. }
  216. // handle deletions
  217. if filer_pb.IsDelete(resp) {
  218. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  219. return nil
  220. }
  221. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  222. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  223. }
  224. // handle new entries
  225. if filer_pb.IsCreate(resp) {
  226. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  227. return nil
  228. }
  229. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  230. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  231. }
  232. // this is something special?
  233. if filer_pb.IsEmpty(resp) {
  234. return nil
  235. }
  236. // handle updates
  237. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  238. // old key is in the watched directory
  239. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  240. // new key is also in the watched directory
  241. if !dataSink.IsIncremental() {
  242. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  243. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  244. foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  245. if foundExisting {
  246. return err
  247. }
  248. // not able to find old entry
  249. if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  250. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  251. }
  252. }
  253. // create the new entry
  254. newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  255. return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
  256. } else {
  257. // new key is outside of the watched directory
  258. if !dataSink.IsIncremental() {
  259. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  260. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  261. }
  262. }
  263. } else {
  264. // old key is outside of the watched directory
  265. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  266. // new key is in the watched directory
  267. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  268. return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
  269. } else {
  270. // new key is also outside of the watched directory
  271. // skip
  272. }
  273. }
  274. return nil
  275. }
  276. return processEventFn
  277. }
  278. func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) {
  279. if !dataSink.IsIncremental() {
  280. key = util.Join(targetPath, string(sourceKey)[len(sourcePath):])
  281. } else {
  282. var mTime int64
  283. if message.NewEntry != nil {
  284. mTime = message.NewEntry.Attributes.Mtime
  285. } else if message.OldEntry != nil {
  286. mTime = message.OldEntry.Attributes.Mtime
  287. }
  288. dateKey := time.Unix(mTime, 0).Format("2006-01-02")
  289. key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
  290. }
  291. return escapeKey(key)
  292. }