file_reader.cpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. #include "file_reader.h"
  2. #include "transaction.h"
  3. #include "transaction_pinger.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/common/retry_request.h>
  7. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <yt/cpp/mapreduce/interface/raw_client.h>
  10. #include <yt/cpp/mapreduce/interface/tvm.h>
  11. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  12. #include <yt/cpp/mapreduce/io/helpers.h>
  13. namespace NYT {
  14. namespace NDetail {
  15. using ::ToString;
  16. ////////////////////////////////////////////////////////////////////////////////
  17. static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) {
  18. if (options.Length_) {
  19. return options.Offset_ + *options.Length_;
  20. } else {
  21. return Nothing();
  22. }
  23. }
  24. ////////////////////////////////////////////////////////////////////////////////
  25. TStreamReaderBase::TStreamReaderBase(
  26. const IRawClientPtr& rawClient,
  27. IClientRetryPolicyPtr clientRetryPolicy,
  28. ITransactionPingerPtr transactionPinger,
  29. const TClientContext& context,
  30. const TTransactionId& transactionId)
  31. : RawClient_(rawClient)
  32. , ClientRetryPolicy_(std::move(clientRetryPolicy))
  33. , ReadTransaction_(std::make_unique<TPingableTransaction>(
  34. RawClient_,
  35. ClientRetryPolicy_,
  36. context,
  37. transactionId,
  38. transactionPinger->GetChildTxPinger(),
  39. TStartTransactionOptions()))
  40. { }
  41. TStreamReaderBase::~TStreamReaderBase() = default;
  42. TYPath TStreamReaderBase::Snapshot(const TYPath& path)
  43. {
  44. return NYT::Snapshot(RawClient_, ClientRetryPolicy_, ReadTransaction_->GetId(), path);
  45. }
  46. size_t TStreamReaderBase::DoRead(void* buf, size_t len)
  47. {
  48. if (len == 0) {
  49. return 0;
  50. }
  51. return RequestWithRetry<size_t>(
  52. ClientRetryPolicy_->CreatePolicyForReaderRequest(),
  53. [this, &buf, len] (TMutationId /*mutationId*/) {
  54. try {
  55. if (!Input_) {
  56. Input_ = Request(ReadTransaction_->GetId(), CurrentOffset_);
  57. }
  58. const size_t read = Input_->Read(buf, len);
  59. CurrentOffset_ += read;
  60. return read;
  61. } catch (...) {
  62. Input_ = nullptr;
  63. throw;
  64. }
  65. });
  66. }
  67. ////////////////////////////////////////////////////////////////////////////////
  68. TFileReader::TFileReader(
  69. const TRichYPath& path,
  70. const IRawClientPtr& rawClient,
  71. IClientRetryPolicyPtr clientRetryPolicy,
  72. ITransactionPingerPtr transactionPinger,
  73. const TClientContext& context,
  74. const TTransactionId& transactionId,
  75. const TFileReaderOptions& options)
  76. : TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
  77. , StartOffset_(options.Offset_)
  78. , EndOffset_(GetEndOffset(options))
  79. , Options_(options)
  80. , Path_(path)
  81. {
  82. Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_);
  83. }
  84. std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes)
  85. {
  86. const ui64 currentOffset = StartOffset_ + readBytes;
  87. if (EndOffset_) {
  88. Y_ABORT_UNLESS(*EndOffset_ >= currentOffset);
  89. Options_.Length(*EndOffset_ - currentOffset);
  90. }
  91. Options_.Offset(currentOffset);
  92. return RawClient_->ReadFile(transactionId, Path_, Options_);
  93. }
  94. ////////////////////////////////////////////////////////////////////////////////
  95. TBlobTableReader::TBlobTableReader(
  96. const TYPath& path,
  97. const TKey& key,
  98. const IRawClientPtr& rawClient,
  99. IClientRetryPolicyPtr retryPolicy,
  100. ITransactionPingerPtr transactionPinger,
  101. const TClientContext& context,
  102. const TTransactionId& transactionId,
  103. const TBlobTableReaderOptions& options)
  104. : TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
  105. , StartOffset_(options.Offset_)
  106. , Key_(key)
  107. , Options_(options)
  108. {
  109. Path_ = TStreamReaderBase::Snapshot(path);
  110. }
  111. std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes)
  112. {
  113. const i64 currentOffset = StartOffset_ + readBytes;
  114. const i64 startPartIndex = currentOffset / Options_.PartSize_;
  115. const i64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex;
  116. Options_.Offset(skipBytes);
  117. Options_.StartPartIndex(startPartIndex);
  118. return RawClient_->ReadBlobTable(transactionId, Path_, Key_, Options_);
  119. }
  120. ////////////////////////////////////////////////////////////////////////////////
  121. } // namespace NDetail
  122. } // namespace NYT