123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/io/event_loop.h>
- #include <aws/io/logging.h>
- #include <aws/common/atomics.h>
- #include <aws/common/clock.h>
- #include <aws/common/mutex.h>
- #include <aws/common/task_scheduler.h>
- #include <aws/common/thread.h>
- #if defined(__FreeBSD__) || defined(__NetBSD__)
- # define __BSD_VISIBLE 1
- # include <sys/types.h>
- #endif
- #include <sys/event.h>
- #include <aws/io/io.h>
- #include <limits.h>
- #include <unistd.h>
- static void s_destroy(struct aws_event_loop *event_loop);
- static int s_run(struct aws_event_loop *event_loop);
- static int s_stop(struct aws_event_loop *event_loop);
- static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
- static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
- static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
- static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
- static int s_subscribe_to_io_events(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- aws_event_loop_on_event_fn *on_event,
- void *user_data);
- static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
- static void s_free_io_event_resources(void *user_data);
- static bool s_is_event_thread(struct aws_event_loop *event_loop);
- static void aws_event_loop_thread(void *user_data);
- int aws_open_nonblocking_posix_pipe(int pipe_fds[2]);
- enum event_thread_state {
- EVENT_THREAD_STATE_READY_TO_RUN,
- EVENT_THREAD_STATE_RUNNING,
- EVENT_THREAD_STATE_STOPPING,
- };
- enum pipe_fd_index {
- READ_FD,
- WRITE_FD,
- };
- struct kqueue_loop {
- /* thread_created_on is the handle to the event loop thread. */
- struct aws_thread thread_created_on;
- /* thread_joined_to is used by the thread destroying the event loop. */
- aws_thread_id_t thread_joined_to;
- /* running_thread_id is NULL if the event loop thread is stopped or points-to the thread_id of the thread running
- * the event loop (either thread_created_on or thread_joined_to). Atomic because of concurrent writes (e.g.,
- * run/stop) and reads (e.g., is_event_loop_thread).
- * An aws_thread_id_t variable itself cannot be atomic because it is an opaque type that is platform-dependent. */
- struct aws_atomic_var running_thread_id;
- int kq_fd; /* kqueue file descriptor */
- /* Pipe for signaling to event-thread that cross_thread_data has changed. */
- int cross_thread_signal_pipe[2];
- /* cross_thread_data holds things that must be communicated across threads.
- * When the event-thread is running, the mutex must be locked while anyone touches anything in cross_thread_data.
- * If this data is modified outside the thread, the thread is signaled via activity on a pipe. */
- struct {
- struct aws_mutex mutex;
- bool thread_signaled; /* whether thread has been signaled about changes to cross_thread_data */
- struct aws_linked_list tasks_to_schedule;
- enum event_thread_state state;
- } cross_thread_data;
- /* thread_data holds things which, when the event-thread is running, may only be touched by the thread */
- struct {
- struct aws_task_scheduler scheduler;
- int connected_handle_count;
- /* These variables duplicate ones in cross_thread_data. We move values out while holding the mutex and operate
- * on them later */
- enum event_thread_state state;
- } thread_data;
- struct aws_thread_options thread_options;
- };
- /* Data attached to aws_io_handle while the handle is subscribed to io events */
- struct handle_data {
- struct aws_io_handle *owner;
- struct aws_event_loop *event_loop;
- aws_event_loop_on_event_fn *on_event;
- void *on_event_user_data;
- int events_subscribed; /* aws_io_event_types this handle should be subscribed to */
- int events_this_loop; /* aws_io_event_types received during current loop of the event-thread */
- enum { HANDLE_STATE_SUBSCRIBING, HANDLE_STATE_SUBSCRIBED, HANDLE_STATE_UNSUBSCRIBED } state;
- struct aws_task subscribe_task;
- struct aws_task cleanup_task;
- };
- enum {
- DEFAULT_TIMEOUT_SEC = 100, /* Max kevent() timeout per loop of the event-thread */
- MAX_EVENTS = 100, /* Max kevents to process per loop of the event-thread */
- };
- struct aws_event_loop_vtable s_kqueue_vtable = {
- .destroy = s_destroy,
- .run = s_run,
- .stop = s_stop,
- .wait_for_stop_completion = s_wait_for_stop_completion,
- .schedule_task_now = s_schedule_task_now,
- .schedule_task_future = s_schedule_task_future,
- .subscribe_to_io_events = s_subscribe_to_io_events,
- .cancel_task = s_cancel_task,
- .unsubscribe_from_io_events = s_unsubscribe_from_io_events,
- .free_io_event_resources = s_free_io_event_resources,
- .is_on_callers_thread = s_is_event_thread,
- };
- struct aws_event_loop *aws_event_loop_new_default_with_options(
- struct aws_allocator *alloc,
- const struct aws_event_loop_options *options) {
- AWS_ASSERT(alloc);
- AWS_ASSERT(clock);
- AWS_ASSERT(options);
- AWS_ASSERT(options->clock);
- bool clean_up_event_loop_mem = false;
- bool clean_up_event_loop_base = false;
- bool clean_up_impl_mem = false;
- bool clean_up_thread = false;
- bool clean_up_kqueue = false;
- bool clean_up_signal_pipe = false;
- bool clean_up_signal_kevent = false;
- bool clean_up_mutex = false;
- struct aws_event_loop *event_loop = aws_mem_acquire(alloc, sizeof(struct aws_event_loop));
- if (!event_loop) {
- return NULL;
- }
- AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered kqueue", (void *)event_loop);
- clean_up_event_loop_mem = true;
- int err = aws_event_loop_init_base(event_loop, alloc, options->clock);
- if (err) {
- goto clean_up;
- }
- clean_up_event_loop_base = true;
- struct kqueue_loop *impl = aws_mem_calloc(alloc, 1, sizeof(struct kqueue_loop));
- if (!impl) {
- goto clean_up;
- }
- if (options->thread_options) {
- impl->thread_options = *options->thread_options;
- } else {
- impl->thread_options = *aws_default_thread_options();
- }
- /* intialize thread id to NULL. It will be set when the event loop thread starts. */
- aws_atomic_init_ptr(&impl->running_thread_id, NULL);
- clean_up_impl_mem = true;
- err = aws_thread_init(&impl->thread_created_on, alloc);
- if (err) {
- goto clean_up;
- }
- clean_up_thread = true;
- impl->kq_fd = kqueue();
- if (impl->kq_fd == -1) {
- AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open kqueue handle.", (void *)event_loop);
- aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
- goto clean_up;
- }
- clean_up_kqueue = true;
- err = aws_open_nonblocking_posix_pipe(impl->cross_thread_signal_pipe);
- if (err) {
- AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to open pipe handle.", (void *)event_loop);
- goto clean_up;
- }
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: pipe descriptors read %d, write %d.",
- (void *)event_loop,
- impl->cross_thread_signal_pipe[READ_FD],
- impl->cross_thread_signal_pipe[WRITE_FD]);
- clean_up_signal_pipe = true;
- /* Set up kevent to handle activity on the cross_thread_signal_pipe */
- struct kevent thread_signal_kevent;
- EV_SET(
- &thread_signal_kevent,
- impl->cross_thread_signal_pipe[READ_FD],
- EVFILT_READ /*filter*/,
- EV_ADD | EV_CLEAR /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- NULL /*udata*/);
- int res = kevent(
- impl->kq_fd,
- &thread_signal_kevent /*changelist*/,
- 1 /*nchanges*/,
- NULL /*eventlist*/,
- 0 /*nevents*/,
- NULL /*timeout*/);
- if (res == -1) {
- AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to create cross-thread signal kevent.", (void *)event_loop);
- aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
- goto clean_up;
- }
- clean_up_signal_kevent = true;
- err = aws_mutex_init(&impl->cross_thread_data.mutex);
- if (err) {
- goto clean_up;
- }
- clean_up_mutex = true;
- impl->cross_thread_data.thread_signaled = false;
- aws_linked_list_init(&impl->cross_thread_data.tasks_to_schedule);
- impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
- err = aws_task_scheduler_init(&impl->thread_data.scheduler, alloc);
- if (err) {
- goto clean_up;
- }
- impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
- event_loop->impl_data = impl;
- event_loop->vtable = &s_kqueue_vtable;
- /* success */
- return event_loop;
- clean_up:
- if (clean_up_mutex) {
- aws_mutex_clean_up(&impl->cross_thread_data.mutex);
- }
- if (clean_up_signal_kevent) {
- thread_signal_kevent.flags = EV_DELETE;
- kevent(
- impl->kq_fd,
- &thread_signal_kevent /*changelist*/,
- 1 /*nchanges*/,
- NULL /*eventlist*/,
- 0 /*nevents*/,
- NULL /*timeout*/);
- }
- if (clean_up_signal_pipe) {
- close(impl->cross_thread_signal_pipe[READ_FD]);
- close(impl->cross_thread_signal_pipe[WRITE_FD]);
- }
- if (clean_up_kqueue) {
- close(impl->kq_fd);
- }
- if (clean_up_thread) {
- aws_thread_clean_up(&impl->thread_created_on);
- }
- if (clean_up_impl_mem) {
- aws_mem_release(alloc, impl);
- }
- if (clean_up_event_loop_base) {
- aws_event_loop_clean_up_base(event_loop);
- }
- if (clean_up_event_loop_mem) {
- aws_mem_release(alloc, event_loop);
- }
- return NULL;
- }
- static void s_destroy(struct aws_event_loop *event_loop) {
- AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: destroying event_loop", (void *)event_loop);
- struct kqueue_loop *impl = event_loop->impl_data;
- /* Stop the event-thread. This might have already happened. It's safe to call multiple times. */
- s_stop(event_loop);
- int err = s_wait_for_stop_completion(event_loop);
- if (err) {
- AWS_LOGF_WARN(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: failed to destroy event-thread, resources have been leaked",
- (void *)event_loop);
- AWS_ASSERT("Failed to destroy event-thread, resources have been leaked." == NULL);
- return;
- }
- /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
- impl->thread_joined_to = aws_thread_current_thread_id();
- aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_joined_to);
- /* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop.
- * Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */
- aws_task_scheduler_clean_up(&impl->thread_data.scheduler); /* Tasks in scheduler get cancelled*/
- while (!aws_linked_list_empty(&impl->cross_thread_data.tasks_to_schedule)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&impl->cross_thread_data.tasks_to_schedule);
- struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
- task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
- }
- /* Warn user if aws_io_handle was subscribed, but never unsubscribed. This would cause memory leaks. */
- AWS_ASSERT(impl->thread_data.connected_handle_count == 0);
- /* Clean up everything else */
- aws_mutex_clean_up(&impl->cross_thread_data.mutex);
- struct kevent thread_signal_kevent;
- EV_SET(
- &thread_signal_kevent,
- impl->cross_thread_signal_pipe[READ_FD],
- EVFILT_READ /*filter*/,
- EV_DELETE /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- NULL /*udata*/);
- kevent(
- impl->kq_fd,
- &thread_signal_kevent /*changelist*/,
- 1 /*nchanges*/,
- NULL /*eventlist*/,
- 0 /*nevents*/,
- NULL /*timeout*/);
- close(impl->cross_thread_signal_pipe[READ_FD]);
- close(impl->cross_thread_signal_pipe[WRITE_FD]);
- close(impl->kq_fd);
- aws_thread_clean_up(&impl->thread_created_on);
- aws_mem_release(event_loop->alloc, impl);
- aws_event_loop_clean_up_base(event_loop);
- aws_mem_release(event_loop->alloc, event_loop);
- }
- static int s_run(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
- AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: starting event-loop thread.", (void *)event_loop);
- /* to re-run, call stop() and wait_for_stop_completion() */
- AWS_ASSERT(impl->cross_thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
- AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
- /* Since thread isn't running it's ok to touch thread_data,
- * and it's ok to touch cross_thread_data without locking the mutex */
- impl->cross_thread_data.state = EVENT_THREAD_STATE_RUNNING;
- aws_thread_increment_unjoined_count();
- int err =
- aws_thread_launch(&impl->thread_created_on, aws_event_loop_thread, (void *)event_loop, &impl->thread_options);
- if (err) {
- aws_thread_decrement_unjoined_count();
- AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
- goto clean_up;
- }
- return AWS_OP_SUCCESS;
- clean_up:
- impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
- return AWS_OP_ERR;
- }
- /* This function can't fail, we're relying on the thread responding to critical messages (ex: stop thread) */
- void signal_cross_thread_data_changed(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: signaling event-loop that cross-thread tasks need to be scheduled.",
- (void *)event_loop);
- /* Doesn't actually matter what we write, any activity on pipe signals that cross_thread_data has changed,
- * If the pipe is full and the write fails, that's fine, the event-thread will get the signal from some previous
- * write */
- uint32_t write_whatever = 0xC0FFEE;
- write(impl->cross_thread_signal_pipe[WRITE_FD], &write_whatever, sizeof(write_whatever));
- }
- static int s_stop(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
- bool signal_thread = false;
- { /* Begin critical section */
- aws_mutex_lock(&impl->cross_thread_data.mutex);
- if (impl->cross_thread_data.state == EVENT_THREAD_STATE_RUNNING) {
- impl->cross_thread_data.state = EVENT_THREAD_STATE_STOPPING;
- signal_thread = !impl->cross_thread_data.thread_signaled;
- impl->cross_thread_data.thread_signaled = true;
- }
- aws_mutex_unlock(&impl->cross_thread_data.mutex);
- } /* End critical section */
- if (signal_thread) {
- signal_cross_thread_data_changed(event_loop);
- }
- return AWS_OP_SUCCESS;
- }
- static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
- #ifdef DEBUG_BUILD
- aws_mutex_lock(&impl->cross_thread_data.mutex);
- /* call stop() before wait_for_stop_completion() or you'll wait forever */
- AWS_ASSERT(impl->cross_thread_data.state != EVENT_THREAD_STATE_RUNNING);
- aws_mutex_unlock(&impl->cross_thread_data.mutex);
- #endif
- int err = aws_thread_join(&impl->thread_created_on);
- aws_thread_decrement_unjoined_count();
- if (err) {
- return AWS_OP_ERR;
- }
- /* Since thread is no longer running it's ok to touch thread_data,
- * and it's ok to touch cross_thread_data without locking the mutex */
- impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
- impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
- return AWS_OP_SUCCESS;
- }
- /* Common functionality for "now" and "future" task scheduling.
- * If `run_at_nanos` is zero then the task is scheduled as a "now" task. */
- static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
- AWS_ASSERT(task);
- struct kqueue_loop *impl = event_loop->impl_data;
- /* If we're on the event-thread, just schedule it directly */
- if (s_is_event_thread(event_loop)) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: scheduling task %p in-thread for timestamp %llu",
- (void *)event_loop,
- (void *)task,
- (unsigned long long)run_at_nanos);
- if (run_at_nanos == 0) {
- aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
- } else {
- aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos);
- }
- return;
- }
- /* Otherwise, add it to cross_thread_data.tasks_to_schedule and signal the event-thread to process it */
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: scheduling task %p cross-thread for timestamp %llu",
- (void *)event_loop,
- (void *)task,
- (unsigned long long)run_at_nanos);
- task->timestamp = run_at_nanos;
- bool should_signal_thread = false;
- /* Begin critical section */
- aws_mutex_lock(&impl->cross_thread_data.mutex);
- aws_linked_list_push_back(&impl->cross_thread_data.tasks_to_schedule, &task->node);
- /* Signal thread that cross_thread_data has changed (unless it's been signaled already) */
- if (!impl->cross_thread_data.thread_signaled) {
- should_signal_thread = true;
- impl->cross_thread_data.thread_signaled = true;
- }
- aws_mutex_unlock(&impl->cross_thread_data.mutex);
- /* End critical section */
- if (should_signal_thread) {
- signal_cross_thread_data_changed(event_loop);
- }
- }
- static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
- s_schedule_task_common(event_loop, task, 0); /* Zero is used to denote "now" tasks */
- }
- static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
- s_schedule_task_common(event_loop, task, run_at_nanos);
- }
- static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
- struct kqueue_loop *kqueue_loop = event_loop->impl_data;
- AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task);
- aws_task_scheduler_cancel_task(&kqueue_loop->thread_data.scheduler, task);
- }
- /* Scheduled task that connects aws_io_handle with the kqueue */
- static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
- (void)task;
- struct handle_data *handle_data = user_data;
- struct aws_event_loop *event_loop = handle_data->event_loop;
- struct kqueue_loop *impl = handle_data->event_loop->impl_data;
- impl->thread_data.connected_handle_count++;
- /* if task was cancelled, nothing to do */
- if (status == AWS_TASK_STATUS_CANCELED) {
- return;
- }
- /* If handle was unsubscribed before this task could execute, nothing to do */
- if (handle_data->state == HANDLE_STATE_UNSUBSCRIBED) {
- return;
- }
- AWS_ASSERT(handle_data->state == HANDLE_STATE_SUBSCRIBING);
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle_data->owner->data.fd);
- /* In order to monitor both reads and writes, kqueue requires you to add two separate kevents.
- * If we're adding two separate kevents, but one of those fails, we need to remove the other kevent.
- * Therefore we use the EV_RECEIPT flag. This causes kevent() to tell whether each EV_ADD succeeded,
- * rather than the usual behavior of telling us about recent events. */
- struct kevent changelist[2];
- AWS_ZERO_ARRAY(changelist);
- int changelist_size = 0;
- if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
- EV_SET(
- &changelist[changelist_size++],
- handle_data->owner->data.fd,
- EVFILT_READ /*filter*/,
- EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- handle_data /*udata*/);
- }
- if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
- EV_SET(
- &changelist[changelist_size++],
- handle_data->owner->data.fd,
- EVFILT_WRITE /*filter*/,
- EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- handle_data /*udata*/);
- }
- int num_events = kevent(
- impl->kq_fd,
- changelist /*changelist*/,
- changelist_size /*nchanges*/,
- changelist /*eventlist. It's OK to re-use the same memory for changelist input and eventlist output*/,
- changelist_size /*nevents*/,
- NULL /*timeout*/);
- if (num_events == -1) {
- goto subscribe_failed;
- }
- /* Look through results to see if any failed */
- for (int i = 0; i < num_events; ++i) {
- /* Every result should be flagged as error, that's just how EV_RECEIPT works */
- AWS_ASSERT(changelist[i].flags & EV_ERROR);
- /* If a real error occurred, .data contains the error code */
- if (changelist[i].data != 0) {
- goto subscribe_failed;
- }
- }
- /* Success */
- handle_data->state = HANDLE_STATE_SUBSCRIBED;
- return;
- subscribe_failed:
- AWS_LOGF_ERROR(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: failed to subscribe to events on fd %d",
- (void *)event_loop,
- handle_data->owner->data.fd);
- /* Remove any related kevents that succeeded */
- for (int i = 0; i < num_events; ++i) {
- if (changelist[i].data == 0) {
- changelist[i].flags = EV_DELETE;
- kevent(
- impl->kq_fd,
- &changelist[i] /*changelist*/,
- 1 /*nchanges*/,
- NULL /*eventlist*/,
- 0 /*nevents*/,
- NULL /*timeout*/);
- }
- }
- /* We can't return an error code because this was a scheduled task.
- * Notify the user of the failed subscription by passing AWS_IO_EVENT_TYPE_ERROR to the callback. */
- handle_data->on_event(event_loop, handle_data->owner, AWS_IO_EVENT_TYPE_ERROR, handle_data->on_event_user_data);
- }
- static int s_subscribe_to_io_events(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- aws_event_loop_on_event_fn *on_event,
- void *user_data) {
- AWS_ASSERT(event_loop);
- AWS_ASSERT(handle->data.fd != -1);
- AWS_ASSERT(handle->additional_data == NULL);
- AWS_ASSERT(on_event);
- /* Must subscribe for read, write, or both */
- AWS_ASSERT(events & (AWS_IO_EVENT_TYPE_READABLE | AWS_IO_EVENT_TYPE_WRITABLE));
- struct handle_data *handle_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct handle_data));
- if (!handle_data) {
- return AWS_OP_ERR;
- }
- handle_data->owner = handle;
- handle_data->event_loop = event_loop;
- handle_data->on_event = on_event;
- handle_data->on_event_user_data = user_data;
- handle_data->events_subscribed = events;
- handle_data->state = HANDLE_STATE_SUBSCRIBING;
- handle->additional_data = handle_data;
- /* We schedule a task to perform the actual changes to the kqueue, read on for an explanation why...
- *
- * kqueue requires separate registrations for read and write events.
- * If the user wants to know about both read and write, we need register once for read and once for write.
- * If the first registration succeeds, but the second registration fails, we need to delete the first registration.
- * If this all happened outside the event-thread, the successful registration's events could begin processing
- * in the brief window of time before the registration is deleted. */
- aws_task_init(&handle_data->subscribe_task, s_subscribe_task, handle_data, "kqueue_event_loop_subscribe");
- s_schedule_task_now(event_loop, &handle_data->subscribe_task);
- return AWS_OP_SUCCESS;
- }
- static void s_free_io_event_resources(void *user_data) {
- struct handle_data *handle_data = user_data;
- struct kqueue_loop *impl = handle_data->event_loop->impl_data;
- impl->thread_data.connected_handle_count--;
- aws_mem_release(handle_data->event_loop->alloc, handle_data);
- }
- static void s_clean_up_handle_data_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
- (void)task;
- (void)status;
- struct handle_data *handle_data = user_data;
- s_free_io_event_resources(handle_data);
- }
- static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP, "id=%p: un-subscribing from events on fd %d", (void *)event_loop, handle->data.fd);
- AWS_ASSERT(handle->additional_data);
- struct handle_data *handle_data = handle->additional_data;
- struct kqueue_loop *impl = event_loop->impl_data;
- AWS_ASSERT(event_loop == handle_data->event_loop);
- /* If the handle was successfully subscribed to kqueue, then remove it. */
- if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
- struct kevent changelist[2];
- int changelist_size = 0;
- if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
- EV_SET(
- &changelist[changelist_size++],
- handle_data->owner->data.fd,
- EVFILT_READ /*filter*/,
- EV_DELETE /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- handle_data /*udata*/);
- }
- if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
- EV_SET(
- &changelist[changelist_size++],
- handle_data->owner->data.fd,
- EVFILT_WRITE /*filter*/,
- EV_DELETE /*flags*/,
- 0 /*fflags*/,
- 0 /*data*/,
- handle_data /*udata*/);
- }
- kevent(impl->kq_fd, changelist, changelist_size, NULL /*eventlist*/, 0 /*nevents*/, NULL /*timeout*/);
- }
- /* Schedule a task to clean up the memory. This is done in a task to prevent the following scenario:
- * - While processing a batch of events, some callback unsubscribes another aws_io_handle.
- * - One of the other events in this batch belongs to that other aws_io_handle.
- * - If the handle_data were already deleted, there would be an access invalid memory. */
- aws_task_init(
- &handle_data->cleanup_task, s_clean_up_handle_data_task, handle_data, "kqueue_event_loop_clean_up_handle_data");
- aws_event_loop_schedule_task_now(event_loop, &handle_data->cleanup_task);
- handle_data->state = HANDLE_STATE_UNSUBSCRIBED;
- handle->additional_data = NULL;
- return AWS_OP_SUCCESS;
- }
- static bool s_is_event_thread(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
- aws_thread_id_t *thread_id = aws_atomic_load_ptr(&impl->running_thread_id);
- return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
- }
- /* Called from thread.
- * Takes tasks from tasks_to_schedule and adds them to the scheduler. */
- static void s_process_tasks_to_schedule(struct aws_event_loop *event_loop, struct aws_linked_list *tasks_to_schedule) {
- struct kqueue_loop *impl = event_loop->impl_data;
- AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)event_loop);
- while (!aws_linked_list_empty(tasks_to_schedule)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(tasks_to_schedule);
- struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: task %p pulled to event-loop, scheduling now.",
- (void *)event_loop,
- (void *)task);
- /* Timestamp 0 is used to denote "now" tasks */
- if (task->timestamp == 0) {
- aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
- } else {
- aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, task->timestamp);
- }
- }
- }
- static void s_process_cross_thread_data(struct aws_event_loop *event_loop) {
- struct kqueue_loop *impl = event_loop->impl_data;
- AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: notified of cross-thread data to process", (void *)event_loop);
- /* If there are tasks to schedule, grab them all out of synced_data.tasks_to_schedule.
- * We'll process them later, so that we minimize time spent holding the mutex. */
- struct aws_linked_list tasks_to_schedule;
- aws_linked_list_init(&tasks_to_schedule);
- { /* Begin critical section */
- aws_mutex_lock(&impl->cross_thread_data.mutex);
- impl->cross_thread_data.thread_signaled = false;
- bool initiate_stop = (impl->cross_thread_data.state == EVENT_THREAD_STATE_STOPPING) &&
- (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING);
- if (AWS_UNLIKELY(initiate_stop)) {
- impl->thread_data.state = EVENT_THREAD_STATE_STOPPING;
- }
- aws_linked_list_swap_contents(&impl->cross_thread_data.tasks_to_schedule, &tasks_to_schedule);
- aws_mutex_unlock(&impl->cross_thread_data.mutex);
- } /* End critical section */
- s_process_tasks_to_schedule(event_loop, &tasks_to_schedule);
- }
- static int s_aws_event_flags_from_kevent(struct kevent *kevent) {
- int event_flags = 0;
- if (kevent->flags & EV_ERROR) {
- event_flags |= AWS_IO_EVENT_TYPE_ERROR;
- } else if (kevent->filter == EVFILT_READ) {
- if (kevent->data != 0) {
- event_flags |= AWS_IO_EVENT_TYPE_READABLE;
- }
- if (kevent->flags & EV_EOF) {
- event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
- }
- } else if (kevent->filter == EVFILT_WRITE) {
- if (kevent->data != 0) {
- event_flags |= AWS_IO_EVENT_TYPE_WRITABLE;
- }
- if (kevent->flags & EV_EOF) {
- event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
- }
- }
- return event_flags;
- }
- /**
- * This just calls kevent()
- *
- * We broke this out into its own function so that the stacktrace clearly shows
- * what this thread is doing. We've had a lot of cases where users think this
- * thread is deadlocked because it's stuck here. We want it to be clear
- * that it's doing nothing on purpose. It's waiting for events to happen...
- */
- AWS_NO_INLINE
- static int aws_event_loop_listen_for_io_events(int kq_fd, struct kevent kevents[MAX_EVENTS], struct timespec *timeout) {
- return kevent(kq_fd, NULL /*changelist*/, 0 /*nchanges*/, kevents /*eventlist*/, MAX_EVENTS /*nevents*/, timeout);
- }
- static void aws_event_loop_thread(void *user_data) {
- struct aws_event_loop *event_loop = user_data;
- AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
- struct kqueue_loop *impl = event_loop->impl_data;
- /* set thread id to the event-loop's thread. */
- aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id);
- AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
- impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
- struct kevent kevents[MAX_EVENTS];
- /* A single aws_io_handle could have two separate kevents if subscribed for both read and write.
- * If both the read and write kevents fire in the same loop of the event-thread,
- * combine the event-flags and deliver them in a single callback.
- * This makes the kqueue_event_loop behave more like the other platform implementations. */
- struct handle_data *io_handle_events[MAX_EVENTS];
- struct timespec timeout = {
- .tv_sec = DEFAULT_TIMEOUT_SEC,
- .tv_nsec = 0,
- };
- AWS_LOGF_INFO(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: default timeout %ds, and max events to process per tick %d",
- (void *)event_loop,
- DEFAULT_TIMEOUT_SEC,
- MAX_EVENTS);
- while (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING) {
- int num_io_handle_events = 0;
- bool should_process_cross_thread_data = false;
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: waiting for a maximum of %ds %lluns",
- (void *)event_loop,
- (int)timeout.tv_sec,
- (unsigned long long)timeout.tv_nsec);
- /* Process kqueue events */
- int num_kevents = aws_event_loop_listen_for_io_events(impl->kq_fd, kevents, &timeout);
- aws_event_loop_register_tick_start(event_loop);
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, num_kevents);
- if (num_kevents == -1) {
- /* Raise an error, in case this is interesting to anyone monitoring,
- * and continue on with this loop. We can't process events,
- * but we can still process scheduled tasks */
- aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
- /* Force the cross_thread_data to be processed.
- * There might be valuable info in there, like the message to stop the thread.
- * It's fine to do this even if nothing has changed, it just costs a mutex lock/unlock. */
- should_process_cross_thread_data = true;
- }
- for (int i = 0; i < num_kevents; ++i) {
- struct kevent *kevent = &kevents[i];
- /* Was this event to signal that cross_thread_data has changed? */
- if ((int)kevent->ident == impl->cross_thread_signal_pipe[READ_FD]) {
- should_process_cross_thread_data = true;
- /* Drain whatever data was written to the signaling pipe */
- uint32_t read_whatever;
- while (read((int)kevent->ident, &read_whatever, sizeof(read_whatever)) > 0) {
- }
- continue;
- }
- /* Otherwise this was a normal event on a subscribed handle. Figure out which flags to report. */
- int event_flags = s_aws_event_flags_from_kevent(kevent);
- if (event_flags == 0) {
- continue;
- }
- /* Combine flags, in case multiple kevents correspond to one handle. (see notes at top of function) */
- struct handle_data *handle_data = kevent->udata;
- if (handle_data->events_this_loop == 0) {
- io_handle_events[num_io_handle_events++] = handle_data;
- }
- handle_data->events_this_loop |= event_flags;
- }
- /* Invoke each handle's event callback (unless the handle has been unsubscribed) */
- for (int i = 0; i < num_io_handle_events; ++i) {
- struct handle_data *handle_data = io_handle_events[i];
- if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: activity on fd %d, invoking handler.",
- (void *)event_loop,
- handle_data->owner->data.fd);
- handle_data->on_event(
- event_loop, handle_data->owner, handle_data->events_this_loop, handle_data->on_event_user_data);
- }
- handle_data->events_this_loop = 0;
- }
- /* Process cross_thread_data */
- if (should_process_cross_thread_data) {
- s_process_cross_thread_data(event_loop);
- }
- /* Run scheduled tasks */
- uint64_t now_ns = 0;
- event_loop->clock(&now_ns); /* If clock fails, now_ns will be 0 and tasks scheduled for a specific time
- will not be run. That's ok, we'll handle them next time around. */
- AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
- aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns);
- /* Set timeout for next kevent() call.
- * If clock fails, or scheduler has no tasks, use default timeout */
- bool use_default_timeout = false;
- int err = event_loop->clock(&now_ns);
- if (err) {
- use_default_timeout = true;
- }
- uint64_t next_run_time_ns;
- if (!aws_task_scheduler_has_tasks(&impl->thread_data.scheduler, &next_run_time_ns)) {
- use_default_timeout = true;
- }
- if (use_default_timeout) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
- timeout.tv_sec = DEFAULT_TIMEOUT_SEC;
- timeout.tv_nsec = 0;
- } else {
- /* Convert from timestamp in nanoseconds, to timeout in seconds with nanosecond remainder */
- uint64_t timeout_ns = next_run_time_ns > now_ns ? next_run_time_ns - now_ns : 0;
- uint64_t timeout_remainder_ns = 0;
- uint64_t timeout_sec =
- aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &timeout_remainder_ns);
- if (timeout_sec > LONG_MAX) { /* Check for overflow. On Darwin, these values are stored as longs */
- timeout_sec = LONG_MAX;
- timeout_remainder_ns = 0;
- }
- AWS_LOGF_TRACE(
- AWS_LS_IO_EVENT_LOOP,
- "id=%p: detected more scheduled tasks with the next occurring at "
- "%llu using timeout of %ds %lluns.",
- (void *)event_loop,
- (unsigned long long)timeout_ns,
- (int)timeout_sec,
- (unsigned long long)timeout_remainder_ns);
- timeout.tv_sec = (time_t)(timeout_sec);
- timeout.tv_nsec = (long)(timeout_remainder_ns);
- }
- aws_event_loop_register_tick_end(event_loop);
- }
- AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
- /* reset to NULL. This should be updated again during destroy before tasks are canceled. */
- aws_atomic_store_ptr(&impl->running_thread_id, NULL);
- }
|