AdaptiveRetryStrategy.cpp 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/core/client/AdaptiveRetryStrategy.h>
  6. #include <aws/core/client/AWSError.h>
  7. #include <aws/core/client/CoreErrors.h>
  8. #include <aws/core/utils/memory/stl/AWSSet.h>
  9. #include <cmath>
  10. #include <thread>
  11. using namespace Aws::Utils::Threading;
  12. namespace Aws
  13. {
  14. namespace Client
  15. {
  16. static const double MIN_FILL_RATE = 0.5;
  17. static const double MIN_CAPACITY = 1;
  18. static const double SMOOTH = 0.8;
  19. static const double BETA = 0.7;
  20. static const double SCALE_CONSTANT = 0.4;
  21. // A static list containing all service exception names classified as throttled.
  22. static const char* THROTTLING_EXCEPTIONS[] {
  23. "Throttling", "ThrottlingException", "ThrottledException", "RequestThrottledException",
  24. "TooManyRequestsException", "ProvisionedThroughputExceededException", "TransactionInProgressException",
  25. "RequestLimitExceeded", "BandwidthLimitExceeded", "LimitExceededException", "RequestThrottled",
  26. "SlowDown", "PriorRequestNotComplete", "EC2ThrottledException"};
  27. static const size_t THROTTLING_EXCEPTIONS_SZ = sizeof(THROTTLING_EXCEPTIONS) / sizeof(THROTTLING_EXCEPTIONS[0]);
  28. // C-tor for unit testing
  29. RetryTokenBucket::RetryTokenBucket(double fillRate, double maxCapacity, double currentCapacity,
  30. const Aws::Utils::DateTime& lastTimestamp, double measuredTxRate, double lastTxRateBucket,
  31. size_t requestCount, bool enabled, double lastMaxRate, const Aws::Utils::DateTime& lastThrottleTime)
  32. :
  33. m_fillRate(fillRate), m_maxCapacity(maxCapacity), m_currentCapacity(currentCapacity),
  34. m_lastTimestamp(lastTimestamp), m_measuredTxRate(measuredTxRate),
  35. m_lastTxRateBucket(lastTxRateBucket), m_requestCount(requestCount), m_enabled(enabled),
  36. m_lastMaxRate(lastMaxRate), m_lastThrottleTime(lastThrottleTime)
  37. {}
  38. bool RetryTokenBucket::Acquire(size_t amount, bool fastFail)
  39. {
  40. std::lock_guard<std::recursive_mutex> locker(m_mutex);
  41. if (!m_enabled)
  42. {
  43. return true;
  44. }
  45. Refill();
  46. bool notEnough = amount > m_currentCapacity;
  47. if (notEnough && fastFail) {
  48. return false;
  49. }
  50. // If all the tokens couldn't be acquired immediately, wait enough
  51. // time to fill the remainder.
  52. if (notEnough) {
  53. std::chrono::duration<double> waitTime((amount - m_currentCapacity) / m_fillRate);
  54. std::this_thread::sleep_for(waitTime);
  55. Refill();
  56. }
  57. m_currentCapacity -= amount;
  58. return true;
  59. }
  60. void RetryTokenBucket::Refill(const Aws::Utils::DateTime& now)
  61. {
  62. std::lock_guard<std::recursive_mutex> locker(m_mutex);
  63. if (0 == m_lastTimestamp.Millis()) {
  64. m_lastTimestamp = now;
  65. return;
  66. }
  67. double fillAmount = (std::abs(now.Millis() - m_lastTimestamp.Millis()))/1000.0 * m_fillRate;
  68. m_currentCapacity = (std::min)(m_maxCapacity, m_currentCapacity + fillAmount);
  69. m_lastTimestamp = now;
  70. }
  71. void RetryTokenBucket::UpdateRate(double newRps, const Aws::Utils::DateTime& now)
  72. {
  73. std::lock_guard<std::recursive_mutex> locker(m_mutex);
  74. Refill(now);
  75. m_fillRate = (std::max)(newRps, MIN_FILL_RATE);
  76. m_maxCapacity = (std::max)(newRps, MIN_CAPACITY);
  77. m_currentCapacity = (std::min)(m_currentCapacity, m_maxCapacity);
  78. }
  79. void RetryTokenBucket::UpdateMeasuredRate(const Aws::Utils::DateTime& now)
  80. {
  81. std::lock_guard<std::recursive_mutex> locker(m_mutex);
  82. double t = now.Millis() / 1000.0;
  83. double timeBucket = floor(t * 2.0) / 2.0;
  84. m_requestCount += 1;
  85. if (timeBucket > m_lastTxRateBucket) {
  86. double currentRate = m_requestCount / (timeBucket - m_lastTxRateBucket);
  87. m_measuredTxRate = (currentRate * SMOOTH) + (m_measuredTxRate * (1 - SMOOTH));
  88. m_requestCount = 0;
  89. m_lastTxRateBucket = timeBucket;
  90. }
  91. }
  92. void RetryTokenBucket::UpdateClientSendingRate(bool isThrottlingResponse, const Aws::Utils::DateTime& now)
  93. {
  94. std::lock_guard<std::recursive_mutex> locker(m_mutex);
  95. UpdateMeasuredRate(now);
  96. double calculatedRate = 0.0;
  97. if (isThrottlingResponse)
  98. {
  99. double rateToUse = m_measuredTxRate;
  100. if (m_enabled)
  101. rateToUse = (std::min)(rateToUse, m_fillRate);
  102. m_lastMaxRate = rateToUse;
  103. m_lastThrottleTime = now;
  104. calculatedRate = CUBICThrottle(rateToUse);
  105. Enable();
  106. }
  107. else
  108. {
  109. double timeWindow = CalculateTimeWindow();
  110. calculatedRate = CUBICSuccess(now, timeWindow);
  111. }
  112. double newRate = (std::min)(calculatedRate, 2.0 * m_measuredTxRate);
  113. UpdateRate(newRate, now);
  114. }
  115. void RetryTokenBucket::Enable()
  116. {
  117. std::lock_guard<std::recursive_mutex> locker(m_mutex);
  118. m_enabled = true;
  119. }
  120. double RetryTokenBucket::CalculateTimeWindow() const
  121. {
  122. return pow(((m_lastMaxRate * (1.0 - BETA)) / SCALE_CONSTANT), (1.0 / 3));
  123. }
  124. double RetryTokenBucket::CUBICSuccess(const Aws::Utils::DateTime& timestamp, const double timeWindow) const
  125. {
  126. double dt = (timestamp.Millis() - m_lastThrottleTime.Millis()) / 1000.0;
  127. double calculatedRate = SCALE_CONSTANT * pow(dt - timeWindow, 3.0) + m_lastMaxRate;
  128. return calculatedRate;
  129. }
  130. double RetryTokenBucket::CUBICThrottle(const double rateToUse) const
  131. {
  132. double calculatedRate = rateToUse * BETA;
  133. return calculatedRate;
  134. }
  135. AdaptiveRetryStrategy::AdaptiveRetryStrategy(long maxAttempts) :
  136. StandardRetryStrategy(maxAttempts)
  137. {}
  138. AdaptiveRetryStrategy::AdaptiveRetryStrategy(std::shared_ptr<RetryQuotaContainer> retryQuotaContainer, long maxAttempts) :
  139. StandardRetryStrategy(retryQuotaContainer, maxAttempts)
  140. {}
  141. bool AdaptiveRetryStrategy::HasSendToken()
  142. {
  143. return m_retryTokenBucket.Acquire(1, m_fastFail);
  144. }
  145. void AdaptiveRetryStrategy::RequestBookkeeping(const HttpResponseOutcome& httpResponseOutcome)
  146. {
  147. if (httpResponseOutcome.IsSuccess())
  148. {
  149. m_retryQuotaContainer->ReleaseRetryQuota(Aws::Client::NO_RETRY_INCREMENT);
  150. m_retryTokenBucket.UpdateClientSendingRate(false);
  151. }
  152. else
  153. {
  154. m_retryTokenBucket.UpdateClientSendingRate(IsThrottlingResponse(httpResponseOutcome));
  155. }
  156. }
  157. void AdaptiveRetryStrategy::RequestBookkeeping(const HttpResponseOutcome& httpResponseOutcome, const AWSError<CoreErrors>& lastError)
  158. {
  159. if (httpResponseOutcome.IsSuccess())
  160. {
  161. m_retryQuotaContainer->ReleaseRetryQuota(lastError);
  162. m_retryTokenBucket.UpdateClientSendingRate(false);
  163. }
  164. else
  165. {
  166. m_retryTokenBucket.UpdateClientSendingRate(IsThrottlingResponse(httpResponseOutcome));
  167. }
  168. }
  169. bool AdaptiveRetryStrategy::IsThrottlingResponse(const HttpResponseOutcome& httpResponseOutcome)
  170. {
  171. if(httpResponseOutcome.IsSuccess())
  172. return false;
  173. const AWSError<CoreErrors>& error = httpResponseOutcome.GetError();
  174. const Aws::Client::CoreErrors enumValue = error.GetErrorType();
  175. switch(enumValue)
  176. {
  177. case Aws::Client::CoreErrors::THROTTLING:
  178. case Aws::Client::CoreErrors::SLOW_DOWN:
  179. return true;
  180. default:
  181. break;
  182. }
  183. if(std::find(THROTTLING_EXCEPTIONS,
  184. THROTTLING_EXCEPTIONS + THROTTLING_EXCEPTIONS_SZ, error.GetExceptionName()) != THROTTLING_EXCEPTIONS + THROTTLING_EXCEPTIONS_SZ)
  185. {
  186. return true;
  187. }
  188. return false;
  189. }
  190. }
  191. }