retry_heavy_write_request.cpp 6.6 KB


  1. #include "retry_heavy_write_request.h"
  2. #include "transaction.h"
  3. #include "transaction_pinger.h"
  4. #include <yt/cpp/mapreduce/common/retry_lib.h>
  5. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  6. #include <yt/cpp/mapreduce/interface/config.h>
  7. #include <yt/cpp/mapreduce/interface/raw_client.h>
  8. #include <yt/cpp/mapreduce/interface/tvm.h>
  9. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  10. #include <yt/cpp/mapreduce/http/helpers.h>
  11. #include <yt/cpp/mapreduce/http/http_client.h>
  12. #include <yt/cpp/mapreduce/http/requests.h>
  13. #include <yt/cpp/mapreduce/http/retry_request.h>
  14. #include <util/stream/null.h>
  15. namespace NYT {
  16. using ::ToString;
  17. ////////////////////////////////////////////////////////////////////////////////
  18. void RetryHeavyWriteRequest(
  19. const IRawClientPtr& rawClient,
  20. const IClientRetryPolicyPtr& clientRetryPolicy,
  21. const ITransactionPingerPtr& transactionPinger,
  22. const TClientContext& context,
  23. const TTransactionId& parentId,
  24. THttpHeader& header,
  25. std::function<std::unique_ptr<IInputStream>()> streamMaker)
  26. {
  27. int retryCount = context.Config->RetryCount;
  28. if (context.ServiceTicketAuth) {
  29. header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
  30. } else {
  31. header.SetToken(context.Token);
  32. }
  33. if (context.ImpersonationUser) {
  34. header.SetImpersonationUser(*context.ImpersonationUser);
  35. }
  36. for (int attempt = 0; attempt < retryCount; ++attempt) {
  37. TPingableTransaction attemptTx(rawClient, clientRetryPolicy, context, parentId, transactionPinger->GetChildTxPinger(), TStartTransactionOptions());
  38. auto input = streamMaker();
  39. TString requestId;
  40. try {
  41. auto hostName = GetProxyForHeavyRequest(context);
  42. requestId = CreateGuidAsString();
  43. UpdateHeaderForProxyIfNeed(hostName, context, header);
  44. header.AddTransactionId(attemptTx.GetId(), /* overwrite = */ true);
  45. header.SetRequestCompression(ToString(context.Config->ContentEncoding));
  46. auto request = context.HttpClient->StartRequest(
  47. GetFullUrlForProxy(hostName, context, header),
  48. requestId,
  49. header);
  50. TransferData(input.get(), request->GetStream());
  51. request->Finish()->GetResponse();
  52. } catch (TErrorResponse& e) {
  53. YT_LOG_ERROR("RSP %v - attempt %v failed",
  54. requestId,
  55. attempt);
  56. if (!IsRetriable(e) || attempt + 1 == retryCount) {
  57. throw;
  58. }
  59. NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, context.Config));
  60. continue;
  61. } catch (std::exception& e) {
  62. YT_LOG_ERROR("RSP %v - %v - attempt %v failed",
  63. requestId,
  64. e.what(),
  65. attempt);
  66. if (attempt + 1 == retryCount) {
  67. throw;
  68. }
  69. NDetail::TWaitProxy::Get()->Sleep(GetBackoffDuration(e, context.Config));
  70. continue;
  71. }
  72. attemptTx.Commit();
  73. return;
  74. }
  75. }
  76. ////////////////////////////////////////////////////////////////////////////////
  77. THeavyRequestRetrier::THeavyRequestRetrier(TParameters parameters)
  78. : Parameters_(std::move(parameters))
  79. , RequestRetryPolicy_(Parameters_.ClientRetryPolicy->CreatePolicyForGenericRequest())
  80. , StreamFactory_([] {
  81. return std::make_unique<TNullInput>();
  82. })
  83. {
  84. Retry([] { });
  85. }
  86. THeavyRequestRetrier::~THeavyRequestRetrier() = default;
  87. void THeavyRequestRetrier::Update(THeavyRequestRetrier::TStreamFactory streamFactory)
  88. {
  89. StreamFactory_ = streamFactory;
  90. Retry([this] {
  91. auto stream = StreamFactory_();
  92. stream->Skip(Attempt_->Offset);
  93. auto transfered = stream->ReadAll(*Attempt_->Request->GetStream());
  94. Attempt_->Offset += transfered;
  95. });
  96. }
  97. void THeavyRequestRetrier::Finish()
  98. {
  99. Retry([this] {
  100. Attempt_->Request->Finish()->GetResponse();
  101. Attempt_->Transaction->Commit();
  102. Attempt_.reset();
  103. });
  104. }
  105. void THeavyRequestRetrier::Retry(const std::function<void()> &function)
  106. {
  107. while (true) {
  108. try {
  109. if (!Attempt_) {
  110. TryStartAttempt();
  111. }
  112. function();
  113. return;
  114. } catch (const std::exception& ex) {
  115. YT_LOG_ERROR("RSP %v - attempt %v failed",
  116. Attempt_->RequestId,
  117. RequestRetryPolicy_->GetAttemptDescription());
  118. Attempt_.reset();
  119. TMaybe<TDuration> backoffDuration;
  120. if (const auto *errorResponse = dynamic_cast<const TErrorResponse *>(&ex)) {
  121. if (!IsRetriable(*errorResponse)) {
  122. throw;
  123. }
  124. backoffDuration = RequestRetryPolicy_->OnRetriableError(*errorResponse);
  125. } else {
  126. if (!IsRetriable(ex)) {
  127. throw;
  128. }
  129. backoffDuration = RequestRetryPolicy_->OnGenericError(ex);
  130. }
  131. if (!backoffDuration) {
  132. throw;
  133. }
  134. NDetail::TWaitProxy::Get()->Sleep(*backoffDuration);
  135. }
  136. }
  137. }
  138. void THeavyRequestRetrier::TryStartAttempt()
  139. {
  140. Attempt_ = std::make_unique<TAttempt>();
  141. Attempt_->Transaction = std::make_unique<TPingableTransaction>(
  142. Parameters_.RawClientPtr,
  143. Parameters_.ClientRetryPolicy, Parameters_.Context,
  144. Parameters_.TransactionId,
  145. Parameters_.TransactionPinger->GetChildTxPinger(),
  146. TStartTransactionOptions());
  147. auto header = Parameters_.Header;
  148. if (Parameters_.Context.ServiceTicketAuth) {
  149. header.SetServiceTicket(Parameters_.Context.ServiceTicketAuth->Ptr->IssueServiceTicket());
  150. } else {
  151. header.SetToken(Parameters_.Context.Token);
  152. }
  153. if (Parameters_.Context.ImpersonationUser) {
  154. header.SetImpersonationUser(*Parameters_.Context.ImpersonationUser);
  155. }
  156. auto hostName = GetProxyForHeavyRequest(Parameters_.Context);
  157. Attempt_->RequestId = CreateGuidAsString();
  158. UpdateHeaderForProxyIfNeed(hostName, Parameters_.Context, header);
  159. header.AddTransactionId(Attempt_->Transaction->GetId(), /* overwrite = */ true);
  160. header.SetRequestCompression(ToString(Parameters_.Context.Config->ContentEncoding));
  161. Attempt_->Request = Parameters_.Context.HttpClient->StartRequest(
  162. GetFullUrlForProxy(hostName, Parameters_.Context, header),
  163. Attempt_->RequestId, header);
  164. auto stream = StreamFactory_();
  165. stream->ReadAll(*Attempt_->Request->GetStream());
  166. }
  167. ////////////////////////////////////////////////////////////////////////////////
  168. } // namespace NYT