file_reader.cpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. #include "file_reader.h"
  2. #include "transaction.h"
  3. #include "transaction_pinger.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  7. #include <yt/cpp/mapreduce/interface/config.h>
  8. #include <yt/cpp/mapreduce/interface/raw_client.h>
  9. #include <yt/cpp/mapreduce/interface/tvm.h>
  10. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  11. #include <yt/cpp/mapreduce/io/helpers.h>
  12. #include <yt/cpp/mapreduce/http/helpers.h>
  13. #include <yt/cpp/mapreduce/http/http.h>
  14. #include <yt/cpp/mapreduce/http/http_client.h>
  15. #include <yt/cpp/mapreduce/http/retry_request.h>
  16. namespace NYT {
  17. namespace NDetail {
  18. using ::ToString;
  19. ////////////////////////////////////////////////////////////////////////////////
  20. static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) {
  21. if (options.Length_) {
  22. return options.Offset_ + *options.Length_;
  23. } else {
  24. return Nothing();
  25. }
  26. }
  27. ////////////////////////////////////////////////////////////////////////////////
  28. TStreamReaderBase::TStreamReaderBase(
  29. const IRawClientPtr& rawClient,
  30. IClientRetryPolicyPtr clientRetryPolicy,
  31. ITransactionPingerPtr transactionPinger,
  32. const TClientContext& context,
  33. const TTransactionId& transactionId)
  34. : RawClient_(rawClient)
  35. , ClientRetryPolicy_(std::move(clientRetryPolicy))
  36. , ReadTransaction_(std::make_unique<TPingableTransaction>(
  37. RawClient_,
  38. ClientRetryPolicy_,
  39. context,
  40. transactionId,
  41. transactionPinger->GetChildTxPinger(),
  42. TStartTransactionOptions()))
  43. { }
  44. TStreamReaderBase::~TStreamReaderBase() = default;
  45. TYPath TStreamReaderBase::Snapshot(const TYPath& path)
  46. {
  47. return NYT::Snapshot(RawClient_, ClientRetryPolicy_, ReadTransaction_->GetId(), path);
  48. }
  49. size_t TStreamReaderBase::DoRead(void* buf, size_t len)
  50. {
  51. if (len == 0) {
  52. return 0;
  53. }
  54. return RequestWithRetry<size_t>(
  55. ClientRetryPolicy_->CreatePolicyForReaderRequest(),
  56. [this, &buf, len] (TMutationId /*mutationId*/) {
  57. try {
  58. if (!Input_) {
  59. Input_ = Request(ReadTransaction_->GetId(), CurrentOffset_);
  60. }
  61. const size_t read = Input_->Read(buf, len);
  62. CurrentOffset_ += read;
  63. return read;
  64. } catch (...) {
  65. Input_ = nullptr;
  66. throw;
  67. }
  68. });
  69. }
  70. ////////////////////////////////////////////////////////////////////////////////
  71. TFileReader::TFileReader(
  72. const TRichYPath& path,
  73. const IRawClientPtr& rawClient,
  74. IClientRetryPolicyPtr clientRetryPolicy,
  75. ITransactionPingerPtr transactionPinger,
  76. const TClientContext& context,
  77. const TTransactionId& transactionId,
  78. const TFileReaderOptions& options)
  79. : TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
  80. , StartOffset_(options.Offset_)
  81. , EndOffset_(GetEndOffset(options))
  82. , Options_(options)
  83. , Path_(path)
  84. {
  85. Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_);
  86. }
  87. std::unique_ptr<IInputStream> TFileReader::Request(const TTransactionId& transactionId, ui64 readBytes)
  88. {
  89. const ui64 currentOffset = StartOffset_ + readBytes;
  90. if (EndOffset_) {
  91. Y_ABORT_UNLESS(*EndOffset_ >= currentOffset);
  92. Options_.Length(*EndOffset_ - currentOffset);
  93. }
  94. Options_.Offset(currentOffset);
  95. return RawClient_->ReadFile(transactionId, Path_, Options_);
  96. }
  97. ////////////////////////////////////////////////////////////////////////////////
  98. TBlobTableReader::TBlobTableReader(
  99. const TYPath& path,
  100. const TKey& key,
  101. const IRawClientPtr& rawClient,
  102. IClientRetryPolicyPtr retryPolicy,
  103. ITransactionPingerPtr transactionPinger,
  104. const TClientContext& context,
  105. const TTransactionId& transactionId,
  106. const TBlobTableReaderOptions& options)
  107. : TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
  108. , StartOffset_(options.Offset_)
  109. , Key_(key)
  110. , Options_(options)
  111. {
  112. Path_ = TStreamReaderBase::Snapshot(path);
  113. }
  114. std::unique_ptr<IInputStream> TBlobTableReader::Request(const TTransactionId& transactionId, ui64 readBytes)
  115. {
  116. const i64 currentOffset = StartOffset_ + readBytes;
  117. const i64 startPartIndex = currentOffset / Options_.PartSize_;
  118. const i64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex;
  119. Options_.Offset(skipBytes);
  120. Options_.StartPartIndex(startPartIndex);
  121. return RawClient_->ReadBlobTable(transactionId, Path_, Key_, Options_);
  122. }
  123. ////////////////////////////////////////////////////////////////////////////////
  124. } // namespace NDetail
  125. } // namespace NYT