retry_lib.cpp 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. #include "retry_lib.h"
  2. #include <yt/cpp/mapreduce/interface/config.h>
  3. #include <yt/cpp/mapreduce/interface/errors.h>
  4. #include <yt/cpp/mapreduce/interface/error_codes.h>
  5. #include <yt/cpp/mapreduce/interface/retry_policy.h>
  6. #include <util/string/builder.h>
  7. #include <util/generic/set.h>
  8. namespace NYT {
  9. ////////////////////////////////////////////////////////////////////////////////
  10. TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy(ui32 attemptLimit, const TConfigPtr& config)
  11. : Config_(config)
  12. , AttemptLimit_(attemptLimit)
  13. { }
  14. void TAttemptLimitedRetryPolicy::NotifyNewAttempt()
  15. {
  16. ++Attempt_;
  17. }
  18. TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnGenericError(const std::exception& e)
  19. {
  20. if (IsAttemptLimitExceeded()) {
  21. return Nothing();
  22. }
  23. return GetBackoffDuration(e, Config_);
  24. }
  25. TMaybe<TDuration> TAttemptLimitedRetryPolicy::OnRetriableError(const TErrorResponse& e)
  26. {
  27. if (IsAttemptLimitExceeded()) {
  28. return Nothing();
  29. }
  30. return GetBackoffDuration(e, Config_);
  31. }
  32. void TAttemptLimitedRetryPolicy::OnIgnoredError(const TErrorResponse& /*e*/)
  33. {
  34. --Attempt_;
  35. }
  36. TString TAttemptLimitedRetryPolicy::GetAttemptDescription() const
  37. {
  38. return ::TStringBuilder() << "attempt " << Attempt_ << " of " << AttemptLimit_;
  39. }
  40. bool TAttemptLimitedRetryPolicy::IsAttemptLimitExceeded() const
  41. {
  42. return Attempt_ >= AttemptLimit_;
  43. }
  44. ////////////////////////////////////////////////////////////////////////////////
  45. class TTimeLimitedRetryPolicy
  46. : public IRequestRetryPolicy
  47. {
  48. public:
  49. TTimeLimitedRetryPolicy(IRequestRetryPolicyPtr retryPolicy, TDuration timeout)
  50. : RetryPolicy_(retryPolicy)
  51. , Deadline_(TInstant::Now() + timeout)
  52. , Timeout_(timeout)
  53. { }
  54. void NotifyNewAttempt() override
  55. {
  56. if (TInstant::Now() >= Deadline_) {
  57. ythrow TRequestRetriesTimeout() << "retry timeout exceeded (timeout: " << Timeout_ << ")";
  58. }
  59. RetryPolicy_->NotifyNewAttempt();
  60. }
  61. TMaybe<TDuration> OnGenericError(const std::exception& e) override
  62. {
  63. return RetryPolicy_->OnGenericError(e);
  64. }
  65. TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
  66. {
  67. return RetryPolicy_->OnRetriableError(e);
  68. }
  69. void OnIgnoredError(const TErrorResponse& e) override
  70. {
  71. return RetryPolicy_->OnIgnoredError(e);
  72. }
  73. TString GetAttemptDescription() const override
  74. {
  75. return RetryPolicy_->GetAttemptDescription();
  76. }
  77. private:
  78. const IRequestRetryPolicyPtr RetryPolicy_;
  79. const TInstant Deadline_;
  80. const TDuration Timeout_;
  81. };
  82. ////////////////////////////////////////////////////////////////////////////////
  83. class TDefaultClientRetryPolicy
  84. : public IClientRetryPolicy
  85. {
  86. public:
  87. explicit TDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config)
  88. : RetryConfigProvider_(std::move(retryConfigProvider))
  89. , Config_(config)
  90. { }
  91. IRequestRetryPolicyPtr CreatePolicyForGenericRequest() override
  92. {
  93. return Wrap(CreateDefaultRequestRetryPolicy(Config_));
  94. }
  95. IRequestRetryPolicyPtr CreatePolicyForStartOperationRequest() override
  96. {
  97. return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->StartOperationRetryCount), Config_));
  98. }
  99. IRequestRetryPolicyPtr CreatePolicyForReaderRequest() override
  100. {
  101. return Wrap(MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(Config_->ReadRetryCount), Config_));
  102. }
  103. IRequestRetryPolicyPtr Wrap(IRequestRetryPolicyPtr basePolicy)
  104. {
  105. auto config = RetryConfigProvider_->CreateRetryConfig();
  106. if (config.RetriesTimeLimit < TDuration::Max()) {
  107. return ::MakeIntrusive<TTimeLimitedRetryPolicy>(std::move(basePolicy), config.RetriesTimeLimit);
  108. }
  109. return basePolicy;
  110. }
  111. private:
  112. IRetryConfigProviderPtr RetryConfigProvider_;
  113. const TConfigPtr Config_;
  114. };
  115. class TDefaultRetryConfigProvider
  116. : public IRetryConfigProvider
  117. {
  118. public:
  119. TRetryConfig CreateRetryConfig() override
  120. {
  121. return {};
  122. }
  123. };
  124. ////////////////////////////////////////////////////////////////////////////////
  125. IRequestRetryPolicyPtr CreateDefaultRequestRetryPolicy(const TConfigPtr& config)
  126. {
  127. return MakeIntrusive<TAttemptLimitedRetryPolicy>(static_cast<ui32>(config->RetryCount), config);
  128. }
  129. IClientRetryPolicyPtr CreateDefaultClientRetryPolicy(IRetryConfigProviderPtr retryConfigProvider, const TConfigPtr& config)
  130. {
  131. return MakeIntrusive<TDefaultClientRetryPolicy>(std::move(retryConfigProvider), config);
  132. }
  133. IRetryConfigProviderPtr CreateDefaultRetryConfigProvider()
  134. {
  135. return MakeIntrusive<TDefaultRetryConfigProvider>();
  136. }
  137. ////////////////////////////////////////////////////////////////////////////////
  138. static bool IsChunkError(int code)
  139. {
  140. return code / 100 == 7;
  141. }
  142. // Check whether:
  143. // 1) codes contain at least one chunk error AND
  144. // 2) codes don't contain non-retriable chunk errors.
  145. static bool IsRetriableChunkError(const TSet<int>& codes)
  146. {
  147. using namespace NClusterErrorCodes;
  148. auto isChunkError = false;
  149. for (auto code : codes) {
  150. switch (code) {
  151. case NChunkClient::SessionAlreadyExists:
  152. case NChunkClient::ChunkAlreadyExists:
  153. case NChunkClient::WindowError:
  154. case NChunkClient::BlockContentMismatch:
  155. case NChunkClient::InvalidBlockChecksum:
  156. case NChunkClient::BlockOutOfRange:
  157. case NChunkClient::MissingExtension:
  158. case NChunkClient::NoSuchBlock:
  159. case NChunkClient::NoSuchChunk:
  160. case NChunkClient::NoSuchChunkList:
  161. case NChunkClient::NoSuchChunkTree:
  162. case NChunkClient::NoSuchChunkView:
  163. case NChunkClient::NoSuchMedium:
  164. return false;
  165. default:
  166. isChunkError |= IsChunkError(code);
  167. break;
  168. }
  169. }
  170. return isChunkError;
  171. }
  172. static TMaybe<TDuration> TryGetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config)
  173. {
  174. int httpCode = errorResponse.GetHttpCode();
  175. if (httpCode / 100 != 4 && !errorResponse.IsFromTrailers()) {
  176. return config->RetryInterval;
  177. }
  178. auto allCodes = errorResponse.GetError().GetAllErrorCodes();
  179. using namespace NClusterErrorCodes;
  180. if (httpCode == 429
  181. || allCodes.count(NSecurityClient::RequestQueueSizeLimitExceeded)
  182. || allCodes.count(NRpc::RequestQueueSizeLimitExceeded))
  183. {
  184. // request rate limit exceeded
  185. return config->RateLimitExceededRetryInterval;
  186. }
  187. if (errorResponse.IsConcurrentOperationsLimitReached()) {
  188. // limit for the number of concurrent operations exceeded
  189. return config->StartOperationRetryInterval;
  190. }
  191. if (IsRetriableChunkError(allCodes)) {
  192. // chunk client errors
  193. return config->ChunkErrorsRetryInterval;
  194. }
  195. for (auto code : {
  196. NRpc::TransportError,
  197. NRpc::Unavailable,
  198. NApi::RetriableArchiveError,
  199. NSequoiaClient::SequoiaRetriableError,
  200. Canceled,
  201. }) {
  202. if (allCodes.contains(code)) {
  203. return config->RetryInterval;
  204. }
  205. }
  206. return Nothing();
  207. }
  208. TDuration GetBackoffDuration(const TErrorResponse& errorResponse, const TConfigPtr& config)
  209. {
  210. return TryGetBackoffDuration(errorResponse, config).GetOrElse(config->RetryInterval);
  211. }
  212. bool IsRetriable(const TErrorResponse& errorResponse)
  213. {
  214. // Retriability of an error doesn't depend on config, so just use global one.
  215. return TryGetBackoffDuration(errorResponse, TConfig::Get()).Defined();
  216. }
  217. bool IsRetriable(const std::exception& ex)
  218. {
  219. if (dynamic_cast<const TRequestRetriesTimeout*>(&ex)) {
  220. return false;
  221. }
  222. return true;
  223. }
  224. TDuration GetBackoffDuration(const std::exception& /*error*/, const TConfigPtr& config)
  225. {
  226. return GetBackoffDuration(config);
  227. }
  228. TDuration GetBackoffDuration(const TConfigPtr& config)
  229. {
  230. return config->RetryInterval;
  231. }
  232. ////////////////////////////////////////////////////////////////////////////////
  233. } // namespace NYT