12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/io/channel.h>
- #include <aws/common/atomics.h>
- #include <aws/common/clock.h>
- #include <aws/common/mutex.h>
- #include <aws/io/event_loop.h>
- #include <aws/io/logging.h>
- #include <aws/io/message_pool.h>
- #include <aws/io/statistics.h>
- #ifdef _MSC_VER
- # pragma warning(disable : 4204) /* non-constant aggregate initializer */
- #endif
- static size_t s_message_pool_key = 0; /* Address of variable serves as key in hash table */
- enum {
- KB_16 = 16 * 1024,
- };
- size_t g_aws_channel_max_fragment_size = KB_16;
- #define INITIAL_STATISTIC_LIST_SIZE 5
- enum aws_channel_state {
- AWS_CHANNEL_SETTING_UP,
- AWS_CHANNEL_ACTIVE,
- AWS_CHANNEL_SHUTTING_DOWN,
- AWS_CHANNEL_SHUT_DOWN,
- };
- struct aws_shutdown_notification_task {
- struct aws_task task;
- int error_code;
- struct aws_channel_slot *slot;
- bool shutdown_immediately;
- };
- struct shutdown_task {
- struct aws_channel_task task;
- struct aws_channel *channel;
- int error_code;
- bool shutdown_immediately;
- };
- struct aws_channel {
- struct aws_allocator *alloc;
- struct aws_event_loop *loop;
- struct aws_channel_slot *first;
- struct aws_message_pool *msg_pool;
- enum aws_channel_state channel_state;
- struct aws_shutdown_notification_task shutdown_notify_task;
- aws_channel_on_shutdown_completed_fn *on_shutdown_completed;
- void *shutdown_user_data;
- struct aws_atomic_var refcount;
- struct aws_task deletion_task;
- struct aws_task statistics_task;
- struct aws_crt_statistics_handler *statistics_handler;
- uint64_t statistics_interval_start_time_ms;
- struct aws_array_list statistic_list;
- struct {
- struct aws_linked_list list;
- } channel_thread_tasks;
- struct {
- struct aws_mutex lock;
- struct aws_linked_list list;
- struct aws_task scheduling_task;
- struct shutdown_task shutdown_task;
- bool is_channel_shut_down;
- } cross_thread_tasks;
- size_t window_update_batch_emit_threshold;
- struct aws_channel_task window_update_task;
- bool read_back_pressure_enabled;
- bool window_update_scheduled;
- };
- struct channel_setup_args {
- struct aws_allocator *alloc;
- struct aws_channel *channel;
- aws_channel_on_setup_completed_fn *on_setup_completed;
- void *user_data;
- struct aws_task task;
- };
- static void s_on_msg_pool_removed(struct aws_event_loop_local_object *object) {
- struct aws_message_pool *msg_pool = object->object;
- AWS_LOGF_TRACE(
- AWS_LS_IO_CHANNEL,
- "static: message pool %p has been purged "
- "from the event-loop: likely because of shutdown",
- (void *)msg_pool);
- struct aws_allocator *alloc = msg_pool->alloc;
- aws_message_pool_clean_up(msg_pool);
- aws_mem_release(alloc, msg_pool);
- aws_mem_release(alloc, object);
- }
- static void s_on_channel_setup_complete(struct aws_task *task, void *arg, enum aws_task_status task_status) {
- (void)task;
- struct channel_setup_args *setup_args = arg;
- struct aws_message_pool *message_pool = NULL;
- struct aws_event_loop_local_object *local_object = NULL;
- AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: setup complete, notifying caller.", (void *)setup_args->channel);
- if (task_status == AWS_TASK_STATUS_RUN_READY) {
- struct aws_event_loop_local_object stack_obj;
- AWS_ZERO_STRUCT(stack_obj);
- local_object = &stack_obj;
- if (aws_event_loop_fetch_local_object(setup_args->channel->loop, &s_message_pool_key, local_object)) {
- local_object = aws_mem_calloc(setup_args->alloc, 1, sizeof(struct aws_event_loop_local_object));
- if (!local_object) {
- goto cleanup_setup_args;
- }
- message_pool = aws_mem_acquire(setup_args->alloc, sizeof(struct aws_message_pool));
- if (!message_pool) {
- goto cleanup_local_obj;
- }
- AWS_LOGF_DEBUG(
- AWS_LS_IO_CHANNEL,
- "id=%p: no message pool is currently stored in the event-loop "
- "local storage, adding %p with max message size %zu, "
- "message count 4, with 4 small blocks of 128 bytes.",
- (void *)setup_args->channel,
- (void *)message_pool,
- g_aws_channel_max_fragment_size);
- struct aws_message_pool_creation_args creation_args = {
- .application_data_msg_data_size = g_aws_channel_max_fragment_size,
- .application_data_msg_count = 4,
- .small_block_msg_count = 4,
- .small_block_msg_data_size = 128,
- };
- if (aws_message_pool_init(message_pool, setup_args->alloc, &creation_args)) {
- goto cleanup_msg_pool_mem;
- }
- local_object->key = &s_message_pool_key;
- local_object->object = message_pool;
- local_object->on_object_removed = s_on_msg_pool_removed;
- if (aws_event_loop_put_local_object(setup_args->channel->loop, local_object)) {
- goto cleanup_msg_pool;
- }
- } else {
- message_pool = local_object->object;
- AWS_LOGF_DEBUG(
- AWS_LS_IO_CHANNEL,
- "id=%p: message pool %p found in event-loop local storage: using it.",
- (void *)setup_args->channel,
- (void *)message_pool);
- }
- setup_args->channel->msg_pool = message_pool;
- setup_args->channel->channel_state = AWS_CHANNEL_ACTIVE;
- setup_args->on_setup_completed(setup_args->channel, AWS_OP_SUCCESS, setup_args->user_data);
- aws_channel_release_hold(setup_args->channel);
- aws_mem_release(setup_args->alloc, setup_args);
- return;
- }
- goto cleanup_setup_args;
- cleanup_msg_pool:
- aws_message_pool_clean_up(message_pool);
- cleanup_msg_pool_mem:
- aws_mem_release(setup_args->alloc, message_pool);
- cleanup_local_obj:
- aws_mem_release(setup_args->alloc, local_object);
- cleanup_setup_args:
- setup_args->on_setup_completed(setup_args->channel, AWS_OP_ERR, setup_args->user_data);
- aws_channel_release_hold(setup_args->channel);
- aws_mem_release(setup_args->alloc, setup_args);
- }
- static void s_schedule_cross_thread_tasks(struct aws_task *task, void *arg, enum aws_task_status status);
- static void s_destroy_partially_constructed_channel(struct aws_channel *channel) {
- if (channel == NULL) {
- return;
- }
- aws_array_list_clean_up(&channel->statistic_list);
- aws_mem_release(channel->alloc, channel);
- }
- struct aws_channel *aws_channel_new(struct aws_allocator *alloc, const struct aws_channel_options *creation_args) {
- AWS_PRECONDITION(creation_args);
- AWS_PRECONDITION(creation_args->event_loop);
- AWS_PRECONDITION(creation_args->on_setup_completed);
- struct aws_channel *channel = aws_mem_calloc(alloc, 1, sizeof(struct aws_channel));
- if (!channel) {
- return NULL;
- }
- AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: Beginning creation and setup of new channel.", (void *)channel);
- channel->alloc = alloc;
- channel->loop = creation_args->event_loop;
- channel->on_shutdown_completed = creation_args->on_shutdown_completed;
- channel->shutdown_user_data = creation_args->shutdown_user_data;
- if (aws_array_list_init_dynamic(
- &channel->statistic_list, alloc, INITIAL_STATISTIC_LIST_SIZE, sizeof(struct aws_crt_statistics_base *))) {
- goto on_error;
- }
- /* Start refcount at 2:
- * 1 for self-reference, released from aws_channel_destroy()
- * 1 for the setup task, released when task executes */
- aws_atomic_init_int(&channel->refcount, 2);
- struct channel_setup_args *setup_args = aws_mem_calloc(alloc, 1, sizeof(struct channel_setup_args));
- if (!setup_args) {
- goto on_error;
- }
- channel->channel_state = AWS_CHANNEL_SETTING_UP;
- aws_linked_list_init(&channel->channel_thread_tasks.list);
- aws_linked_list_init(&channel->cross_thread_tasks.list);
- channel->cross_thread_tasks.lock = (struct aws_mutex)AWS_MUTEX_INIT;
- if (creation_args->enable_read_back_pressure) {
- channel->read_back_pressure_enabled = true;
- /* we probably only need room for one fragment, but let's avoid potential deadlocks
- * on things like tls that need extra head-room. */
- channel->window_update_batch_emit_threshold = g_aws_channel_max_fragment_size * 2;
- }
- aws_task_init(
- &channel->cross_thread_tasks.scheduling_task,
- s_schedule_cross_thread_tasks,
- channel,
- "schedule_cross_thread_tasks");
- setup_args->alloc = alloc;
- setup_args->channel = channel;
- setup_args->on_setup_completed = creation_args->on_setup_completed;
- setup_args->user_data = creation_args->setup_user_data;
- aws_task_init(&setup_args->task, s_on_channel_setup_complete, setup_args, "on_channel_setup_complete");
- aws_event_loop_schedule_task_now(creation_args->event_loop, &setup_args->task);
- return channel;
- on_error:
- s_destroy_partially_constructed_channel(channel);
- return NULL;
- }
- static void s_cleanup_slot(struct aws_channel_slot *slot) {
- if (slot) {
- if (slot->handler) {
- aws_channel_handler_destroy(slot->handler);
- }
- aws_mem_release(slot->alloc, slot);
- }
- }
- void aws_channel_destroy(struct aws_channel *channel) {
- AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: destroying channel.", (void *)channel);
- aws_channel_release_hold(channel);
- }
- static void s_final_channel_deletion_task(struct aws_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- (void)status;
- struct aws_channel *channel = arg;
- struct aws_channel_slot *current = channel->first;
- if (!current || !current->handler) {
- /* Allow channels with no valid slots to skip shutdown process */
- channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
- }
- AWS_ASSERT(channel->channel_state == AWS_CHANNEL_SHUT_DOWN);
- while (current) {
- struct aws_channel_slot *tmp = current->adj_right;
- s_cleanup_slot(current);
- current = tmp;
- }
- aws_array_list_clean_up(&channel->statistic_list);
- aws_channel_set_statistics_handler(channel, NULL);
- aws_mem_release(channel->alloc, channel);
- }
- void aws_channel_acquire_hold(struct aws_channel *channel) {
- size_t prev_refcount = aws_atomic_fetch_add(&channel->refcount, 1);
- AWS_ASSERT(prev_refcount != 0);
- (void)prev_refcount;
- }
- void aws_channel_release_hold(struct aws_channel *channel) {
- size_t prev_refcount = aws_atomic_fetch_sub(&channel->refcount, 1);
- AWS_ASSERT(prev_refcount != 0);
- if (prev_refcount == 1) {
- /* Refcount is now 0, finish cleaning up channel memory. */
- if (aws_channel_thread_is_callers_thread(channel)) {
- s_final_channel_deletion_task(NULL, channel, AWS_TASK_STATUS_RUN_READY);
- } else {
- aws_task_init(&channel->deletion_task, s_final_channel_deletion_task, channel, "final_channel_deletion");
- aws_event_loop_schedule_task_now(channel->loop, &channel->deletion_task);
- }
- }
- }
- struct channel_shutdown_task_args {
- struct aws_channel *channel;
- struct aws_allocator *alloc;
- int error_code;
- struct aws_task task;
- };
- static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately);
- static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status);
- static void s_shutdown_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- (void)status;
- struct shutdown_task *shutdown_task = arg;
- struct aws_channel *channel = shutdown_task->channel;
- int error_code = shutdown_task->error_code;
- bool shutdown_immediately = shutdown_task->shutdown_immediately;
- if (channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
- AWS_LOGF_DEBUG(AWS_LS_IO_CHANNEL, "id=%p: beginning shutdown process", (void *)channel);
- struct aws_channel_slot *slot = channel->first;
- channel->channel_state = AWS_CHANNEL_SHUTTING_DOWN;
- if (slot) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_CHANNEL,
- "id=%p: shutting down slot %p (the first one) in the read direction",
- (void *)channel,
- (void *)slot);
- aws_channel_slot_shutdown(slot, AWS_CHANNEL_DIR_READ, error_code, shutdown_immediately);
- return;
- }
- channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
- AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: shutdown completed", (void *)channel);
- aws_mutex_lock(&channel->cross_thread_tasks.lock);
- channel->cross_thread_tasks.is_channel_shut_down = true;
- aws_mutex_unlock(&channel->cross_thread_tasks.lock);
- if (channel->on_shutdown_completed) {
- channel->shutdown_notify_task.task.fn = s_on_shutdown_completion_task;
- channel->shutdown_notify_task.task.arg = channel;
- channel->shutdown_notify_task.error_code = error_code;
- aws_event_loop_schedule_task_now(channel->loop, &channel->shutdown_notify_task.task);
- }
- }
- }
- static int s_channel_shutdown(struct aws_channel *channel, int error_code, bool shutdown_immediately) {
- bool need_to_schedule = true;
- aws_mutex_lock(&channel->cross_thread_tasks.lock);
- if (channel->cross_thread_tasks.shutdown_task.task.task_fn) {
- need_to_schedule = false;
- AWS_LOGF_DEBUG(
- AWS_LS_IO_CHANNEL, "id=%p: Channel shutdown is already pending, not scheduling another.", (void *)channel);
- } else {
- aws_channel_task_init(
- &channel->cross_thread_tasks.shutdown_task.task,
- s_shutdown_task,
- &channel->cross_thread_tasks.shutdown_task,
- "channel_shutdown");
- channel->cross_thread_tasks.shutdown_task.shutdown_immediately = shutdown_immediately;
- channel->cross_thread_tasks.shutdown_task.channel = channel;
- channel->cross_thread_tasks.shutdown_task.error_code = error_code;
- }
- aws_mutex_unlock(&channel->cross_thread_tasks.lock);
- if (need_to_schedule) {
- AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: channel shutdown task is scheduled", (void *)channel);
- aws_channel_schedule_task_now(channel, &channel->cross_thread_tasks.shutdown_task.task);
- }
- return AWS_OP_SUCCESS;
- }
- int aws_channel_shutdown(struct aws_channel *channel, int error_code) {
- return s_channel_shutdown(channel, error_code, false);
- }
- struct aws_io_message *aws_channel_acquire_message_from_pool(
- struct aws_channel *channel,
- enum aws_io_message_type message_type,
- size_t size_hint) {
- struct aws_io_message *message = aws_message_pool_acquire(channel->msg_pool, message_type, size_hint);
- if (AWS_LIKELY(message)) {
- message->owning_channel = channel;
- AWS_LOGF_TRACE(
- AWS_LS_IO_CHANNEL,
- "id=%p: acquired message %p of capacity %zu from pool %p. Requested size was %zu",
- (void *)channel,
- (void *)message,
- message->message_data.capacity,
- (void *)channel->msg_pool,
- size_hint);
- }
- return message;
- }
- struct aws_channel_slot *aws_channel_slot_new(struct aws_channel *channel) {
- struct aws_channel_slot *new_slot = aws_mem_calloc(channel->alloc, 1, sizeof(struct aws_channel_slot));
- if (!new_slot) {
- return NULL;
- }
- AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: creating new slot %p.", (void *)channel, (void *)new_slot);
- new_slot->alloc = channel->alloc;
- new_slot->channel = channel;
- if (!channel->first) {
- channel->first = new_slot;
- }
- return new_slot;
- }
- int aws_channel_current_clock_time(struct aws_channel *channel, uint64_t *time_nanos) {
- return aws_event_loop_current_clock_time(channel->loop, time_nanos);
- }
- int aws_channel_fetch_local_object(
- struct aws_channel *channel,
- const void *key,
- struct aws_event_loop_local_object *obj) {
- return aws_event_loop_fetch_local_object(channel->loop, (void *)key, obj);
- }
- int aws_channel_put_local_object(
- struct aws_channel *channel,
- const void *key,
- const struct aws_event_loop_local_object *obj) {
- (void)key;
- return aws_event_loop_put_local_object(channel->loop, (struct aws_event_loop_local_object *)obj);
- }
- int aws_channel_remove_local_object(
- struct aws_channel *channel,
- const void *key,
- struct aws_event_loop_local_object *removed_obj) {
- return aws_event_loop_remove_local_object(channel->loop, (void *)key, removed_obj);
- }
- static void s_channel_task_run(struct aws_task *task, void *arg, enum aws_task_status status) {
- struct aws_channel_task *channel_task = AWS_CONTAINER_OF(task, struct aws_channel_task, wrapper_task);
- struct aws_channel *channel = arg;
- /* Any task that runs after shutdown completes is considered canceled */
- if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
- status = AWS_TASK_STATUS_CANCELED;
- }
- aws_linked_list_remove(&channel_task->node);
- channel_task->task_fn(channel_task, channel_task->arg, status);
- }
- static void s_schedule_cross_thread_tasks(struct aws_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- struct aws_channel *channel = arg;
- struct aws_linked_list cross_thread_task_list;
- aws_linked_list_init(&cross_thread_task_list);
- /* Grab contents of cross-thread task list while we have the lock */
- aws_mutex_lock(&channel->cross_thread_tasks.lock);
- aws_linked_list_swap_contents(&channel->cross_thread_tasks.list, &cross_thread_task_list);
- aws_mutex_unlock(&channel->cross_thread_tasks.lock);
- /* If the channel has shut down since the cross-thread tasks were scheduled, run tasks immediately as canceled */
- if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
- status = AWS_TASK_STATUS_CANCELED;
- }
- while (!aws_linked_list_empty(&cross_thread_task_list)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&cross_thread_task_list);
- struct aws_channel_task *channel_task = AWS_CONTAINER_OF(node, struct aws_channel_task, node);
- if ((channel_task->wrapper_task.timestamp == 0) || (status == AWS_TASK_STATUS_CANCELED)) {
- /* Run "now" tasks, and canceled tasks, immediately */
- channel_task->task_fn(channel_task, channel_task->arg, status);
- } else {
- /* "Future" tasks are scheduled with the event-loop. */
- aws_linked_list_push_back(&channel->channel_thread_tasks.list, &channel_task->node);
- aws_event_loop_schedule_task_future(
- channel->loop, &channel_task->wrapper_task, channel_task->wrapper_task.timestamp);
- }
- }
- }
- void aws_channel_task_init(
- struct aws_channel_task *channel_task,
- aws_channel_task_fn *task_fn,
- void *arg,
- const char *type_tag) {
- AWS_ZERO_STRUCT(*channel_task);
- channel_task->task_fn = task_fn;
- channel_task->arg = arg;
- channel_task->type_tag = type_tag;
- }
- static void s_register_pending_task_in_event_loop(
- struct aws_channel *channel,
- struct aws_channel_task *channel_task,
- uint64_t run_at_nanos) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_CHANNEL,
- "id=%p: scheduling task with wrapper task id %p.",
- (void *)channel,
- (void *)&channel_task->wrapper_task);
- /* If channel is shut down, run task immediately as canceled */
- if (channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
- AWS_LOGF_DEBUG(
- AWS_LS_IO_CHANNEL,
- "id=%p: Running %s channel task immediately as canceled due to shut down channel",
- (void *)channel,
- channel_task->type_tag);
- channel_task->task_fn(channel_task, channel_task->arg, AWS_TASK_STATUS_CANCELED);
- return;
- }
- aws_linked_list_push_back(&channel->channel_thread_tasks.list, &channel_task->node);
- if (run_at_nanos == 0) {
- aws_event_loop_schedule_task_now(channel->loop, &channel_task->wrapper_task);
- } else {
- aws_event_loop_schedule_task_future(
- channel->loop, &channel_task->wrapper_task, channel_task->wrapper_task.timestamp);
- }
- }
- static void s_register_pending_task_cross_thread(struct aws_channel *channel, struct aws_channel_task *channel_task) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_CHANNEL,
- "id=%p: scheduling task with wrapper task id %p from "
- "outside the event-loop thread.",
- (void *)channel,
- (void *)&channel_task->wrapper_task);
- /* Outside event-loop thread... */
- bool should_cancel_task = false;
- /* Begin Critical Section */
- aws_mutex_lock(&channel->cross_thread_tasks.lock);
- if (channel->cross_thread_tasks.is_channel_shut_down) {
- should_cancel_task = true; /* run task outside critical section to avoid deadlock */
- } else {
- bool list_was_empty = aws_linked_list_empty(&channel->cross_thread_tasks.list);
- aws_linked_list_push_back(&channel->cross_thread_tasks.list, &channel_task->node);
- if (list_was_empty) {
- aws_event_loop_schedule_task_now(channel->loop, &channel->cross_thread_tasks.scheduling_task);
- }
- }
- aws_mutex_unlock(&channel->cross_thread_tasks.lock);
- /* End Critical Section */
- if (should_cancel_task) {
- channel_task->task_fn(channel_task, channel_task->arg, AWS_TASK_STATUS_CANCELED);
- }
- }
- static void s_reset_pending_channel_task(
- struct aws_channel *channel,
- struct aws_channel_task *channel_task,
- uint64_t run_at_nanos) {
- /* Reset every property on channel task other than user's fn & arg.*/
- aws_task_init(&channel_task->wrapper_task, s_channel_task_run, channel, channel_task->type_tag);
- channel_task->wrapper_task.timestamp = run_at_nanos;
- aws_linked_list_node_reset(&channel_task->node);
- }
- /* Common functionality for scheduling "now" and "future" tasks.
- * For "now" tasks, pass 0 for `run_at_nanos` */
- static void s_register_pending_task(
- struct aws_channel *channel,
- struct aws_channel_task *channel_task,
- uint64_t run_at_nanos) {
- s_reset_pending_channel_task(channel, channel_task, run_at_nanos);
- if (aws_channel_thread_is_callers_thread(channel)) {
- s_register_pending_task_in_event_loop(channel, channel_task, run_at_nanos);
- } else {
- s_register_pending_task_cross_thread(channel, channel_task);
- }
- }
- void aws_channel_schedule_task_now(struct aws_channel *channel, struct aws_channel_task *task) {
- s_register_pending_task(channel, task, 0);
- }
- void aws_channel_schedule_task_now_serialized(struct aws_channel *channel, struct aws_channel_task *task) {
- s_reset_pending_channel_task(channel, task, 0);
- s_register_pending_task_cross_thread(channel, task);
- }
- void aws_channel_schedule_task_future(
- struct aws_channel *channel,
- struct aws_channel_task *task,
- uint64_t run_at_nanos) {
- s_register_pending_task(channel, task, run_at_nanos);
- }
- bool aws_channel_thread_is_callers_thread(struct aws_channel *channel) {
- return aws_event_loop_thread_is_callers_thread(channel->loop);
- }
- static void s_update_channel_slot_message_overheads(struct aws_channel *channel) {
- size_t overhead = 0;
- struct aws_channel_slot *slot_iter = channel->first;
- while (slot_iter) {
- slot_iter->upstream_message_overhead = overhead;
- if (slot_iter->handler) {
- overhead += slot_iter->handler->vtable->message_overhead(slot_iter->handler);
- }
- slot_iter = slot_iter->adj_right;
- }
- }
- int aws_channel_slot_set_handler(struct aws_channel_slot *slot, struct aws_channel_handler *handler) {
- slot->handler = handler;
- slot->handler->slot = slot;
- s_update_channel_slot_message_overheads(slot->channel);
- return aws_channel_slot_increment_read_window(slot, slot->handler->vtable->initial_window_size(handler));
- }
- int aws_channel_slot_remove(struct aws_channel_slot *slot) {
- if (slot->adj_right) {
- slot->adj_right->adj_left = slot->adj_left;
- if (slot == slot->channel->first) {
- slot->channel->first = slot->adj_right;
- }
- }
- if (slot->adj_left) {
- slot->adj_left->adj_right = slot->adj_right;
- }
- if (slot == slot->channel->first) {
- slot->channel->first = NULL;
- }
- s_update_channel_slot_message_overheads(slot->channel);
- s_cleanup_slot(slot);
- return AWS_OP_SUCCESS;
- }
- int aws_channel_slot_replace(struct aws_channel_slot *remove, struct aws_channel_slot *new_slot) {
- new_slot->adj_left = remove->adj_left;
- if (remove->adj_left) {
- remove->adj_left->adj_right = new_slot;
- }
- new_slot->adj_right = remove->adj_right;
- if (remove->adj_right) {
- remove->adj_right->adj_left = new_slot;
- }
- if (remove == remove->channel->first) {
- remove->channel->first = new_slot;
- }
- s_update_channel_slot_message_overheads(remove->channel);
- s_cleanup_slot(remove);
- return AWS_OP_SUCCESS;
- }
- int aws_channel_slot_insert_right(struct aws_channel_slot *slot, struct aws_channel_slot *to_add) {
- to_add->adj_right = slot->adj_right;
- if (slot->adj_right) {
- slot->adj_right->adj_left = to_add;
- }
- slot->adj_right = to_add;
- to_add->adj_left = slot;
- return AWS_OP_SUCCESS;
- }
- int aws_channel_slot_insert_end(struct aws_channel *channel, struct aws_channel_slot *to_add) {
- /* It's actually impossible there's not a first if the user went through the aws_channel_slot_new() function.
- * But also check that a user didn't call insert_end if it's the first slot in the channel since first would already
- * have been set. */
- if (AWS_LIKELY(channel->first && channel->first != to_add)) {
- struct aws_channel_slot *cur = channel->first;
- while (cur->adj_right) {
- cur = cur->adj_right;
- }
- return aws_channel_slot_insert_right(cur, to_add);
- }
- AWS_ASSERT(0);
- return AWS_OP_ERR;
- }
- int aws_channel_slot_insert_left(struct aws_channel_slot *slot, struct aws_channel_slot *to_add) {
- to_add->adj_left = slot->adj_left;
- if (slot->adj_left) {
- slot->adj_left->adj_right = to_add;
- }
- slot->adj_left = to_add;
- to_add->adj_right = slot;
- if (slot == slot->channel->first) {
- slot->channel->first = to_add;
- }
- return AWS_OP_SUCCESS;
- }
- int aws_channel_slot_send_message(
- struct aws_channel_slot *slot,
- struct aws_io_message *message,
- enum aws_channel_direction dir) {
- if (dir == AWS_CHANNEL_DIR_READ) {
- AWS_ASSERT(slot->adj_right);
- AWS_ASSERT(slot->adj_right->handler);
- if (!slot->channel->read_back_pressure_enabled || slot->adj_right->window_size >= message->message_data.len) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_CHANNEL,
- "id=%p: sending read message of size %zu, "
- "from slot %p to slot %p with handler %p.",
- (void *)slot->channel,
- message->message_data.len,
- (void *)slot,
- (void *)slot->adj_right,
- (void *)slot->adj_right->handler);
- slot->adj_right->window_size -= message->message_data.len;
- return aws_channel_handler_process_read_message(slot->adj_right->handler, slot->adj_right, message);
- }
- AWS_LOGF_ERROR(
- AWS_LS_IO_CHANNEL,
- "id=%p: sending message of size %zu, "
- "from slot %p to slot %p with handler %p, but this would exceed the channel's "
- "read window, this is always a programming error.",
- (void *)slot->channel,
- message->message_data.len,
- (void *)slot,
- (void *)slot->adj_right,
- (void *)slot->adj_right->handler);
- return aws_raise_error(AWS_IO_CHANNEL_READ_WOULD_EXCEED_WINDOW);
- }
- AWS_ASSERT(slot->adj_left);
- AWS_ASSERT(slot->adj_left->handler);
- AWS_LOGF_TRACE(
- AWS_LS_IO_CHANNEL,
- "id=%p: sending write message of size %zu, "
- "from slot %p to slot %p with handler %p.",
- (void *)slot->channel,
- message->message_data.len,
- (void *)slot,
- (void *)slot->adj_left,
- (void *)slot->adj_left->handler);
- return aws_channel_handler_process_write_message(slot->adj_left->handler, slot->adj_left, message);
- }
- struct aws_io_message *aws_channel_slot_acquire_max_message_for_write(struct aws_channel_slot *slot) {
- AWS_PRECONDITION(slot);
- AWS_PRECONDITION(slot->channel);
- AWS_PRECONDITION(aws_channel_thread_is_callers_thread(slot->channel));
- const size_t overhead = aws_channel_slot_upstream_message_overhead(slot);
- if (overhead >= g_aws_channel_max_fragment_size) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_CHANNEL, "id=%p: Upstream overhead exceeds channel's max message size.", (void *)slot->channel);
- aws_raise_error(AWS_ERROR_INVALID_STATE);
- return NULL;
- }
- const size_t size_hint = g_aws_channel_max_fragment_size - overhead;
- return aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, size_hint);
- }
- static void s_window_update_task(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
- (void)channel_task;
- struct aws_channel *channel = arg;
- channel->window_update_scheduled = false;
- if (status == AWS_TASK_STATUS_RUN_READY && channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
- /* get the right-most slot to start the updates. */
- struct aws_channel_slot *slot = channel->first;
- while (slot->adj_right) {
- slot = slot->adj_right;
- }
- while (slot->adj_left) {
- struct aws_channel_slot *upstream_slot = slot->adj_left;
- if (upstream_slot->handler) {
- slot->window_size = aws_add_size_saturating(slot->window_size, slot->current_window_update_batch_size);
- size_t update_size = slot->current_window_update_batch_size;
- slot->current_window_update_batch_size = 0;
- if (aws_channel_handler_increment_read_window(upstream_slot->handler, upstream_slot, update_size)) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_CHANNEL,
- "channel %p: channel update task failed with status %d",
- (void *)slot->channel,
- aws_last_error());
- aws_channel_shutdown(channel, aws_last_error());
- return;
- }
- }
- slot = slot->adj_left;
- }
- }
- }
- int aws_channel_slot_increment_read_window(struct aws_channel_slot *slot, size_t window) {
- if (slot->channel->read_back_pressure_enabled && slot->channel->channel_state < AWS_CHANNEL_SHUTTING_DOWN) {
- slot->current_window_update_batch_size =
- aws_add_size_saturating(slot->current_window_update_batch_size, window);
- if (!slot->channel->window_update_scheduled &&
- slot->window_size <= slot->channel->window_update_batch_emit_threshold) {
- slot->channel->window_update_scheduled = true;
- aws_channel_task_init(
- &slot->channel->window_update_task, s_window_update_task, slot->channel, "window update task");
- aws_channel_schedule_task_now(slot->channel, &slot->channel->window_update_task);
- }
- }
- return AWS_OP_SUCCESS;
- }
- int aws_channel_slot_shutdown(
- struct aws_channel_slot *slot,
- enum aws_channel_direction dir,
- int err_code,
- bool free_scarce_resources_immediately) {
- AWS_ASSERT(slot->handler);
- AWS_LOGF_TRACE(
- AWS_LS_IO_CHANNEL,
- "id=%p: shutting down slot %p, with handler %p "
- "in %s direction with error code %d",
- (void *)slot->channel,
- (void *)slot,
- (void *)slot->handler,
- (dir == AWS_CHANNEL_DIR_READ) ? "read" : "write",
- err_code);
- return aws_channel_handler_shutdown(slot->handler, slot, dir, err_code, free_scarce_resources_immediately);
- }
- static void s_on_shutdown_completion_task(struct aws_task *task, void *arg, enum aws_task_status status) {
- (void)status;
- struct aws_shutdown_notification_task *shutdown_notify = (struct aws_shutdown_notification_task *)task;
- struct aws_channel *channel = arg;
- AWS_ASSERT(channel->channel_state == AWS_CHANNEL_SHUT_DOWN);
- /* Cancel tasks that have been scheduled with the event loop */
- while (!aws_linked_list_empty(&channel->channel_thread_tasks.list)) {
- struct aws_linked_list_node *node = aws_linked_list_front(&channel->channel_thread_tasks.list);
- struct aws_channel_task *channel_task = AWS_CONTAINER_OF(node, struct aws_channel_task, node);
- AWS_LOGF_DEBUG(
- AWS_LS_IO_CHANNEL,
- "id=%p: during shutdown, canceling task %p",
- (void *)channel,
- (void *)&channel_task->wrapper_task);
- /* The task will remove itself from the list when it's canceled */
- aws_event_loop_cancel_task(channel->loop, &channel_task->wrapper_task);
- }
- /* Cancel off-thread tasks, which haven't made it to the event-loop thread yet */
- aws_mutex_lock(&channel->cross_thread_tasks.lock);
- bool cancel_cross_thread_tasks = !aws_linked_list_empty(&channel->cross_thread_tasks.list);
- aws_mutex_unlock(&channel->cross_thread_tasks.lock);
- if (cancel_cross_thread_tasks) {
- aws_event_loop_cancel_task(channel->loop, &channel->cross_thread_tasks.scheduling_task);
- }
- AWS_ASSERT(aws_linked_list_empty(&channel->channel_thread_tasks.list));
- AWS_ASSERT(aws_linked_list_empty(&channel->cross_thread_tasks.list));
- channel->on_shutdown_completed(channel, shutdown_notify->error_code, channel->shutdown_user_data);
- }
- static void s_run_shutdown_write_direction(struct aws_task *task, void *arg, enum aws_task_status status) {
- (void)arg;
- (void)status;
- struct aws_shutdown_notification_task *shutdown_notify = (struct aws_shutdown_notification_task *)task;
- task->fn = NULL;
- task->arg = NULL;
- struct aws_channel_slot *slot = shutdown_notify->slot;
- aws_channel_handler_shutdown(
- slot->handler, slot, AWS_CHANNEL_DIR_WRITE, shutdown_notify->error_code, shutdown_notify->shutdown_immediately);
- }
- int aws_channel_slot_on_handler_shutdown_complete(
- struct aws_channel_slot *slot,
- enum aws_channel_direction dir,
- int err_code,
- bool free_scarce_resources_immediately) {
- AWS_LOGF_DEBUG(
- AWS_LS_IO_CHANNEL,
- "id=%p: handler %p shutdown in %s dir completed.",
- (void *)slot->channel,
- (void *)slot->handler,
- (dir == AWS_CHANNEL_DIR_READ) ? "read" : "write");
- if (slot->channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
- return AWS_OP_SUCCESS;
- }
- if (dir == AWS_CHANNEL_DIR_READ) {
- if (slot->adj_right && slot->adj_right->handler) {
- return aws_channel_handler_shutdown(
- slot->adj_right->handler, slot->adj_right, dir, err_code, free_scarce_resources_immediately);
- }
- /* break the shutdown sequence so we don't have handlers having to deal with their memory disappearing out from
- * under them during a shutdown process. */
- slot->channel->shutdown_notify_task.slot = slot;
- slot->channel->shutdown_notify_task.shutdown_immediately = free_scarce_resources_immediately;
- slot->channel->shutdown_notify_task.error_code = err_code;
- slot->channel->shutdown_notify_task.task.fn = s_run_shutdown_write_direction;
- slot->channel->shutdown_notify_task.task.arg = NULL;
- aws_event_loop_schedule_task_now(slot->channel->loop, &slot->channel->shutdown_notify_task.task);
- return AWS_OP_SUCCESS;
- }
- if (slot->adj_left && slot->adj_left->handler) {
- return aws_channel_handler_shutdown(
- slot->adj_left->handler, slot->adj_left, dir, err_code, free_scarce_resources_immediately);
- }
- if (slot->channel->first == slot) {
- slot->channel->channel_state = AWS_CHANNEL_SHUT_DOWN;
- aws_mutex_lock(&slot->channel->cross_thread_tasks.lock);
- slot->channel->cross_thread_tasks.is_channel_shut_down = true;
- aws_mutex_unlock(&slot->channel->cross_thread_tasks.lock);
- if (slot->channel->on_shutdown_completed) {
- slot->channel->shutdown_notify_task.task.fn = s_on_shutdown_completion_task;
- slot->channel->shutdown_notify_task.task.arg = slot->channel;
- slot->channel->shutdown_notify_task.error_code = err_code;
- aws_event_loop_schedule_task_now(slot->channel->loop, &slot->channel->shutdown_notify_task.task);
- }
- }
- return AWS_OP_SUCCESS;
- }
- size_t aws_channel_slot_downstream_read_window(struct aws_channel_slot *slot) {
- AWS_ASSERT(slot->adj_right);
- return slot->channel->read_back_pressure_enabled ? slot->adj_right->window_size : SIZE_MAX;
- }
- size_t aws_channel_slot_upstream_message_overhead(struct aws_channel_slot *slot) {
- return slot->upstream_message_overhead;
- }
- void aws_channel_handler_destroy(struct aws_channel_handler *handler) {
- AWS_ASSERT(handler->vtable && handler->vtable->destroy);
- handler->vtable->destroy(handler);
- }
- int aws_channel_handler_process_read_message(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- struct aws_io_message *message) {
- AWS_ASSERT(handler->vtable && handler->vtable->process_read_message);
- return handler->vtable->process_read_message(handler, slot, message);
- }
- int aws_channel_handler_process_write_message(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- struct aws_io_message *message) {
- AWS_ASSERT(handler->vtable && handler->vtable->process_write_message);
- return handler->vtable->process_write_message(handler, slot, message);
- }
- int aws_channel_handler_increment_read_window(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- size_t size) {
- AWS_ASSERT(handler->vtable && handler->vtable->increment_read_window);
- return handler->vtable->increment_read_window(handler, slot, size);
- }
- int aws_channel_handler_shutdown(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- enum aws_channel_direction dir,
- int error_code,
- bool free_scarce_resources_immediately) {
- AWS_ASSERT(handler->vtable && handler->vtable->shutdown);
- return handler->vtable->shutdown(handler, slot, dir, error_code, free_scarce_resources_immediately);
- }
- size_t aws_channel_handler_initial_window_size(struct aws_channel_handler *handler) {
- AWS_ASSERT(handler->vtable && handler->vtable->initial_window_size);
- return handler->vtable->initial_window_size(handler);
- }
- struct aws_channel_slot *aws_channel_get_first_slot(struct aws_channel *channel) {
- return channel->first;
- }
- static void s_reset_statistics(struct aws_channel *channel) {
- AWS_FATAL_ASSERT(aws_channel_thread_is_callers_thread(channel));
- struct aws_channel_slot *current_slot = channel->first;
- while (current_slot) {
- struct aws_channel_handler *handler = current_slot->handler;
- if (handler != NULL && handler->vtable->reset_statistics != NULL) {
- handler->vtable->reset_statistics(handler);
- }
- current_slot = current_slot->adj_right;
- }
- }
- static void s_channel_gather_statistics_task(struct aws_task *task, void *arg, enum aws_task_status status) {
- if (status != AWS_TASK_STATUS_RUN_READY) {
- return;
- }
- struct aws_channel *channel = arg;
- if (channel->statistics_handler == NULL) {
- return;
- }
- if (channel->channel_state == AWS_CHANNEL_SHUTTING_DOWN || channel->channel_state == AWS_CHANNEL_SHUT_DOWN) {
- return;
- }
- uint64_t now_ns = 0;
- if (aws_channel_current_clock_time(channel, &now_ns)) {
- return;
- }
- uint64_t now_ms = aws_timestamp_convert(now_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
- struct aws_array_list *statistics_list = &channel->statistic_list;
- aws_array_list_clear(statistics_list);
- struct aws_channel_slot *current_slot = channel->first;
- while (current_slot) {
- struct aws_channel_handler *handler = current_slot->handler;
- if (handler != NULL && handler->vtable->gather_statistics != NULL) {
- handler->vtable->gather_statistics(handler, statistics_list);
- }
- current_slot = current_slot->adj_right;
- }
- struct aws_crt_statistics_sample_interval sample_interval = {
- .begin_time_ms = channel->statistics_interval_start_time_ms, .end_time_ms = now_ms};
- aws_crt_statistics_handler_process_statistics(
- channel->statistics_handler, &sample_interval, statistics_list, channel);
- s_reset_statistics(channel);
- uint64_t reschedule_interval_ns = aws_timestamp_convert(
- aws_crt_statistics_handler_get_report_interval_ms(channel->statistics_handler),
- AWS_TIMESTAMP_MILLIS,
- AWS_TIMESTAMP_NANOS,
- NULL);
- aws_event_loop_schedule_task_future(channel->loop, task, now_ns + reschedule_interval_ns);
- channel->statistics_interval_start_time_ms = now_ms;
- }
- int aws_channel_set_statistics_handler(struct aws_channel *channel, struct aws_crt_statistics_handler *handler) {
- AWS_FATAL_ASSERT(aws_channel_thread_is_callers_thread(channel));
- if (channel->statistics_handler) {
- aws_crt_statistics_handler_destroy(channel->statistics_handler);
- aws_event_loop_cancel_task(channel->loop, &channel->statistics_task);
- channel->statistics_handler = NULL;
- }
- if (handler != NULL) {
- aws_task_init(&channel->statistics_task, s_channel_gather_statistics_task, channel, "gather_statistics");
- uint64_t now_ns = 0;
- if (aws_channel_current_clock_time(channel, &now_ns)) {
- return AWS_OP_ERR;
- }
- uint64_t report_time_ns = now_ns + aws_timestamp_convert(
- aws_crt_statistics_handler_get_report_interval_ms(handler),
- AWS_TIMESTAMP_MILLIS,
- AWS_TIMESTAMP_NANOS,
- NULL);
- channel->statistics_interval_start_time_ms =
- aws_timestamp_convert(now_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
- s_reset_statistics(channel);
- aws_event_loop_schedule_task_future(channel->loop, &channel->statistics_task, report_time_ns);
- }
- channel->statistics_handler = handler;
- return AWS_OP_SUCCESS;
- }
- struct aws_event_loop *aws_channel_get_event_loop(struct aws_channel *channel) {
- return channel->loop;
- }
- int aws_channel_trigger_read(struct aws_channel *channel) {
- if (channel == NULL) {
- return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- }
- if (!aws_channel_thread_is_callers_thread(channel)) {
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- struct aws_channel_slot *slot = channel->first;
- if (slot == NULL) {
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- struct aws_channel_handler *handler = slot->handler;
- if (handler == NULL) {
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- if (handler->vtable->trigger_read != NULL) {
- handler->vtable->trigger_read(handler);
- }
- return AWS_OP_SUCCESS;
- }
|