filer_client_bfs.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package filer_pb
  2. import (
  3. "fmt"
  4. "sync"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/util"
  7. )
  8. func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
  9. K := 5
  10. var jobQueueWg sync.WaitGroup
  11. queue := util.NewQueue()
  12. jobQueueWg.Add(1)
  13. queue.Enqueue(parentPath)
  14. terminates := make([]chan bool, K)
  15. for i := 0; i < K; i++ {
  16. terminates[i] = make(chan bool)
  17. go func(j int) {
  18. for {
  19. select {
  20. case <-terminates[j]:
  21. return
  22. default:
  23. t := queue.Dequeue()
  24. if t == nil {
  25. time.Sleep(329 * time.Millisecond)
  26. continue
  27. }
  28. dir := t.(util.FullPath)
  29. processErr := processOneDirectory(filerClient, dir, queue, &jobQueueWg, fn)
  30. if processErr != nil {
  31. err = processErr
  32. }
  33. jobQueueWg.Done()
  34. }
  35. }
  36. }(i)
  37. }
  38. jobQueueWg.Wait()
  39. for i := 0; i < K; i++ {
  40. close(terminates[i])
  41. }
  42. return
  43. }
  44. func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
  45. return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) error {
  46. fn(parentPath, entry)
  47. if entry.IsDirectory {
  48. subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name)
  49. if parentPath == "/" {
  50. subDir = "/" + entry.Name
  51. }
  52. jobQueueWg.Add(1)
  53. queue.Enqueue(util.FullPath(subDir))
  54. }
  55. return nil
  56. })
  57. }