123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/http/private/h1_stream.h>
- #include <aws/http/private/h1_connection.h>
- #include <aws/http/private/h1_encoder.h>
- #include <aws/http/status_code.h>
- #include <aws/io/logging.h>
- #include <aws/io/stream.h>
- #include <inttypes.h>
- static void s_stream_destroy(struct aws_http_stream *stream_base) {
- struct aws_h1_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h1_stream, base);
- AWS_ASSERT(
- stream->synced_data.api_state != AWS_H1_STREAM_API_STATE_ACTIVE &&
- "Stream should be complete (or never-activated) when stream destroyed");
- AWS_ASSERT(
- aws_linked_list_empty(&stream->thread_data.pending_chunk_list) &&
- aws_linked_list_empty(&stream->synced_data.pending_chunk_list) &&
- "Chunks should be marked complete before stream destroyed");
- aws_h1_encoder_message_clean_up(&stream->encoder_message);
- aws_byte_buf_clean_up(&stream->incoming_storage_buf);
- aws_mem_release(stream->base.alloc, stream);
- }
- static struct aws_h1_connection *s_get_h1_connection(const struct aws_h1_stream *stream) {
- return AWS_CONTAINER_OF(stream->base.owning_connection, struct aws_h1_connection, base);
- }
- static void s_stream_lock_synced_data(struct aws_h1_stream *stream) {
- aws_h1_connection_lock_synced_data(s_get_h1_connection(stream));
- }
- static void s_stream_unlock_synced_data(struct aws_h1_stream *stream) {
- aws_h1_connection_unlock_synced_data(s_get_h1_connection(stream));
- }
- static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- struct aws_h1_stream *stream = arg;
- struct aws_h1_connection *connection = s_get_h1_connection(stream);
- if (status != AWS_TASK_STATUS_RUN_READY) {
- goto done;
- }
- AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Running stream cross-thread work task.", (void *)&stream->base);
- /* BEGIN CRITICAL SECTION */
- s_stream_lock_synced_data(stream);
- stream->synced_data.is_cross_thread_work_task_scheduled = false;
- int api_state = stream->synced_data.api_state;
- bool found_chunks = !aws_linked_list_empty(&stream->synced_data.pending_chunk_list);
- aws_linked_list_move_all_back(&stream->thread_data.pending_chunk_list, &stream->synced_data.pending_chunk_list);
- stream->encoder_message.trailer = stream->synced_data.pending_trailer;
- stream->synced_data.pending_trailer = NULL;
- bool has_outgoing_response = stream->synced_data.has_outgoing_response;
- uint64_t pending_window_update = stream->synced_data.pending_window_update;
- stream->synced_data.pending_window_update = 0;
- s_stream_unlock_synced_data(stream);
- /* END CRITICAL SECTION */
- /* If we have any new outgoing data, prompt the connection to try and send it. */
- bool new_outgoing_data = found_chunks;
- /* If we JUST learned about having an outgoing response, that's a reason to try sending data */
- if (has_outgoing_response && !stream->thread_data.has_outgoing_response) {
- stream->thread_data.has_outgoing_response = true;
- new_outgoing_data = true;
- }
- if (new_outgoing_data && (api_state == AWS_H1_STREAM_API_STATE_ACTIVE)) {
- aws_h1_connection_try_write_outgoing_stream(connection);
- }
- /* Add to window size using saturated sum to prevent overflow.
- * Saturating is fine because it's a u64, the stream could never receive that much data. */
- stream->thread_data.stream_window =
- aws_add_u64_saturating(stream->thread_data.stream_window, pending_window_update);
- if ((pending_window_update > 0) && (api_state == AWS_H1_STREAM_API_STATE_ACTIVE)) {
- /* Now that stream window is larger, connection might have buffered
- * data to send, or might need to increment its own window */
- aws_h1_connection_try_process_read_messages(connection);
- }
- done:
- /* Release reference that kept stream alive until task ran */
- aws_http_stream_release(&stream->base);
- }
- /* Note the update in synced_data, and schedule the cross_thread_work_task if necessary */
- static void s_stream_update_window(struct aws_http_stream *stream_base, size_t increment_size) {
- if (increment_size == 0) {
- return;
- }
- if (!stream_base->owning_connection->stream_manual_window_management) {
- return;
- }
- struct aws_h1_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h1_stream, base);
- bool should_schedule_task = false;
- { /* BEGIN CRITICAL SECTION */
- s_stream_lock_synced_data(stream);
- /* Saturated sum. It's a u64. The stream could never receive that much data. */
- stream->synced_data.pending_window_update =
- aws_add_u64_saturating(stream->synced_data.pending_window_update, increment_size);
- /* Don't alert the connection unless the stream is active */
- if (stream->synced_data.api_state == AWS_H1_STREAM_API_STATE_ACTIVE) {
- if (!stream->synced_data.is_cross_thread_work_task_scheduled) {
- stream->synced_data.is_cross_thread_work_task_scheduled = true;
- should_schedule_task = true;
- }
- }
- s_stream_unlock_synced_data(stream);
- } /* END CRITICAL SECTION */
- if (should_schedule_task) {
- /* Keep stream alive until task completes */
- aws_atomic_fetch_add(&stream->base.refcount, 1);
- AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Scheduling stream cross-thread work task.", (void *)stream_base);
- aws_channel_schedule_task_now(
- stream->base.owning_connection->channel_slot->channel, &stream->cross_thread_work_task);
- }
- }
- static int s_stream_write_chunk(struct aws_http_stream *stream_base, const struct aws_http1_chunk_options *options) {
- AWS_PRECONDITION(stream_base);
- AWS_PRECONDITION(options);
- struct aws_h1_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h1_stream, base);
- if (options->chunk_data == NULL && options->chunk_data_size > 0) {
- AWS_LOGF_ERROR(
- AWS_LS_HTTP_STREAM, "id=%p: Chunk data cannot be NULL if data size is non-zero", (void *)stream_base);
- return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- }
- struct aws_h1_chunk *chunk = aws_h1_chunk_new(stream_base->alloc, options);
- if (AWS_UNLIKELY(NULL == chunk)) {
- AWS_LOGF_ERROR(
- AWS_LS_HTTP_STREAM,
- "id=%p: Failed to initialize streamed chunk, error %d (%s).",
- (void *)stream_base,
- aws_last_error(),
- aws_error_name(aws_last_error()));
- return AWS_OP_ERR;
- }
- int error_code = 0;
- bool should_schedule_task = false;
- { /* BEGIN CRITICAL SECTION */
- s_stream_lock_synced_data(stream);
- /* Can only add chunks while stream is active. */
- if (stream->synced_data.api_state != AWS_H1_STREAM_API_STATE_ACTIVE) {
- error_code = (stream->synced_data.api_state == AWS_H1_STREAM_API_STATE_INIT)
- ? AWS_ERROR_HTTP_STREAM_NOT_ACTIVATED
- : AWS_ERROR_HTTP_STREAM_HAS_COMPLETED;
- goto unlock;
- }
- /* Prevent user trying to submit chunks without having set the required headers.
- * This check also prevents a server-user submitting chunks before the response has been submitted. */
- if (!stream->synced_data.using_chunked_encoding) {
- AWS_LOGF_ERROR(
- AWS_LS_HTTP_STREAM,
- "id=%p: Cannot write chunks without 'transfer-encoding: chunked' header.",
- (void *)stream_base);
- error_code = AWS_ERROR_INVALID_STATE;
- goto unlock;
- }
- if (stream->synced_data.has_final_chunk) {
- AWS_LOGF_ERROR(
- AWS_LS_HTTP_STREAM, "id=%p: Cannot write additional chunk after final chunk.", (void *)stream_base);
- error_code = AWS_ERROR_INVALID_STATE;
- goto unlock;
- }
- /* success */
- if (chunk->data_size == 0) {
- stream->synced_data.has_final_chunk = true;
- }
- aws_linked_list_push_back(&stream->synced_data.pending_chunk_list, &chunk->node);
- should_schedule_task = !stream->synced_data.is_cross_thread_work_task_scheduled;
- stream->synced_data.is_cross_thread_work_task_scheduled = true;
- unlock:
- s_stream_unlock_synced_data(stream);
- } /* END CRITICAL SECTION */
- if (error_code) {
- AWS_LOGF_ERROR(
- AWS_LS_HTTP_STREAM,
- "id=%p: Failed to add chunk, error %d (%s)",
- (void *)stream_base,
- error_code,
- aws_error_name(error_code));
- aws_h1_chunk_destroy(chunk);
- return aws_raise_error(error_code);
- }
- AWS_LOGF_TRACE(
- AWS_LS_HTTP_STREAM,
- "id=%p: Adding chunk with size %" PRIu64 " to stream",
- (void *)stream,
- options->chunk_data_size);
- if (should_schedule_task) {
- /* Keep stream alive until task completes */
- aws_atomic_fetch_add(&stream->base.refcount, 1);
- AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Scheduling stream cross-thread work task.", (void *)stream_base);
- aws_channel_schedule_task_now(
- stream->base.owning_connection->channel_slot->channel, &stream->cross_thread_work_task);
- } else {
- AWS_LOGF_TRACE(
- AWS_LS_HTTP_STREAM, "id=%p: Stream cross-thread work task was already scheduled.", (void *)stream_base);
- }
- return AWS_OP_SUCCESS;
- }
- static int s_stream_add_trailer(struct aws_http_stream *stream_base, const struct aws_http_headers *trailing_headers) {
- AWS_PRECONDITION(stream_base);
- AWS_PRECONDITION(trailing_headers);
- struct aws_h1_stream *stream = AWS_CONTAINER_OF(stream_base, struct aws_h1_stream, base);
- struct aws_h1_trailer *trailer = aws_h1_trailer_new(stream_base->alloc, trailing_headers);
- if (AWS_UNLIKELY(NULL == trailer)) {
- AWS_LOGF_ERROR(
- AWS_LS_HTTP_STREAM,
- "id=%p: Failed to initialize streamed trailer, error %d (%s).",
- (void *)stream_base,
- aws_last_error(),
- aws_error_name(aws_last_error()));
- return AWS_OP_ERR;
- }
- int error_code = 0;
- bool should_schedule_task = false;
- { /* BEGIN CRITICAL SECTION */
- s_stream_lock_synced_data(stream);
- /* Can only add trailers while stream is active. */
- if (stream->synced_data.api_state != AWS_H1_STREAM_API_STATE_ACTIVE) {
- error_code = (stream->synced_data.api_state == AWS_H1_STREAM_API_STATE_INIT)
- ? AWS_ERROR_HTTP_STREAM_NOT_ACTIVATED
- : AWS_ERROR_HTTP_STREAM_HAS_COMPLETED;
- goto unlock;
- }
- if (!stream->synced_data.using_chunked_encoding) {
- AWS_LOGF_ERROR(
- AWS_LS_HTTP_STREAM,
- "id=%p: Cannot write trailers without 'transfer-encoding: chunked' header.",
- (void *)stream_base);
- error_code = AWS_ERROR_INVALID_STATE;
- goto unlock;
- }
- if (stream->synced_data.has_added_trailer) {
- AWS_LOGF_ERROR(AWS_LS_HTTP_STREAM, "id=%p: Cannot write trailers twice.", (void *)stream_base);
- error_code = AWS_ERROR_INVALID_STATE;
- goto unlock;
- }
- if (stream->synced_data.has_final_chunk) {
- AWS_LOGF_ERROR(AWS_LS_HTTP_STREAM, "id=%p: Cannot write trailers after final chunk.", (void *)stream_base);
- error_code = AWS_ERROR_INVALID_STATE;
- goto unlock;
- }
- stream->synced_data.has_added_trailer = true;
- stream->synced_data.pending_trailer = trailer;
- should_schedule_task = !stream->synced_data.is_cross_thread_work_task_scheduled;
- stream->synced_data.is_cross_thread_work_task_scheduled = true;
- unlock:
- s_stream_unlock_synced_data(stream);
- } /* END CRITICAL SECTION */
- if (error_code) {
- AWS_LOGF_ERROR(
- AWS_LS_HTTP_STREAM,
- "id=%p: Failed to add trailer, error %d (%s)",
- (void *)stream_base,
- error_code,
- aws_error_name(error_code));
- aws_h1_trailer_destroy(trailer);
- return aws_raise_error(error_code);
- }
- AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Adding trailer to stream", (void *)stream);
- if (should_schedule_task) {
- /* Keep stream alive until task completes */
- aws_atomic_fetch_add(&stream->base.refcount, 1);
- AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Scheduling stream cross-thread work task.", (void *)stream_base);
- aws_channel_schedule_task_now(
- stream->base.owning_connection->channel_slot->channel, &stream->cross_thread_work_task);
- } else {
- AWS_LOGF_TRACE(
- AWS_LS_HTTP_STREAM, "id=%p: Stream cross-thread work task was already scheduled.", (void *)stream_base);
- }
- return AWS_OP_SUCCESS;
- }
- static const struct aws_http_stream_vtable s_stream_vtable = {
- .destroy = s_stream_destroy,
- .update_window = s_stream_update_window,
- .activate = aws_h1_stream_activate,
- .http1_write_chunk = s_stream_write_chunk,
- .http1_add_trailer = s_stream_add_trailer,
- .http2_reset_stream = NULL,
- .http2_get_received_error_code = NULL,
- .http2_get_sent_error_code = NULL,
- };
- static struct aws_h1_stream *s_stream_new_common(
- struct aws_http_connection *connection_base,
- void *user_data,
- aws_http_on_incoming_headers_fn *on_incoming_headers,
- aws_http_on_incoming_header_block_done_fn *on_incoming_header_block_done,
- aws_http_on_incoming_body_fn *on_incoming_body,
- aws_http_on_stream_complete_fn *on_complete,
- aws_http_on_stream_destroy_fn *on_destroy) {
- struct aws_h1_connection *connection = AWS_CONTAINER_OF(connection_base, struct aws_h1_connection, base);
- struct aws_h1_stream *stream = aws_mem_calloc(connection_base->alloc, 1, sizeof(struct aws_h1_stream));
- if (!stream) {
- return NULL;
- }
- stream->base.vtable = &s_stream_vtable;
- stream->base.alloc = connection_base->alloc;
- stream->base.owning_connection = connection_base;
- stream->base.user_data = user_data;
- stream->base.on_incoming_headers = on_incoming_headers;
- stream->base.on_incoming_header_block_done = on_incoming_header_block_done;
- stream->base.on_incoming_body = on_incoming_body;
- stream->base.on_complete = on_complete;
- stream->base.on_destroy = on_destroy;
- aws_channel_task_init(
- &stream->cross_thread_work_task, s_stream_cross_thread_work_task, stream, "http1_stream_cross_thread_work");
- aws_linked_list_init(&stream->thread_data.pending_chunk_list);
- aws_linked_list_init(&stream->synced_data.pending_chunk_list);
- stream->thread_data.stream_window = connection->initial_stream_window_size;
- /* Stream refcount starts at 1 for user and is incremented upon activation for the connection */
- aws_atomic_init_int(&stream->base.refcount, 1);
- return stream;
- }
- struct aws_h1_stream *aws_h1_stream_new_request(
- struct aws_http_connection *client_connection,
- const struct aws_http_make_request_options *options) {
- struct aws_h1_stream *stream = s_stream_new_common(
- client_connection,
- options->user_data,
- options->on_response_headers,
- options->on_response_header_block_done,
- options->on_response_body,
- options->on_complete,
- options->on_destroy);
- if (!stream) {
- return NULL;
- }
- /* Transform request if necessary */
- if (client_connection->proxy_request_transform) {
- if (client_connection->proxy_request_transform(options->request, client_connection->user_data)) {
- goto error;
- }
- }
- stream->base.client_data = &stream->base.client_or_server_data.client;
- stream->base.client_data->response_status = AWS_HTTP_STATUS_CODE_UNKNOWN;
- /* Validate request and cache info that the encoder will eventually need */
- if (aws_h1_encoder_message_init_from_request(
- &stream->encoder_message,
- client_connection->alloc,
- options->request,
- &stream->thread_data.pending_chunk_list)) {
- goto error;
- }
- /* RFC-7230 Section 6.3: The "close" connection option is used to signal
- * that a connection will not persist after the current request/response*/
- if (stream->encoder_message.has_connection_close_header) {
- stream->is_final_stream = true;
- }
- stream->synced_data.using_chunked_encoding = stream->encoder_message.has_chunked_encoding_header;
- return stream;
- error:
- s_stream_destroy(&stream->base);
- return NULL;
- }
- struct aws_h1_stream *aws_h1_stream_new_request_handler(const struct aws_http_request_handler_options *options) {
- struct aws_h1_stream *stream = s_stream_new_common(
- options->server_connection,
- options->user_data,
- options->on_request_headers,
- options->on_request_header_block_done,
- options->on_request_body,
- options->on_complete,
- options->on_destroy);
- if (!stream) {
- return NULL;
- }
- /* This code is only executed in server mode and can only be invoked from the event-loop thread so don't worry
- * with the lock here. */
- stream->base.id = aws_http_connection_get_next_stream_id(options->server_connection);
- /* Request-handler (server) streams don't need user to call activate() on them.
- * Since these these streams can only be created on the event-loop thread,
- * it's not possible for callbacks to fire before the stream pointer is returned.
- * (Clients must call stream.activate() because they might create a stream on any thread) */
- stream->synced_data.api_state = AWS_H1_STREAM_API_STATE_ACTIVE;
- stream->base.server_data = &stream->base.client_or_server_data.server;
- stream->base.server_data->on_request_done = options->on_request_done;
- aws_atomic_fetch_add(&stream->base.refcount, 1);
- return stream;
- }
- int aws_h1_stream_send_response(struct aws_h1_stream *stream, struct aws_http_message *response) {
- struct aws_h1_connection *connection = s_get_h1_connection(stream);
- int error_code = 0;
- /* Validate the response and cache info that encoder will eventually need.
- * The encoder_message object will be moved into the stream later while holding the lock */
- struct aws_h1_encoder_message encoder_message;
- bool body_headers_ignored = stream->base.request_method == AWS_HTTP_METHOD_HEAD;
- if (aws_h1_encoder_message_init_from_response(
- &encoder_message,
- stream->base.alloc,
- response,
- body_headers_ignored,
- &stream->thread_data.pending_chunk_list)) {
- error_code = aws_last_error();
- goto error;
- }
- bool should_schedule_task = false;
- { /* BEGIN CRITICAL SECTION */
- s_stream_lock_synced_data(stream);
- if (stream->synced_data.api_state == AWS_H1_STREAM_API_STATE_COMPLETE) {
- error_code = AWS_ERROR_HTTP_STREAM_HAS_COMPLETED;
- } else if (stream->synced_data.has_outgoing_response) {
- AWS_LOGF_ERROR(AWS_LS_HTTP_STREAM, "id=%p: Response already created on the stream", (void *)&stream->base);
- error_code = AWS_ERROR_INVALID_STATE;
- } else {
- stream->synced_data.has_outgoing_response = true;
- stream->encoder_message = encoder_message;
- if (encoder_message.has_connection_close_header) {
- /* This will be the last stream connection will process, new streams will be rejected */
- stream->is_final_stream = true;
- /* Note: We're touching the connection's synced_data, which is OK
- * because an h1_connection and all its h1_streams share a single lock. */
- connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
- }
- stream->synced_data.using_chunked_encoding = stream->encoder_message.has_chunked_encoding_header;
- should_schedule_task = !stream->synced_data.is_cross_thread_work_task_scheduled;
- stream->synced_data.is_cross_thread_work_task_scheduled = true;
- }
- s_stream_unlock_synced_data(stream);
- } /* END CRITICAL SECTION */
- if (error_code) {
- goto error;
- }
- /* Success! */
- AWS_LOGF_DEBUG(
- AWS_LS_HTTP_STREAM, "id=%p: Created response on connection=%p: ", (void *)stream, (void *)connection);
- if (should_schedule_task) {
- /* Keep stream alive until task completes */
- aws_atomic_fetch_add(&stream->base.refcount, 1);
- AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Scheduling stream cross-thread work task.", (void *)&stream->base);
- aws_channel_schedule_task_now(
- stream->base.owning_connection->channel_slot->channel, &stream->cross_thread_work_task);
- } else {
- AWS_LOGF_TRACE(
- AWS_LS_HTTP_STREAM, "id=%p: Stream cross-thread work task was already scheduled.", (void *)&stream->base);
- }
- return AWS_OP_SUCCESS;
- error:
- AWS_LOGF_ERROR(
- AWS_LS_HTTP_STREAM,
- "id=%p: Sending response on the stream failed, error %d (%s)",
- (void *)&stream->base,
- error_code,
- aws_error_name(error_code));
- aws_h1_encoder_message_clean_up(&encoder_message);
- return aws_raise_error(error_code);
- }
|