filer_sync_jobs.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package command
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/glog"
  4. "github.com/seaweedfs/seaweedfs/weed/pb"
  5. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. "sync"
  8. )
  9. type MetadataProcessor struct {
  10. activeJobs map[int64]*filer_pb.SubscribeMetadataResponse
  11. activeJobsLock sync.Mutex
  12. activeJobsCond *sync.Cond
  13. concurrencyLimit int
  14. fn pb.ProcessMetadataFunc
  15. processedTsWatermark int64
  16. }
  17. func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int) *MetadataProcessor {
  18. t := &MetadataProcessor{
  19. fn: fn,
  20. activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
  21. concurrencyLimit: concurrency,
  22. }
  23. t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
  24. return t
  25. }
  26. func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
  27. if filer_pb.IsEmpty(resp) {
  28. return
  29. }
  30. t.activeJobsLock.Lock()
  31. defer t.activeJobsLock.Unlock()
  32. for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
  33. t.activeJobsCond.Wait()
  34. }
  35. t.activeJobs[resp.TsNs] = resp
  36. go func() {
  37. if err := util.Retry("metadata processor", func() error {
  38. return t.fn(resp)
  39. }); err != nil {
  40. glog.Errorf("process %v: %v", resp, err)
  41. }
  42. t.activeJobsLock.Lock()
  43. defer t.activeJobsLock.Unlock()
  44. delete(t.activeJobs, resp.TsNs)
  45. // if is the oldest job, write down the watermark
  46. isOldest := true
  47. for t := range t.activeJobs {
  48. if resp.TsNs > t {
  49. isOldest = false
  50. break
  51. }
  52. }
  53. if isOldest {
  54. t.processedTsWatermark = resp.TsNs
  55. }
  56. t.activeJobsCond.Signal()
  57. }()
  58. }
  59. func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
  60. for _, r := range t.activeJobs {
  61. if shouldWaitFor(resp, r) {
  62. return true
  63. }
  64. }
  65. return false
  66. }
  67. // a is one possible job to schedule
  68. // b is one existing active job
  69. func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool {
  70. aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a)
  71. bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b)
  72. if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) {
  73. return true
  74. }
  75. if aNewPath != "" {
  76. if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) {
  77. return true
  78. }
  79. }
  80. if bNewPath != "" {
  81. if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) {
  82. return true
  83. }
  84. }
  85. if aNewPath != "" && bNewPath != "" {
  86. if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) {
  87. return true
  88. }
  89. }
  90. return false
  91. }
  92. func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool {
  93. if bIsDirectory {
  94. if aIsDirectory {
  95. return aPath.IsUnder(bPath) || bPath.IsUnder(aPath)
  96. } else {
  97. return aPath.IsUnder(bPath)
  98. }
  99. } else {
  100. if aIsDirectory {
  101. return bPath.IsUnder(aPath)
  102. } else {
  103. return aPath == bPath
  104. }
  105. }
  106. }
  107. func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) {
  108. oldEntry := resp.EventNotification.OldEntry
  109. newEntry := resp.EventNotification.NewEntry
  110. // create
  111. if filer_pb.IsCreate(resp) {
  112. path = util.FullPath(resp.Directory).Child(newEntry.Name)
  113. isDirectory = newEntry.IsDirectory
  114. return
  115. }
  116. if filer_pb.IsDelete(resp) {
  117. path = util.FullPath(resp.Directory).Child(oldEntry.Name)
  118. isDirectory = oldEntry.IsDirectory
  119. return
  120. }
  121. if filer_pb.IsUpdate(resp) {
  122. path = util.FullPath(resp.Directory).Child(newEntry.Name)
  123. isDirectory = newEntry.IsDirectory
  124. return
  125. }
  126. // renaming
  127. path = util.FullPath(resp.Directory).Child(oldEntry.Name)
  128. isDirectory = oldEntry.IsDirectory
  129. newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
  130. return
  131. }