client_reader.cpp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. #include "client_reader.h"
  2. #include "structured_table_formats.h"
  3. #include "transaction.h"
  4. #include "transaction_pinger.h"
  5. #include <yt/cpp/mapreduce/common/helpers.h>
  6. #include <yt/cpp/mapreduce/common/retry_lib.h>
  7. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <yt/cpp/mapreduce/interface/tvm.h>
  10. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  11. #include <yt/cpp/mapreduce/io/helpers.h>
  12. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  13. #include <yt/cpp/mapreduce/http/helpers.h>
  14. #include <yt/cpp/mapreduce/http/requests.h>
  15. #include <yt/cpp/mapreduce/http/retry_request.h>
  16. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  17. #include <library/cpp/yson/node/serialize.h>
  18. #include <util/random/random.h>
  19. #include <util/stream/file.h>
  20. #include <util/stream/str.h>
  21. #include <util/string/builder.h>
  22. #include <util/string/cast.h>
  23. namespace NYT {
  24. using ::ToString;
  25. ////////////////////////////////////////////////////////////////////////////////
  26. TClientReader::TClientReader(
  27. const TRichYPath& path,
  28. IClientRetryPolicyPtr clientRetryPolicy,
  29. ITransactionPingerPtr transactionPinger,
  30. const TClientContext& context,
  31. const TTransactionId& transactionId,
  32. const TFormat& format,
  33. const TTableReaderOptions& options,
  34. bool useFormatFromTableAttributes)
  35. : Path_(path)
  36. , ClientRetryPolicy_(std::move(clientRetryPolicy))
  37. , Context_(context)
  38. , ParentTransactionId_(transactionId)
  39. , Format_(format)
  40. , Options_(options)
  41. , ReadTransaction_(nullptr)
  42. {
  43. if (options.CreateTransaction_) {
  44. Y_ABORT_UNLESS(transactionPinger, "Internal error: transactionPinger is null");
  45. ReadTransaction_ = MakeHolder<TPingableTransaction>(
  46. ClientRetryPolicy_,
  47. Context_,
  48. transactionId,
  49. transactionPinger->GetChildTxPinger(),
  50. TStartTransactionOptions());
  51. Path_.Path(Snapshot(
  52. ClientRetryPolicy_,
  53. Context_,
  54. ReadTransaction_->GetId(),
  55. path.Path_));
  56. }
  57. if (useFormatFromTableAttributes) {
  58. auto transactionId2 = ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_;
  59. auto newFormat = GetTableFormat(ClientRetryPolicy_, Context_, transactionId2, Path_);
  60. if (newFormat) {
  61. Format_->Config = *newFormat;
  62. }
  63. }
  64. TransformYPath();
  65. CreateRequest();
  66. }
  67. bool TClientReader::Retry(
  68. const TMaybe<ui32>& rangeIndex,
  69. const TMaybe<ui64>& rowIndex)
  70. {
  71. if (CurrentRequestRetryPolicy_) {
  72. // TODO we should pass actual exception in Retry function
  73. yexception genericError;
  74. auto backoff = CurrentRequestRetryPolicy_->OnGenericError(genericError);
  75. if (!backoff) {
  76. return false;
  77. }
  78. }
  79. try {
  80. CreateRequest(rangeIndex, rowIndex);
  81. return true;
  82. } catch (const std::exception& ex) {
  83. YT_LOG_ERROR("Client reader retry failed: %v",
  84. ex.what());
  85. return false;
  86. }
  87. }
  88. void TClientReader::ResetRetries()
  89. {
  90. CurrentRequestRetryPolicy_ = nullptr;
  91. }
  92. size_t TClientReader::DoRead(void* buf, size_t len)
  93. {
  94. return Input_->Read(buf, len);
  95. }
  96. void TClientReader::TransformYPath()
  97. {
  98. for (auto& range : Path_.MutableRangesView()) {
  99. auto& exact = range.Exact_;
  100. if (IsTrivial(exact)) {
  101. continue;
  102. }
  103. if (exact.RowIndex_) {
  104. range.LowerLimit(TReadLimit().RowIndex(*exact.RowIndex_));
  105. range.UpperLimit(TReadLimit().RowIndex(*exact.RowIndex_ + 1));
  106. exact.RowIndex_.Clear();
  107. } else if (exact.Key_) {
  108. range.LowerLimit(TReadLimit().Key(*exact.Key_));
  109. auto lastPart = TNode::CreateEntity();
  110. lastPart.Attributes() = TNode()("type", "max");
  111. exact.Key_->Parts_.push_back(lastPart);
  112. range.UpperLimit(TReadLimit().Key(*exact.Key_));
  113. exact.Key_.Clear();
  114. }
  115. }
  116. }
  117. void TClientReader::CreateRequest(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex)
  118. {
  119. if (!CurrentRequestRetryPolicy_) {
  120. CurrentRequestRetryPolicy_ = ClientRetryPolicy_->CreatePolicyForGenericRequest();
  121. }
  122. bool areRangesUpdated = false;
  123. while (true) {
  124. CurrentRequestRetryPolicy_->NotifyNewAttempt();
  125. THttpHeader header("GET", GetReadTableCommand(Context_.Config->ApiVersion));
  126. if (Context_.ServiceTicketAuth) {
  127. header.SetServiceTicket(Context_.ServiceTicketAuth->Ptr->IssueServiceTicket());
  128. } else {
  129. header.SetToken(Context_.Token);
  130. }
  131. if (Context_.ImpersonationUser) {
  132. header.SetImpersonationUser(*Context_.ImpersonationUser);
  133. }
  134. auto transactionId = (ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_);
  135. header.AddTransactionId(transactionId);
  136. const auto& controlAttributes = Options_.ControlAttributes_;
  137. header.AddParameter("control_attributes", TNode()
  138. ("enable_row_index", controlAttributes.EnableRowIndex_)
  139. ("enable_range_index", controlAttributes.EnableRangeIndex_));
  140. header.SetOutputFormat(Format_);
  141. header.SetResponseCompression(ToString(Context_.Config->AcceptEncoding));
  142. if (rowIndex.Defined() && !areRangesUpdated) {
  143. auto& ranges = Path_.MutableRanges();
  144. if (ranges.Empty()) {
  145. ranges.ConstructInPlace(TVector{TReadRange()});
  146. } else {
  147. if (rangeIndex.GetOrElse(0) >= ranges->size()) {
  148. ythrow yexception()
  149. << "range index " << rangeIndex.GetOrElse(0)
  150. << " is out of range, input range count is " << ranges->size();
  151. }
  152. ranges->erase(ranges->begin(), ranges->begin() + rangeIndex.GetOrElse(0));
  153. }
  154. ranges->begin()->LowerLimit(TReadLimit().RowIndex(*rowIndex));
  155. areRangesUpdated = true;
  156. }
  157. header.MergeParameters(FormIORequestParameters(Path_, Options_));
  158. auto requestId = CreateGuidAsString();
  159. try {
  160. const auto proxyName = GetProxyForHeavyRequest(Context_);
  161. UpdateHeaderForProxyIfNeed(proxyName, Context_, header);
  162. Response_ = Context_.HttpClient->Request(GetFullUrlForProxy(proxyName, Context_, header), requestId, header);
  163. Input_ = Response_->GetResponseStream();
  164. YT_LOG_DEBUG("RSP %v - table stream (RequestId: %v, RangeIndex: %v, RowIndex: %v)", requestId, rangeIndex, rowIndex);
  165. return;
  166. } catch (const TErrorResponse& e) {
  167. LogRequestError(
  168. requestId,
  169. header,
  170. e.what(),
  171. CurrentRequestRetryPolicy_->GetAttemptDescription());
  172. if (!IsRetriable(e)) {
  173. throw;
  174. }
  175. auto backoff = CurrentRequestRetryPolicy_->OnRetriableError(e);
  176. if (!backoff) {
  177. throw;
  178. }
  179. NDetail::TWaitProxy::Get()->Sleep(*backoff);
  180. } catch (const std::exception& e) {
  181. LogRequestError(
  182. requestId,
  183. header,
  184. e.what(),
  185. CurrentRequestRetryPolicy_->GetAttemptDescription());
  186. Response_.reset();
  187. Input_ = nullptr;
  188. auto backoff = CurrentRequestRetryPolicy_->OnGenericError(e);
  189. if (!backoff) {
  190. throw;
  191. }
  192. NDetail::TWaitProxy::Get()->Sleep(*backoff);
  193. }
  194. }
  195. }
  196. ////////////////////////////////////////////////////////////////////////////////
  197. } // namespace NYT