filer_client_bfs.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package filer_pb
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/chrislusf/seaweedfs/weed/util"
  7. )
  8. func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
  9. K := 5
  10. var jobQueueWg sync.WaitGroup
  11. queue := util.NewQueue()
  12. jobQueueWg.Add(1)
  13. queue.Enqueue(parentPath)
  14. var isTerminating bool
  15. for i := 0; i < K; i++ {
  16. go func() {
  17. for {
  18. if isTerminating {
  19. break
  20. }
  21. t := queue.Dequeue()
  22. if t == nil {
  23. time.Sleep(329 * time.Millisecond)
  24. continue
  25. }
  26. dir := t.(util.FullPath)
  27. processErr := processOneDirectory(filerClient, dir, queue, &jobQueueWg, fn)
  28. if processErr != nil {
  29. err = processErr
  30. }
  31. jobQueueWg.Done()
  32. }
  33. }()
  34. }
  35. jobQueueWg.Wait()
  36. isTerminating = true
  37. return
  38. }
  39. func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
  40. return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) error {
  41. fn(parentPath, entry)
  42. if entry.IsDirectory {
  43. subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name)
  44. if parentPath == "/" {
  45. subDir = "/" + entry.Name
  46. }
  47. jobQueueWg.Add(1)
  48. queue.Enqueue(util.FullPath(subDir))
  49. }
  50. return nil
  51. })
  52. }