command_fs_du.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package shell
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/filer2"
  6. "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "google.golang.org/grpc"
  9. "io"
  10. )
  11. func init() {
  12. Commands = append(Commands, &commandFsDu{})
  13. }
  14. type commandFsDu struct {
  15. }
  16. func (c *commandFsDu) Name() string {
  17. return "fs.du"
  18. }
  19. func (c *commandFsDu) Help() string {
  20. return `show disk usage
  21. fs.du http://<filer_server>:<port>/dir
  22. fs.du http://<filer_server>:<port>/dir/file_name
  23. fs.du http://<filer_server>:<port>/dir/file_prefix
  24. `
  25. }
  26. func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
  27. filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
  28. if err != nil {
  29. return err
  30. }
  31. ctx := context.Background()
  32. if commandEnv.isDirectory(ctx, filerServer, filerPort, path) {
  33. path = path + "/"
  34. }
  35. dir, name := filer2.FullPath(path).DirAndName()
  36. return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
  37. _, _, err = paginateDirectory(ctx, writer, client, dir, name, 1000)
  38. return err
  39. })
  40. }
  41. func paginateDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, dir, name string, paginateSize int) (blockCount uint64, byteCount uint64, err error) {
  42. paginatedCount := -1
  43. startFromFileName := ""
  44. for paginatedCount == -1 || paginatedCount == paginateSize {
  45. resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{
  46. Directory: dir,
  47. Prefix: name,
  48. StartFromFileName: startFromFileName,
  49. InclusiveStartFrom: false,
  50. Limit: uint32(paginateSize),
  51. })
  52. if listErr != nil {
  53. err = listErr
  54. return
  55. }
  56. paginatedCount = len(resp.Entries)
  57. for _, entry := range resp.Entries {
  58. if entry.IsDirectory {
  59. subDir := fmt.Sprintf("%s/%s", dir, entry.Name)
  60. if dir == "/" {
  61. subDir = "/" + entry.Name
  62. }
  63. numBlock, numByte, err := paginateDirectory(ctx, writer, client, subDir, "", paginateSize)
  64. if err == nil {
  65. blockCount += numBlock
  66. byteCount += numByte
  67. }
  68. } else {
  69. blockCount += uint64(len(entry.Chunks))
  70. byteCount += filer2.TotalSize(entry.Chunks)
  71. }
  72. startFromFileName = entry.Name
  73. if name != "" && !entry.IsDirectory {
  74. fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", blockCount, byteCount, dir, name)
  75. }
  76. }
  77. }
  78. if name == "" {
  79. fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s\n", blockCount, byteCount, dir)
  80. }
  81. return
  82. }
  83. func (env *CommandEnv) withFilerClient(ctx context.Context, filerServer string, filerPort int64, fn func(filer_pb.SeaweedFilerClient) error) error {
  84. filerGrpcAddress := fmt.Sprintf("%s:%d", filerServer, filerPort+10000)
  85. return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
  86. client := filer_pb.NewSeaweedFilerClient(grpcConnection)
  87. return fn(client)
  88. }, filerGrpcAddress, env.option.GrpcDialOption)
  89. }