filer_sync.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/pb"
  8. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  9. "github.com/chrislusf/seaweedfs/weed/replication"
  10. "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink"
  11. "github.com/chrislusf/seaweedfs/weed/replication/source"
  12. "github.com/chrislusf/seaweedfs/weed/security"
  13. "github.com/chrislusf/seaweedfs/weed/util"
  14. "github.com/chrislusf/seaweedfs/weed/util/grace"
  15. "google.golang.org/grpc"
  16. "io"
  17. "strings"
  18. "time"
  19. )
  20. type SyncOptions struct {
  21. isActivePassive *bool
  22. filerA *string
  23. filerB *string
  24. aPath *string
  25. bPath *string
  26. aReplication *string
  27. bReplication *string
  28. aCollection *string
  29. bCollection *string
  30. aTtlSec *int
  31. bTtlSec *int
  32. aDiskType *string
  33. bDiskType *string
  34. aDebug *bool
  35. bDebug *bool
  36. aProxyByFiler *bool
  37. bProxyByFiler *bool
  38. }
  39. var (
  40. syncOptions SyncOptions
  41. syncCpuProfile *string
  42. syncMemProfile *string
  43. )
  44. func init() {
  45. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  46. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  47. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  48. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  49. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  50. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  51. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  52. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  53. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  54. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  55. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  56. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  57. syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd] hard drive or solid state drive on filer A")
  58. syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd] hard drive or solid state drive on filer B")
  59. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  60. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  61. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  62. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  63. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  64. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  65. }
  66. var cmdFilerSynchronize = &Command{
  67. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  68. Short: "resumeable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  69. Long: `resumeable continuous synchronization for file changes between two active-active or active-passive filers
  70. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  71. and write to the other destination. Different from filer.replicate:
  72. * filer.sync only works between two filers.
  73. * filer.sync does not need any special message queue setup.
  74. * filer.sync supports both active-active and active-passive modes.
  75. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  76. A fresh sync will start from the earliest metadata logs.
  77. `,
  78. }
  79. func runFilerSynchronize(cmd *Command, args []string) bool {
  80. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  81. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  82. go func() {
  83. for {
  84. err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, *syncOptions.filerB,
  85. *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
  86. if err != nil {
  87. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  88. time.Sleep(1747 * time.Millisecond)
  89. }
  90. }
  91. }()
  92. if !*syncOptions.isActivePassive {
  93. go func() {
  94. for {
  95. err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, *syncOptions.filerA,
  96. *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
  97. if err != nil {
  98. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  99. time.Sleep(2147 * time.Millisecond)
  100. }
  101. }
  102. }()
  103. }
  104. select {}
  105. return true
  106. }
  107. func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string,
  108. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
  109. // read source filer signature
  110. sourceFilerSignature, sourceErr := replication.ReadFilerSignature(grpcDialOption, sourceFiler)
  111. if sourceErr != nil {
  112. return sourceErr
  113. }
  114. // read target filer signature
  115. targetFilerSignature, targetErr := replication.ReadFilerSignature(grpcDialOption, targetFiler)
  116. if targetErr != nil {
  117. return targetErr
  118. }
  119. // if first time, start from now
  120. // if has previously synced, resume from that point of time
  121. sourceFilerOffsetTsNs, err := readSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature)
  122. if err != nil {
  123. return err
  124. }
  125. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  126. // create filer sink
  127. filerSource := &source.FilerSource{}
  128. filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler)
  129. filerSink := &filersink.FilerSink{}
  130. filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
  131. filerSink.SetSourceFiler(filerSource)
  132. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  133. message := resp.EventNotification
  134. var sourceOldKey, sourceNewKey util.FullPath
  135. if message.OldEntry != nil {
  136. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  137. }
  138. if message.NewEntry != nil {
  139. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  140. }
  141. for _, sig := range message.Signatures {
  142. if sig == targetFilerSignature && targetFilerSignature != 0 {
  143. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  144. return nil
  145. }
  146. }
  147. if debug {
  148. fmt.Printf("%s check %s change %s,%s sig %v, target sig: %v\n", targetFiler, sourceFiler, sourceOldKey, sourceNewKey, message.Signatures, targetFilerSignature)
  149. }
  150. if !strings.HasPrefix(resp.Directory, sourcePath) {
  151. return nil
  152. }
  153. // handle deletions
  154. if message.OldEntry != nil && message.NewEntry == nil {
  155. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  156. return nil
  157. }
  158. key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  159. return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  160. }
  161. // handle new entries
  162. if message.OldEntry == nil && message.NewEntry != nil {
  163. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  164. return nil
  165. }
  166. key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
  167. return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
  168. }
  169. // this is something special?
  170. if message.OldEntry == nil && message.NewEntry == nil {
  171. return nil
  172. }
  173. // handle updates
  174. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  175. // old key is in the watched directory
  176. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  177. // new key is also in the watched directory
  178. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  179. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  180. foundExisting, err := filerSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  181. if foundExisting {
  182. return err
  183. }
  184. // not able to find old entry
  185. if err = filerSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  186. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  187. }
  188. // create the new entry
  189. newKey := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
  190. return filerSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
  191. } else {
  192. // new key is outside of the watched directory
  193. key := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  194. return filerSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  195. }
  196. } else {
  197. // old key is outside of the watched directory
  198. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  199. // new key is in the watched directory
  200. key := util.Join(targetPath, string(sourceNewKey)[len(sourcePath):])
  201. return filerSink.CreateEntry(key, message.NewEntry, message.Signatures)
  202. } else {
  203. // new key is also outside of the watched directory
  204. // skip
  205. }
  206. }
  207. return nil
  208. }
  209. return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  210. ctx, cancel := context.WithCancel(context.Background())
  211. defer cancel()
  212. stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
  213. ClientName: "syncTo_" + targetFiler,
  214. PathPrefix: sourcePath,
  215. SinceNs: sourceFilerOffsetTsNs,
  216. Signature: targetFilerSignature,
  217. })
  218. if err != nil {
  219. return fmt.Errorf("listen: %v", err)
  220. }
  221. var counter int64
  222. var lastWriteTime time.Time
  223. for {
  224. resp, listenErr := stream.Recv()
  225. if listenErr == io.EOF {
  226. return nil
  227. }
  228. if listenErr != nil {
  229. return listenErr
  230. }
  231. if err := processEventFn(resp); err != nil {
  232. return err
  233. }
  234. counter++
  235. if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
  236. glog.V(0).Infof("sync %s => %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
  237. counter = 0
  238. lastWriteTime = time.Now()
  239. if err := writeSyncOffset(grpcDialOption, targetFiler, sourceFilerSignature, resp.TsNs); err != nil {
  240. return err
  241. }
  242. }
  243. }
  244. })
  245. }
  246. const (
  247. SyncKeyPrefix = "sync."
  248. )
  249. func readSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32) (lastOffsetTsNs int64, readErr error) {
  250. readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  251. syncKey := []byte(SyncKeyPrefix + "____")
  252. util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
  253. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  254. if err != nil {
  255. return err
  256. }
  257. if len(resp.Error) != 0 {
  258. return errors.New(resp.Error)
  259. }
  260. if len(resp.Value) < 8 {
  261. return nil
  262. }
  263. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  264. return nil
  265. })
  266. return
  267. }
  268. func writeSyncOffset(grpcDialOption grpc.DialOption, filer string, filerSignature int32, offsetTsNs int64) error {
  269. return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  270. syncKey := []byte(SyncKeyPrefix + "____")
  271. util.Uint32toBytes(syncKey[len(SyncKeyPrefix):len(SyncKeyPrefix)+4], uint32(filerSignature))
  272. valueBuf := make([]byte, 8)
  273. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  274. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  275. Key: syncKey,
  276. Value: valueBuf,
  277. })
  278. if err != nil {
  279. return err
  280. }
  281. if len(resp.Error) != 0 {
  282. return errors.New(resp.Error)
  283. }
  284. return nil
  285. })
  286. }