123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/mqtt/private/topic_tree.h>
- #include <aws/io/logging.h>
- #include <aws/common/byte_buf.h>
- #include <aws/common/task_scheduler.h>
- #ifdef _MSC_VER
- /* disables warning non const declared initializers for Microsoft compilers */
- # pragma warning(disable : 4204)
- #endif /* _MSC_VER */
- AWS_STATIC_STRING_FROM_LITERAL(s_single_level_wildcard, "+");
- AWS_STATIC_STRING_FROM_LITERAL(s_multi_level_wildcard, "#");
- /*******************************************************************************
- * Transactions
- ******************************************************************************/
- struct topic_tree_action {
- enum {
- AWS_MQTT_TOPIC_TREE_ADD,
- AWS_MQTT_TOPIC_TREE_UPDATE,
- AWS_MQTT_TOPIC_TREE_REMOVE,
- } mode;
- /* All Nodes */
- struct aws_mqtt_topic_node *node_to_update;
- /* ADD/UPDATE */
- struct aws_byte_cursor topic;
- const struct aws_string *topic_filter;
- enum aws_mqtt_qos qos;
- aws_mqtt_publish_received_fn *callback;
- aws_mqtt_userdata_cleanup_fn *cleanup;
- void *userdata;
- /* ADD */
- struct aws_mqtt_topic_node *last_found;
- struct aws_mqtt_topic_node *first_created;
- /* REMOVE */
- struct aws_array_list to_remove; /* topic_tree_node* */
- };
- size_t aws_mqtt_topic_tree_action_size = sizeof(struct topic_tree_action);
- static struct topic_tree_action *s_topic_tree_action_create(struct aws_array_list *transaction) {
- struct topic_tree_action *action = NULL;
- /* Push an empty action into the transaction and get a pointer to it. */
- struct topic_tree_action empty_action;
- AWS_ZERO_STRUCT(empty_action);
- if (aws_array_list_push_back(transaction, &empty_action)) {
- AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "Failed to insert action into transaction, array_list_push_back failed");
- goto push_back_failed;
- }
- if (aws_array_list_get_at_ptr(transaction, (void **)&action, aws_array_list_length(transaction) - 1)) {
- AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "Failed to retrieve most recent action from transaction");
- goto get_at_failed;
- }
- AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "action=%p: Created action", (void *)action);
- return action;
- get_at_failed:
- aws_array_list_pop_back(transaction);
- push_back_failed:
- return NULL;
- }
- static void s_topic_tree_action_destroy(struct topic_tree_action *action) {
- AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "action=%p: Destroying action", (void *)action);
- if (action->mode == AWS_MQTT_TOPIC_TREE_REMOVE) {
- aws_array_list_clean_up(&action->to_remove);
- }
- AWS_ZERO_STRUCT(*action);
- }
- static int s_topic_tree_action_to_remove(
- struct topic_tree_action *action,
- struct aws_allocator *allocator,
- size_t size_hint) {
- if (action->mode != AWS_MQTT_TOPIC_TREE_REMOVE) {
- if (aws_array_list_init_dynamic(&action->to_remove, allocator, size_hint, sizeof(void *))) {
- AWS_LOGF_ERROR(
- AWS_LS_MQTT_TOPIC_TREE, "action=%p: Failed to initialize to_remove list in action", (void *)action);
- return AWS_OP_ERR;
- }
- action->mode = AWS_MQTT_TOPIC_TREE_REMOVE;
- }
- return AWS_OP_SUCCESS;
- }
- static bool byte_cursor_eq(const void *a, const void *b) {
- const struct aws_byte_cursor *cur_a = a;
- const struct aws_byte_cursor *cur_b = b;
- return aws_byte_cursor_eq(cur_a, cur_b);
- }
- /*******************************************************************************
- * Init
- ******************************************************************************/
- static struct aws_mqtt_topic_node *s_topic_node_new(
- struct aws_allocator *allocator,
- const struct aws_byte_cursor *topic_filter,
- const struct aws_string *full_topic) {
- AWS_PRECONDITION(!topic_filter || full_topic);
- struct aws_mqtt_topic_node *node = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_topic_node));
- if (!node) {
- AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "Failed to allocate new topic node");
- return NULL;
- }
- if (topic_filter) {
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "node=%p: Creating new node with topic filter " PRInSTR,
- (void *)node,
- AWS_BYTE_CURSOR_PRI(*topic_filter));
- }
- if (topic_filter) {
- node->topic = *topic_filter;
- node->topic_filter = full_topic;
- }
- /* Init the sub topics map */
- if (aws_hash_table_init(&node->subtopics, allocator, 0, aws_hash_byte_cursor_ptr, byte_cursor_eq, NULL, NULL)) {
- AWS_LOGF_ERROR(
- AWS_LS_MQTT_TOPIC_TREE, "node=%p: Failed to initialize subtopics table in topic node", (void *)node);
- aws_mem_release(allocator, node);
- return NULL;
- }
- return node;
- }
- static int s_topic_node_destroy_hash_foreach_wrap(void *context, struct aws_hash_element *elem);
- static void s_topic_node_destroy(struct aws_mqtt_topic_node *node, struct aws_allocator *allocator) {
- AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "node=%p: Destroying topic tree node", (void *)node);
- /* Traverse all children and remove */
- aws_hash_table_foreach(&node->subtopics, s_topic_node_destroy_hash_foreach_wrap, allocator);
- if (node->cleanup && node->userdata) {
- node->cleanup(node->userdata);
- }
- if (node->owns_topic_filter) {
- aws_string_destroy((void *)node->topic_filter);
- }
- aws_hash_table_clean_up(&node->subtopics);
- aws_mem_release(allocator, node);
- }
- static int s_topic_node_destroy_hash_foreach_wrap(void *context, struct aws_hash_element *elem) {
- s_topic_node_destroy(elem->value, context);
- return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE;
- }
- int aws_mqtt_topic_tree_init(struct aws_mqtt_topic_tree *tree, struct aws_allocator *allocator) {
- AWS_PRECONDITION(tree);
- AWS_PRECONDITION(allocator);
- AWS_LOGF_DEBUG(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Creating new topic tree", (void *)tree);
- tree->root = s_topic_node_new(allocator, NULL, NULL);
- if (!tree->root) {
- /* Error raised by s_topic_node_new */
- return AWS_OP_ERR;
- }
- tree->allocator = allocator;
- return AWS_OP_SUCCESS;
- }
- /*******************************************************************************
- * Clean Up
- ******************************************************************************/
- void aws_mqtt_topic_tree_clean_up(struct aws_mqtt_topic_tree *tree) {
- AWS_PRECONDITION(tree);
- AWS_LOGF_DEBUG(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Cleaning up topic tree", (void *)tree);
- if (tree->allocator && tree->root) {
- s_topic_node_destroy(tree->root, tree->allocator);
- AWS_ZERO_STRUCT(*tree);
- }
- }
- /*******************************************************************************
- * Iterate
- ******************************************************************************/
- bool s_topic_node_is_subscription(const struct aws_mqtt_topic_node *node) {
- return node->callback;
- }
- struct topic_tree_iterate_context {
- bool should_continue;
- aws_mqtt_topic_tree_iterator_fn *iterator;
- void *user_data;
- };
- static int s_topic_tree_iterate_do_recurse(void *context, struct aws_hash_element *current_elem) {
- struct topic_tree_iterate_context *ctx = context;
- struct aws_mqtt_topic_node *current = current_elem->value;
- if (s_topic_node_is_subscription(current)) {
- const struct aws_byte_cursor topic_filter = aws_byte_cursor_from_string(current->topic_filter);
- ctx->should_continue = ctx->iterator(&topic_filter, current->qos, ctx->user_data);
- }
- if (ctx->should_continue) {
- aws_hash_table_foreach(¤t->subtopics, s_topic_tree_iterate_do_recurse, context);
- }
- /* One of the children could have updated should_continue, so check again */
- if (ctx->should_continue) {
- return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
- }
- /* If false returned, return immediately. */
- return 0;
- }
- void aws_mqtt_topic_tree_iterate(
- const struct aws_mqtt_topic_tree *tree,
- aws_mqtt_topic_tree_iterator_fn *iterator,
- void *user_data) {
- AWS_PRECONDITION(tree);
- AWS_PRECONDITION(tree->root);
- AWS_PRECONDITION(iterator);
- struct topic_tree_iterate_context itr;
- itr.should_continue = true;
- itr.iterator = iterator;
- itr.user_data = user_data;
- aws_hash_table_foreach(&tree->root->subtopics, s_topic_tree_iterate_do_recurse, &itr);
- }
- bool s_topic_tree_sub_count_iterator(const struct aws_byte_cursor *topic, enum aws_mqtt_qos qos, void *user_data) {
- (void)topic;
- (void)qos;
- size_t *sub_count = user_data;
- *sub_count += 1;
- return true;
- }
- size_t aws_mqtt_topic_tree_get_sub_count(const struct aws_mqtt_topic_tree *tree) {
- AWS_PRECONDITION(tree);
- AWS_PRECONDITION(tree->root);
- size_t sub_count = 0;
- aws_mqtt_topic_tree_iterate(tree, s_topic_tree_sub_count_iterator, &sub_count);
- return sub_count;
- }
- /*******************************************************************************
- * Action Commit
- ******************************************************************************/
- /* Searches subtree until a topic_filter with a different pointer value is found. */
- static int s_topic_node_string_finder(void *userdata, struct aws_hash_element *elem) {
- const struct aws_string **topic_filter = userdata;
- struct aws_mqtt_topic_node *node = elem->value;
- /* We've found this node again, search it's children */
- if (*topic_filter == node->topic_filter) {
- if (0 == aws_hash_table_get_entry_count(&node->subtopics)) {
- /* If no children, then there must be siblings, so we can use those */
- return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
- }
- aws_hash_table_foreach(&node->subtopics, s_topic_node_string_finder, userdata);
- if (*topic_filter == node->topic_filter) {
- /* If the topic filter still hasn't changed, continue iterating */
- return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
- }
- AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, " Found matching topic string, using %s", node->topic_filter->bytes);
- return 0;
- }
- AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, " Found matching topic string, using %s", node->topic_filter->bytes);
- *topic_filter = node->topic_filter;
- return 0;
- }
- static void s_topic_tree_action_commit(struct topic_tree_action *action, struct aws_mqtt_topic_tree *tree) {
- (void)tree;
- AWS_PRECONDITION(action->node_to_update);
- switch (action->mode) {
- case AWS_MQTT_TOPIC_TREE_ADD:
- case AWS_MQTT_TOPIC_TREE_UPDATE: {
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p action=%p: Committing %s topic tree action",
- (void *)tree,
- (void *)action,
- (action->mode == AWS_MQTT_TOPIC_TREE_ADD) ? "add" : "update");
- /* Destroy old userdata */
- if (action->node_to_update->cleanup && action->node_to_update->userdata) {
- /* If there was userdata assigned to this node, pass it out. */
- action->node_to_update->cleanup(action->node_to_update->userdata);
- }
- /* Update data */
- action->node_to_update->callback = action->callback;
- action->node_to_update->cleanup = action->cleanup;
- action->node_to_update->userdata = action->userdata;
- action->node_to_update->qos = action->qos;
- if (action->topic.ptr) {
- action->node_to_update->topic = action->topic;
- }
- if (action->topic_filter) {
- if (action->node_to_update->owns_topic_filter && action->node_to_update->topic_filter) {
- /* The topic filer is already there, destory the new filter to keep all the byte cursor valid */
- aws_string_destroy((void *)action->topic_filter);
- } else {
- action->node_to_update->topic_filter = action->topic_filter;
- action->node_to_update->owns_topic_filter = true;
- }
- }
- break;
- }
- case AWS_MQTT_TOPIC_TREE_REMOVE: {
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p action=%p: Committing remove topic tree action",
- (void *)tree,
- (void *)action);
- struct aws_mqtt_topic_node *current = action->node_to_update;
- const size_t sub_parts_len = aws_array_list_length(&action->to_remove) - 1;
- if (current) {
- /* If found the node, traverse up and remove each with no sub-topics.
- * Then update all nodes that were using current's topic_filter for topic. */
- /* "unsubscribe" current. */
- if (current->cleanup && current->userdata) {
- AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "node=%p: Cleaning up node's userdata", (void *)current);
- /* If there was userdata assigned to this node, pass it out. */
- current->cleanup(current->userdata);
- }
- current->callback = NULL;
- current->cleanup = NULL;
- current->userdata = NULL;
- /* Set to true if current needs to be cleaned up. */
- bool destroy_current = false;
- /* How many nodes are left after the great purge. */
- size_t nodes_left = sub_parts_len;
- /* Remove all subscription-less and child-less nodes. */
- for (size_t i = sub_parts_len; i > 0; --i) {
- struct aws_mqtt_topic_node *node = NULL;
- aws_array_list_get_at(&action->to_remove, &node, i);
- AWS_ASSUME(node); /* Must be in bounds */
- if (!s_topic_node_is_subscription(node) && 0 == aws_hash_table_get_entry_count(&node->subtopics)) {
- /* No subscription and no children, this node needs to go. */
- struct aws_mqtt_topic_node *grandma = NULL;
- aws_array_list_get_at(&action->to_remove, &grandma, i - 1);
- AWS_ASSUME(grandma); /* Must be in bounds */
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p node=%p: Removing child node %p with topic \"" PRInSTR "\"",
- (void *)tree,
- (void *)grandma,
- (void *)node,
- AWS_BYTE_CURSOR_PRI(node->topic));
- aws_hash_table_remove(&grandma->subtopics, &node->topic, NULL, NULL);
- /* Make sure the following loop doesn't hit this node. */
- --nodes_left;
- if (i != sub_parts_len) {
- /* Clean up and delete */
- s_topic_node_destroy(node, tree->allocator);
- } else {
- destroy_current = true;
- }
- } else {
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p: Node %p with topic \"" PRInSTR
- "\" has children or is a subscription, leaving in place",
- (void *)tree,
- (void *)node,
- AWS_BYTE_CURSOR_PRI(node->topic));
- /* Once we've found one node with children, the rest are guaranteed to. */
- break;
- }
- }
- /* If current owns the full string, go fixup the pointer references. */
- if (nodes_left > 0) {
- /* If a new viable topic filter is found once, it can be used for all parents. */
- const struct aws_string *new_topic_filter = NULL;
- const struct aws_string *const old_topic_filter = current->topic_filter;
- /* How much of new_topic_filter should be lopped off the beginning. */
- struct aws_mqtt_topic_node *parent = NULL;
- aws_array_list_get_at(&action->to_remove, &parent, nodes_left);
- AWS_ASSUME(parent);
- size_t topic_offset =
- parent->topic.ptr - aws_string_bytes(parent->topic_filter) + parent->topic.len + 1;
- /* -1 to avoid touching current */
- for (size_t i = nodes_left; i > 0; --i) {
- aws_array_list_get_at(&action->to_remove, &parent, i);
- AWS_ASSUME(parent); /* Must be in bounds */
- /* Remove this topic and following / from offset. */
- topic_offset -= (parent->topic.len + 1);
- if (parent->topic_filter == old_topic_filter) {
- /* Uh oh, Mom's using my topic string again! Steal it and replace it with a new one, Indiana
- * Jones style. */
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p: Found node %p reusing topic filter part, replacing with next child",
- (void *)tree,
- (void *)parent);
- if (!new_topic_filter) {
- /* Set new_tf to old_tf so it's easier to check against the existing node.
- * Basically, it's an INOUT param. */
- new_topic_filter = old_topic_filter;
- /* Search all subtopics until we find one that isn't current. */
- aws_hash_table_foreach(
- &parent->subtopics, s_topic_node_string_finder, (void *)&new_topic_filter);
- /* This would only happen if there is only one topic in subtopics (current's) and
- * it has no children (in which case it should have been removed above). */
- AWS_ASSERT(new_topic_filter != old_topic_filter);
- /* Now that the new string has been found, the old one can be destroyed. */
- aws_string_destroy((void *)current->topic_filter);
- current->owns_topic_filter = false;
- }
- /* Update the pointers. */
- parent->topic_filter = new_topic_filter;
- parent->topic.ptr = (uint8_t *)aws_string_bytes(new_topic_filter) + topic_offset;
- }
- }
- }
- /* Now that the strings are update, remove current. */
- if (destroy_current) {
- s_topic_node_destroy(current, tree->allocator);
- }
- current = NULL;
- }
- break;
- }
- }
- s_topic_tree_action_destroy(action);
- }
- /*******************************************************************************
- * Action Roll Back
- ******************************************************************************/
- static void s_topic_tree_action_roll_back(struct topic_tree_action *action, struct aws_mqtt_topic_tree *tree) {
- AWS_PRECONDITION(action);
- switch (action->mode) {
- case AWS_MQTT_TOPIC_TREE_ADD: {
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p action=%p: Rolling back add transaction action",
- (void *)tree,
- (void *)action);
- /* Remove the first new node from it's parent's map */
- aws_hash_table_remove(&action->last_found->subtopics, &action->first_created->topic, NULL, NULL);
- /* Recursively destroy all other created nodes */
- s_topic_node_destroy(action->first_created, tree->allocator);
- if (action->topic_filter) {
- aws_string_destroy((void *)action->topic_filter);
- }
- break;
- }
- case AWS_MQTT_TOPIC_TREE_REMOVE:
- case AWS_MQTT_TOPIC_TREE_UPDATE: {
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p action=%p: Rolling back remove/update transaction, no changes made",
- (void *)tree,
- (void *)action);
- /* Aborting a remove or update doesn't require any actions. */
- break;
- }
- }
- s_topic_tree_action_destroy(action);
- }
- /*******************************************************************************
- * Insert
- ******************************************************************************/
- int aws_mqtt_topic_tree_transaction_insert(
- struct aws_mqtt_topic_tree *tree,
- struct aws_array_list *transaction,
- const struct aws_string *topic_filter_ori,
- enum aws_mqtt_qos qos,
- aws_mqtt_publish_received_fn *callback,
- aws_mqtt_userdata_cleanup_fn *cleanup,
- void *userdata) {
- AWS_PRECONDITION(tree);
- AWS_PRECONDITION(transaction);
- AWS_PRECONDITION(topic_filter_ori);
- AWS_PRECONDITION(callback);
- /* let topic tree take the ownership of the new string and leave the caller string alone. */
- struct aws_string *topic_filter = aws_string_new_from_string(tree->allocator, topic_filter_ori);
- AWS_LOGF_DEBUG(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p: Inserting topic filter %s into topic tree",
- (void *)tree,
- topic_filter->bytes);
- struct aws_mqtt_topic_node *current = tree->root;
- struct topic_tree_action *action = s_topic_tree_action_create(transaction);
- if (!action) {
- return AWS_OP_ERR;
- }
- /* Default to update unless a node was added */
- action->mode = AWS_MQTT_TOPIC_TREE_UPDATE;
- action->qos = qos;
- action->callback = callback;
- action->cleanup = cleanup;
- action->userdata = userdata;
- struct aws_byte_cursor topic_filter_cur = aws_byte_cursor_from_string(topic_filter);
- struct aws_byte_cursor sub_part;
- AWS_ZERO_STRUCT(sub_part);
- struct aws_byte_cursor last_part;
- AWS_ZERO_STRUCT(last_part);
- while (aws_byte_cursor_next_split(&topic_filter_cur, '/', &sub_part)) {
- last_part = sub_part;
- /* Add or find mid-node */
- struct aws_hash_element *elem = NULL;
- int was_created = 0;
- aws_hash_table_create(¤t->subtopics, &sub_part, &elem, &was_created);
- if (was_created) {
- if (action->mode == AWS_MQTT_TOPIC_TREE_UPDATE) {
- /* Store the last found node */
- action->last_found = current;
- }
- /* Node does not exist, add new one */
- current = s_topic_node_new(tree->allocator, &sub_part, topic_filter);
- if (!current) {
- /* Don't do handle_error logic, the action needs to persist to be rolled back */
- return AWS_OP_ERR;
- }
- /* Stash in the hash map */
- elem->key = ¤t->topic;
- elem->value = current;
- if (action->mode == AWS_MQTT_TOPIC_TREE_UPDATE) {
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p: Topic part \"" PRInSTR "\" is new, it and all children will be added as new nodes",
- (void *)tree,
- AWS_BYTE_CURSOR_PRI(sub_part));
- /* Store the node we just made, and make sure we don't store again */
- action->mode = AWS_MQTT_TOPIC_TREE_ADD;
- action->first_created = current;
- }
- } else {
- AWS_ASSERT(action->mode == AWS_MQTT_TOPIC_TREE_UPDATE); /* Can't have found an existing node while adding */
- /* If the node exists, just traverse it */
- current = elem->value;
- }
- }
- action->node_to_update = current;
- /* Node found (or created), add the topic filter and callbacks */
- if (current->owns_topic_filter) {
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p node=%p: Updating existing node that already owns its topic_filter, throwing out parameter",
- (void *)tree,
- (void *)current);
- /* If the topic filter was already here, this is already a subscription.
- Free the new topic_filter so all existing byte_cursors remain valid. */
- aws_string_destroy(topic_filter);
- } else {
- /* Node already existed (or was created) but wasn't subscription. */
- action->topic = last_part;
- action->topic_filter = topic_filter;
- }
- return AWS_OP_SUCCESS;
- }
- /*******************************************************************************
- * Remove
- ******************************************************************************/
- int aws_mqtt_topic_tree_transaction_remove(
- struct aws_mqtt_topic_tree *tree,
- struct aws_array_list *transaction,
- const struct aws_byte_cursor *topic_filter,
- void **old_userdata) {
- AWS_PRECONDITION(tree);
- AWS_PRECONDITION(transaction);
- AWS_PRECONDITION(topic_filter);
- AWS_LOGF_DEBUG(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p: Removing topic filter \"" PRInSTR "\" from topic tree",
- (void *)tree,
- AWS_BYTE_CURSOR_PRI(*topic_filter));
- /* Initialize output parameter to a safe default */
- if (old_userdata) {
- *old_userdata = NULL;
- }
- /* Default to error because that's what handle_error will do in all cases except node not found */
- int result = AWS_OP_ERR;
- struct topic_tree_action *action = s_topic_tree_action_create(transaction);
- if (!action) {
- return AWS_OP_ERR;
- }
- struct aws_array_list sub_topic_parts;
- AWS_ZERO_STRUCT(sub_topic_parts);
- if (aws_array_list_init_dynamic(&sub_topic_parts, tree->allocator, 1, sizeof(struct aws_byte_cursor))) {
- AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to initialize topic parts array", (void *)tree);
- goto handle_error;
- }
- if (aws_byte_cursor_split_on_char(topic_filter, '/', &sub_topic_parts)) {
- AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to split topic filter", (void *)tree);
- goto handle_error;
- }
- const size_t sub_parts_len = aws_array_list_length(&sub_topic_parts);
- if (!sub_parts_len) {
- AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to get topic parts length", (void *)tree);
- goto handle_error;
- }
- s_topic_tree_action_to_remove(action, tree->allocator, sub_parts_len);
- struct aws_mqtt_topic_node *current = tree->root;
- if (aws_array_list_push_back(&action->to_remove, ¤t)) {
- AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to insert root node into to_remove list", (void *)tree);
- goto handle_error;
- }
- for (size_t i = 0; i < sub_parts_len; ++i) {
- /* Get the current topic part */
- struct aws_byte_cursor *sub_part = NULL;
- aws_array_list_get_at_ptr(&sub_topic_parts, (void **)&sub_part, i);
- /* Find mid-node */
- struct aws_hash_element *elem = NULL;
- aws_hash_table_find(¤t->subtopics, sub_part, &elem);
- if (elem) {
- /* If the node exists, just traverse it */
- current = elem->value;
- if (aws_array_list_push_back(&action->to_remove, ¤t)) {
- AWS_LOGF_ERROR(
- AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to insert topic node into to_remove list", (void *)tree);
- goto handle_error;
- }
- } else {
- /* If not, abandon ship */
- goto handle_not_found;
- }
- }
- action->node_to_update = current;
- aws_array_list_clean_up(&sub_topic_parts);
- if (old_userdata) {
- *old_userdata = current->userdata;
- }
- return AWS_OP_SUCCESS;
- handle_not_found:
- result = AWS_OP_SUCCESS;
- handle_error:
- aws_array_list_clean_up(&sub_topic_parts);
- s_topic_tree_action_destroy(action);
- aws_array_list_pop_back(transaction);
- return result;
- }
- /*******************************************************************************
- * Commit
- ******************************************************************************/
- void aws_mqtt_topic_tree_transaction_commit(struct aws_mqtt_topic_tree *tree, struct aws_array_list *transaction) {
- const size_t num_actions = aws_array_list_length(transaction);
- for (size_t i = 0; i < num_actions; ++i) {
- struct topic_tree_action *action = NULL;
- aws_array_list_get_at_ptr(transaction, (void **)&action, i);
- AWS_ASSUME(action); /* Within bounds */
- s_topic_tree_action_commit(action, tree);
- }
- aws_array_list_clear(transaction);
- }
- /*******************************************************************************
- * Roll Back
- ******************************************************************************/
- void aws_mqtt_topic_tree_transaction_roll_back(struct aws_mqtt_topic_tree *tree, struct aws_array_list *transaction) {
- const size_t num_actions = aws_array_list_length(transaction);
- for (size_t i = 1; i <= num_actions; ++i) {
- struct topic_tree_action *action = NULL;
- aws_array_list_get_at_ptr(transaction, (void **)&action, num_actions - i);
- AWS_ASSUME(action); /* Within bounds */
- s_topic_tree_action_roll_back(action, tree);
- }
- aws_array_list_clear(transaction);
- }
- int aws_mqtt_topic_tree_insert(
- struct aws_mqtt_topic_tree *tree,
- const struct aws_string *topic_filter,
- enum aws_mqtt_qos qos,
- aws_mqtt_publish_received_fn *callback,
- aws_mqtt_userdata_cleanup_fn *cleanup,
- void *userdata) {
- AWS_VARIABLE_LENGTH_ARRAY(uint8_t, transaction_buf, aws_mqtt_topic_tree_action_size);
- struct aws_array_list transaction;
- aws_array_list_init_static(&transaction, transaction_buf, 1, aws_mqtt_topic_tree_action_size);
- if (aws_mqtt_topic_tree_transaction_insert(tree, &transaction, topic_filter, qos, callback, cleanup, userdata)) {
- aws_mqtt_topic_tree_transaction_roll_back(tree, &transaction);
- return AWS_OP_ERR;
- }
- aws_mqtt_topic_tree_transaction_commit(tree, &transaction);
- return AWS_OP_SUCCESS;
- }
- int aws_mqtt_topic_tree_remove(struct aws_mqtt_topic_tree *tree, const struct aws_byte_cursor *topic_filter) {
- AWS_PRECONDITION(tree);
- AWS_PRECONDITION(topic_filter);
- AWS_VARIABLE_LENGTH_ARRAY(uint8_t, transaction_buf, aws_mqtt_topic_tree_action_size);
- struct aws_array_list transaction;
- aws_array_list_init_static(&transaction, transaction_buf, 1, aws_mqtt_topic_tree_action_size);
- if (aws_mqtt_topic_tree_transaction_remove(tree, &transaction, topic_filter, NULL)) {
- aws_mqtt_topic_tree_transaction_roll_back(tree, &transaction);
- return AWS_OP_ERR;
- }
- aws_mqtt_topic_tree_transaction_commit(tree, &transaction);
- return AWS_OP_SUCCESS;
- }
- /*******************************************************************************
- * Publish
- ******************************************************************************/
- static void s_topic_tree_publish_do_recurse(
- const struct aws_byte_cursor *current_sub_part,
- const struct aws_mqtt_topic_node *current,
- const struct aws_mqtt_packet_publish *pub) {
- struct aws_byte_cursor hash_cur = aws_byte_cursor_from_string(s_multi_level_wildcard);
- struct aws_byte_cursor plus_cur = aws_byte_cursor_from_string(s_single_level_wildcard);
- struct aws_hash_element *elem = NULL;
- struct aws_byte_cursor sub_part = *current_sub_part;
- if (!aws_byte_cursor_next_split(&pub->topic_name, '/', &sub_part)) {
- /* If this is the last node and is a sub, call it */
- if (s_topic_node_is_subscription(current)) {
- bool dup = aws_mqtt_packet_publish_get_dup(pub);
- enum aws_mqtt_qos qos = aws_mqtt_packet_publish_get_qos(pub);
- bool retain = aws_mqtt_packet_publish_get_retain(pub);
- current->callback(&pub->topic_name, &pub->payload, dup, qos, retain, current->userdata);
- }
- return;
- }
- /* Check multi-level wildcard */
- aws_hash_table_find(¤t->subtopics, &hash_cur, &elem);
- if (elem) {
- /* Match! */
- struct aws_mqtt_topic_node *multi_wildcard = elem->value;
- /* Must be a subscription and have no children */
- AWS_ASSERT(s_topic_node_is_subscription(multi_wildcard));
- AWS_ASSERT(0 == aws_hash_table_get_entry_count(&multi_wildcard->subtopics));
- bool dup = aws_mqtt_packet_publish_get_dup(pub);
- enum aws_mqtt_qos qos = aws_mqtt_packet_publish_get_qos(pub);
- bool retain = aws_mqtt_packet_publish_get_retain(pub);
- multi_wildcard->callback(&pub->topic_name, &pub->payload, dup, qos, retain, multi_wildcard->userdata);
- }
- /* Check single level wildcard */
- aws_hash_table_find(¤t->subtopics, &plus_cur, &elem);
- if (elem) {
- /* Recurse sub topics */
- s_topic_tree_publish_do_recurse(&sub_part, elem->value, pub);
- }
- /* Check actual topic name */
- aws_hash_table_find(¤t->subtopics, &sub_part, &elem);
- if (elem) {
- /* Found the actual topic, recurse to it */
- s_topic_tree_publish_do_recurse(&sub_part, elem->value, pub);
- }
- }
- void aws_mqtt_topic_tree_publish(const struct aws_mqtt_topic_tree *tree, struct aws_mqtt_packet_publish *pub) {
- AWS_PRECONDITION(tree);
- AWS_PRECONDITION(pub);
- AWS_LOGF_TRACE(
- AWS_LS_MQTT_TOPIC_TREE,
- "tree=%p: Publishing on topic " PRInSTR,
- (void *)tree,
- AWS_BYTE_CURSOR_PRI(pub->topic_name));
- struct aws_byte_cursor sub_part;
- AWS_ZERO_STRUCT(sub_part);
- s_topic_tree_publish_do_recurse(&sub_part, tree->root, pub);
- }
|