traverse_bfs.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package remote_storage
  2. import (
  3. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  4. "github.com/seaweedfs/seaweedfs/weed/util"
  5. "sync"
  6. "time"
  7. )
  8. type ListDirectoryFunc func(parentDir util.FullPath, visitFn VisitFunc) error
  9. func TraverseBfs(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn VisitFunc) (err error) {
  10. K := 5
  11. var dirQueueWg sync.WaitGroup
  12. dirQueue := util.NewQueue()
  13. dirQueueWg.Add(1)
  14. dirQueue.Enqueue(parentPath)
  15. var isTerminating bool
  16. for i := 0; i < K; i++ {
  17. go func() {
  18. for {
  19. if isTerminating {
  20. break
  21. }
  22. t := dirQueue.Dequeue()
  23. if t == nil {
  24. time.Sleep(329 * time.Millisecond)
  25. continue
  26. }
  27. dir := t.(util.FullPath)
  28. processErr := processOneDirectory(listDirFn, dir, visitFn, dirQueue, &dirQueueWg)
  29. if processErr != nil {
  30. err = processErr
  31. }
  32. dirQueueWg.Done()
  33. }
  34. }()
  35. }
  36. dirQueueWg.Wait()
  37. isTerminating = true
  38. return
  39. }
  40. func processOneDirectory(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn VisitFunc, dirQueue *util.Queue, dirQueueWg *sync.WaitGroup) error {
  41. return listDirFn(parentPath, func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
  42. if err := visitFn(dir, name, isDirectory, remoteEntry); err != nil {
  43. return err
  44. }
  45. if !isDirectory {
  46. return nil
  47. }
  48. dirQueueWg.Add(1)
  49. dirQueue.Enqueue(parentPath.Child(name))
  50. return nil
  51. })
  52. }