client_reader.cpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. #include "client_reader.h"
  2. #include "structured_table_formats.h"
  3. #include "transaction.h"
  4. #include "transaction_pinger.h"
  5. #include <yt/cpp/mapreduce/common/helpers.h>
  6. #include <yt/cpp/mapreduce/common/retry_lib.h>
  7. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <yt/cpp/mapreduce/interface/tvm.h>
  10. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  11. #include <yt/cpp/mapreduce/io/helpers.h>
  12. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  13. #include <yt/cpp/mapreduce/http/helpers.h>
  14. #include <yt/cpp/mapreduce/http/requests.h>
  15. #include <yt/cpp/mapreduce/http/retry_request.h>
  16. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  17. #include <library/cpp/yson/node/serialize.h>
  18. #include <util/random/random.h>
  19. #include <util/stream/file.h>
  20. #include <util/stream/str.h>
  21. #include <util/string/builder.h>
  22. #include <util/string/cast.h>
  23. namespace NYT {
  24. using ::ToString;
  25. ////////////////////////////////////////////////////////////////////////////////
  26. TClientReader::TClientReader(
  27. const TRichYPath& path,
  28. IClientRetryPolicyPtr clientRetryPolicy,
  29. ITransactionPingerPtr transactionPinger,
  30. const TClientContext& context,
  31. const TTransactionId& transactionId,
  32. const TFormat& format,
  33. const TTableReaderOptions& options,
  34. bool useFormatFromTableAttributes)
  35. : Path_(path)
  36. , ClientRetryPolicy_(std::move(clientRetryPolicy))
  37. , Context_(context)
  38. , ParentTransactionId_(transactionId)
  39. , Format_(format)
  40. , Options_(options)
  41. , ReadTransaction_(nullptr)
  42. {
  43. if (options.CreateTransaction_) {
  44. Y_ABORT_UNLESS(transactionPinger, "Internal error: transactionPinger is null");
  45. ReadTransaction_ = MakeHolder<TPingableTransaction>(
  46. ClientRetryPolicy_,
  47. Context_,
  48. transactionId,
  49. transactionPinger->GetChildTxPinger(),
  50. TStartTransactionOptions());
  51. Path_.Path(Snapshot(
  52. ClientRetryPolicy_,
  53. Context_,
  54. ReadTransaction_->GetId(),
  55. path.Path_));
  56. }
  57. if (useFormatFromTableAttributes) {
  58. auto transactionId2 = ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_;
  59. auto newFormat = GetTableFormat(ClientRetryPolicy_, Context_, transactionId2, Path_);
  60. if (newFormat) {
  61. Format_->Config = *newFormat;
  62. }
  63. }
  64. TransformYPath();
  65. CreateRequest();
  66. }
  67. bool TClientReader::Retry(
  68. const TMaybe<ui32>& rangeIndex,
  69. const TMaybe<ui64>& rowIndex,
  70. const std::exception_ptr& error)
  71. {
  72. if (CurrentRequestRetryPolicy_) {
  73. TMaybe<TDuration> backoffDuration;
  74. try {
  75. std::rethrow_exception(error);
  76. } catch (const TErrorResponse& ex) {
  77. if (!IsRetriable(ex)) {
  78. throw;
  79. }
  80. backoffDuration = CurrentRequestRetryPolicy_->OnRetriableError(ex);
  81. } catch (const std::exception& ex) {
  82. if (!IsRetriable(ex)) {
  83. throw;
  84. }
  85. backoffDuration = CurrentRequestRetryPolicy_->OnGenericError(ex);
  86. } catch (...) {
  87. }
  88. if (!backoffDuration) {
  89. return false;
  90. }
  91. NDetail::TWaitProxy::Get()->Sleep(*backoffDuration);
  92. }
  93. try {
  94. CreateRequest(rangeIndex, rowIndex);
  95. return true;
  96. } catch (const std::exception& ex) {
  97. YT_LOG_ERROR("Client reader retry failed: %v",
  98. ex.what());
  99. return false;
  100. }
  101. }
  102. void TClientReader::ResetRetries()
  103. {
  104. CurrentRequestRetryPolicy_ = nullptr;
  105. }
  106. size_t TClientReader::DoRead(void* buf, size_t len)
  107. {
  108. return Input_->Read(buf, len);
  109. }
  110. void TClientReader::TransformYPath()
  111. {
  112. for (auto& range : Path_.MutableRangesView()) {
  113. auto& exact = range.Exact_;
  114. if (IsTrivial(exact)) {
  115. continue;
  116. }
  117. if (exact.RowIndex_) {
  118. range.LowerLimit(TReadLimit().RowIndex(*exact.RowIndex_));
  119. range.UpperLimit(TReadLimit().RowIndex(*exact.RowIndex_ + 1));
  120. exact.RowIndex_.Clear();
  121. } else if (exact.Key_) {
  122. range.LowerLimit(TReadLimit().Key(*exact.Key_));
  123. auto lastPart = TNode::CreateEntity();
  124. lastPart.Attributes() = TNode()("type", "max");
  125. exact.Key_->Parts_.push_back(lastPart);
  126. range.UpperLimit(TReadLimit().Key(*exact.Key_));
  127. exact.Key_.Clear();
  128. }
  129. }
  130. }
  131. void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex)
  132. {
  133. if (!CurrentRequestRetryPolicy_) {
  134. CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest();
  135. }
  136. bool areRangesUpdated = false;
  137. while (true) {
  138. CurrentRequestRetryPolicy_->NotifyNewAttempt();
  139. THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
  140. if (Context_.ServiceTicketAuth) {
  141. header.SetServiceTicket(Context_.ServiceTicketAuth->Ptr->IssueServiceTicket());
  142. } else {
  143. header.SetToken(Context_.Token);
  144. }
  145. if (Context_.ImpersonationUser) {
  146. header.SetImpersonationUser(*Context_.ImpersonationUser);
  147. }
  148. auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_);
  149. header.AddTransactionId(transactionId);
  150. const auto& controlAttributes = Options_.ControlAttributes_;
  151. header.AddParameter("control_attributes", TNode()
  152. ("enable_row_index", controlAttributes.EnableRowIndex_)
  153. ("enable_range_index", controlAttributes.EnableRangeIndex_));
  154. header.SetOutputFormat(Format_);
  155. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  156. if (rowIndex.Defined() && !areRangesUpdated) {
  157. auto& ranges = Path_.MutableRanges();
  158. if (ranges.Empty()) {
  159. ranges.ConstructInPlace(TVector{TReadRange()});
  160. } else {
  161. if (rangeIndex.GetOrElse(0) >= ranges->size()) {
  162. ythrow yexception()
  163. << "range index " << rangeIndex.GetOrElse(0)
  164. << " is out of range, input range count is " << ranges->size();
  165. }
  166. ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0));
  167. }
  168. ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
  169. areRangesUpdated = true;
  170. }
  171. header.MergeParameters(FormIORequestParameters(Path_, Options_));
  172. auto requestId = CreateGuidAsString();
  173. try {
  174. const auto proxyName = GetProxyForHeavyRequest(Context_);
  175. UpdateHeaderForProxyIfNeed(proxyName, Context_, header);
  176. Response_ = Context_.HttpClient->Request(GetFullUrlForProxy(proxyName, Context_, header), requestId, header);
  177. Input_ = Response_->GetResponseStream();
  178. YT_LOG_DEBUG(
  179. "RSP %v - table stream (RangeIndex: %v, RowIndex: %v)",
  180. requestId,
  181. rangeIndex,
  182. rowIndex);
  183. return;
  184. } catch (const TErrorResponse& e) {
  185. LogRequestError(
  186. requestId,
  187. header,
  188. e.what(),
  189. CurrentRequestRetryPolicy_->GetAttemptDescription());
  190. if (!IsRetriable(e)) {
  191. throw;
  192. }
  193. auto backoff = CurrentRequestRetryPolicy_->OnRetriableError(e);
  194. if (!backoff) {
  195. throw;
  196. }
  197. NDetail::TWaitProxy::Get()->Sleep(*backoff);
  198. } catch (const std::exception& e) {
  199. LogRequestError(
  200. requestId,
  201. header,
  202. e.what(),
  203. CurrentRequestRetryPolicy_->GetAttemptDescription());
  204. Response_.reset();
  205. Input_ = nullptr;
  206. auto backoff = CurrentRequestRetryPolicy_->OnGenericError(e);
  207. if (!backoff) {
  208. throw;
  209. }
  210. NDetail::TWaitProxy::Get()->Sleep(*backoff);
  211. }
  212. }
  213. }
  214. ////////////////////////////////////////////////////////////////////////////////
  215. } // namespace NYT