filer_grpc_server_traverse_meta.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package weed_server
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "github.com/viant/ptrie"
  10. )
  11. func (fs *FilerServer) TraverseBfsMetadata(req *filer_pb.TraverseBfsMetadataRequest, stream filer_pb.SeaweedFiler_TraverseBfsMetadataServer) error {
  12. glog.V(0).Infof("TraverseBfsMetadata %v", req)
  13. excludedTrie := ptrie.New[bool]()
  14. for _, excluded := range req.ExcludedPrefixes {
  15. excludedTrie.Put([]byte(excluded), true)
  16. }
  17. ctx := stream.Context()
  18. queue := util.NewQueue[*filer.Entry]()
  19. dirEntry, err := fs.filer.FindEntry(ctx, util.FullPath(req.Directory))
  20. if err != nil {
  21. return fmt.Errorf("find dir %s: %v", req.Directory, err)
  22. }
  23. queue.Enqueue(dirEntry)
  24. for item := queue.Dequeue(); item != nil; item = queue.Dequeue() {
  25. if excludedTrie.MatchPrefix([]byte(item.FullPath), func(key []byte, value bool) bool {
  26. return true
  27. }) {
  28. // println("excluded", item.FullPath)
  29. continue
  30. }
  31. parent, _ := item.FullPath.DirAndName()
  32. if err := stream.Send(&filer_pb.TraverseBfsMetadataResponse{
  33. Directory: parent,
  34. Entry: item.ToProtoEntry(),
  35. }); err != nil {
  36. return fmt.Errorf("send traverse bfs metadata response: %v", err)
  37. }
  38. if !item.IsDirectory() {
  39. continue
  40. }
  41. if err := fs.iterateDirectory(ctx, item.FullPath, func(entry *filer.Entry) error {
  42. queue.Enqueue(entry)
  43. return nil
  44. }); err != nil {
  45. return err
  46. }
  47. }
  48. return nil
  49. }
  50. func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPath, fn func(entry *filer.Entry) error) (err error) {
  51. var lastFileName string
  52. var listErr error
  53. for {
  54. var hasEntries bool
  55. lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool {
  56. hasEntries = true
  57. if fnErr := fn(entry); fnErr != nil {
  58. err = fnErr
  59. return false
  60. }
  61. return true
  62. })
  63. if listErr != nil {
  64. return listErr
  65. }
  66. if err != nil {
  67. return err
  68. }
  69. if !hasEntries {
  70. return nil
  71. }
  72. }
  73. }