123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/io/pipe.h>
- #include <aws/io/event_loop.h>
- #ifdef __GLIBC__
- # define __USE_GNU
- #endif
- /* TODO: move this detection to CMAKE and a config header */
- #if !defined(COMPAT_MODE) && defined(__GLIBC__) && ((__GLIBC__ == 2 && __GLIBC_MINOR__ >= 9) || __GLIBC__ > 2)
- # define HAVE_PIPE2 1
- #else
- # define HAVE_PIPE2 0
- #endif
- #include <errno.h>
- #include <fcntl.h>
- #include <unistd.h>
- /* This isn't defined on ancient linux distros (breaking the builds).
- * However, if this is a prebuild, we purposely build on an ancient system, but
- * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
- * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
- * gets passed as long as it does.
- */
- #ifndef O_CLOEXEC
- # define O_CLOEXEC 02000000
- #endif
- struct read_end_impl {
- struct aws_allocator *alloc;
- struct aws_io_handle handle;
- struct aws_event_loop *event_loop;
- aws_pipe_on_readable_fn *on_readable_user_callback;
- void *on_readable_user_data;
- /* Used in handshake for detecting whether user callback resulted in read-end being cleaned up.
- * If clean_up() sees that the pointer is set, the bool it points to will get set true. */
- bool *did_user_callback_clean_up_read_end;
- bool is_subscribed;
- };
- struct pipe_write_request {
- struct aws_byte_cursor original_cursor;
- struct aws_byte_cursor cursor; /* tracks progress of write */
- size_t num_bytes_written;
- aws_pipe_on_write_completed_fn *user_callback;
- void *user_data;
- struct aws_linked_list_node list_node;
- /* True if the write-end is cleaned up while the user callback is being invoked */
- bool did_user_callback_clean_up_write_end;
- };
- struct write_end_impl {
- struct aws_allocator *alloc;
- struct aws_io_handle handle;
- struct aws_event_loop *event_loop;
- struct aws_linked_list write_list;
- /* Valid while invoking user callback on a completed write request. */
- struct pipe_write_request *currently_invoking_write_callback;
- bool is_writable;
- /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated pipe_write_request around
- * and re-using it whenever possible */
- };
- static void s_write_end_on_event(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- void *user_data);
- static int s_translate_posix_error(int err) {
- AWS_ASSERT(err);
- switch (err) {
- case EPIPE:
- return AWS_IO_BROKEN_PIPE;
- default:
- return AWS_ERROR_SYS_CALL_FAILURE;
- }
- }
- static int s_raise_posix_error(int err) {
- return aws_raise_error(s_translate_posix_error(err));
- }
- AWS_IO_API int aws_open_nonblocking_posix_pipe(int pipe_fds[2]) {
- int err;
- #if HAVE_PIPE2
- err = pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC);
- if (err) {
- return s_raise_posix_error(err);
- }
- return AWS_OP_SUCCESS;
- #else
- err = pipe(pipe_fds);
- if (err) {
- return s_raise_posix_error(err);
- }
- for (int i = 0; i < 2; ++i) {
- int flags = fcntl(pipe_fds[i], F_GETFL);
- if (flags == -1) {
- s_raise_posix_error(err);
- goto error;
- }
- flags |= O_NONBLOCK | O_CLOEXEC;
- if (fcntl(pipe_fds[i], F_SETFL, flags) == -1) {
- s_raise_posix_error(err);
- goto error;
- }
- }
- return AWS_OP_SUCCESS;
- error:
- close(pipe_fds[0]);
- close(pipe_fds[1]);
- return AWS_OP_ERR;
- #endif
- }
- int aws_pipe_init(
- struct aws_pipe_read_end *read_end,
- struct aws_event_loop *read_end_event_loop,
- struct aws_pipe_write_end *write_end,
- struct aws_event_loop *write_end_event_loop,
- struct aws_allocator *allocator) {
- AWS_ASSERT(read_end);
- AWS_ASSERT(read_end_event_loop);
- AWS_ASSERT(write_end);
- AWS_ASSERT(write_end_event_loop);
- AWS_ASSERT(allocator);
- AWS_ZERO_STRUCT(*read_end);
- AWS_ZERO_STRUCT(*write_end);
- struct read_end_impl *read_impl = NULL;
- struct write_end_impl *write_impl = NULL;
- int err;
- /* Open pipe */
- int pipe_fds[2];
- err = aws_open_nonblocking_posix_pipe(pipe_fds);
- if (err) {
- return AWS_OP_ERR;
- }
- /* Init read-end */
- read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl));
- if (!read_impl) {
- goto error;
- }
- read_impl->alloc = allocator;
- read_impl->handle.data.fd = pipe_fds[0];
- read_impl->event_loop = read_end_event_loop;
- /* Init write-end */
- write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl));
- if (!write_impl) {
- goto error;
- }
- write_impl->alloc = allocator;
- write_impl->handle.data.fd = pipe_fds[1];
- write_impl->event_loop = write_end_event_loop;
- write_impl->is_writable = true; /* Assume pipe is writable to start. Even if it's not, things shouldn't break */
- aws_linked_list_init(&write_impl->write_list);
- read_end->impl_data = read_impl;
- write_end->impl_data = write_impl;
- err = aws_event_loop_subscribe_to_io_events(
- write_end_event_loop, &write_impl->handle, AWS_IO_EVENT_TYPE_WRITABLE, s_write_end_on_event, write_end);
- if (err) {
- goto error;
- }
- return AWS_OP_SUCCESS;
- error:
- close(pipe_fds[0]);
- close(pipe_fds[1]);
- if (read_impl) {
- aws_mem_release(allocator, read_impl);
- }
- if (write_impl) {
- aws_mem_release(allocator, write_impl);
- }
- read_end->impl_data = NULL;
- write_end->impl_data = NULL;
- return AWS_OP_ERR;
- }
- int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) {
- struct read_end_impl *read_impl = read_end->impl_data;
- if (!read_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
- if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
- if (read_impl->is_subscribed) {
- int err = aws_pipe_unsubscribe_from_readable_events(read_end);
- if (err) {
- return AWS_OP_ERR;
- }
- }
- /* If the event-handler is invoking a user callback, let it know that the read-end was cleaned up */
- if (read_impl->did_user_callback_clean_up_read_end) {
- *read_impl->did_user_callback_clean_up_read_end = true;
- }
- close(read_impl->handle.data.fd);
- aws_mem_release(read_impl->alloc, read_impl);
- AWS_ZERO_STRUCT(*read_end);
- return AWS_OP_SUCCESS;
- }
- struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) {
- const struct read_end_impl *read_impl = read_end->impl_data;
- if (!read_impl) {
- aws_raise_error(AWS_IO_BROKEN_PIPE);
- return NULL;
- }
- return read_impl->event_loop;
- }
- struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) {
- const struct write_end_impl *write_impl = write_end->impl_data;
- if (!write_impl) {
- aws_raise_error(AWS_IO_BROKEN_PIPE);
- return NULL;
- }
- return write_impl->event_loop;
- }
- int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *num_bytes_read) {
- AWS_ASSERT(dst_buffer && dst_buffer->buffer);
- struct read_end_impl *read_impl = read_end->impl_data;
- if (!read_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
- if (num_bytes_read) {
- *num_bytes_read = 0;
- }
- size_t num_bytes_to_read = dst_buffer->capacity - dst_buffer->len;
- ssize_t read_val = read(read_impl->handle.data.fd, dst_buffer->buffer + dst_buffer->len, num_bytes_to_read);
- if (read_val < 0) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
- return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
- }
- return s_raise_posix_error(errno_value);
- }
- /* Success */
- dst_buffer->len += read_val;
- if (num_bytes_read) {
- *num_bytes_read = read_val;
- }
- return AWS_OP_SUCCESS;
- }
- static void s_read_end_on_event(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- void *user_data) {
- (void)event_loop;
- (void)handle;
- /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */
- struct aws_pipe_read_end *read_end = user_data;
- struct read_end_impl *read_impl = read_end->impl_data;
- AWS_ASSERT(read_impl);
- AWS_ASSERT(read_impl->event_loop == event_loop);
- AWS_ASSERT(&read_impl->handle == handle);
- AWS_ASSERT(read_impl->is_subscribed);
- AWS_ASSERT(events != 0);
- AWS_ASSERT(read_impl->did_user_callback_clean_up_read_end == NULL);
- /* Set up handshake, so we can be informed if the read-end is cleaned up while invoking a user callback */
- bool did_user_callback_clean_up_read_end = false;
- read_impl->did_user_callback_clean_up_read_end = &did_user_callback_clean_up_read_end;
- /* If readable event received, tell user to try and read, even if "error" events have also occurred. */
- if (events & AWS_IO_EVENT_TYPE_READABLE) {
- read_impl->on_readable_user_callback(read_end, AWS_ERROR_SUCCESS, read_impl->on_readable_user_data);
- if (did_user_callback_clean_up_read_end) {
- return;
- }
- events &= ~AWS_IO_EVENT_TYPE_READABLE;
- }
- if (events) {
- /* Check that user didn't unsubscribe in the previous callback */
- if (read_impl->is_subscribed) {
- read_impl->on_readable_user_callback(read_end, AWS_IO_BROKEN_PIPE, read_impl->on_readable_user_data);
- if (did_user_callback_clean_up_read_end) {
- return;
- }
- }
- }
- read_impl->did_user_callback_clean_up_read_end = NULL;
- }
- int aws_pipe_subscribe_to_readable_events(
- struct aws_pipe_read_end *read_end,
- aws_pipe_on_readable_fn *on_readable,
- void *user_data) {
- AWS_ASSERT(on_readable);
- struct read_end_impl *read_impl = read_end->impl_data;
- if (!read_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
- if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
- if (read_impl->is_subscribed) {
- return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
- }
- read_impl->is_subscribed = true;
- read_impl->on_readable_user_callback = on_readable;
- read_impl->on_readable_user_data = user_data;
- int err = aws_event_loop_subscribe_to_io_events(
- read_impl->event_loop, &read_impl->handle, AWS_IO_EVENT_TYPE_READABLE, s_read_end_on_event, read_end);
- if (err) {
- read_impl->is_subscribed = false;
- read_impl->on_readable_user_callback = NULL;
- read_impl->on_readable_user_data = NULL;
- return AWS_OP_ERR;
- }
- return AWS_OP_SUCCESS;
- }
- int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) {
- struct read_end_impl *read_impl = read_end->impl_data;
- if (!read_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
- if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
- if (!read_impl->is_subscribed) {
- return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED);
- }
- int err = aws_event_loop_unsubscribe_from_io_events(read_impl->event_loop, &read_impl->handle);
- if (err) {
- return AWS_OP_ERR;
- }
- read_impl->is_subscribed = false;
- read_impl->on_readable_user_callback = NULL;
- read_impl->on_readable_user_data = NULL;
- return AWS_OP_SUCCESS;
- }
- /* Pop front write request, invoke its callback, and delete it.
- * Returns whether the callback resulted in the write-end getting cleaned up */
- static bool s_write_end_complete_front_write_request(struct aws_pipe_write_end *write_end, int error_code) {
- struct write_end_impl *write_impl = write_end->impl_data;
- AWS_ASSERT(!aws_linked_list_empty(&write_impl->write_list));
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
- struct pipe_write_request *request = AWS_CONTAINER_OF(node, struct pipe_write_request, list_node);
- struct aws_allocator *alloc = write_impl->alloc;
- /* Let the write-end know that a callback is in process, so the write-end can inform the callback
- * whether it resulted in clean_up() being called. */
- bool write_end_cleaned_up_during_callback = false;
- struct pipe_write_request *prev_invoking_request = write_impl->currently_invoking_write_callback;
- write_impl->currently_invoking_write_callback = request;
- if (request->user_callback) {
- request->user_callback(write_end, error_code, request->original_cursor, request->user_data);
- write_end_cleaned_up_during_callback = request->did_user_callback_clean_up_write_end;
- }
- if (!write_end_cleaned_up_during_callback) {
- write_impl->currently_invoking_write_callback = prev_invoking_request;
- }
- aws_mem_release(alloc, request);
- return write_end_cleaned_up_during_callback;
- }
- /* Process write requests as long as the pipe remains writable */
- static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) {
- struct write_end_impl *write_impl = write_end->impl_data;
- AWS_ASSERT(write_impl);
- while (!aws_linked_list_empty(&write_impl->write_list)) {
- struct aws_linked_list_node *node = aws_linked_list_front(&write_impl->write_list);
- struct pipe_write_request *request = AWS_CONTAINER_OF(node, struct pipe_write_request, list_node);
- int completed_error_code = AWS_ERROR_SUCCESS;
- if (request->cursor.len > 0) {
- ssize_t write_val = write(write_impl->handle.data.fd, request->cursor.ptr, request->cursor.len);
- if (write_val < 0) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
- /* The pipe is no longer writable. Bail out */
- write_impl->is_writable = false;
- return;
- }
- /* A non-recoverable error occurred during this write */
- completed_error_code = s_translate_posix_error(errno_value);
- } else {
- aws_byte_cursor_advance(&request->cursor, write_val);
- if (request->cursor.len > 0) {
- /* There was a partial write, loop again to try and write the rest. */
- continue;
- }
- }
- }
- /* If we got this far in the loop, then the write request is complete.
- * Note that the callback may result in the pipe being cleaned up. */
- bool write_end_cleaned_up = s_write_end_complete_front_write_request(write_end, completed_error_code);
- if (write_end_cleaned_up) {
- /* Bail out! Any remaining requests were canceled during clean_up() */
- return;
- }
- }
- }
- /* Handle events on the write-end's file handle */
- static void s_write_end_on_event(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- void *user_data) {
- (void)event_loop;
- (void)handle;
- /* Note that it should be impossible for this to run after write-end has been unsubscribed or cleaned up */
- struct aws_pipe_write_end *write_end = user_data;
- struct write_end_impl *write_impl = write_end->impl_data;
- AWS_ASSERT(write_impl);
- AWS_ASSERT(write_impl->event_loop == event_loop);
- AWS_ASSERT(&write_impl->handle == handle);
- /* Only care about the writable event. */
- if ((events & AWS_IO_EVENT_TYPE_WRITABLE) == 0) {
- return;
- }
- write_impl->is_writable = true;
- s_write_end_process_requests(write_end);
- }
- int aws_pipe_write(
- struct aws_pipe_write_end *write_end,
- struct aws_byte_cursor src_buffer,
- aws_pipe_on_write_completed_fn *on_completed,
- void *user_data) {
- AWS_ASSERT(src_buffer.ptr);
- struct write_end_impl *write_impl = write_end->impl_data;
- if (!write_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
- if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
- struct pipe_write_request *request = aws_mem_calloc(write_impl->alloc, 1, sizeof(struct pipe_write_request));
- if (!request) {
- return AWS_OP_ERR;
- }
- request->original_cursor = src_buffer;
- request->cursor = src_buffer;
- request->user_callback = on_completed;
- request->user_data = user_data;
- aws_linked_list_push_back(&write_impl->write_list, &request->list_node);
- /* If the pipe is writable, process the request (unless pipe is already in the middle of processing, which could
- * happen if a this aws_pipe_write() call was made by another write's completion callback */
- if (write_impl->is_writable && !write_impl->currently_invoking_write_callback) {
- s_write_end_process_requests(write_end);
- }
- return AWS_OP_SUCCESS;
- }
- int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) {
- struct write_end_impl *write_impl = write_end->impl_data;
- if (!write_impl) {
- return aws_raise_error(AWS_IO_BROKEN_PIPE);
- }
- if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
- int err = aws_event_loop_unsubscribe_from_io_events(write_impl->event_loop, &write_impl->handle);
- if (err) {
- return AWS_OP_ERR;
- }
- close(write_impl->handle.data.fd);
- /* Zero out write-end before invoking user callbacks so that it won't work anymore with public functions. */
- AWS_ZERO_STRUCT(*write_end);
- /* If a request callback is currently being invoked, let it know that the write-end was cleaned up */
- if (write_impl->currently_invoking_write_callback) {
- write_impl->currently_invoking_write_callback->did_user_callback_clean_up_write_end = true;
- }
- /* Force any outstanding write requests to complete with an error status. */
- while (!aws_linked_list_empty(&write_impl->write_list)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
- struct pipe_write_request *request = AWS_CONTAINER_OF(node, struct pipe_write_request, list_node);
- if (request->user_callback) {
- request->user_callback(NULL, AWS_IO_BROKEN_PIPE, request->original_cursor, request->user_data);
- }
- aws_mem_release(write_impl->alloc, request);
- }
- aws_mem_release(write_impl->alloc, write_impl);
- return AWS_OP_SUCCESS;
- }
|