exponential_backoff_retry_strategy.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/io/retry_strategy.h>
  6. #include <aws/io/event_loop.h>
  7. #include <aws/io/logging.h>
  8. #include <aws/common/clock.h>
  9. #include <aws/common/device_random.h>
  10. #include <aws/common/mutex.h>
  11. #include <aws/common/task_scheduler.h>
  12. #include <inttypes.h>
  13. struct exponential_backoff_strategy {
  14. struct aws_retry_strategy base;
  15. struct aws_exponential_backoff_retry_options config;
  16. struct aws_shutdown_callback_options shutdown_options;
  17. };
  18. struct exponential_backoff_retry_token {
  19. struct aws_retry_token base;
  20. struct aws_atomic_var current_retry_count;
  21. struct aws_atomic_var last_backoff;
  22. size_t max_retries;
  23. uint64_t backoff_scale_factor_ns;
  24. enum aws_exponential_backoff_jitter_mode jitter_mode;
  25. /* Let's not make this worse by constantly moving across threads if we can help it */
  26. struct aws_event_loop *bound_loop;
  27. uint64_t (*generate_random)(void);
  28. aws_generate_random_fn *generate_random_impl;
  29. void *generate_random_user_data;
  30. struct aws_task retry_task;
  31. struct {
  32. struct aws_mutex mutex;
  33. aws_retry_strategy_on_retry_token_acquired_fn *acquired_fn;
  34. aws_retry_strategy_on_retry_ready_fn *retry_ready_fn;
  35. void *user_data;
  36. } thread_data;
  37. };
  38. static void s_exponential_retry_destroy(struct aws_retry_strategy *retry_strategy) {
  39. if (retry_strategy) {
  40. struct exponential_backoff_strategy *exponential_strategy = retry_strategy->impl;
  41. struct aws_event_loop_group *el_group = exponential_strategy->config.el_group;
  42. aws_simple_completion_callback *completion_callback =
  43. exponential_strategy->shutdown_options.shutdown_callback_fn;
  44. void *completion_user_data = exponential_strategy->shutdown_options.shutdown_callback_user_data;
  45. aws_mem_release(retry_strategy->allocator, exponential_strategy);
  46. if (completion_callback != NULL) {
  47. completion_callback(completion_user_data);
  48. }
  49. aws_ref_count_release(&el_group->ref_count);
  50. }
  51. }
  52. static void s_exponential_retry_task(struct aws_task *task, void *arg, enum aws_task_status status) {
  53. (void)task;
  54. int error_code = AWS_ERROR_IO_OPERATION_CANCELLED;
  55. if (status == AWS_TASK_STATUS_RUN_READY) {
  56. error_code = AWS_OP_SUCCESS;
  57. }
  58. struct exponential_backoff_retry_token *backoff_retry_token = arg;
  59. aws_retry_strategy_on_retry_token_acquired_fn *acquired_fn = NULL;
  60. aws_retry_strategy_on_retry_ready_fn *retry_ready_fn = NULL;
  61. void *user_data = NULL;
  62. { /***** BEGIN CRITICAL SECTION *********/
  63. AWS_FATAL_ASSERT(
  64. !aws_mutex_lock(&backoff_retry_token->thread_data.mutex) && "Retry token mutex acquisition failed");
  65. acquired_fn = backoff_retry_token->thread_data.acquired_fn;
  66. retry_ready_fn = backoff_retry_token->thread_data.retry_ready_fn;
  67. user_data = backoff_retry_token->thread_data.user_data;
  68. backoff_retry_token->thread_data.user_data = NULL;
  69. backoff_retry_token->thread_data.retry_ready_fn = NULL;
  70. backoff_retry_token->thread_data.acquired_fn = NULL;
  71. AWS_FATAL_ASSERT(
  72. !aws_mutex_unlock(&backoff_retry_token->thread_data.mutex) && "Retry token mutex release failed");
  73. } /**** END CRITICAL SECTION ***********/
  74. aws_retry_token_acquire(&backoff_retry_token->base);
  75. if (acquired_fn) {
  76. AWS_LOGF_DEBUG(
  77. AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
  78. "id=%p: Vending retry_token %p",
  79. (void *)backoff_retry_token->base.retry_strategy,
  80. (void *)&backoff_retry_token->base);
  81. acquired_fn(backoff_retry_token->base.retry_strategy, error_code, &backoff_retry_token->base, user_data);
  82. } else if (retry_ready_fn) {
  83. AWS_LOGF_DEBUG(
  84. AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
  85. "id=%p: Invoking retry_ready for token %p",
  86. (void *)backoff_retry_token->base.retry_strategy,
  87. (void *)&backoff_retry_token->base);
  88. retry_ready_fn(&backoff_retry_token->base, error_code, user_data);
  89. /* it's acquired before being scheduled for retry */
  90. aws_retry_token_release(&backoff_retry_token->base);
  91. }
  92. aws_retry_token_release(&backoff_retry_token->base);
  93. }
  94. static int s_exponential_retry_acquire_token(
  95. struct aws_retry_strategy *retry_strategy,
  96. const struct aws_byte_cursor *partition_id,
  97. aws_retry_strategy_on_retry_token_acquired_fn *on_acquired,
  98. void *user_data,
  99. uint64_t timeout_ms) {
  100. (void)partition_id;
  101. /* no resource contention here so no timeouts. */
  102. (void)timeout_ms;
  103. struct exponential_backoff_retry_token *backoff_retry_token =
  104. aws_mem_calloc(retry_strategy->allocator, 1, sizeof(struct exponential_backoff_retry_token));
  105. if (!backoff_retry_token) {
  106. return AWS_OP_ERR;
  107. }
  108. AWS_LOGF_DEBUG(
  109. AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
  110. "id=%p: Initializing retry token %p",
  111. (void *)retry_strategy,
  112. (void *)&backoff_retry_token->base);
  113. backoff_retry_token->base.allocator = retry_strategy->allocator;
  114. backoff_retry_token->base.retry_strategy = retry_strategy;
  115. aws_atomic_init_int(&backoff_retry_token->base.ref_count, 1u);
  116. aws_retry_strategy_acquire(retry_strategy);
  117. backoff_retry_token->base.impl = backoff_retry_token;
  118. struct exponential_backoff_strategy *exponential_backoff_strategy = retry_strategy->impl;
  119. backoff_retry_token->bound_loop = aws_event_loop_group_get_next_loop(exponential_backoff_strategy->config.el_group);
  120. backoff_retry_token->max_retries = exponential_backoff_strategy->config.max_retries;
  121. backoff_retry_token->backoff_scale_factor_ns = aws_timestamp_convert(
  122. exponential_backoff_strategy->config.backoff_scale_factor_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
  123. backoff_retry_token->jitter_mode = exponential_backoff_strategy->config.jitter_mode;
  124. backoff_retry_token->generate_random = exponential_backoff_strategy->config.generate_random;
  125. backoff_retry_token->generate_random_impl = exponential_backoff_strategy->config.generate_random_impl;
  126. backoff_retry_token->generate_random_user_data = exponential_backoff_strategy->config.generate_random_user_data;
  127. aws_atomic_init_int(&backoff_retry_token->current_retry_count, 0);
  128. aws_atomic_init_int(&backoff_retry_token->last_backoff, 0);
  129. backoff_retry_token->thread_data.acquired_fn = on_acquired;
  130. backoff_retry_token->thread_data.user_data = user_data;
  131. AWS_FATAL_ASSERT(
  132. !aws_mutex_init(&backoff_retry_token->thread_data.mutex) && "Retry strategy mutex initialization failed");
  133. aws_task_init(
  134. &backoff_retry_token->retry_task,
  135. s_exponential_retry_task,
  136. backoff_retry_token,
  137. "aws_exponential_backoff_retry_task");
  138. aws_event_loop_schedule_task_now(backoff_retry_token->bound_loop, &backoff_retry_token->retry_task);
  139. return AWS_OP_SUCCESS;
  140. }
  141. static inline uint64_t s_random_in_range(uint64_t from, uint64_t to, struct exponential_backoff_retry_token *token) {
  142. uint64_t max = aws_max_u64(from, to);
  143. uint64_t min = aws_min_u64(from, to);
  144. uint64_t diff = max - min;
  145. if (!diff) {
  146. return 0;
  147. }
  148. uint64_t random;
  149. if (token->generate_random_impl) {
  150. random = token->generate_random_impl(token->generate_random_user_data);
  151. } else {
  152. random = token->generate_random();
  153. }
  154. return min + random % (diff);
  155. }
  156. typedef uint64_t(compute_backoff_fn)(struct exponential_backoff_retry_token *token);
  157. static uint64_t s_compute_no_jitter(struct exponential_backoff_retry_token *token) {
  158. uint64_t retry_count = aws_min_u64(aws_atomic_load_int(&token->current_retry_count), 63);
  159. return aws_mul_u64_saturating((uint64_t)1 << retry_count, token->backoff_scale_factor_ns);
  160. }
  161. static uint64_t s_compute_full_jitter(struct exponential_backoff_retry_token *token) {
  162. uint64_t non_jittered = s_compute_no_jitter(token);
  163. return s_random_in_range(0, non_jittered, token);
  164. }
  165. static uint64_t s_compute_deccorelated_jitter(struct exponential_backoff_retry_token *token) {
  166. size_t last_backoff_val = aws_atomic_load_int(&token->last_backoff);
  167. if (!last_backoff_val) {
  168. return s_compute_full_jitter(token);
  169. }
  170. return s_random_in_range(token->backoff_scale_factor_ns, aws_mul_u64_saturating(last_backoff_val, 3), token);
  171. }
  172. static compute_backoff_fn *s_backoff_compute_table[] = {
  173. [AWS_EXPONENTIAL_BACKOFF_JITTER_DEFAULT] = s_compute_full_jitter,
  174. [AWS_EXPONENTIAL_BACKOFF_JITTER_NONE] = s_compute_no_jitter,
  175. [AWS_EXPONENTIAL_BACKOFF_JITTER_FULL] = s_compute_full_jitter,
  176. [AWS_EXPONENTIAL_BACKOFF_JITTER_DECORRELATED] = s_compute_deccorelated_jitter,
  177. };
  178. static int s_exponential_retry_schedule_retry(
  179. struct aws_retry_token *token,
  180. enum aws_retry_error_type error_type,
  181. aws_retry_strategy_on_retry_ready_fn *retry_ready,
  182. void *user_data) {
  183. struct exponential_backoff_retry_token *backoff_retry_token = token->impl;
  184. AWS_LOGF_DEBUG(
  185. AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
  186. "id=%p: Attempting retry on token %p with error type %d",
  187. (void *)backoff_retry_token->base.retry_strategy,
  188. (void *)token,
  189. error_type);
  190. uint64_t schedule_at = 0;
  191. /* AWS_RETRY_ERROR_TYPE_CLIENT_ERROR does not count against your retry budget since you were responding to an
  192. * improperly crafted request. */
  193. if (error_type != AWS_RETRY_ERROR_TYPE_CLIENT_ERROR) {
  194. size_t retry_count = aws_atomic_load_int(&backoff_retry_token->current_retry_count);
  195. if (retry_count >= backoff_retry_token->max_retries) {
  196. AWS_LOGF_WARN(
  197. AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
  198. "id=%p: token %p has exhausted allowed retries. Retry count %zu max retries %zu",
  199. (void *)backoff_retry_token->base.retry_strategy,
  200. (void *)token,
  201. backoff_retry_token->max_retries,
  202. retry_count);
  203. return aws_raise_error(AWS_IO_MAX_RETRIES_EXCEEDED);
  204. }
  205. uint64_t backoff = s_backoff_compute_table[backoff_retry_token->jitter_mode](backoff_retry_token);
  206. uint64_t current_time = 0;
  207. aws_event_loop_current_clock_time(backoff_retry_token->bound_loop, &current_time);
  208. schedule_at = backoff + current_time;
  209. aws_atomic_init_int(&backoff_retry_token->last_backoff, (size_t)backoff);
  210. aws_atomic_fetch_add(&backoff_retry_token->current_retry_count, 1u);
  211. AWS_LOGF_DEBUG(
  212. AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
  213. "id=%p: Computed backoff value of %" PRIu64 "ns on token %p",
  214. (void *)backoff_retry_token->base.retry_strategy,
  215. backoff,
  216. (void *)token);
  217. }
  218. bool already_scheduled = false;
  219. { /***** BEGIN CRITICAL SECTION *********/
  220. AWS_FATAL_ASSERT(
  221. !aws_mutex_lock(&backoff_retry_token->thread_data.mutex) && "Retry token mutex acquisition failed");
  222. if (backoff_retry_token->thread_data.user_data) {
  223. already_scheduled = true;
  224. } else {
  225. backoff_retry_token->thread_data.retry_ready_fn = retry_ready;
  226. backoff_retry_token->thread_data.user_data = user_data;
  227. /* acquire to hold until the task runs. */
  228. aws_retry_token_acquire(token);
  229. aws_task_init(
  230. &backoff_retry_token->retry_task,
  231. s_exponential_retry_task,
  232. backoff_retry_token,
  233. "aws_exponential_backoff_retry_task");
  234. }
  235. AWS_FATAL_ASSERT(
  236. !aws_mutex_unlock(&backoff_retry_token->thread_data.mutex) && "Retry token mutex release failed");
  237. } /**** END CRITICAL SECTION ***********/
  238. if (already_scheduled) {
  239. AWS_LOGF_ERROR(
  240. AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
  241. "id=%p: retry token %p is already scheduled.",
  242. (void *)backoff_retry_token->base.retry_strategy,
  243. (void *)token);
  244. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  245. }
  246. aws_event_loop_schedule_task_future(backoff_retry_token->bound_loop, &backoff_retry_token->retry_task, schedule_at);
  247. return AWS_OP_SUCCESS;
  248. }
  249. static int s_exponential_backoff_record_success(struct aws_retry_token *token) {
  250. /* we don't do book keeping in this mode. */
  251. (void)token;
  252. return AWS_OP_SUCCESS;
  253. }
  254. static void s_exponential_backoff_release_token(struct aws_retry_token *token) {
  255. if (token) {
  256. aws_retry_strategy_release(token->retry_strategy);
  257. struct exponential_backoff_retry_token *backoff_retry_token = token->impl;
  258. aws_mutex_clean_up(&backoff_retry_token->thread_data.mutex);
  259. aws_mem_release(token->allocator, backoff_retry_token);
  260. }
  261. }
  262. static struct aws_retry_strategy_vtable s_exponential_retry_vtable = {
  263. .destroy = s_exponential_retry_destroy,
  264. .acquire_token = s_exponential_retry_acquire_token,
  265. .schedule_retry = s_exponential_retry_schedule_retry,
  266. .record_success = s_exponential_backoff_record_success,
  267. .release_token = s_exponential_backoff_release_token,
  268. };
  269. static uint64_t s_default_gen_rand(void *user_data) {
  270. (void)user_data;
  271. uint64_t res = 0;
  272. aws_device_random_u64(&res);
  273. return res;
  274. }
  275. struct aws_retry_strategy *aws_retry_strategy_new_exponential_backoff(
  276. struct aws_allocator *allocator,
  277. const struct aws_exponential_backoff_retry_options *config) {
  278. AWS_PRECONDITION(config);
  279. AWS_PRECONDITION(config->el_group);
  280. AWS_PRECONDITION(config->jitter_mode <= AWS_EXPONENTIAL_BACKOFF_JITTER_DECORRELATED);
  281. AWS_PRECONDITION(config->max_retries);
  282. if (config->max_retries > 63 || !config->el_group ||
  283. config->jitter_mode > AWS_EXPONENTIAL_BACKOFF_JITTER_DECORRELATED) {
  284. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  285. return NULL;
  286. }
  287. struct exponential_backoff_strategy *exponential_backoff_strategy =
  288. aws_mem_calloc(allocator, 1, sizeof(struct exponential_backoff_strategy));
  289. if (!exponential_backoff_strategy) {
  290. return NULL;
  291. }
  292. AWS_LOGF_INFO(
  293. AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
  294. "id=%p: Initializing exponential backoff retry strategy with scale factor: %" PRIu32
  295. " jitter mode: %d and max retries %zu",
  296. (void *)&exponential_backoff_strategy->base,
  297. config->backoff_scale_factor_ms,
  298. config->jitter_mode,
  299. config->max_retries);
  300. exponential_backoff_strategy->base.allocator = allocator;
  301. exponential_backoff_strategy->base.impl = exponential_backoff_strategy;
  302. exponential_backoff_strategy->base.vtable = &s_exponential_retry_vtable;
  303. aws_atomic_init_int(&exponential_backoff_strategy->base.ref_count, 1);
  304. exponential_backoff_strategy->config = *config;
  305. exponential_backoff_strategy->config.el_group =
  306. aws_ref_count_acquire(&exponential_backoff_strategy->config.el_group->ref_count);
  307. if (!exponential_backoff_strategy->config.generate_random &&
  308. !exponential_backoff_strategy->config.generate_random_impl) {
  309. exponential_backoff_strategy->config.generate_random_impl = s_default_gen_rand;
  310. }
  311. if (!exponential_backoff_strategy->config.max_retries) {
  312. exponential_backoff_strategy->config.max_retries = 5;
  313. }
  314. if (!exponential_backoff_strategy->config.backoff_scale_factor_ms) {
  315. exponential_backoff_strategy->config.backoff_scale_factor_ms = 25;
  316. }
  317. if (config->shutdown_options) {
  318. exponential_backoff_strategy->shutdown_options = *config->shutdown_options;
  319. }
  320. return &exponential_backoff_strategy->base;
  321. }