stream_read_volume.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/security"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. "google.golang.org/grpc"
  14. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  15. )
  16. var (
  17. volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server")
  18. volumeId = flag.Int("volumeId", -1, "a volume id to stream read")
  19. grpcDialOption grpc.DialOption
  20. )
  21. func main() {
  22. flag.Parse()
  23. util_http.InitGlobalHttpClient()
  24. util.LoadSecurityConfiguration()
  25. grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  26. vid := uint32(*volumeId)
  27. eachNeedleFunc := func(resp *volume_server_pb.ReadAllNeedlesResponse) error {
  28. fmt.Printf("%d,%x%08x %d %v %d %x\n", resp.VolumeId, resp.NeedleId, resp.Cookie, len(resp.NeedleBlob), resp.NeedleBlobCompressed, resp.LastModified, resp.Crc)
  29. return nil
  30. }
  31. err := operation.WithVolumeServerClient(true, pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
  32. ctx, cancel := context.WithCancel(context.Background())
  33. defer cancel()
  34. copyFileClient, err := vs.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{
  35. VolumeIds: []uint32{vid},
  36. })
  37. if err != nil {
  38. return err
  39. }
  40. for {
  41. resp, err := copyFileClient.Recv()
  42. if errors.Is(err, io.EOF) {
  43. break
  44. }
  45. if err != nil {
  46. return err
  47. }
  48. if err = eachNeedleFunc(resp); err != nil {
  49. return err
  50. }
  51. }
  52. return nil
  53. })
  54. if err != nil {
  55. fmt.Printf("read %s: %v\n", *volumeServer, err)
  56. }
  57. }