file_reader.h 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. #pragma once
  2. #include <yt/cpp/mapreduce/common/fwd.h>
  3. #include <yt/cpp/mapreduce/interface/io.h>
  4. #include <yt/cpp/mapreduce/http/context.h>
  5. #include <yt/cpp/mapreduce/http/requests.h>
  6. class IInputStream;
  7. namespace NYT {
  8. class THttpRequest;
  9. class TPingableTransaction;
  10. namespace NDetail {
  11. ////////////////////////////////////////////////////////////////////////////////
  12. class TStreamReaderBase
  13. : public IFileReader
  14. {
  15. public:
  16. TStreamReaderBase(
  17. IClientRetryPolicyPtr clientRetryPolicy,
  18. ITransactionPingerPtr transactionPinger,
  19. const TClientContext& context,
  20. const TTransactionId& transactionId);
  21. ~TStreamReaderBase();
  22. protected:
  23. TYPath Snapshot(const TYPath& path);
  24. protected:
  25. const TClientContext Context_;
  26. private:
  27. size_t DoRead(void* buf, size_t len) override;
  28. virtual NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) = 0;
  29. TString GetActiveRequestId() const;
  30. private:
  31. const IClientRetryPolicyPtr ClientRetryPolicy_;
  32. TFileReaderOptions FileReaderOptions_;
  33. NHttpClient::IHttpResponsePtr Response_;
  34. IInputStream* Input_ = nullptr;
  35. THolder<TPingableTransaction> ReadTransaction_;
  36. ui64 CurrentOffset_ = 0;
  37. };
  38. ////////////////////////////////////////////////////////////////////////////////
  39. class TFileReader
  40. : public TStreamReaderBase
  41. {
  42. public:
  43. TFileReader(
  44. const TRichYPath& path,
  45. IClientRetryPolicyPtr clientRetryPolicy,
  46. ITransactionPingerPtr transactionPinger,
  47. const TClientContext& context,
  48. const TTransactionId& transactionId,
  49. const TFileReaderOptions& options = TFileReaderOptions());
  50. private:
  51. NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) override;
  52. private:
  53. TFileReaderOptions FileReaderOptions_;
  54. TRichYPath Path_;
  55. const ui64 StartOffset_;
  56. const TMaybe<ui64> EndOffset_;
  57. };
  58. ////////////////////////////////////////////////////////////////////////////////
  59. class TBlobTableReader
  60. : public TStreamReaderBase
  61. {
  62. public:
  63. TBlobTableReader(
  64. const TYPath& path,
  65. const TKey& key,
  66. IClientRetryPolicyPtr clientRetryPolicy,
  67. ITransactionPingerPtr transactionPinger,
  68. const TClientContext& context,
  69. const TTransactionId& transactionId,
  70. const TBlobTableReaderOptions& options);
  71. private:
  72. NHttpClient::IHttpResponsePtr Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes) override;
  73. private:
  74. const TKey Key_;
  75. const TBlobTableReaderOptions Options_;
  76. TYPath Path_;
  77. };
  78. } // namespace NDetail
  79. } // namespace NYT