stream_read_volume.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package main
  2. import (
  3. "context"
  4. "errors"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb"
  10. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  11. "github.com/seaweedfs/seaweedfs/weed/security"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. "google.golang.org/grpc"
  14. )
  15. var (
  16. volumeServer = flag.String("volumeServer", "localhost:8080", "a volume server")
  17. volumeId = flag.Int("volumeId", -1, "a volume id to stream read")
  18. grpcDialOption grpc.DialOption
  19. )
  20. func main() {
  21. flag.Parse()
  22. util.LoadConfiguration("security", false)
  23. grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
  24. vid := uint32(*volumeId)
  25. eachNeedleFunc := func(resp *volume_server_pb.ReadAllNeedlesResponse) error {
  26. fmt.Printf("%d,%x%08x %d %v %d %x\n", resp.VolumeId, resp.NeedleId, resp.Cookie, len(resp.NeedleBlob), resp.NeedleBlobCompressed, resp.LastModified, resp.Crc)
  27. return nil
  28. }
  29. err := operation.WithVolumeServerClient(true, pb.ServerAddress(*volumeServer), grpcDialOption, func(vs volume_server_pb.VolumeServerClient) error {
  30. ctx, cancel := context.WithCancel(context.Background())
  31. defer cancel()
  32. copyFileClient, err := vs.ReadAllNeedles(ctx, &volume_server_pb.ReadAllNeedlesRequest{
  33. VolumeIds: []uint32{vid},
  34. })
  35. if err != nil {
  36. return err
  37. }
  38. for {
  39. resp, err := copyFileClient.Recv()
  40. if errors.Is(err, io.EOF) {
  41. break
  42. }
  43. if err != nil {
  44. return err
  45. }
  46. if err = eachNeedleFunc(resp); err != nil {
  47. return err
  48. }
  49. }
  50. return nil
  51. })
  52. if err != nil {
  53. fmt.Printf("read %s: %v\n", *volumeServer, err)
  54. }
  55. }