123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724 |
- /*
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License").
- * You may not use this file except in compliance with the License.
- * A copy of the License is located at
- *
- * http://aws.amazon.com/apache2.0
- *
- * or in the "license" file accompanying this file. This file is distributed
- * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
- * express or implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
- #include <aws/common/bus.h>
- #include <aws/common/allocator.h>
- #include <aws/common/atomics.h>
- #include <aws/common/byte_buf.h>
- #include <aws/common/condition_variable.h>
- #include <aws/common/hash_table.h>
- #include <aws/common/linked_list.h>
- #include <aws/common/logging.h>
- #include <aws/common/mutex.h>
- #include <aws/common/thread.h>
- #include <inttypes.h>
- #ifdef _MSC_VER
- # pragma warning(push)
- # pragma warning(disable : 4204) /* nonstandard extension used: non-constant aggregate initializer */
- #endif
- struct aws_bus {
- struct aws_allocator *allocator;
- /* vtable and additional data structures for delivery policy */
- void *impl;
- };
- /* MUST be the first member of any impl to allow blind casting */
- struct bus_vtable {
- void (*clean_up)(struct aws_bus *bus);
- int (*send)(struct aws_bus *bus, uint64_t address, void *payload, void (*destructor)(void *));
- int (*subscribe)(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *callback, void *user_data);
- void (*unsubscribe)(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *callback, void *user_data);
- };
- /* each bound callback is stored as a bus_listener in the slots table */
- struct bus_listener {
- struct aws_linked_list_node list_node;
- void *user_data;
- aws_bus_listener_fn *deliver;
- };
- /* value type stored in each slot in the slots table in a bus */
- struct listener_list {
- struct aws_allocator *allocator;
- struct aws_linked_list listeners;
- };
- /* find a listener list (or NULL) by address */
- static struct listener_list *bus_find_listeners(struct aws_hash_table *slots, uint64_t address) {
- struct aws_hash_element *elem = NULL;
- if (aws_hash_table_find(slots, (void *)(uintptr_t)address, &elem)) {
- return NULL;
- }
- if (!elem) {
- return NULL;
- }
- struct listener_list *list = elem->value;
- return list;
- }
- /* find a listener list by address, or create/insert/return a new one */
- static struct listener_list *bus_find_or_create_listeners(
- struct aws_allocator *allocator,
- struct aws_hash_table *slots,
- uint64_t address) {
- struct listener_list *list = bus_find_listeners(slots, address);
- if (list) {
- return list;
- }
- list = aws_mem_calloc(allocator, 1, sizeof(struct listener_list));
- list->allocator = allocator;
- aws_linked_list_init(&list->listeners);
- aws_hash_table_put(slots, (void *)(uintptr_t)address, list, NULL);
- return list;
- }
- static void s_bus_deliver_msg_to_slot(
- struct aws_bus *bus,
- uint64_t slot,
- uint64_t address,
- struct aws_hash_table *slots,
- const void *payload) {
- (void)bus;
- struct listener_list *list = bus_find_listeners(slots, slot);
- if (!list) {
- return;
- }
- struct aws_linked_list_node *node = aws_linked_list_begin(&list->listeners);
- for (; node != aws_linked_list_end(&list->listeners); node = aws_linked_list_next(node)) {
- struct bus_listener *listener = AWS_CONTAINER_OF(node, struct bus_listener, list_node);
- listener->deliver(address, payload, listener->user_data);
- }
- }
- /* common delivery logic */
- static void s_bus_deliver_msg(
- struct aws_bus *bus,
- uint64_t address,
- struct aws_hash_table *slots,
- const void *payload) {
- s_bus_deliver_msg_to_slot(bus, AWS_BUS_ADDRESS_ALL, address, slots, payload);
- s_bus_deliver_msg_to_slot(bus, address, address, slots, payload);
- }
- /* common subscribe logic */
- static int s_bus_subscribe(
- struct aws_bus *bus,
- uint64_t address,
- struct aws_hash_table *slots,
- aws_bus_listener_fn *callback,
- void *user_data) {
- if (address == AWS_BUS_ADDRESS_CLOSE) {
- AWS_LOGF_ERROR(AWS_LS_COMMON_BUS, "Cannot directly subscribe to AWS_BUS_ADDRESS_CLOSE(0)");
- return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- }
- struct listener_list *list = bus_find_or_create_listeners(bus->allocator, slots, address);
- struct bus_listener *listener = aws_mem_calloc(bus->allocator, 1, sizeof(struct bus_listener));
- listener->deliver = callback;
- listener->user_data = user_data;
- aws_linked_list_push_back(&list->listeners, &listener->list_node);
- return AWS_OP_SUCCESS;
- }
- /* common unsubscribe logic */
- static void s_bus_unsubscribe(
- struct aws_bus *bus,
- uint64_t address,
- struct aws_hash_table *slots,
- aws_bus_listener_fn *callback,
- void *user_data) {
- (void)bus;
- if (address == AWS_BUS_ADDRESS_CLOSE) {
- AWS_LOGF_WARN(AWS_LS_COMMON_BUS, "Attempted to unsubscribe from invalid address AWS_BUS_ADDRESS_CLOSE")
- return;
- }
- struct listener_list *list = bus_find_listeners(slots, address);
- if (!list) {
- return;
- }
- struct aws_linked_list_node *node;
- for (node = aws_linked_list_begin(&list->listeners); node != aws_linked_list_end(&list->listeners);
- node = aws_linked_list_next(node)) {
- struct bus_listener *listener = AWS_CONTAINER_OF(node, struct bus_listener, list_node);
- if (listener->deliver == callback && listener->user_data == user_data) {
- aws_linked_list_remove(node);
- aws_mem_release(list->allocator, listener);
- return;
- }
- }
- }
- /* destructor for listener lists in the slots tables */
- void s_bus_destroy_listener_list(void *data) {
- struct listener_list *list = data;
- AWS_PRECONDITION(list->allocator);
- /* call all listeners with an AWS_BUS_ADDRESS_CLOSE message type to clean up */
- while (!aws_linked_list_empty(&list->listeners)) {
- struct aws_linked_list_node *back = aws_linked_list_back(&list->listeners);
- struct bus_listener *listener = AWS_CONTAINER_OF(back, struct bus_listener, list_node);
- listener->deliver(AWS_BUS_ADDRESS_CLOSE, NULL, listener->user_data);
- aws_linked_list_pop_back(&list->listeners);
- aws_mem_release(list->allocator, listener);
- }
- aws_mem_release(list->allocator, list);
- }
- /*
- * AWS_BUS_SYNC implementation
- */
- struct bus_sync_impl {
- struct bus_vtable vtable;
- struct {
- /* Map of address -> list of listeners */
- struct aws_hash_table table;
- } slots;
- };
- static void s_bus_sync_clean_up(struct aws_bus *bus) {
- struct bus_sync_impl *impl = bus->impl;
- aws_hash_table_clean_up(&impl->slots.table);
- aws_mem_release(bus->allocator, impl);
- }
- static int s_bus_sync_send(struct aws_bus *bus, uint64_t address, void *payload, void (*destructor)(void *)) {
- struct bus_sync_impl *impl = bus->impl;
- s_bus_deliver_msg(bus, address, &impl->slots.table, payload);
- if (destructor) {
- destructor(payload);
- }
- return AWS_OP_SUCCESS;
- }
- static int s_bus_sync_subscribe(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *callback, void *user_data) {
- struct bus_sync_impl *impl = bus->impl;
- return s_bus_subscribe(bus, address, &impl->slots.table, callback, user_data);
- }
- static void s_bus_sync_unsubscribe(
- struct aws_bus *bus,
- uint64_t address,
- aws_bus_listener_fn *callback,
- void *user_data) {
- struct bus_sync_impl *impl = bus->impl;
- s_bus_unsubscribe(bus, address, &impl->slots.table, callback, user_data);
- }
- static struct bus_vtable bus_sync_vtable = {
- .clean_up = s_bus_sync_clean_up,
- .send = s_bus_sync_send,
- .subscribe = s_bus_sync_subscribe,
- .unsubscribe = s_bus_sync_unsubscribe,
- };
- static void s_bus_sync_init(struct aws_bus *bus, const struct aws_bus_options *options) {
- (void)options;
- struct bus_sync_impl *impl = bus->impl = aws_mem_calloc(bus->allocator, 1, sizeof(struct bus_sync_impl));
- impl->vtable = bus_sync_vtable;
- if (aws_hash_table_init(
- &impl->slots.table, bus->allocator, 8, aws_hash_ptr, aws_ptr_eq, NULL, s_bus_destroy_listener_list)) {
- goto error;
- }
- return;
- error:
- aws_mem_release(bus->allocator, impl);
- }
- /*
- * AWS_BUS_ASYNC implementation
- */
- struct bus_async_impl {
- struct bus_vtable vtable;
- struct {
- /* Map of address -> list of listeners */
- struct aws_hash_table table;
- } slots;
- /* Queue of bus_messages to deliver */
- struct {
- struct aws_mutex mutex;
- /* backing memory for the message free list */
- void *buffer;
- void *buffer_end; /* 1 past the end of buffer */
- /* message free list */
- struct aws_linked_list free; /* struct bus_message */
- /* message delivery queue */
- struct aws_linked_list msgs; /* struct bus_message */
- /* list of pending adds/removes of listeners */
- struct aws_linked_list subs; /* struct pending_listener */
- } queue;
- /* dispatch thread */
- struct {
- struct aws_thread thread;
- struct aws_condition_variable notify;
- bool running;
- struct aws_atomic_var started;
- struct aws_atomic_var exited;
- } dispatch;
- bool reliable;
- };
- /* represents a message in the queue on impls that queue */
- struct bus_message {
- struct aws_linked_list_node list_node;
- uint64_t address;
- void *payload;
- void (*destructor)(void *);
- };
- struct pending_listener {
- struct aws_linked_list_node list_node;
- uint64_t address;
- aws_bus_listener_fn *listener;
- void *user_data;
- uint32_t add : 1;
- uint32_t remove : 1;
- };
- static void s_bus_message_clean_up(struct bus_message *msg) {
- if (msg->destructor) {
- msg->destructor(msg->payload);
- }
- msg->destructor = NULL;
- msg->payload = NULL;
- }
- /* Assumes the caller holds the lock */
- static void s_bus_async_free_message(struct aws_bus *bus, struct bus_message *msg) {
- struct bus_async_impl *impl = bus->impl;
- s_bus_message_clean_up(msg);
- if ((void *)msg >= impl->queue.buffer && (void *)msg < impl->queue.buffer_end) {
- AWS_ZERO_STRUCT(*msg);
- aws_linked_list_push_back(&impl->queue.free, &msg->list_node);
- return;
- }
- aws_mem_release(bus->allocator, msg);
- }
- /* Assumes the caller holds the lock */
- struct bus_message *s_bus_async_alloc_message(struct aws_bus *bus) {
- struct bus_async_impl *impl = bus->impl;
- /* try the free list first */
- if (!aws_linked_list_empty(&impl->queue.free)) {
- struct aws_linked_list_node *msg_node = aws_linked_list_pop_back(&impl->queue.free);
- struct bus_message *msg = AWS_CONTAINER_OF(msg_node, struct bus_message, list_node);
- return msg;
- }
- /* unreliable will re-use the oldest message */
- if (!impl->reliable) {
- struct aws_linked_list_node *msg_node = aws_linked_list_pop_front(&impl->queue.msgs);
- struct bus_message *msg = AWS_CONTAINER_OF(msg_node, struct bus_message, list_node);
- s_bus_async_free_message(bus, msg);
- return s_bus_async_alloc_message(bus);
- }
- return aws_mem_calloc(bus->allocator, 1, sizeof(struct bus_message));
- }
- /*
- * resolve all adds and removes of listeners, in FIFO order
- * NOTE: expects mutex to be held by caller
- */
- static void s_bus_apply_listeners(struct aws_bus *bus, struct aws_linked_list *pending_subs) {
- struct bus_async_impl *impl = bus->impl;
- while (!aws_linked_list_empty(pending_subs)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(pending_subs);
- struct pending_listener *listener = AWS_CONTAINER_OF(node, struct pending_listener, list_node);
- if (listener->add) {
- s_bus_subscribe(bus, listener->address, &impl->slots.table, listener->listener, listener->user_data);
- } else if (listener->remove) {
- s_bus_unsubscribe(bus, listener->address, &impl->slots.table, listener->listener, listener->user_data);
- }
- aws_mem_release(bus->allocator, listener);
- }
- }
- static void s_bus_async_deliver_messages(struct aws_bus *bus, struct aws_linked_list *pending_msgs) {
- struct bus_async_impl *impl = bus->impl;
- struct aws_linked_list_node *msg_node = aws_linked_list_begin(pending_msgs);
- for (; msg_node != aws_linked_list_end(pending_msgs); msg_node = aws_linked_list_next(msg_node)) {
- struct bus_message *msg = AWS_CONTAINER_OF(msg_node, struct bus_message, list_node);
- s_bus_deliver_msg(bus, msg->address, &impl->slots.table, msg->payload);
- s_bus_message_clean_up(msg);
- }
- /* push all pending messages back on the free list */
- aws_mutex_lock(&impl->queue.mutex);
- {
- while (!aws_linked_list_empty(pending_msgs)) {
- msg_node = aws_linked_list_pop_front(pending_msgs);
- struct bus_message *msg = AWS_CONTAINER_OF(msg_node, struct bus_message, list_node);
- s_bus_async_free_message(bus, msg);
- }
- }
- aws_mutex_unlock(&impl->queue.mutex);
- }
- static void s_bus_async_clean_up(struct aws_bus *bus) {
- struct bus_async_impl *impl = bus->impl;
- /* shut down delivery thread, clean up dispatch */
- AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus: %p clean_up: starting final drain", (void *)bus);
- aws_mutex_lock(&impl->queue.mutex);
- impl->dispatch.running = false;
- aws_mutex_unlock(&impl->queue.mutex);
- aws_condition_variable_notify_one(&impl->dispatch.notify);
- /* Spin wait for the final drain and dispatch thread to complete */
- while (!aws_atomic_load_int(&impl->dispatch.exited)) {
- aws_thread_current_sleep(1000 * 1000); /* 1 microsecond */
- }
- AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus: %p clean_up: finished final drain", (void *)bus);
- aws_thread_join(&impl->dispatch.thread);
- aws_thread_clean_up(&impl->dispatch.thread);
- aws_condition_variable_clean_up(&impl->dispatch.notify);
- /* should be impossible for subs or msgs to remain after final drain */
- AWS_FATAL_ASSERT(aws_linked_list_empty(&impl->queue.msgs));
- AWS_FATAL_ASSERT(aws_linked_list_empty(&impl->queue.subs));
- /* this frees everything that the free/msgs lists point to */
- if (impl->queue.buffer) {
- aws_mem_release(bus->allocator, impl->queue.buffer);
- }
- aws_mutex_clean_up(&impl->queue.mutex);
- aws_hash_table_clean_up(&impl->slots.table);
- aws_mem_release(bus->allocator, impl);
- }
- static bool s_bus_async_should_wake_up(void *user_data) {
- struct bus_async_impl *impl = user_data;
- return !impl->dispatch.running || !aws_linked_list_empty(&impl->queue.subs) ||
- !aws_linked_list_empty(&impl->queue.msgs);
- }
- static bool s_bus_async_is_running(struct bus_async_impl *impl) {
- aws_mutex_lock(&impl->queue.mutex);
- bool running = impl->dispatch.running;
- aws_mutex_unlock(&impl->queue.mutex);
- return running;
- }
- /* Async bus delivery thread loop */
- static void s_bus_async_deliver(void *user_data) {
- struct aws_bus *bus = user_data;
- struct bus_async_impl *impl = bus->impl;
- aws_atomic_store_int(&impl->dispatch.started, 1);
- AWS_LOGF_DEBUG(AWS_LS_COMMON_BUS, "bus %p: delivery thread loop started", (void *)bus);
- /* once shutdown has been triggered, need to drain one more time to ensure all queues are empty */
- int pending_drains = 1;
- do {
- struct aws_linked_list pending_msgs;
- aws_linked_list_init(&pending_msgs);
- struct aws_linked_list pending_subs;
- aws_linked_list_init(&pending_subs);
- aws_mutex_lock(&impl->queue.mutex);
- {
- aws_condition_variable_wait_pred(
- &impl->dispatch.notify, &impl->queue.mutex, s_bus_async_should_wake_up, impl);
- /* copy out any queued subs/unsubs */
- aws_linked_list_swap_contents(&impl->queue.subs, &pending_subs);
- /* copy out any queued messages */
- aws_linked_list_swap_contents(&impl->queue.msgs, &pending_msgs);
- }
- aws_mutex_unlock(&impl->queue.mutex);
- /* first resolve subs/unsubs */
- if (!aws_linked_list_empty(&pending_subs)) {
- s_bus_apply_listeners(bus, &pending_subs);
- }
- /* Then deliver queued messages */
- if (!aws_linked_list_empty(&pending_msgs)) {
- s_bus_async_deliver_messages(bus, &pending_msgs);
- }
- } while (s_bus_async_is_running(impl) || pending_drains--);
- /* record that the dispatch thread is done */
- aws_atomic_store_int(&impl->dispatch.exited, 1);
- }
- int s_bus_async_send(struct aws_bus *bus, uint64_t address, void *payload, void (*destructor)(void *)) {
- struct bus_async_impl *impl = bus->impl;
- aws_mutex_lock(&impl->queue.mutex);
- {
- if (!impl->dispatch.running) {
- AWS_LOGF_WARN(
- AWS_LS_COMMON_BUS, "bus %p: message sent after clean_up: address: %" PRIu64 "", (void *)bus, address);
- aws_mutex_unlock(&impl->queue.mutex);
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- struct bus_message *msg = s_bus_async_alloc_message(bus);
- msg->address = address;
- msg->payload = payload;
- msg->destructor = destructor;
- /* push the message onto the delivery queue */
- aws_linked_list_push_back(&impl->queue.msgs, &msg->list_node);
- }
- aws_mutex_unlock(&impl->queue.mutex);
- /* notify the delivery thread to wake up */
- aws_condition_variable_notify_one(&impl->dispatch.notify);
- return AWS_OP_SUCCESS;
- }
- int s_bus_async_subscribe(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *listener, void *user_data) {
- struct bus_async_impl *impl = bus->impl;
- if (address == AWS_BUS_ADDRESS_CLOSE) {
- AWS_LOGF_ERROR(AWS_LS_COMMON_BUS, "Cannot subscribe to AWS_BUS_ADDRESS_CLOSE");
- return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
- }
- aws_mutex_lock(&impl->queue.mutex);
- {
- if (!impl->dispatch.running) {
- AWS_LOGF_WARN(
- AWS_LS_COMMON_BUS,
- "bus %p: subscribe requested after clean_up: address: %" PRIu64 "",
- (void *)bus,
- address);
- aws_mutex_unlock(&impl->queue.mutex);
- return aws_raise_error(AWS_ERROR_INVALID_STATE);
- }
- struct pending_listener *sub = aws_mem_calloc(bus->allocator, 1, sizeof(struct pending_listener));
- sub->address = address;
- sub->listener = listener;
- sub->user_data = user_data;
- sub->add = true;
- aws_linked_list_push_back(&impl->queue.subs, &sub->list_node);
- }
- aws_mutex_unlock(&impl->queue.mutex);
- /* notify the delivery thread to wake up */
- aws_condition_variable_notify_one(&impl->dispatch.notify);
- return AWS_OP_SUCCESS;
- }
- void s_bus_async_unsubscribe(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *listener, void *user_data) {
- struct bus_async_impl *impl = bus->impl;
- if (address == AWS_BUS_ADDRESS_CLOSE) {
- AWS_LOGF_ERROR(AWS_LS_COMMON_BUS, "Cannot unsubscribe from AWS_BUS_ADDRESS_CLOSE");
- return;
- }
- aws_mutex_lock(&impl->queue.mutex);
- {
- if (!impl->dispatch.running) {
- AWS_LOGF_WARN(
- AWS_LS_COMMON_BUS,
- "bus %p: unsubscribe requested after clean_up: address: %" PRIu64 "",
- (void *)bus,
- address);
- aws_mutex_unlock(&impl->queue.mutex);
- return;
- }
- struct pending_listener *unsub = aws_mem_calloc(bus->allocator, 1, sizeof(struct pending_listener));
- unsub->address = address;
- unsub->listener = listener;
- unsub->user_data = user_data;
- unsub->remove = true;
- aws_linked_list_push_back(&impl->queue.subs, &unsub->list_node);
- }
- aws_mutex_unlock(&impl->queue.mutex);
- /* notify the delivery thread to wake up */
- aws_condition_variable_notify_one(&impl->dispatch.notify);
- }
- static struct bus_vtable bus_async_vtable = {
- .clean_up = s_bus_async_clean_up,
- .send = s_bus_async_send,
- .subscribe = s_bus_async_subscribe,
- .unsubscribe = s_bus_async_unsubscribe,
- };
- static void s_bus_async_init(struct aws_bus *bus, const struct aws_bus_options *options) {
- struct bus_async_impl *impl = bus->impl = aws_mem_calloc(bus->allocator, 1, sizeof(struct bus_async_impl));
- impl->vtable = bus_async_vtable;
- impl->reliable = (options->policy == AWS_BUS_ASYNC_RELIABLE);
- /* init msg queue */
- if (aws_mutex_init(&impl->queue.mutex)) {
- AWS_LOGF_ERROR(
- AWS_LS_COMMON_BUS,
- "bus %p: Unable to initialize queue synchronization: %s",
- (void *)bus,
- aws_error_name(aws_last_error()));
- goto error;
- }
- aws_linked_list_init(&impl->queue.msgs);
- aws_linked_list_init(&impl->queue.free);
- aws_linked_list_init(&impl->queue.subs);
- /* push as many bus_messages as we can into the free list from the buffer */
- if (options->buffer_size) {
- impl->queue.buffer = aws_mem_calloc(bus->allocator, 1, options->buffer_size);
- impl->queue.buffer_end = ((uint8_t *)impl->queue.buffer) + options->buffer_size;
- const int msg_count = (int)(options->buffer_size / sizeof(struct bus_message));
- for (int msg_idx = 0; msg_idx < msg_count; ++msg_idx) {
- struct bus_message *msg = (void *)&((char *)impl->queue.buffer)[msg_idx * sizeof(struct bus_message)];
- aws_linked_list_push_back(&impl->queue.free, &msg->list_node);
- }
- }
- /* init subscription table */
- if (aws_hash_table_init(
- &impl->slots.table, bus->allocator, 8, aws_hash_ptr, aws_ptr_eq, NULL, s_bus_destroy_listener_list)) {
- AWS_LOGF_ERROR(
- AWS_LS_COMMON_BUS,
- "bus %p: Unable to initialize bus addressing table: %s",
- (void *)bus,
- aws_error_name(aws_last_error()));
- goto error;
- }
- /* Setup dispatch thread */
- if (aws_condition_variable_init(&impl->dispatch.notify)) {
- AWS_LOGF_ERROR(
- AWS_LS_COMMON_BUS,
- "bus %p: Unable to initialize async notify: %s",
- (void *)bus,
- aws_error_name(aws_last_error()));
- goto error;
- }
- if (aws_thread_init(&impl->dispatch.thread, bus->allocator)) {
- AWS_LOGF_ERROR(
- AWS_LS_COMMON_BUS,
- "bus %p: Unable to initialize background thread: %s",
- (void *)bus,
- aws_error_name(aws_last_error()));
- goto error;
- }
- impl->dispatch.running = true;
- aws_atomic_init_int(&impl->dispatch.started, 0);
- aws_atomic_init_int(&impl->dispatch.exited, 0);
- if (aws_thread_launch(&impl->dispatch.thread, s_bus_async_deliver, bus, aws_default_thread_options())) {
- AWS_LOGF_ERROR(
- AWS_LS_COMMON_BUS,
- "bus %p: Unable to launch delivery thread: %s",
- (void *)bus,
- aws_error_name(aws_last_error()));
- goto error;
- }
- /* wait for dispatch thread to start before returning control */
- AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus %p: Waiting for delivery thread to start", (void *)bus);
- while (!aws_atomic_load_int(&impl->dispatch.started)) {
- aws_thread_current_sleep(1000 * 1000);
- }
- AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus %p: Delivery thread started", (void *)bus);
- return;
- error:
- aws_thread_clean_up(&impl->dispatch.thread);
- aws_condition_variable_clean_up(&impl->dispatch.notify);
- aws_hash_table_clean_up(&impl->slots.table);
- aws_mem_release(bus->allocator, &impl->queue.buffer);
- aws_mutex_clean_up(&impl->queue.mutex);
- aws_mem_release(bus->allocator, impl);
- bus->impl = NULL;
- }
- /*
- * Public API
- */
- struct aws_bus *aws_bus_new(struct aws_allocator *allocator, const struct aws_bus_options *options) {
- struct aws_bus *bus = aws_mem_calloc(allocator, 1, sizeof(struct aws_bus));
- bus->allocator = allocator;
- switch (options->policy) {
- case AWS_BUS_ASYNC_RELIABLE:
- case AWS_BUS_ASYNC_UNRELIABLE:
- s_bus_async_init(bus, options);
- break;
- case AWS_BUS_SYNC_RELIABLE:
- s_bus_sync_init(bus, options);
- break;
- }
- if (!bus->impl) {
- aws_mem_release(allocator, bus);
- return NULL;
- }
- return bus;
- }
- void aws_bus_destroy(struct aws_bus *bus) {
- struct bus_vtable *vtable = bus->impl;
- vtable->clean_up(bus);
- aws_mem_release(bus->allocator, bus);
- }
- int aws_bus_subscribe(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *listener, void *user_data) {
- struct bus_vtable *vtable = bus->impl;
- return vtable->subscribe(bus, address, listener, user_data);
- }
- void aws_bus_unsubscribe(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *listener, void *user_data) {
- struct bus_vtable *vtable = bus->impl;
- vtable->unsubscribe(bus, address, listener, user_data);
- }
- int aws_bus_send(struct aws_bus *bus, uint64_t address, void *payload, void (*destructor)(void *)) {
- struct bus_vtable *vtable = bus->impl;
- return vtable->send(bus, address, payload, destructor);
- }
- #ifdef _MSC_VER
- # pragma warning(pop)
- #endif
|