123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/io/retry_strategy.h>
- #include <aws/io/event_loop.h>
- #include <aws/io/logging.h>
- #include <aws/common/clock.h>
- #include <aws/common/device_random.h>
- #include <aws/common/mutex.h>
- #include <aws/common/task_scheduler.h>
- #include <inttypes.h>
- struct exponential_backoff_strategy {
- struct aws_retry_strategy base;
- struct aws_exponential_backoff_retry_options config;
- struct aws_shutdown_callback_options shutdown_options;
- };
- struct exponential_backoff_retry_token {
- struct aws_retry_token base;
- struct aws_atomic_var current_retry_count;
- struct aws_atomic_var last_backoff;
- size_t max_retries;
- uint64_t backoff_scale_factor_ns;
- enum aws_exponential_backoff_jitter_mode jitter_mode;
- /* Let's not make this worse by constantly moving across threads if we can help it */
- struct aws_event_loop *bound_loop;
- uint64_t (*generate_random)(void);
- aws_generate_random_fn *generate_random_impl;
- void *generate_random_user_data;
- struct aws_task retry_task;
- struct {
- struct aws_mutex mutex;
- aws_retry_strategy_on_retry_token_acquired_fn *acquired_fn;
- aws_retry_strategy_on_retry_ready_fn *retry_ready_fn;
- void *user_data;
- } thread_data;
- };
- static void s_exponential_retry_destroy(struct aws_retry_strategy *retry_strategy) {
- if (retry_strategy) {
- struct exponential_backoff_strategy *exponential_strategy = retry_strategy->impl;
- struct aws_event_loop_group *el_group = exponential_strategy->config.el_group;
- aws_simple_completion_callback *completion_callback =
- exponential_strategy->shutdown_options.shutdown_callback_fn;
- void *completion_user_data = exponential_strategy->shutdown_options.shutdown_callback_user_data;
- aws_mem_release(retry_strategy->allocator, exponential_strategy);
- if (completion_callback != NULL) {
- completion_callback(completion_user_data);
- }
- aws_ref_count_release(&el_group->ref_count);
- }
- }
- static void s_exponential_retry_task(struct aws_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- int error_code = AWS_ERROR_IO_OPERATION_CANCELLED;
- if (status == AWS_TASK_STATUS_RUN_READY) {
- error_code = AWS_OP_SUCCESS;
- }
- struct exponential_backoff_retry_token *backoff_retry_token = arg;
- aws_retry_strategy_on_retry_token_acquired_fn *acquired_fn = NULL;
- aws_retry_strategy_on_retry_ready_fn *retry_ready_fn = NULL;
- void *user_data = NULL;
- { /***** BEGIN CRITICAL SECTION *********/
- AWS_FATAL_ASSERT(
- !aws_mutex_lock(&backoff_retry_token->thread_data.mutex) && "Retry token mutex acquisition failed");
- acquired_fn = backoff_retry_token->thread_data.acquired_fn;
- retry_ready_fn = backoff_retry_token->thread_data.retry_ready_fn;
- user_data = backoff_retry_token->thread_data.user_data;
- backoff_retry_token->thread_data.user_data = NULL;
- backoff_retry_token->thread_data.retry_ready_fn = NULL;
- backoff_retry_token->thread_data.acquired_fn = NULL;
- AWS_FATAL_ASSERT(
- !aws_mutex_unlock(&backoff_retry_token->thread_data.mutex) && "Retry token mutex release failed");
- } /**** END CRITICAL SECTION ***********/
- aws_retry_token_acquire(&backoff_retry_token->base);
- if (acquired_fn) {
- AWS_LOGF_DEBUG(
- AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
- "id=%p: Vending retry_token %p",
- (void *)backoff_retry_token->base.retry_strategy,
- (void *)&backoff_retry_token->base);
- acquired_fn(backoff_retry_token->base.retry_strategy, error_code, &backoff_retry_token->base, user_data);
- } else if (retry_ready_fn) {
- AWS_LOGF_DEBUG(
- AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
- "id=%p: Invoking retry_ready for token %p",
- (void *)backoff_retry_token->base.retry_strategy,
- (void *)&backoff_retry_token->base);
- retry_ready_fn(&backoff_retry_token->base, error_code, user_data);
- /* it's acquired before being scheduled for retry */
- aws_retry_token_release(&backoff_retry_token->base);
- }
- aws_retry_token_release(&backoff_retry_token->base);
- }
- static int s_exponential_retry_acquire_token(
- struct aws_retry_strategy *retry_strategy,
- const struct aws_byte_cursor *partition_id,
- aws_retry_strategy_on_retry_token_acquired_fn *on_acquired,
- void *user_data,
- uint64_t timeout_ms) {
- (void)partition_id;
- /* no resource contention here so no timeouts. */
- (void)timeout_ms;
- struct exponential_backoff_retry_token *backoff_retry_token =
- aws_mem_calloc(retry_strategy->allocator, 1, sizeof(struct exponential_backoff_retry_token));
- if (!backoff_retry_token) {
- return AWS_OP_ERR;
- }
- AWS_LOGF_DEBUG(
- AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
- "id=%p: Initializing retry token %p",
- (void *)retry_strategy,
- (void *)&backoff_retry_token->base);
- backoff_retry_token->base.allocator = retry_strategy->allocator;
- backoff_retry_token->base.retry_strategy = retry_strategy;
- aws_atomic_init_int(&backoff_retry_token->base.ref_count, 1u);
- aws_retry_strategy_acquire(retry_strategy);
- backoff_retry_token->base.impl = backoff_retry_token;
- struct exponential_backoff_strategy *exponential_backoff_strategy = retry_strategy->impl;
- backoff_retry_token->bound_loop = aws_event_loop_group_get_next_loop(exponential_backoff_strategy->config.el_group);
- backoff_retry_token->max_retries = exponential_backoff_strategy->config.max_retries;
- backoff_retry_token->backoff_scale_factor_ns = aws_timestamp_convert(
- exponential_backoff_strategy->config.backoff_scale_factor_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
- backoff_retry_token->jitter_mode = exponential_backoff_strategy->config.jitter_mode;
- backoff_retry_token->generate_random = exponential_backoff_strategy->config.generate_random;
- backoff_retry_token->generate_random_impl = exponential_backoff_strategy->config.generate_random_impl;
- backoff_retry_token->generate_random_user_data = exponential_backoff_strategy->config.generate_random_user_data;
- aws_atomic_init_int(&backoff_retry_token->current_retry_count, 0);
- aws_atomic_init_int(&backoff_retry_token->last_backoff, 0);
- backoff_retry_token->thread_data.acquired_fn = on_acquired;
- backoff_retry_token->thread_data.user_data = user_data;
- AWS_FATAL_ASSERT(
- !aws_mutex_init(&backoff_retry_token->thread_data.mutex) && "Retry strategy mutex initialization failed");
- aws_task_init(
- &backoff_retry_token->retry_task,
- s_exponential_retry_task,
- backoff_retry_token,
- "aws_exponential_backoff_retry_task");
- aws_event_loop_schedule_task_now(backoff_retry_token->bound_loop, &backoff_retry_token->retry_task);
- return AWS_OP_SUCCESS;
- }
- static inline uint64_t s_random_in_range(uint64_t from, uint64_t to, struct exponential_backoff_retry_token *token) {
- uint64_t max = aws_max_u64(from, to);
- uint64_t min = aws_min_u64(from, to);
- uint64_t diff = max - min;
- if (!diff) {
- return 0;
- }
- uint64_t random;
- if (token->generate_random_impl) {
- random = token->generate_random_impl(token->generate_random_user_data);
- } else {
- random = token->generate_random();
- }
- return min + random % (diff);
- }
- typedef uint64_t(compute_backoff_fn)(struct exponential_backoff_retry_token *token);
- static uint64_t s_compute_no_jitter(struct exponential_backoff_retry_token *token) {
- uint64_t retry_count = aws_min_u64(aws_atomic_load_int(&token->current_retry_count), 63);
- return aws_mul_u64_saturating((uint64_t)1 << retry_count, token->backoff_scale_factor_ns);
- }
- static uint64_t s_compute_full_jitter(struct exponential_backoff_retry_token *token) {
- uint64_t non_jittered = s_compute_no_jitter(token);
- return s_random_in_range(0, non_jittered, token);
- }
- static uint64_t s_compute_deccorelated_jitter(struct exponential_backoff_retry_token *token) {
- size_t last_backoff_val = aws_atomic_load_int(&token->last_backoff);
- if (!last_backoff_val) {
- return s_compute_full_jitter(token);
- }
- return s_random_in_range(token->backoff_scale_factor_ns, aws_mul_u64_saturating(last_backoff_val, 3), token);
- }
- static compute_backoff_fn *s_backoff_compute_table[] = {
- [AWS_EXPONENTIAL_BACKOFF_JITTER_DEFAULT] = s_compute_full_jitter,
- [AWS_EXPONENTIAL_BACKOFF_JITTER_NONE] = s_compute_no_jitter,
- [AWS_EXPONENTIAL_BACKOFF_JITTER_FULL] = s_compute_full_jitter,
- [AWS_EXPONENTIAL_BACKOFF_JITTER_DECORRELATED] = s_compute_deccorelated_jitter,
- };
- static int s_exponential_retry_schedule_retry(
- struct aws_retry_token *token,
- enum aws_retry_error_type error_type,
- aws_retry_strategy_on_retry_ready_fn *retry_ready,
- void *user_data) {
- struct exponential_backoff_retry_token *backoff_retry_token = token->impl;
- AWS_LOGF_DEBUG(
- AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
- "id=%p: Attempting retry on token %p with error type %d",
- (void *)backoff_retry_token->base.retry_strategy,
- (void *)token,
- error_type);
- uint64_t schedule_at = 0;
- /* AWS_RETRY_ERROR_TYPE_CLIENT_ERROR does not count against your retry budget since you were responding to an
- * improperly crafted request. */
- if (error_type != AWS_RETRY_ERROR_TYPE_CLIENT_ERROR) {
- size_t retry_count = aws_atomic_load_int(&backoff_retry_token->current_retry_count);
- if (retry_count >= backoff_retry_token->max_retries) {
- AWS_LOGF_WARN(
- AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
- "id=%p: token %p has exhausted allowed retries. Retry count %zu max retries %zu",
- (void *)backoff_retry_token->base.retry_strategy,
- (void *)token,
- backoff_retry_token->max_retries,
- retry_count);
- return aws_raise_error(AWS_IO_MAX_RETRIES_EXCEEDED);
- }
- uint64_t backoff = s_backoff_compute_table[backoff_retry_token->jitter_mode](backoff_retry_token);
- uint64_t current_time = 0;
- aws_event_loop_current_clock_time(backoff_retry_token->bound_loop, ¤t_time);
- schedule_at = backoff + current_time;
- aws_atomic_init_int(&backoff_retry_token->last_backoff, (size_t)backoff);
- aws_atomic_fetch_add(&backoff_retry_token->current_retry_count, 1u);
- AWS_LOGF_DEBUG(
- AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
- "id=%p: Computed backoff value of %" PRIu64 "ns on token %p",
- (void *)backoff_retry_token->base.retry_strategy,
- backoff,
- (void *)token);
- }
- bool already_scheduled = false;
- { /***** BEGIN CRITICAL SECTION *********/
- AWS_FATAL_ASSERT(
- !aws_mutex_lock(&backoff_retry_token->thread_data.mutex) && "Retry token mutex acquisition failed");
- if (backoff_retry_token->thread_data.user_data) {
- already_scheduled = true;
- } else {
- backoff_retry_token->thread_data.retry_ready_fn = retry_ready;
- backoff_retry_token->thread_data.user_data = user_data;
- /* acquire to hold until the task runs. */
- aws_retry_token_acquire(token);
- aws_task_init(
- &backoff_retry_token->retry_task,
- s_exponential_retry_task,
- backoff_retry_token,
- "aws_exponential_backoff_retry_task");
- }
- AWS_FATAL_ASSERT(
- !aws_mutex_unlock(&backoff_retry_token->thread_data.mutex) && "Retry token mutex release failed");
- } /**** END CRITICAL SECTION ***********/
- if (already_scheduled) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
- "id=%p: retry token %p is already scheduled.",
- (void *)backoff_retry_token->base.retry_strategy,
- (void *)token);
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- aws_event_loop_schedule_task_future(backoff_retry_token->bound_loop, &backoff_retry_token->retry_task, schedule_at);
- return AWS_OP_SUCCESS;
- }
- static int s_exponential_backoff_record_success(struct aws_retry_token *token) {
- /* we don't do book keeping in this mode. */
- (void)token;
- return AWS_OP_SUCCESS;
- }
- static void s_exponential_backoff_release_token(struct aws_retry_token *token) {
- if (token) {
- aws_retry_strategy_release(token->retry_strategy);
- struct exponential_backoff_retry_token *backoff_retry_token = token->impl;
- aws_mutex_clean_up(&backoff_retry_token->thread_data.mutex);
- aws_mem_release(token->allocator, backoff_retry_token);
- }
- }
- static struct aws_retry_strategy_vtable s_exponential_retry_vtable = {
- .destroy = s_exponential_retry_destroy,
- .acquire_token = s_exponential_retry_acquire_token,
- .schedule_retry = s_exponential_retry_schedule_retry,
- .record_success = s_exponential_backoff_record_success,
- .release_token = s_exponential_backoff_release_token,
- };
- static uint64_t s_default_gen_rand(void *user_data) {
- (void)user_data;
- uint64_t res = 0;
- aws_device_random_u64(&res);
- return res;
- }
- struct aws_retry_strategy *aws_retry_strategy_new_exponential_backoff(
- struct aws_allocator *allocator,
- const struct aws_exponential_backoff_retry_options *config) {
- AWS_PRECONDITION(config);
- AWS_PRECONDITION(config->el_group);
- AWS_PRECONDITION(config->jitter_mode <= AWS_EXPONENTIAL_BACKOFF_JITTER_DECORRELATED);
- AWS_PRECONDITION(config->max_retries);
- if (config->max_retries > 63 || !config->el_group ||
- config->jitter_mode > AWS_EXPONENTIAL_BACKOFF_JITTER_DECORRELATED) {
- aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- return NULL;
- }
- struct exponential_backoff_strategy *exponential_backoff_strategy =
- aws_mem_calloc(allocator, 1, sizeof(struct exponential_backoff_strategy));
- if (!exponential_backoff_strategy) {
- return NULL;
- }
- AWS_LOGF_INFO(
- AWS_LS_IO_EXPONENTIAL_BACKOFF_RETRY_STRATEGY,
- "id=%p: Initializing exponential backoff retry strategy with scale factor: %" PRIu32
- " jitter mode: %d and max retries %zu",
- (void *)&exponential_backoff_strategy->base,
- config->backoff_scale_factor_ms,
- config->jitter_mode,
- config->max_retries);
- exponential_backoff_strategy->base.allocator = allocator;
- exponential_backoff_strategy->base.impl = exponential_backoff_strategy;
- exponential_backoff_strategy->base.vtable = &s_exponential_retry_vtable;
- aws_atomic_init_int(&exponential_backoff_strategy->base.ref_count, 1);
- exponential_backoff_strategy->config = *config;
- exponential_backoff_strategy->config.el_group =
- aws_ref_count_acquire(&exponential_backoff_strategy->config.el_group->ref_count);
- if (!exponential_backoff_strategy->config.generate_random &&
- !exponential_backoff_strategy->config.generate_random_impl) {
- exponential_backoff_strategy->config.generate_random_impl = s_default_gen_rand;
- }
- if (!exponential_backoff_strategy->config.max_retries) {
- exponential_backoff_strategy->config.max_retries = 5;
- }
- if (!exponential_backoff_strategy->config.backoff_scale_factor_ms) {
- exponential_backoff_strategy->config.backoff_scale_factor_ms = 25;
- }
- if (config->shutdown_options) {
- exponential_backoff_strategy->shutdown_options = *config->shutdown_options;
- }
- return &exponential_backoff_strategy->base;
- }
|