retry_request.cpp 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. #include "retry_request.h"
  2. #include "context.h"
  3. #include "helpers.h"
  4. #include "http_client.h"
  5. #include "requests.h"
  6. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  7. #include <yt/cpp/mapreduce/common/retry_lib.h>
  8. #include <yt/cpp/mapreduce/interface/config.h>
  9. #include <yt/cpp/mapreduce/interface/tvm.h>
  10. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  11. #include <library/cpp/yson/node/node_io.h>
  12. namespace NYT {
  13. namespace NDetail {
  14. ///////////////////////////////////////////////////////////////////////////////
  15. static TResponseInfo Request(
  16. const TClientContext& context,
  17. THttpHeader& header,
  18. TMaybe<TStringBuf> body,
  19. const TString& requestId,
  20. const TRequestConfig& config)
  21. {
  22. TString hostName;
  23. if (config.IsHeavy) {
  24. hostName = GetProxyForHeavyRequest(context);
  25. } else {
  26. hostName = context.ServerName;
  27. }
  28. UpdateHeaderForProxyIfNeed(hostName, context, header);
  29. auto url = GetFullUrlForProxy(hostName, context, header);
  30. auto response = context.HttpClient->Request(url, requestId, config.HttpConfig, header, body);
  31. TResponseInfo result;
  32. result.RequestId = requestId;
  33. result.Response = response->GetResponse();
  34. result.HttpCode = response->GetStatusCode();
  35. return result;
  36. }
  37. TResponseInfo RequestWithoutRetry(
  38. const TClientContext& context,
  39. THttpHeader& header,
  40. TMaybe<TStringBuf> body,
  41. const TRequestConfig& config)
  42. {
  43. if (context.ServiceTicketAuth) {
  44. header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
  45. } else {
  46. header.SetToken(context.Token);
  47. }
  48. if (context.ImpersonationUser) {
  49. header.SetImpersonationUser(*context.ImpersonationUser);
  50. }
  51. if (header.HasMutationId()) {
  52. header.RemoveParameter("retry");
  53. header.AddMutationId();
  54. }
  55. auto requestId = CreateGuidAsString();
  56. return Request(context, header, body, requestId, config);
  57. }
  58. TResponseInfo RetryRequestWithPolicy(
  59. IRequestRetryPolicyPtr retryPolicy,
  60. const TClientContext& context,
  61. THttpHeader& header,
  62. TMaybe<TStringBuf> body,
  63. const TRequestConfig& config)
  64. {
  65. if (context.ServiceTicketAuth) {
  66. header.SetServiceTicket(context.ServiceTicketAuth->Ptr->IssueServiceTicket());
  67. } else {
  68. header.SetToken(context.Token);
  69. }
  70. UpdateHeaderForProxyIfNeed(context.ServerName, context, header);
  71. if (context.ImpersonationUser) {
  72. header.SetImpersonationUser(*context.ImpersonationUser);
  73. }
  74. bool useMutationId = header.HasMutationId();
  75. bool retryWithSameMutationId = false;
  76. if (!retryPolicy) {
  77. retryPolicy = CreateDefaultRequestRetryPolicy(context.Config);
  78. }
  79. while (true) {
  80. auto requestId = CreateGuidAsString();
  81. try {
  82. retryPolicy->NotifyNewAttempt();
  83. if (useMutationId) {
  84. if (retryWithSameMutationId) {
  85. header.AddParameter("retry", true, /* overwrite = */ true);
  86. } else {
  87. header.RemoveParameter("retry");
  88. header.AddMutationId();
  89. }
  90. }
  91. return Request(context, header, body, requestId, config);
  92. } catch (const TErrorResponse& e) {
  93. LogRequestError(requestId, header, e.GetError().GetMessage(), retryPolicy->GetAttemptDescription());
  94. retryWithSameMutationId = e.IsTransportError();
  95. if (!IsRetriable(e)) {
  96. throw;
  97. }
  98. auto maybeRetryTimeout = retryPolicy->OnRetriableError(e);
  99. if (maybeRetryTimeout) {
  100. TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
  101. } else {
  102. throw;
  103. }
  104. } catch (const std::exception& e) {
  105. LogRequestError(requestId, header, e.what(), retryPolicy->GetAttemptDescription());
  106. retryWithSameMutationId = true;
  107. if (!IsRetriable(e)) {
  108. throw;
  109. }
  110. auto maybeRetryTimeout = retryPolicy->OnGenericError(e);
  111. if (maybeRetryTimeout) {
  112. TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
  113. } else {
  114. throw;
  115. }
  116. }
  117. }
  118. Y_ABORT("Retries must have either succeeded or thrown an exception");
  119. }
  120. ///////////////////////////////////////////////////////////////////////////////
  121. } // namespace NDetail
  122. } // namespace NYT