retry_lib.cpp 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. #include "retry_lib.h"
  2. #include <yt/cpp/mapreduce/interface/config.h>
  3. #include <yt/cpp/mapreduce/interface/errors.h>
  4. #include <yt/cpp/mapreduce/interface/error_codes.h>
  5. #include <yt/cpp/mapreduce/interface/retry_policy.h>
  6. #include <util/string/builder.h>
  7. #include <util/generic/set.h>
  8. namespace NYT {
  9. ////////////////////////////////////////////////////////////////////////////////
  10. TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy(ui32 attemptLimit, const TConfigPtr& config)
  11. : Config_(config)
  12. , AttemptLimit_(attemptLimit)
  13. { }
  14. void TAttemptLimitedRetryPolicy::NotifyNewAttempt()
  15. {
  16. ++Attempt_;
  17. }
  18. TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnGenericError(const std::exception& e)
  19. {
  20. if (IsAttemptLimitExceeded()) {
  21. return Nothing();
  22. }
  23. return GetBackoffDuration(e, Config_);
  24. }
  25. TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnRetriableError(const TErrorResponse& e)
  26. {
  27. if (IsAttemptLimitExceeded()) {
  28. return Nothing();
  29. }
  30. return GetBackoffDuration(e, Config_);
  31. }
  32. void TAttemptLimitedRetryPolicy::OnIgnoredError(const TErrorResponse& /*e*/)
  33. {
  34. --Attempt_;
  35. }
  36. TString TAttemptLimitedRetryPolicy::GetAttemptDescription() const
  37. {
  38. return ::TStringBuilder() << "attempt " << Attempt_ << " of " << AttemptLimit_;
  39. }
  40. bool TAttemptLimitedRetryPolicy::IsAttemptLimitExceeded() const
  41. {
  42. return Attempt_ >= AttemptLimit_;
  43. }
  44. ////////////////////////////////////////////////////////////////////////////////
  45. class TTimeLimitedRetryPolicy
  46. : public IRequestRetryPolicy
  47. {
  48. public:
  49. TTimeLimitedRetryPolicy(IRequestRetryPolicyPtr retryPolicy, TDuration timeout)
  50. : RetryPolicy_(retryPolicy)
  51. , Deadline_(TInstant::Now() + timeout)
  52. , Timeout_(timeout)
  53. { }
  54. void NotifyNewAttempt() override
  55. {
  56. if (TInstant::Now() >= Deadline_) {
  57. ythrow TRequestRetriesTimeout() << "retry timeout exceeded (timeout: " << Timeout_ << ")";
  58. }
  59. RetryPolicy_->NotifyNewAttempt();
  60. }
  61. TMaybe<TDuration> OnGenericError(const std::exception& e) override
  62. {
  63. return RetryPolicy_->OnGenericError(e);
  64. }
  65. TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
  66. {
  67. return RetryPolicy_->OnRetriableError(e);
  68. }
  69. void OnIgnoredError(const TErrorResponse& e) override
  70. {
  71. return RetryPolicy_->OnIgnoredError(e);
  72. }
  73. TString GetAttemptDescription() const override
  74. {
  75. return RetryPolicy_->GetAttemptDescription();
  76. }
  77. private:
  78. const IRequestRetryPolicyPtr RetryPolicy_;
  79. const TInstant Deadline_;
  80. const TDuration Timeout_;
  81. };
  82. ////////////////////////////////////////////////////////////////////////////////
  83. class TDefaultClientRetryPolicy
  84. : public IClientRetryPolicy
  85. {
  86. public:
  87. explicit TDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config)
  88. : RetryConfigProvider_(std::move(retryConfigProvider))
  89. , Config_(config)
  90. { }
  91. IRequestRetryPolicyPtr CreatePolicyForGenericRequest() override
  92. {
  93. return Wrap(CreateDefaultRequestRetryPolicy(Config_));
  94. }
  95. IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() override
  96. {
  97. return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->StartOperationRetryCount), Config_));
  98. }
  99. IRequestRetryPolicyPtr Wrap(IRequestRetryPolicyPtr basePolicy)
  100. {
  101. auto config = RetryConfigProvider_->CreateRetryConfig();
  102. if (config.RetriesTimeLimit < TDuration::Max()) {
  103. return ::MakeIntrusive<TTimeLimitedRetryPolicy>(std::move(basePolicy), config.RetriesTimeLimit);
  104. }
  105. return basePolicy;
  106. }
  107. private:
  108. IRetryConfigProviderPtr RetryConfigProvider_;
  109. const TConfigPtr Config_;
  110. };
  111. class TDefaultRetryConfigProvider
  112. : public IRetryConfigProvider
  113. {
  114. public:
  115. TRetryConfig CreateRetryConfig() override
  116. {
  117. return {};
  118. }
  119. };
  120. ////////////////////////////////////////////////////////////////////////////////
  121. IRequestRetryPolicyPtr CreateDefaultRequestRetryPolicy(const TConfigPtr& config)
  122. {
  123. return MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(config->RetryCount), config);
  124. }
  125. IClientRetryPolicyPtr CreateDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config)
  126. {
  127. return MakeIntrusive<TDefaultClientRetryPolicy>(std::move(retryConfigProvider), config);
  128. }
  129. IRetryConfigProviderPtr CreateDefaultRetryConfigProvider()
  130. {
  131. return MakeIntrusive<TDefaultRetryConfigProvider>();
  132. }
  133. ////////////////////////////////////////////////////////////////////////////////
  134. static bool IsChunkError(int code)
  135. {
  136. return code / 100 == 7;
  137. }
  138. // Check whether:
  139. // 1) codes contain at least one chunk error AND
  140. // 2) codes don't contain non-retriable chunk errors.
  141. static bool IsRetriableChunkError(const TSet<int>& codes)
  142. {
  143. using namespace NClusterErrorCodes;
  144. auto isChunkError = false;
  145. for (auto code : codes) {
  146. switch (code) {
  147. case NChunkClient::SessionAlreadyExists:
  148. case NChunkClient::ChunkAlreadyExists:
  149. case NChunkClient::WindowError:
  150. case NChunkClient::BlockContentMismatch:
  151. case NChunkClient::InvalidBlockChecksum:
  152. case NChunkClient::BlockOutOfRange:
  153. case NChunkClient::MissingExtension:
  154. case NChunkClient::NoSuchBlock:
  155. case NChunkClient::NoSuchChunk:
  156. case NChunkClient::NoSuchChunkList:
  157. case NChunkClient::NoSuchChunkTree:
  158. case NChunkClient::NoSuchChunkView:
  159. case NChunkClient::NoSuchMedium:
  160. return false;
  161. default:
  162. isChunkError |= IsChunkError(code);
  163. break;
  164. }
  165. }
  166. return isChunkError;
  167. }
  168. static TMaybe<TDuration> TryGetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config)
  169. {
  170. int httpCode = errorResponse.GetHttpCode();
  171. if (httpCode / 100 != 4 && !errorResponse.IsFromTrailers()) {
  172. return config->RetryInterval;
  173. }
  174. auto allCodes = errorResponse.GetError().GetAllErrorCodes();
  175. using namespace NClusterErrorCodes;
  176. if (httpCode == 429
  177. || allCodes.count(NSecurityClient::RequestQueueSizeLimitExceeded)
  178. || allCodes.count(NRpc::RequestQueueSizeLimitExceeded))
  179. {
  180. // request rate limit exceeded
  181. return config->RateLimitExceededRetryInterval;
  182. }
  183. if (errorResponse.IsConcurrentOperationsLimitReached()) {
  184. // limit for the number of concurrent operations exceeded
  185. return config->StartOperationRetryInterval;
  186. }
  187. if (IsRetriableChunkError(allCodes)) {
  188. // chunk client errors
  189. return config->ChunkErrorsRetryInterval;
  190. }
  191. for (auto code : {
  192. NRpc::TransportError,
  193. NRpc::Unavailable,
  194. NApi::RetriableArchiveError,
  195. NSequoiaClient::SequoiaRetriableError,
  196. Canceled,
  197. }) {
  198. if (allCodes.contains(code)) {
  199. return config->RetryInterval;
  200. }
  201. }
  202. return Nothing();
  203. }
  204. TDuration GetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config)
  205. {
  206. return TryGetBackoffDuration(errorResponse, config).GetOrElse(config->RetryInterval);
  207. }
  208. bool IsRetriable(const TErrorResponse& errorResponse)
  209. {
  210. // Retriability of an error doesn't depend on config, so just use global one.
  211. return TryGetBackoffDuration(errorResponse, TConfig::Get()).Defined();
  212. }
  213. bool IsRetriable(const std::exception& ex)
  214. {
  215. if (dynamic_cast<const TRequestRetriesTimeout*>(&ex)) {
  216. return false;
  217. }
  218. return true;
  219. }
  220. TDuration GetBackoffDuration(const std::exception& /*error*/, const TConfigPtr& config)
  221. {
  222. return GetBackoffDuration(config);
  223. }
  224. TDuration GetBackoffDuration(const TConfigPtr& config)
  225. {
  226. return config->RetryInterval;
  227. }
  228. ////////////////////////////////////////////////////////////////////////////////
  229. } // namespace NYT