file_reader.cpp 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. #include "file_reader.h"
  2. #include "transaction.h"
  3. #include "transaction_pinger.h"
  4. #include <yt/cpp/mapreduce/common/helpers.h>
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  7. #include <yt/cpp/mapreduce/interface/config.h>
  8. #include <yt/cpp/mapreduce/interface/tvm.h>
  9. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  10. #include <yt/cpp/mapreduce/io/helpers.h>
  11. #include <yt/cpp/mapreduce/http/helpers.h>
  12. #include <yt/cpp/mapreduce/http/http.h>
  13. #include <yt/cpp/mapreduce/http/http_client.h>
  14. #include <yt/cpp/mapreduce/http/retry_request.h>
  15. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  16. namespace NYT {
  17. namespace NDetail {
  18. using ::ToString;
  19. ////////////////////////////////////////////////////////////////////////////////
  20. static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) {
  21. if (options.Length_) {
  22. return options.Offset_.GetOrElse(0) + *options.Length_;
  23. } else {
  24. return Nothing();
  25. }
  26. }
  27. ////////////////////////////////////////////////////////////////////////////////
  28. TStreamReaderBase::TStreamReaderBase(
  29. IClientRetryPolicyPtr clientRetryPolicy,
  30. ITransactionPingerPtr transactionPinger,
  31. const TClientContext& context,
  32. const TTransactionId& transactionId)
  33. : Context_(context)
  34. , ClientRetryPolicy_(std::move(clientRetryPolicy))
  35. , ReadTransaction_(MakeHolder<TPingableTransaction>(
  36. ClientRetryPolicy_,
  37. context,
  38. transactionId,
  39. transactionPinger->GetChildTxPinger(),
  40. TStartTransactionOptions()))
  41. { }
  42. TStreamReaderBase::~TStreamReaderBase() = default;
  43. TYPath TStreamReaderBase::Snapshot(const TYPath& path)
  44. {
  45. return NYT::Snapshot(ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path);
  46. }
  47. TString TStreamReaderBase::GetActiveRequestId() const
  48. {
  49. if (Response_) {
  50. return Response_->GetRequestId();;
  51. } else {
  52. return "<no-active-request>";
  53. }
  54. }
  55. size_t TStreamReaderBase::DoRead(void* buf, size_t len)
  56. {
  57. const int retryCount = Context_.Config->ReadRetryCount;
  58. for (int attempt = 1; attempt <= retryCount; ++attempt) {
  59. try {
  60. if (!Input_) {
  61. Response_ = Request(Context_, ReadTransaction_->GetId(), CurrentOffset_);
  62. Input_ = Response_->GetResponseStream();
  63. }
  64. if (len == 0) {
  65. return 0;
  66. }
  67. const size_t read = Input_->Read(buf, len);
  68. CurrentOffset_ += read;
  69. return read;
  70. } catch (TErrorResponse& e) {
  71. YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
  72. GetActiveRequestId(),
  73. e.what(),
  74. attempt,
  75. retryCount);
  76. if (!IsRetriable(e) || attempt == retryCount) {
  77. throw;
  78. }
  79. NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
  80. } catch (std::exception& e) {
  81. YT_LOG_ERROR("RSP %v - failed: %v (attempt %v of %v)",
  82. GetActiveRequestId(),
  83. e.what(),
  84. attempt,
  85. retryCount);
  86. // Invalidate connection.
  87. Response_.reset();
  88. if (attempt == retryCount) {
  89. throw;
  90. }
  91. NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, Context_.Config));
  92. }
  93. Input_ = nullptr;
  94. }
  95. Y_UNREACHABLE(); // we should either return or throw from loop above
  96. }
  97. ////////////////////////////////////////////////////////////////////////////////
  98. TFileReader::TFileReader(
  99. const TRichYPath& path,
  100. IClientRetryPolicyPtr clientRetryPolicy,
  101. ITransactionPingerPtr transactionPinger,
  102. const TClientContext& context,
  103. const TTransactionId& transactionId,
  104. const TFileReaderOptions& options)
  105. : TStreamReaderBase(std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
  106. , FileReaderOptions_(options)
  107. , Path_(path)
  108. , StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0))
  109. , EndOffset_(GetEndOffset(FileReaderOptions_))
  110. {
  111. Path_.Path_ = TStreamReaderBase::Snapshot(Path_.Path_);
  112. }
  113. NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes)
  114. {
  115. const ui64 currentOffset = StartOffset_ + readBytes;
  116. TString hostName = GetProxyForHeavyRequest(context);
  117. THttpHeader header("GET", GetReadFileCommand(context.Config->ApiVersion));
  118. if (context.ServiceTicketAuth) {
  119. header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
  120. } else {
  121. header.SetToken(context.Token);
  122. }
  123. if (context.ImpersonationUser) {
  124. header.SetImpersonationUser(*context.ImpersonationUser);
  125. }
  126. header.AddTransactionId(transactionId);
  127. header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
  128. if (EndOffset_) {
  129. Y_VERIFY(*EndOffset_ >= currentOffset);
  130. FileReaderOptions_.Length(*EndOffset_ - currentOffset);
  131. }
  132. FileReaderOptions_.Offset(currentOffset);
  133. header.MergeParameters(FormIORequestParameters(Path_, FileReaderOptions_));
  134. header.SetResponseCompression(ToString(context.Config->AcceptEncoding));
  135. auto requestId = CreateGuidAsString();
  136. NHttpClient::IHttpResponsePtr response;
  137. try {
  138. response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
  139. } catch (const std::exception& ex) {
  140. LogRequestError(requestId, header, ex.what(), "");
  141. throw;
  142. }
  143. YT_LOG_DEBUG("RSP %v - file stream",
  144. requestId);
  145. return response;
  146. }
  147. ////////////////////////////////////////////////////////////////////////////////
  148. TBlobTableReader::TBlobTableReader(
  149. const TYPath& path,
  150. const TKey& key,
  151. IClientRetryPolicyPtr retryPolicy,
  152. ITransactionPingerPtr transactionPinger,
  153. const TClientContext& context,
  154. const TTransactionId& transactionId,
  155. const TBlobTableReaderOptions& options)
  156. : TStreamReaderBase(std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
  157. , Key_(key)
  158. , Options_(options)
  159. {
  160. Path_ = TStreamReaderBase::Snapshot(path);
  161. }
  162. NHttpClient::IHttpResponsePtr TBlobTableReader::Request(const TClientContext& context, const TTransactionId& transactionId, ui64 readBytes)
  163. {
  164. TString hostName = GetProxyForHeavyRequest(context);
  165. THttpHeader header("GET", "read_blob_table");
  166. if (context.ServiceTicketAuth) {
  167. header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
  168. } else {
  169. header.SetToken(context.Token);
  170. }
  171. if (context.ImpersonationUser) {
  172. header.SetImpersonationUser(*context.ImpersonationUser);
  173. }
  174. header.AddTransactionId(transactionId);
  175. header.SetOutputFormat(TMaybe<TFormat>()); // Binary format
  176. const ui64 currentOffset = Options_.Offset_ + readBytes;
  177. const i64 startPartIndex = currentOffset / Options_.PartSize_;
  178. const ui64 skipBytes = currentOffset - Options_.PartSize_ * startPartIndex;
  179. auto lowerLimitKey = Key_;
  180. lowerLimitKey.Parts_.push_back(startPartIndex);
  181. auto upperLimitKey = Key_;
  182. upperLimitKey.Parts_.push_back(std::numeric_limits<i64>::max());
  183. TNode params = PathToParamNode(TRichYPath(Path_).AddRange(TReadRange()
  184. .LowerLimit(TReadLimit().Key(lowerLimitKey))
  185. .UpperLimit(TReadLimit().Key(upperLimitKey))));
  186. params["start_part_index"] = TNode(startPartIndex);
  187. params["offset"] = skipBytes;
  188. if (Options_.PartIndexColumnName_) {
  189. params["part_index_column_name"] = *Options_.PartIndexColumnName_;
  190. }
  191. if (Options_.DataColumnName_) {
  192. params["data_column_name"] = *Options_.DataColumnName_;
  193. }
  194. params["part_size"] = Options_.PartSize_;
  195. header.MergeParameters(params);
  196. header.SetResponseCompression(ToString(context.Config->AcceptEncoding));
  197. auto requestId = CreateGuidAsString();
  198. NHttpClient::IHttpResponsePtr response;
  199. try {
  200. response = context.HttpClient->Request(GetFullUrl(hostName, context, header), requestId, header);
  201. } catch (const std::exception& ex) {
  202. LogRequestError(requestId, header, ex.what(), "");
  203. throw;
  204. }
  205. YT_LOG_DEBUG("RSP %v - blob table stream",
  206. requestId);
  207. return response;
  208. }
  209. ////////////////////////////////////////////////////////////////////////////////
  210. } // namespace NDetail
  211. } // namespace NYT