123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/io/socket_channel_handler.h>
- #include <aws/common/error.h>
- #include <aws/common/task_scheduler.h>
- #include <aws/io/event_loop.h>
- #include <aws/io/logging.h>
- #include <aws/io/socket.h>
- #include <aws/io/statistics.h>
- #ifdef _MSC_VER
- # pragma warning(disable : 4204) /* non-constant aggregate initializer */
- #endif
- struct socket_handler {
- struct aws_socket *socket;
- struct aws_channel_slot *slot;
- size_t max_rw_size;
- struct aws_channel_task read_task_storage;
- struct aws_channel_task shutdown_task_storage;
- struct aws_crt_statistics_socket stats;
- int shutdown_err_code;
- bool shutdown_in_progress;
- };
- static int s_socket_process_read_message(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- struct aws_io_message *message) {
- (void)handler;
- (void)slot;
- (void)message;
- AWS_LOGF_FATAL(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: process_read_message called on "
- "socket handler. This should never happen",
- (void *)handler);
- /*since a socket handler will ALWAYS be the first handler in a channel,
- * this should NEVER happen, if it does it's a programmer error.*/
- AWS_ASSERT(0);
- return aws_raise_error(AWS_IO_CHANNEL_ERROR_ERROR_CANT_ACCEPT_INPUT);
- }
- /* invoked by the socket when a write has completed or failed. */
- static void s_on_socket_write_complete(
- struct aws_socket *socket,
- int error_code,
- size_t amount_written,
- void *user_data) {
- if (user_data) {
- struct aws_io_message *message = user_data;
- struct aws_channel *channel = message->owning_channel;
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET_HANDLER,
- "static: write of size %llu, completed on channel %p",
- (unsigned long long)amount_written,
- (void *)channel);
- if (message->on_completion) {
- message->on_completion(channel, message, error_code, message->user_data);
- }
- if (socket && socket->handler) {
- struct socket_handler *socket_handler = socket->handler->impl;
- socket_handler->stats.bytes_written += amount_written;
- }
- aws_mem_release(message->allocator, message);
- if (error_code) {
- aws_channel_shutdown(channel, error_code);
- }
- }
- }
- static int s_socket_process_write_message(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- struct aws_io_message *message) {
- (void)slot;
- struct socket_handler *socket_handler = handler->impl;
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: writing message of size %llu",
- (void *)handler,
- (unsigned long long)message->message_data.len);
- if (!aws_socket_is_open(socket_handler->socket)) {
- return aws_raise_error(AWS_IO_SOCKET_CLOSED);
- }
- struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&message->message_data);
- if (aws_socket_write(socket_handler->socket, &cursor, s_on_socket_write_complete, message)) {
- return AWS_OP_ERR;
- }
- return AWS_OP_SUCCESS;
- }
- static void s_read_task(struct aws_channel_task *task, void *arg, aws_task_status status);
- static void s_on_readable_notification(struct aws_socket *socket, int error_code, void *user_data);
- /* Ok this next function is VERY important for how back pressure works. Here's what it's supposed to be doing:
- *
- * See how much data downstream is willing to accept.
- * See how much we're actually willing to read per event loop tick (usually 16 kb).
- * Take the minimum of those two.
- * Try and read as much as possible up to the calculated max read.
- * If we didn't read up to the max_read, we go back to waiting on the event loop to tell us we can read more.
- * If we did read up to the max_read, we stop reading immediately and wait for either for a window update,
- * or schedule a task to enforce fairness for other sockets in the event loop if we read up to the max
- * read per event loop tick.
- */
- static void s_do_read(struct socket_handler *socket_handler) {
- size_t downstream_window = aws_channel_slot_downstream_read_window(socket_handler->slot);
- size_t max_to_read =
- downstream_window > socket_handler->max_rw_size ? socket_handler->max_rw_size : downstream_window;
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: invoking read. Downstream window %llu, max_to_read %llu",
- (void *)socket_handler->slot->handler,
- (unsigned long long)downstream_window,
- (unsigned long long)max_to_read);
- if (max_to_read == 0) {
- return;
- }
- size_t total_read = 0;
- size_t read = 0;
- while (total_read < max_to_read && !socket_handler->shutdown_in_progress) {
- size_t iter_max_read = max_to_read - total_read;
- struct aws_io_message *message = aws_channel_acquire_message_from_pool(
- socket_handler->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, iter_max_read);
- if (!message) {
- break;
- }
- if (aws_socket_read(socket_handler->socket, &message->message_data, &read)) {
- aws_mem_release(message->allocator, message);
- break;
- }
- total_read += read;
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: read %llu from socket",
- (void *)socket_handler->slot->handler,
- (unsigned long long)read);
- if (aws_channel_slot_send_message(socket_handler->slot, message, AWS_CHANNEL_DIR_READ)) {
- aws_mem_release(message->allocator, message);
- break;
- }
- }
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: total read on this tick %llu",
- (void *)&socket_handler->slot->handler,
- (unsigned long long)total_read);
- socket_handler->stats.bytes_read += total_read;
- /* resubscribe as long as there's no error, just return if we're in a would block scenario. */
- if (total_read < max_to_read) {
- int last_error = aws_last_error();
- if (last_error != AWS_IO_READ_WOULD_BLOCK && !socket_handler->shutdown_in_progress) {
- aws_channel_shutdown(socket_handler->slot->channel, last_error);
- }
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: out of data to read on socket. "
- "Waiting on event-loop notification.",
- (void *)socket_handler->slot->handler);
- return;
- }
- /* in this case, everything was fine, but there's still pending reads. We need to schedule a task to do the read
- * again. */
- if (!socket_handler->shutdown_in_progress && total_read == socket_handler->max_rw_size &&
- !socket_handler->read_task_storage.task_fn) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: more data is pending read, but we've exceeded "
- "the max read on this tick. Scheduling a task to read on next tick.",
- (void *)socket_handler->slot->handler);
- aws_channel_task_init(
- &socket_handler->read_task_storage, s_read_task, socket_handler, "socket_handler_re_read");
- aws_channel_schedule_task_now(socket_handler->slot->channel, &socket_handler->read_task_storage);
- }
- }
- /* the socket is either readable or errored out. If it's readable, kick off s_do_read() to do its thing.
- * If an error, start the channel shutdown process. */
- static void s_on_readable_notification(struct aws_socket *socket, int error_code, void *user_data) {
- (void)socket;
- struct socket_handler *socket_handler = user_data;
- AWS_LOGF_TRACE(AWS_LS_IO_SOCKET_HANDLER, "id=%p: socket is now readable", (void *)socket_handler->slot->handler);
- /* read regardless so we can pick up data that was sent prior to the close. For example, peer sends a TLS ALERT
- * then immediately closes the socket. On some platforms, we'll never see the readable flag. So we want to make
- * sure we read the ALERT, otherwise, we'll end up telling the user that the channel shutdown because of a socket
- * closure, when in reality it was a TLS error */
- s_do_read(socket_handler);
- if (error_code && !socket_handler->shutdown_in_progress) {
- aws_channel_shutdown(socket_handler->slot->channel, error_code);
- }
- }
- /* Either the result of a context switch (for fairness in the event loop), or a window update. */
- static void s_read_task(struct aws_channel_task *task, void *arg, aws_task_status status) {
- task->task_fn = NULL;
- task->arg = NULL;
- if (status == AWS_TASK_STATUS_RUN_READY) {
- struct socket_handler *socket_handler = arg;
- s_do_read(socket_handler);
- }
- }
- static int s_socket_increment_read_window(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- size_t size) {
- (void)size;
- struct socket_handler *socket_handler = handler->impl;
- if (!socket_handler->shutdown_in_progress && !socket_handler->read_task_storage.task_fn) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: increment read window message received, scheduling"
- " task for another read operation.",
- (void *)handler);
- aws_channel_task_init(
- &socket_handler->read_task_storage, s_read_task, socket_handler, "socket_handler_read_on_window_increment");
- aws_channel_schedule_task_now(slot->channel, &socket_handler->read_task_storage);
- }
- return AWS_OP_SUCCESS;
- }
- static void s_close_task(struct aws_channel_task *task, void *arg, aws_task_status status) {
- (void)task;
- (void)status;
- struct aws_channel_handler *handler = arg;
- struct socket_handler *socket_handler = handler->impl;
- /*
- * Run this unconditionally regardless of status, otherwise channel will not
- * finish shutting down properly
- */
- /* this only happens in write direction. */
- /* we also don't care about the free_scarce_resource_immediately
- * code since we're always the last one in the shutdown sequence. */
- aws_channel_slot_on_handler_shutdown_complete(
- socket_handler->slot, AWS_CHANNEL_DIR_WRITE, socket_handler->shutdown_err_code, false);
- }
- static int s_socket_shutdown(
- struct aws_channel_handler *handler,
- struct aws_channel_slot *slot,
- enum aws_channel_direction dir,
- int error_code,
- bool free_scarce_resource_immediately) {
- struct socket_handler *socket_handler = (struct socket_handler *)handler->impl;
- socket_handler->shutdown_in_progress = true;
- if (dir == AWS_CHANNEL_DIR_READ) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: shutting down read direction with error_code %d",
- (void *)handler,
- error_code);
- if (free_scarce_resource_immediately && aws_socket_is_open(socket_handler->socket)) {
- if (aws_socket_close(socket_handler->socket)) {
- return AWS_OP_ERR;
- }
- }
- return aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, free_scarce_resource_immediately);
- }
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: shutting down write direction with error_code %d",
- (void *)handler,
- error_code);
- if (aws_socket_is_open(socket_handler->socket)) {
- aws_socket_close(socket_handler->socket);
- }
- /* Schedule a task to complete the shutdown, in case a do_read task is currently pending.
- * It's OK to delay the shutdown, even when free_scarce_resources_immediately is true,
- * because the socket has been closed: mitigating the risk that the socket is still being abused by
- * a hostile peer. */
- aws_channel_task_init(&socket_handler->shutdown_task_storage, s_close_task, handler, "socket_handler_close");
- socket_handler->shutdown_err_code = error_code;
- aws_channel_schedule_task_now(slot->channel, &socket_handler->shutdown_task_storage);
- return AWS_OP_SUCCESS;
- }
- static size_t s_message_overhead(struct aws_channel_handler *handler) {
- (void)handler;
- return 0;
- }
- static size_t s_socket_initial_window_size(struct aws_channel_handler *handler) {
- (void)handler;
- return SIZE_MAX;
- }
- static void s_socket_destroy(struct aws_channel_handler *handler) {
- if (handler != NULL) {
- struct socket_handler *socket_handler = (struct socket_handler *)handler->impl;
- if (socket_handler != NULL) {
- aws_crt_statistics_socket_cleanup(&socket_handler->stats);
- }
- aws_mem_release(handler->alloc, handler);
- }
- }
- static void s_reset_statistics(struct aws_channel_handler *handler) {
- struct socket_handler *socket_handler = (struct socket_handler *)handler->impl;
- aws_crt_statistics_socket_reset(&socket_handler->stats);
- }
- static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats_list) {
- struct socket_handler *socket_handler = (struct socket_handler *)handler->impl;
- void *stats_base = &socket_handler->stats;
- aws_array_list_push_back(stats_list, &stats_base);
- }
- static void s_trigger_read(struct aws_channel_handler *handler) {
- struct socket_handler *socket_handler = (struct socket_handler *)handler->impl;
- s_do_read(socket_handler);
- }
- static struct aws_channel_handler_vtable s_vtable = {
- .process_read_message = s_socket_process_read_message,
- .destroy = s_socket_destroy,
- .process_write_message = s_socket_process_write_message,
- .initial_window_size = s_socket_initial_window_size,
- .increment_read_window = s_socket_increment_read_window,
- .shutdown = s_socket_shutdown,
- .message_overhead = s_message_overhead,
- .reset_statistics = s_reset_statistics,
- .gather_statistics = s_gather_statistics,
- .trigger_read = s_trigger_read,
- };
- struct aws_channel_handler *aws_socket_handler_new(
- struct aws_allocator *allocator,
- struct aws_socket *socket,
- struct aws_channel_slot *slot,
- size_t max_read_size) {
- /* make sure something has assigned this socket to an event loop, in client mode this will already have occurred.
- In server mode, someone should have assigned it before calling us.*/
- AWS_ASSERT(aws_socket_get_event_loop(socket));
- struct aws_channel_handler *handler = NULL;
- struct socket_handler *impl = NULL;
- if (!aws_mem_acquire_many(
- allocator, 2, &handler, sizeof(struct aws_channel_handler), &impl, sizeof(struct socket_handler))) {
- return NULL;
- }
- impl->socket = socket;
- impl->slot = slot;
- impl->max_rw_size = max_read_size;
- AWS_ZERO_STRUCT(impl->read_task_storage);
- AWS_ZERO_STRUCT(impl->shutdown_task_storage);
- impl->shutdown_in_progress = false;
- if (aws_crt_statistics_socket_init(&impl->stats)) {
- goto cleanup_handler;
- }
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET_HANDLER,
- "id=%p: Socket handler created with max_read_size of %llu",
- (void *)handler,
- (unsigned long long)max_read_size);
- handler->alloc = allocator;
- handler->impl = impl;
- handler->vtable = &s_vtable;
- handler->slot = slot;
- if (aws_socket_subscribe_to_readable_events(socket, s_on_readable_notification, impl)) {
- goto cleanup_handler;
- }
- socket->handler = handler;
- return handler;
- cleanup_handler:
- aws_mem_release(allocator, handler);
- return NULL;
- }
- const struct aws_socket *aws_socket_handler_get_socket(const struct aws_channel_handler *handler) {
- AWS_PRECONDITION(handler);
- const struct socket_handler *socket_handler = handler->impl;
- return socket_handler->socket;
- }
|