filer_sync.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. package command
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/replication"
  10. "github.com/seaweedfs/seaweedfs/weed/replication/sink"
  11. "github.com/seaweedfs/seaweedfs/weed/replication/sink/filersink"
  12. "github.com/seaweedfs/seaweedfs/weed/replication/source"
  13. "github.com/seaweedfs/seaweedfs/weed/security"
  14. statsCollect "github.com/seaweedfs/seaweedfs/weed/stats"
  15. "github.com/seaweedfs/seaweedfs/weed/util"
  16. "github.com/seaweedfs/seaweedfs/weed/util/grace"
  17. "google.golang.org/grpc"
  18. "os"
  19. "regexp"
  20. "strings"
  21. "sync/atomic"
  22. "time"
  23. )
  24. type SyncOptions struct {
  25. isActivePassive *bool
  26. filerA *string
  27. filerB *string
  28. aPath *string
  29. aExcludePaths *string
  30. bPath *string
  31. bExcludePaths *string
  32. aReplication *string
  33. bReplication *string
  34. aCollection *string
  35. bCollection *string
  36. aTtlSec *int
  37. bTtlSec *int
  38. aDiskType *string
  39. bDiskType *string
  40. aDebug *bool
  41. bDebug *bool
  42. aFromTsMs *int64
  43. bFromTsMs *int64
  44. aProxyByFiler *bool
  45. bProxyByFiler *bool
  46. metricsHttpIp *string
  47. metricsHttpPort *int
  48. concurrency *int
  49. aDoDeleteFiles *bool
  50. bDoDeleteFiles *bool
  51. clientId int32
  52. clientEpoch atomic.Int32
  53. }
  54. const (
  55. SyncKeyPrefix = "sync."
  56. DefaultConcurrencyLimit = 32
  57. )
  58. var (
  59. syncOptions SyncOptions
  60. syncCpuProfile *string
  61. syncMemProfile *string
  62. )
  63. func init() {
  64. cmdFilerSynchronize.Run = runFilerSynchronize // break init cycle
  65. syncOptions.isActivePassive = cmdFilerSynchronize.Flag.Bool("isActivePassive", false, "one directional follow from A to B if true")
  66. syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster")
  67. syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster")
  68. syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A")
  69. syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A")
  70. syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B")
  71. syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B")
  72. syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A")
  73. syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B")
  74. syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A")
  75. syncOptions.bCollection = cmdFilerSynchronize.Flag.String("b.collection", "", "collection on filer B")
  76. syncOptions.aTtlSec = cmdFilerSynchronize.Flag.Int("a.ttlSec", 0, "ttl in seconds on filer A")
  77. syncOptions.bTtlSec = cmdFilerSynchronize.Flag.Int("b.ttlSec", 0, "ttl in seconds on filer B")
  78. syncOptions.aDiskType = cmdFilerSynchronize.Flag.String("a.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer A")
  79. syncOptions.bDiskType = cmdFilerSynchronize.Flag.String("b.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag on filer B")
  80. syncOptions.aProxyByFiler = cmdFilerSynchronize.Flag.Bool("a.filerProxy", false, "read and write file chunks by filer A instead of volume servers")
  81. syncOptions.bProxyByFiler = cmdFilerSynchronize.Flag.Bool("b.filerProxy", false, "read and write file chunks by filer B instead of volume servers")
  82. syncOptions.aDebug = cmdFilerSynchronize.Flag.Bool("a.debug", false, "debug mode to print out filer A received files")
  83. syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
  84. syncOptions.aFromTsMs = cmdFilerSynchronize.Flag.Int64("a.fromTsMs", 0, "synchronization from timestamp on filer A. The unit is millisecond")
  85. syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond")
  86. syncOptions.concurrency = cmdFilerSynchronize.Flag.Int("concurrency", DefaultConcurrencyLimit, "The maximum number of files that will be synced concurrently.")
  87. syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
  88. syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
  89. syncOptions.metricsHttpIp = cmdFilerSynchronize.Flag.String("metricsIp", "", "metrics listen ip")
  90. syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
  91. syncOptions.aDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("a.doDeleteFiles", true, "delete and update files when synchronizing on filer A")
  92. syncOptions.bDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("b.doDeleteFiles", true, "delete and update files when synchronizing on filer B")
  93. syncOptions.clientId = util.RandomInt32()
  94. }
  95. var cmdFilerSynchronize = &Command{
  96. UsageLine: "filer.sync -a=<oneFilerHost>:<oneFilerPort> -b=<otherFilerHost>:<otherFilerPort>",
  97. Short: "resumable continuous synchronization between two active-active or active-passive SeaweedFS clusters",
  98. Long: `resumable continuous synchronization for file changes between two active-active or active-passive filers
  99. filer.sync listens on filer notifications. If any file is updated, it will fetch the updated content,
  100. and write to the other destination. Different from filer.replicate:
  101. * filer.sync only works between two filers.
  102. * filer.sync does not need any special message queue setup.
  103. * filer.sync supports both active-active and active-passive modes.
  104. If restarted, the synchronization will resume from the previous checkpoints, persisted every minute.
  105. A fresh sync will start from the earliest metadata logs.
  106. `,
  107. }
  108. func runFilerSynchronize(cmd *Command, args []string) bool {
  109. util.LoadConfiguration("security", false)
  110. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
  111. grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
  112. filerA := pb.ServerAddress(*syncOptions.filerA)
  113. filerB := pb.ServerAddress(*syncOptions.filerB)
  114. // start filer.sync metrics server
  115. go statsCollect.StartMetricsServer(*syncOptions.metricsHttpIp, *syncOptions.metricsHttpPort)
  116. // read a filer signature
  117. aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
  118. if aFilerErr != nil {
  119. glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
  120. return true
  121. }
  122. // read b filer signature
  123. bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
  124. if bFilerErr != nil {
  125. glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
  126. return true
  127. }
  128. go func() {
  129. // a->b
  130. // set synchronization start timestamp to offset
  131. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath))
  132. if initOffsetError != nil {
  133. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
  134. os.Exit(2)
  135. }
  136. for {
  137. syncOptions.clientEpoch.Add(1)
  138. err := doSubscribeFilerMetaChanges(
  139. syncOptions.clientId,
  140. syncOptions.clientEpoch.Load(),
  141. grpcDialOption,
  142. filerA,
  143. *syncOptions.aPath,
  144. util.StringSplit(*syncOptions.aExcludePaths, ","),
  145. *syncOptions.aProxyByFiler,
  146. filerB,
  147. *syncOptions.bPath,
  148. *syncOptions.bReplication,
  149. *syncOptions.bCollection,
  150. *syncOptions.bTtlSec,
  151. *syncOptions.bProxyByFiler,
  152. *syncOptions.bDiskType,
  153. *syncOptions.bDebug,
  154. *syncOptions.concurrency,
  155. *syncOptions.bDoDeleteFiles,
  156. aFilerSignature,
  157. bFilerSignature)
  158. if err != nil {
  159. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
  160. time.Sleep(1747 * time.Millisecond)
  161. }
  162. }
  163. }()
  164. if !*syncOptions.isActivePassive {
  165. // b->a
  166. // set synchronization start timestamp to offset
  167. initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath))
  168. if initOffsetError != nil {
  169. glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
  170. os.Exit(2)
  171. }
  172. go func() {
  173. for {
  174. syncOptions.clientEpoch.Add(1)
  175. err := doSubscribeFilerMetaChanges(
  176. syncOptions.clientId,
  177. syncOptions.clientEpoch.Load(),
  178. grpcDialOption,
  179. filerB,
  180. *syncOptions.bPath,
  181. util.StringSplit(*syncOptions.bExcludePaths, ","),
  182. *syncOptions.bProxyByFiler,
  183. filerA,
  184. *syncOptions.aPath,
  185. *syncOptions.aReplication,
  186. *syncOptions.aCollection,
  187. *syncOptions.aTtlSec,
  188. *syncOptions.aProxyByFiler,
  189. *syncOptions.aDiskType,
  190. *syncOptions.aDebug,
  191. *syncOptions.concurrency,
  192. *syncOptions.aDoDeleteFiles,
  193. bFilerSignature,
  194. aFilerSignature)
  195. if err != nil {
  196. glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
  197. time.Sleep(2147 * time.Millisecond)
  198. }
  199. }
  200. }()
  201. }
  202. select {}
  203. return true
  204. }
  205. // initOffsetFromTsMs Initialize offset
  206. func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, sourceFilerSignature int32, fromTsMs int64, signaturePrefix string) error {
  207. if fromTsMs <= 0 {
  208. return nil
  209. }
  210. // convert to nanosecond
  211. fromTsNs := fromTsMs * 1000_000
  212. // If not successful, exit the program.
  213. setOffsetErr := setOffset(grpcDialOption, targetFiler, signaturePrefix, sourceFilerSignature, fromTsNs)
  214. if setOffsetErr != nil {
  215. return setOffsetErr
  216. }
  217. glog.Infof("setOffset from timestamp ms success! start offset: %d from %s to %s", fromTsNs, *syncOptions.filerA, *syncOptions.filerB)
  218. return nil
  219. }
  220. func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
  221. replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32) error {
  222. // if first time, start from now
  223. // if has previously synced, resume from that point of time
  224. sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
  225. if err != nil {
  226. return err
  227. }
  228. glog.V(0).Infof("start sync %s(%d) => %s(%d) from %v(%d)", sourceFiler, sourceFilerSignature, targetFiler, targetFilerSignature, time.Unix(0, sourceFilerOffsetTsNs), sourceFilerOffsetTsNs)
  229. // create filer sink
  230. filerSource := &source.FilerSource{}
  231. filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
  232. filerSink := &filersink.FilerSink{}
  233. filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
  234. filerSink.SetSourceFiler(filerSource)
  235. persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, nil, filerSink, doDeleteFiles, debug)
  236. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  237. message := resp.EventNotification
  238. for _, sig := range message.Signatures {
  239. if sig == targetFilerSignature && targetFilerSignature != 0 {
  240. fmt.Printf("%s skipping %s change to %v\n", targetFiler, sourceFiler, message)
  241. return nil
  242. }
  243. }
  244. return persistEventFn(resp)
  245. }
  246. if concurrency < 0 || concurrency > 1024 {
  247. glog.Warningf("invalid concurrency value, using default: %d", DefaultConcurrencyLimit)
  248. concurrency = DefaultConcurrencyLimit
  249. }
  250. processor := NewMetadataProcessor(processEventFn, concurrency, sourceFilerOffsetTsNs)
  251. var lastLogTsNs = time.Now().UnixNano()
  252. var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
  253. processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
  254. processor.AddSyncJob(resp)
  255. return nil
  256. }, 3*time.Second, func(counter int64, lastTsNs int64) error {
  257. offsetTsNs := processor.processedTsWatermark.Load()
  258. if offsetTsNs == 0 {
  259. return nil
  260. }
  261. // use processor.processedTsWatermark instead of the lastTsNs from the most recent job
  262. now := time.Now().UnixNano()
  263. glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
  264. lastLogTsNs = now
  265. // collect synchronous offset
  266. statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs))
  267. return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)
  268. })
  269. metadataFollowOption := &pb.MetadataFollowOption{
  270. ClientName: clientName,
  271. ClientId: clientId,
  272. ClientEpoch: clientEpoch,
  273. SelfSignature: targetFilerSignature,
  274. PathPrefix: sourcePath,
  275. AdditionalPathPrefixes: nil,
  276. DirectoriesToWatch: nil,
  277. StartTsNs: sourceFilerOffsetTsNs,
  278. StopTsNs: 0,
  279. EventErrorType: pb.RetryForeverOnError,
  280. }
  281. return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
  282. }
  283. // When each business is distinguished according to path, and offsets need to be maintained separately.
  284. func getSignaturePrefixByPath(path string) string {
  285. // compatible historical version
  286. if path == "/" {
  287. return SyncKeyPrefix
  288. } else {
  289. return SyncKeyPrefix + path
  290. }
  291. }
  292. func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
  293. readErr = pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  294. syncKey := []byte(signaturePrefix + "____")
  295. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  296. resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: syncKey})
  297. if err != nil {
  298. return err
  299. }
  300. if len(resp.Error) != 0 {
  301. return errors.New(resp.Error)
  302. }
  303. if len(resp.Value) < 8 {
  304. return nil
  305. }
  306. lastOffsetTsNs = int64(util.BytesToUint64(resp.Value))
  307. return nil
  308. })
  309. return
  310. }
  311. func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
  312. return pb.WithFilerClient(false, signature, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
  313. syncKey := []byte(signaturePrefix + "____")
  314. util.Uint32toBytes(syncKey[len(signaturePrefix):len(signaturePrefix)+4], uint32(signature))
  315. valueBuf := make([]byte, 8)
  316. util.Uint64toBytes(valueBuf, uint64(offsetTsNs))
  317. resp, err := client.KvPut(context.Background(), &filer_pb.KvPutRequest{
  318. Key: syncKey,
  319. Value: valueBuf,
  320. })
  321. if err != nil {
  322. return err
  323. }
  324. if len(resp.Error) != 0 {
  325. return errors.New(resp.Error)
  326. }
  327. return nil
  328. })
  329. }
  330. func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, reExcludeFileName *regexp.Regexp, dataSink sink.ReplicationSink, doDeleteFiles bool, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error {
  331. // process function
  332. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  333. message := resp.EventNotification
  334. var sourceOldKey, sourceNewKey util.FullPath
  335. if message.OldEntry != nil {
  336. sourceOldKey = util.FullPath(resp.Directory).Child(message.OldEntry.Name)
  337. }
  338. if message.NewEntry != nil {
  339. sourceNewKey = util.FullPath(message.NewParentPath).Child(message.NewEntry.Name)
  340. }
  341. if debug {
  342. glog.V(0).Infof("received %v", resp)
  343. }
  344. if isMultipartUploadDir(resp.Directory + "/") {
  345. return nil
  346. }
  347. if !strings.HasPrefix(resp.Directory, sourcePath) {
  348. return nil
  349. }
  350. for _, excludePath := range excludePaths {
  351. if strings.HasPrefix(resp.Directory, excludePath) {
  352. return nil
  353. }
  354. }
  355. if reExcludeFileName != nil && reExcludeFileName.MatchString(message.NewEntry.Name) {
  356. return nil
  357. }
  358. if dataSink.IsIncremental() {
  359. doDeleteFiles = false
  360. }
  361. // handle deletions
  362. if filer_pb.IsDelete(resp) {
  363. if !doDeleteFiles {
  364. return nil
  365. }
  366. if !strings.HasPrefix(string(sourceOldKey), sourcePath) {
  367. return nil
  368. }
  369. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  370. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  371. }
  372. // handle new entries
  373. if filer_pb.IsCreate(resp) {
  374. if !strings.HasPrefix(string(sourceNewKey), sourcePath) {
  375. return nil
  376. }
  377. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  378. if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
  379. return fmt.Errorf("create entry1 : %v", err)
  380. } else {
  381. return nil
  382. }
  383. }
  384. // this is something special?
  385. if filer_pb.IsEmpty(resp) {
  386. return nil
  387. }
  388. // handle updates
  389. if strings.HasPrefix(string(sourceOldKey), sourcePath) {
  390. // old key is in the watched directory
  391. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  392. // new key is also in the watched directory
  393. if doDeleteFiles {
  394. oldKey := util.Join(targetPath, string(sourceOldKey)[len(sourcePath):])
  395. message.NewParentPath = util.Join(targetPath, message.NewParentPath[len(sourcePath):])
  396. foundExisting, err := dataSink.UpdateEntry(string(oldKey), message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
  397. if foundExisting {
  398. return err
  399. }
  400. // not able to find old entry
  401. if err = dataSink.DeleteEntry(string(oldKey), message.OldEntry.IsDirectory, false, message.Signatures); err != nil {
  402. return fmt.Errorf("delete old entry %v: %v", oldKey, err)
  403. }
  404. }
  405. // create the new entry
  406. newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  407. if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
  408. return fmt.Errorf("create entry2 : %v", err)
  409. } else {
  410. return nil
  411. }
  412. } else {
  413. // new key is outside of the watched directory
  414. if doDeleteFiles {
  415. key := buildKey(dataSink, message, targetPath, sourceOldKey, sourcePath)
  416. return dataSink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
  417. }
  418. }
  419. } else {
  420. // old key is outside of the watched directory
  421. if strings.HasPrefix(string(sourceNewKey), sourcePath) {
  422. // new key is in the watched directory
  423. key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
  424. if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
  425. return fmt.Errorf("create entry3 : %v", err)
  426. } else {
  427. return nil
  428. }
  429. } else {
  430. // new key is also outside of the watched directory
  431. // skip
  432. }
  433. }
  434. return nil
  435. }
  436. return processEventFn
  437. }
  438. func buildKey(dataSink sink.ReplicationSink, message *filer_pb.EventNotification, targetPath string, sourceKey util.FullPath, sourcePath string) (key string) {
  439. if !dataSink.IsIncremental() {
  440. key = util.Join(targetPath, string(sourceKey)[len(sourcePath):])
  441. } else {
  442. var mTime int64
  443. if message.NewEntry != nil {
  444. mTime = message.NewEntry.Attributes.Mtime
  445. } else if message.OldEntry != nil {
  446. mTime = message.OldEntry.Attributes.Mtime
  447. }
  448. dateKey := time.Unix(mTime, 0).Format("2006-01-02")
  449. key = util.Join(targetPath, dateKey, string(sourceKey)[len(sourcePath):])
  450. }
  451. return escapeKey(key)
  452. }