filer_sync_jobs.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package command
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. "sync"
  8. "sync/atomic"
  9. )
  10. type MetadataProcessor struct {
  11. activeJobs map[int64]*filer_pb.SubscribeMetadataResponse
  12. activeJobsLock sync.Mutex
  13. activeJobsCond *sync.Cond
  14. concurrencyLimit int
  15. fn pb.ProcessMetadataFunc
  16. processedTsWatermark atomic.Int64
  17. }
  18. func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
  19. t := &MetadataProcessor{
  20. fn: fn,
  21. activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
  22. concurrencyLimit: concurrency,
  23. }
  24. t.processedTsWatermark.Store(offsetTsNs)
  25. t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
  26. return t
  27. }
  28. func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
  29. if filer_pb.IsEmpty(resp) {
  30. return
  31. }
  32. t.activeJobsLock.Lock()
  33. defer t.activeJobsLock.Unlock()
  34. for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
  35. t.activeJobsCond.Wait()
  36. }
  37. t.activeJobs[resp.TsNs] = resp
  38. go func() {
  39. if err := util.Retry("metadata processor", func() error {
  40. return t.fn(resp)
  41. }); err != nil {
  42. glog.Errorf("process %v: %v", resp, err)
  43. }
  44. t.activeJobsLock.Lock()
  45. defer t.activeJobsLock.Unlock()
  46. delete(t.activeJobs, resp.TsNs)
  47. // if is the oldest job, write down the watermark
  48. isOldest := true
  49. for t := range t.activeJobs {
  50. if resp.TsNs > t {
  51. isOldest = false
  52. break
  53. }
  54. }
  55. if isOldest {
  56. t.processedTsWatermark.Store(resp.TsNs)
  57. }
  58. t.activeJobsCond.Signal()
  59. }()
  60. }
  61. func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
  62. for _, r := range t.activeJobs {
  63. if shouldWaitFor(resp, r) {
  64. return true
  65. }
  66. }
  67. return false
  68. }
  69. // a is one possible job to schedule
  70. // b is one existing active job
  71. func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool {
  72. aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a)
  73. bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b)
  74. if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) {
  75. return true
  76. }
  77. if aNewPath != "" {
  78. if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) {
  79. return true
  80. }
  81. }
  82. if bNewPath != "" {
  83. if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) {
  84. return true
  85. }
  86. }
  87. if aNewPath != "" && bNewPath != "" {
  88. if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) {
  89. return true
  90. }
  91. }
  92. return false
  93. }
  94. func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool {
  95. if bIsDirectory {
  96. if aIsDirectory {
  97. return aPath.IsUnder(bPath) || bPath.IsUnder(aPath)
  98. } else {
  99. return aPath.IsUnder(bPath)
  100. }
  101. } else {
  102. if aIsDirectory {
  103. return bPath.IsUnder(aPath)
  104. } else {
  105. return aPath == bPath
  106. }
  107. }
  108. }
  109. func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) {
  110. oldEntry := resp.EventNotification.OldEntry
  111. newEntry := resp.EventNotification.NewEntry
  112. // create
  113. if filer_pb.IsCreate(resp) {
  114. path = util.FullPath(resp.Directory).Child(newEntry.Name)
  115. isDirectory = newEntry.IsDirectory
  116. return
  117. }
  118. if filer_pb.IsDelete(resp) {
  119. path = util.FullPath(resp.Directory).Child(oldEntry.Name)
  120. isDirectory = oldEntry.IsDirectory
  121. return
  122. }
  123. if filer_pb.IsUpdate(resp) {
  124. path = util.FullPath(resp.Directory).Child(newEntry.Name)
  125. isDirectory = newEntry.IsDirectory
  126. return
  127. }
  128. // renaming
  129. path = util.FullPath(resp.Directory).Child(oldEntry.Name)
  130. isDirectory = oldEntry.IsDirectory
  131. newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
  132. return
  133. }