client_reader.cpp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. #include "client_reader.h"
  2. #include "structured_table_formats.h"
  3. #include "transaction.h"
  4. #include "transaction_pinger.h"
  5. #include <yt/cpp/mapreduce/common/helpers.h>
  6. #include <yt/cpp/mapreduce/common/retry_lib.h>
  7. #include <yt/cpp/mapreduce/common/retry_request.h>
  8. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  9. #include <yt/cpp/mapreduce/interface/config.h>
  10. #include <yt/cpp/mapreduce/interface/raw_client.h>
  11. #include <yt/cpp/mapreduce/interface/tvm.h>
  12. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  13. #include <yt/cpp/mapreduce/io/helpers.h>
  14. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  15. #include <library/cpp/yson/node/serialize.h>
  16. #include <util/random/random.h>
  17. #include <util/stream/file.h>
  18. #include <util/stream/str.h>
  19. #include <util/string/builder.h>
  20. #include <util/string/cast.h>
  21. namespace NYT {
  22. using ::ToString;
  23. ////////////////////////////////////////////////////////////////////////////////
  24. TClientReader::TClientReader(
  25. const TRichYPath& path,
  26. const IRawClientPtr& rawClient,
  27. IClientRetryPolicyPtr clientRetryPolicy,
  28. ITransactionPingerPtr transactionPinger,
  29. const TClientContext& context,
  30. const TTransactionId& transactionId,
  31. const TFormat& format,
  32. const TTableReaderOptions& options,
  33. bool useFormatFromTableAttributes)
  34. : Path_(path)
  35. , RawClient_(rawClient)
  36. , ClientRetryPolicy_(std::move(clientRetryPolicy))
  37. , Context_(context)
  38. , ParentTransactionId_(transactionId)
  39. , Format_(format)
  40. , Options_(options)
  41. , ReadTransaction_(nullptr)
  42. {
  43. if (options.CreateTransaction_) {
  44. Y_ABORT_UNLESS(transactionPinger, "Internal error: transactionPinger is null");
  45. ReadTransaction_ = std::make_unique<TPingableTransaction>(
  46. RawClient_,
  47. ClientRetryPolicy_,
  48. Context_,
  49. transactionId,
  50. transactionPinger->GetChildTxPinger(),
  51. TStartTransactionOptions());
  52. Path_.Path(Snapshot(
  53. RawClient_,
  54. ClientRetryPolicy_,
  55. ReadTransaction_->GetId(),
  56. path.Path_));
  57. }
  58. if (useFormatFromTableAttributes) {
  59. auto transactionId2 = ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_;
  60. auto newFormat = GetTableFormat(ClientRetryPolicy_, RawClient_, transactionId2, Path_);
  61. if (newFormat) {
  62. Format_->Config = *newFormat;
  63. }
  64. }
  65. TransformYPath();
  66. CreateRequest();
  67. }
  68. bool TClientReader::Retry(
  69. const TMaybe<ui32>& rangeIndex,
  70. const TMaybe<ui64>& rowIndex,
  71. const std::exception_ptr& error)
  72. {
  73. if (CurrentRequestRetryPolicy_) {
  74. TMaybe<TDuration> backoffDuration;
  75. try {
  76. std::rethrow_exception(error);
  77. } catch (const TErrorResponse& ex) {
  78. if (!IsRetriable(ex)) {
  79. throw;
  80. }
  81. backoffDuration = CurrentRequestRetryPolicy_->OnRetriableError(ex);
  82. } catch (const std::exception& ex) {
  83. if (!IsRetriable(ex)) {
  84. throw;
  85. }
  86. backoffDuration = CurrentRequestRetryPolicy_->OnGenericError(ex);
  87. } catch (...) {
  88. }
  89. if (!backoffDuration) {
  90. return false;
  91. }
  92. NDetail::TWaitProxy::Get()->Sleep(*backoffDuration);
  93. }
  94. try {
  95. CreateRequest(rangeIndex, rowIndex);
  96. return true;
  97. } catch (const std::exception& ex) {
  98. YT_LOG_ERROR("Client reader retry failed: %v",
  99. ex.what());
  100. return false;
  101. }
  102. }
  103. void TClientReader::ResetRetries()
  104. {
  105. CurrentRequestRetryPolicy_ = nullptr;
  106. }
  107. size_t TClientReader::DoRead(void* buf, size_t len)
  108. {
  109. return Input_->Read(buf, len);
  110. }
  111. void TClientReader::TransformYPath()
  112. {
  113. for (auto& range : Path_.MutableRangesView()) {
  114. auto& exact = range.Exact_;
  115. if (IsTrivial(exact)) {
  116. continue;
  117. }
  118. if (exact.RowIndex_) {
  119. range.LowerLimit(TReadLimit().RowIndex(*exact.RowIndex_));
  120. range.UpperLimit(TReadLimit().RowIndex(*exact.RowIndex_ + 1));
  121. exact.RowIndex_.Clear();
  122. } else if (exact.Key_) {
  123. range.LowerLimit(TReadLimit().Key(*exact.Key_));
  124. auto lastPart = TNode::CreateEntity();
  125. lastPart.Attributes() = TNode()("type", "max");
  126. exact.Key_->Parts_.push_back(lastPart);
  127. range.UpperLimit(TReadLimit().Key(*exact.Key_));
  128. exact.Key_.Clear();
  129. }
  130. }
  131. }
  132. void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex)
  133. {
  134. if (!CurrentRequestRetryPolicy_) {
  135. CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest();
  136. }
  137. auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_);
  138. if (rowIndex.Defined()) {
  139. auto& ranges = Path_.MutableRanges();
  140. if (ranges.Empty()) {
  141. ranges.ConstructInPlace(TVector{TReadRange()});
  142. } else {
  143. if (rangeIndex.GetOrElse(0) >= ranges->size()) {
  144. ythrow yexception()
  145. << "range index " << rangeIndex.GetOrElse(0)
  146. << " is out of range, input range count is " << ranges->size();
  147. }
  148. ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0));
  149. }
  150. ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
  151. }
  152. Input_ = NDetail::RequestWithRetry<std::unique_ptr<IInputStream>>(
  153. CurrentRequestRetryPolicy_,
  154. [this, &transactionId] (TMutationId /*mutationId*/) {
  155. return RawClient_->ReadTable(transactionId, Path_, Format_, Options_);
  156. });
  157. }
  158. ////////////////////////////////////////////////////////////////////////////////
  159. } // namespace NYT