123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- package command
- import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "sync"
- "sync/atomic"
- )
- type MetadataProcessor struct {
- activeJobs map[int64]*filer_pb.SubscribeMetadataResponse
- activeJobsLock sync.Mutex
- activeJobsCond *sync.Cond
- concurrencyLimit int
- fn pb.ProcessMetadataFunc
- processedTsWatermark atomic.Int64
- }
- func NewMetadataProcessor(fn pb.ProcessMetadataFunc, concurrency int, offsetTsNs int64) *MetadataProcessor {
- t := &MetadataProcessor{
- fn: fn,
- activeJobs: make(map[int64]*filer_pb.SubscribeMetadataResponse),
- concurrencyLimit: concurrency,
- }
- t.processedTsWatermark.Store(offsetTsNs)
- t.activeJobsCond = sync.NewCond(&t.activeJobsLock)
- return t
- }
- func (t *MetadataProcessor) AddSyncJob(resp *filer_pb.SubscribeMetadataResponse) {
- if filer_pb.IsEmpty(resp) {
- return
- }
- t.activeJobsLock.Lock()
- defer t.activeJobsLock.Unlock()
- for len(t.activeJobs) >= t.concurrencyLimit || t.conflictsWith(resp) {
- t.activeJobsCond.Wait()
- }
- t.activeJobs[resp.TsNs] = resp
- go func() {
- if err := util.Retry("metadata processor", func() error {
- return t.fn(resp)
- }); err != nil {
- glog.Errorf("process %v: %v", resp, err)
- }
- t.activeJobsLock.Lock()
- defer t.activeJobsLock.Unlock()
- delete(t.activeJobs, resp.TsNs)
- // if is the oldest job, write down the watermark
- isOldest := true
- for t := range t.activeJobs {
- if resp.TsNs > t {
- isOldest = false
- break
- }
- }
- if isOldest {
- t.processedTsWatermark.Store(resp.TsNs)
- }
- t.activeJobsCond.Signal()
- }()
- }
- func (t *MetadataProcessor) conflictsWith(resp *filer_pb.SubscribeMetadataResponse) bool {
- for _, r := range t.activeJobs {
- if shouldWaitFor(resp, r) {
- return true
- }
- }
- return false
- }
- // a is one possible job to schedule
- // b is one existing active job
- func shouldWaitFor(a *filer_pb.SubscribeMetadataResponse, b *filer_pb.SubscribeMetadataResponse) bool {
- aPath, aNewPath, aIsDirectory := extractPathsFromMetadata(a)
- bPath, bNewPath, bIsDirectory := extractPathsFromMetadata(b)
- if pairShouldWaitFor(aPath, bPath, aIsDirectory, bIsDirectory) {
- return true
- }
- if aNewPath != "" {
- if pairShouldWaitFor(aNewPath, bPath, aIsDirectory, bIsDirectory) {
- return true
- }
- }
- if bNewPath != "" {
- if pairShouldWaitFor(aPath, bNewPath, aIsDirectory, bIsDirectory) {
- return true
- }
- }
- if aNewPath != "" && bNewPath != "" {
- if pairShouldWaitFor(aNewPath, bNewPath, aIsDirectory, bIsDirectory) {
- return true
- }
- }
- return false
- }
- func pairShouldWaitFor(aPath, bPath util.FullPath, aIsDirectory, bIsDirectory bool) bool {
- if bIsDirectory {
- if aIsDirectory {
- return aPath.IsUnder(bPath) || bPath.IsUnder(aPath)
- } else {
- return aPath.IsUnder(bPath)
- }
- } else {
- if aIsDirectory {
- return bPath.IsUnder(aPath)
- } else {
- return aPath == bPath
- }
- }
- }
- func extractPathsFromMetadata(resp *filer_pb.SubscribeMetadataResponse) (path, newPath util.FullPath, isDirectory bool) {
- oldEntry := resp.EventNotification.OldEntry
- newEntry := resp.EventNotification.NewEntry
- // create
- if filer_pb.IsCreate(resp) {
- path = util.FullPath(resp.Directory).Child(newEntry.Name)
- isDirectory = newEntry.IsDirectory
- return
- }
- if filer_pb.IsDelete(resp) {
- path = util.FullPath(resp.Directory).Child(oldEntry.Name)
- isDirectory = oldEntry.IsDirectory
- return
- }
- if filer_pb.IsUpdate(resp) {
- path = util.FullPath(resp.Directory).Child(newEntry.Name)
- isDirectory = newEntry.IsDirectory
- return
- }
- // renaming
- path = util.FullPath(resp.Directory).Child(oldEntry.Name)
- isDirectory = oldEntry.IsDirectory
- newPath = util.FullPath(resp.EventNotification.NewParentPath).Child(newEntry.Name)
- return
- }
|