topic_tree.c 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/mqtt/private/topic_tree.h>
  6. #include <aws/io/logging.h>
  7. #include <aws/common/byte_buf.h>
  8. #include <aws/common/task_scheduler.h>
  9. #ifdef _MSC_VER
  10. /* disables warning non const declared initializers for Microsoft compilers */
  11. # pragma warning(disable : 4204)
  12. #endif /* _MSC_VER */
  13. AWS_STATIC_STRING_FROM_LITERAL(s_single_level_wildcard, "+");
  14. AWS_STATIC_STRING_FROM_LITERAL(s_multi_level_wildcard, "#");
  15. /*******************************************************************************
  16. * Transactions
  17. ******************************************************************************/
  18. struct topic_tree_action {
  19. enum {
  20. AWS_MQTT_TOPIC_TREE_ADD,
  21. AWS_MQTT_TOPIC_TREE_UPDATE,
  22. AWS_MQTT_TOPIC_TREE_REMOVE,
  23. } mode;
  24. /* All Nodes */
  25. struct aws_mqtt_topic_node *node_to_update;
  26. /* ADD/UPDATE */
  27. struct aws_byte_cursor topic;
  28. const struct aws_string *topic_filter;
  29. enum aws_mqtt_qos qos;
  30. aws_mqtt_publish_received_fn *callback;
  31. aws_mqtt_userdata_cleanup_fn *cleanup;
  32. void *userdata;
  33. /* ADD */
  34. struct aws_mqtt_topic_node *last_found;
  35. struct aws_mqtt_topic_node *first_created;
  36. /* REMOVE */
  37. struct aws_array_list to_remove; /* topic_tree_node* */
  38. };
  39. size_t aws_mqtt_topic_tree_action_size = sizeof(struct topic_tree_action);
  40. static struct topic_tree_action *s_topic_tree_action_create(struct aws_array_list *transaction) {
  41. struct topic_tree_action *action = NULL;
  42. /* Push an empty action into the transaction and get a pointer to it. */
  43. struct topic_tree_action empty_action;
  44. AWS_ZERO_STRUCT(empty_action);
  45. if (aws_array_list_push_back(transaction, &empty_action)) {
  46. AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "Failed to insert action into transaction, array_list_push_back failed");
  47. goto push_back_failed;
  48. }
  49. if (aws_array_list_get_at_ptr(transaction, (void **)&action, aws_array_list_length(transaction) - 1)) {
  50. AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "Failed to retrieve most recent action from transaction");
  51. goto get_at_failed;
  52. }
  53. AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "action=%p: Created action", (void *)action);
  54. return action;
  55. get_at_failed:
  56. aws_array_list_pop_back(transaction);
  57. push_back_failed:
  58. return NULL;
  59. }
  60. static void s_topic_tree_action_destroy(struct topic_tree_action *action) {
  61. AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "action=%p: Destroying action", (void *)action);
  62. if (action->mode == AWS_MQTT_TOPIC_TREE_REMOVE) {
  63. aws_array_list_clean_up(&action->to_remove);
  64. }
  65. AWS_ZERO_STRUCT(*action);
  66. }
  67. static int s_topic_tree_action_to_remove(
  68. struct topic_tree_action *action,
  69. struct aws_allocator *allocator,
  70. size_t size_hint) {
  71. if (action->mode != AWS_MQTT_TOPIC_TREE_REMOVE) {
  72. if (aws_array_list_init_dynamic(&action->to_remove, allocator, size_hint, sizeof(void *))) {
  73. AWS_LOGF_ERROR(
  74. AWS_LS_MQTT_TOPIC_TREE, "action=%p: Failed to initialize to_remove list in action", (void *)action);
  75. return AWS_OP_ERR;
  76. }
  77. action->mode = AWS_MQTT_TOPIC_TREE_REMOVE;
  78. }
  79. return AWS_OP_SUCCESS;
  80. }
  81. static bool byte_cursor_eq(const void *a, const void *b) {
  82. const struct aws_byte_cursor *cur_a = a;
  83. const struct aws_byte_cursor *cur_b = b;
  84. return aws_byte_cursor_eq(cur_a, cur_b);
  85. }
  86. /*******************************************************************************
  87. * Init
  88. ******************************************************************************/
  89. static struct aws_mqtt_topic_node *s_topic_node_new(
  90. struct aws_allocator *allocator,
  91. const struct aws_byte_cursor *topic_filter,
  92. const struct aws_string *full_topic) {
  93. AWS_PRECONDITION(!topic_filter || full_topic);
  94. struct aws_mqtt_topic_node *node = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_topic_node));
  95. if (!node) {
  96. AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "Failed to allocate new topic node");
  97. return NULL;
  98. }
  99. if (topic_filter) {
  100. AWS_LOGF_TRACE(
  101. AWS_LS_MQTT_TOPIC_TREE,
  102. "node=%p: Creating new node with topic filter " PRInSTR,
  103. (void *)node,
  104. AWS_BYTE_CURSOR_PRI(*topic_filter));
  105. }
  106. if (topic_filter) {
  107. node->topic = *topic_filter;
  108. node->topic_filter = full_topic;
  109. }
  110. /* Init the sub topics map */
  111. if (aws_hash_table_init(&node->subtopics, allocator, 0, aws_hash_byte_cursor_ptr, byte_cursor_eq, NULL, NULL)) {
  112. AWS_LOGF_ERROR(
  113. AWS_LS_MQTT_TOPIC_TREE, "node=%p: Failed to initialize subtopics table in topic node", (void *)node);
  114. aws_mem_release(allocator, node);
  115. return NULL;
  116. }
  117. return node;
  118. }
  119. static int s_topic_node_destroy_hash_foreach_wrap(void *context, struct aws_hash_element *elem);
  120. static void s_topic_node_destroy(struct aws_mqtt_topic_node *node, struct aws_allocator *allocator) {
  121. AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "node=%p: Destroying topic tree node", (void *)node);
  122. /* Traverse all children and remove */
  123. aws_hash_table_foreach(&node->subtopics, s_topic_node_destroy_hash_foreach_wrap, allocator);
  124. if (node->cleanup && node->userdata) {
  125. node->cleanup(node->userdata);
  126. }
  127. if (node->owns_topic_filter) {
  128. aws_string_destroy((void *)node->topic_filter);
  129. }
  130. aws_hash_table_clean_up(&node->subtopics);
  131. aws_mem_release(allocator, node);
  132. }
  133. static int s_topic_node_destroy_hash_foreach_wrap(void *context, struct aws_hash_element *elem) {
  134. s_topic_node_destroy(elem->value, context);
  135. return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE;
  136. }
  137. int aws_mqtt_topic_tree_init(struct aws_mqtt_topic_tree *tree, struct aws_allocator *allocator) {
  138. AWS_PRECONDITION(tree);
  139. AWS_PRECONDITION(allocator);
  140. AWS_LOGF_DEBUG(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Creating new topic tree", (void *)tree);
  141. tree->root = s_topic_node_new(allocator, NULL, NULL);
  142. if (!tree->root) {
  143. /* Error raised by s_topic_node_new */
  144. return AWS_OP_ERR;
  145. }
  146. tree->allocator = allocator;
  147. return AWS_OP_SUCCESS;
  148. }
  149. /*******************************************************************************
  150. * Clean Up
  151. ******************************************************************************/
  152. void aws_mqtt_topic_tree_clean_up(struct aws_mqtt_topic_tree *tree) {
  153. AWS_PRECONDITION(tree);
  154. AWS_LOGF_DEBUG(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Cleaning up topic tree", (void *)tree);
  155. if (tree->allocator && tree->root) {
  156. s_topic_node_destroy(tree->root, tree->allocator);
  157. AWS_ZERO_STRUCT(*tree);
  158. }
  159. }
  160. /*******************************************************************************
  161. * Iterate
  162. ******************************************************************************/
  163. bool s_topic_node_is_subscription(const struct aws_mqtt_topic_node *node) {
  164. return node->callback;
  165. }
  166. struct topic_tree_iterate_context {
  167. bool should_continue;
  168. aws_mqtt_topic_tree_iterator_fn *iterator;
  169. void *user_data;
  170. };
  171. static int s_topic_tree_iterate_do_recurse(void *context, struct aws_hash_element *current_elem) {
  172. struct topic_tree_iterate_context *ctx = context;
  173. struct aws_mqtt_topic_node *current = current_elem->value;
  174. if (s_topic_node_is_subscription(current)) {
  175. const struct aws_byte_cursor topic_filter = aws_byte_cursor_from_string(current->topic_filter);
  176. ctx->should_continue = ctx->iterator(&topic_filter, current->qos, ctx->user_data);
  177. }
  178. if (ctx->should_continue) {
  179. aws_hash_table_foreach(&current->subtopics, s_topic_tree_iterate_do_recurse, context);
  180. }
  181. /* One of the children could have updated should_continue, so check again */
  182. if (ctx->should_continue) {
  183. return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
  184. }
  185. /* If false returned, return immediately. */
  186. return 0;
  187. }
  188. void aws_mqtt_topic_tree_iterate(
  189. const struct aws_mqtt_topic_tree *tree,
  190. aws_mqtt_topic_tree_iterator_fn *iterator,
  191. void *user_data) {
  192. AWS_PRECONDITION(tree);
  193. AWS_PRECONDITION(tree->root);
  194. AWS_PRECONDITION(iterator);
  195. struct topic_tree_iterate_context itr;
  196. itr.should_continue = true;
  197. itr.iterator = iterator;
  198. itr.user_data = user_data;
  199. aws_hash_table_foreach(&tree->root->subtopics, s_topic_tree_iterate_do_recurse, &itr);
  200. }
  201. bool s_topic_tree_sub_count_iterator(const struct aws_byte_cursor *topic, enum aws_mqtt_qos qos, void *user_data) {
  202. (void)topic;
  203. (void)qos;
  204. size_t *sub_count = user_data;
  205. *sub_count += 1;
  206. return true;
  207. }
  208. size_t aws_mqtt_topic_tree_get_sub_count(const struct aws_mqtt_topic_tree *tree) {
  209. AWS_PRECONDITION(tree);
  210. AWS_PRECONDITION(tree->root);
  211. size_t sub_count = 0;
  212. aws_mqtt_topic_tree_iterate(tree, s_topic_tree_sub_count_iterator, &sub_count);
  213. return sub_count;
  214. }
  215. /*******************************************************************************
  216. * Action Commit
  217. ******************************************************************************/
  218. /* Searches subtree until a topic_filter with a different pointer value is found. */
  219. static int s_topic_node_string_finder(void *userdata, struct aws_hash_element *elem) {
  220. const struct aws_string **topic_filter = userdata;
  221. struct aws_mqtt_topic_node *node = elem->value;
  222. /* We've found this node again, search it's children */
  223. if (*topic_filter == node->topic_filter) {
  224. if (0 == aws_hash_table_get_entry_count(&node->subtopics)) {
  225. /* If no children, then there must be siblings, so we can use those */
  226. return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
  227. }
  228. aws_hash_table_foreach(&node->subtopics, s_topic_node_string_finder, userdata);
  229. if (*topic_filter == node->topic_filter) {
  230. /* If the topic filter still hasn't changed, continue iterating */
  231. return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
  232. }
  233. AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, " Found matching topic string, using %s", node->topic_filter->bytes);
  234. return 0;
  235. }
  236. AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, " Found matching topic string, using %s", node->topic_filter->bytes);
  237. *topic_filter = node->topic_filter;
  238. return 0;
  239. }
  240. static void s_topic_tree_action_commit(struct topic_tree_action *action, struct aws_mqtt_topic_tree *tree) {
  241. (void)tree;
  242. AWS_PRECONDITION(action->node_to_update);
  243. switch (action->mode) {
  244. case AWS_MQTT_TOPIC_TREE_ADD:
  245. case AWS_MQTT_TOPIC_TREE_UPDATE: {
  246. AWS_LOGF_TRACE(
  247. AWS_LS_MQTT_TOPIC_TREE,
  248. "tree=%p action=%p: Committing %s topic tree action",
  249. (void *)tree,
  250. (void *)action,
  251. (action->mode == AWS_MQTT_TOPIC_TREE_ADD) ? "add" : "update");
  252. /* Destroy old userdata */
  253. if (action->node_to_update->cleanup && action->node_to_update->userdata) {
  254. /* If there was userdata assigned to this node, pass it out. */
  255. action->node_to_update->cleanup(action->node_to_update->userdata);
  256. }
  257. /* Update data */
  258. action->node_to_update->callback = action->callback;
  259. action->node_to_update->cleanup = action->cleanup;
  260. action->node_to_update->userdata = action->userdata;
  261. action->node_to_update->qos = action->qos;
  262. if (action->topic.ptr) {
  263. action->node_to_update->topic = action->topic;
  264. }
  265. if (action->topic_filter) {
  266. if (action->node_to_update->owns_topic_filter && action->node_to_update->topic_filter) {
  267. /* The topic filer is already there, destory the new filter to keep all the byte cursor valid */
  268. aws_string_destroy((void *)action->topic_filter);
  269. } else {
  270. action->node_to_update->topic_filter = action->topic_filter;
  271. action->node_to_update->owns_topic_filter = true;
  272. }
  273. }
  274. break;
  275. }
  276. case AWS_MQTT_TOPIC_TREE_REMOVE: {
  277. AWS_LOGF_TRACE(
  278. AWS_LS_MQTT_TOPIC_TREE,
  279. "tree=%p action=%p: Committing remove topic tree action",
  280. (void *)tree,
  281. (void *)action);
  282. struct aws_mqtt_topic_node *current = action->node_to_update;
  283. const size_t sub_parts_len = aws_array_list_length(&action->to_remove) - 1;
  284. if (current) {
  285. /* If found the node, traverse up and remove each with no sub-topics.
  286. * Then update all nodes that were using current's topic_filter for topic. */
  287. /* "unsubscribe" current. */
  288. if (current->cleanup && current->userdata) {
  289. AWS_LOGF_TRACE(AWS_LS_MQTT_TOPIC_TREE, "node=%p: Cleaning up node's userdata", (void *)current);
  290. /* If there was userdata assigned to this node, pass it out. */
  291. current->cleanup(current->userdata);
  292. }
  293. current->callback = NULL;
  294. current->cleanup = NULL;
  295. current->userdata = NULL;
  296. /* Set to true if current needs to be cleaned up. */
  297. bool destroy_current = false;
  298. /* How many nodes are left after the great purge. */
  299. size_t nodes_left = sub_parts_len;
  300. /* Remove all subscription-less and child-less nodes. */
  301. for (size_t i = sub_parts_len; i > 0; --i) {
  302. struct aws_mqtt_topic_node *node = NULL;
  303. aws_array_list_get_at(&action->to_remove, &node, i);
  304. AWS_ASSUME(node); /* Must be in bounds */
  305. if (!s_topic_node_is_subscription(node) && 0 == aws_hash_table_get_entry_count(&node->subtopics)) {
  306. /* No subscription and no children, this node needs to go. */
  307. struct aws_mqtt_topic_node *grandma = NULL;
  308. aws_array_list_get_at(&action->to_remove, &grandma, i - 1);
  309. AWS_ASSUME(grandma); /* Must be in bounds */
  310. AWS_LOGF_TRACE(
  311. AWS_LS_MQTT_TOPIC_TREE,
  312. "tree=%p node=%p: Removing child node %p with topic \"" PRInSTR "\"",
  313. (void *)tree,
  314. (void *)grandma,
  315. (void *)node,
  316. AWS_BYTE_CURSOR_PRI(node->topic));
  317. aws_hash_table_remove(&grandma->subtopics, &node->topic, NULL, NULL);
  318. /* Make sure the following loop doesn't hit this node. */
  319. --nodes_left;
  320. if (i != sub_parts_len) {
  321. /* Clean up and delete */
  322. s_topic_node_destroy(node, tree->allocator);
  323. } else {
  324. destroy_current = true;
  325. }
  326. } else {
  327. AWS_LOGF_TRACE(
  328. AWS_LS_MQTT_TOPIC_TREE,
  329. "tree=%p: Node %p with topic \"" PRInSTR
  330. "\" has children or is a subscription, leaving in place",
  331. (void *)tree,
  332. (void *)node,
  333. AWS_BYTE_CURSOR_PRI(node->topic));
  334. /* Once we've found one node with children, the rest are guaranteed to. */
  335. break;
  336. }
  337. }
  338. /* If current owns the full string, go fixup the pointer references. */
  339. if (nodes_left > 0) {
  340. /* If a new viable topic filter is found once, it can be used for all parents. */
  341. const struct aws_string *new_topic_filter = NULL;
  342. const struct aws_string *const old_topic_filter = current->topic_filter;
  343. /* How much of new_topic_filter should be lopped off the beginning. */
  344. struct aws_mqtt_topic_node *parent = NULL;
  345. aws_array_list_get_at(&action->to_remove, &parent, nodes_left);
  346. AWS_ASSUME(parent);
  347. size_t topic_offset =
  348. parent->topic.ptr - aws_string_bytes(parent->topic_filter) + parent->topic.len + 1;
  349. /* -1 to avoid touching current */
  350. for (size_t i = nodes_left; i > 0; --i) {
  351. aws_array_list_get_at(&action->to_remove, &parent, i);
  352. AWS_ASSUME(parent); /* Must be in bounds */
  353. /* Remove this topic and following / from offset. */
  354. topic_offset -= (parent->topic.len + 1);
  355. if (parent->topic_filter == old_topic_filter) {
  356. /* Uh oh, Mom's using my topic string again! Steal it and replace it with a new one, Indiana
  357. * Jones style. */
  358. AWS_LOGF_TRACE(
  359. AWS_LS_MQTT_TOPIC_TREE,
  360. "tree=%p: Found node %p reusing topic filter part, replacing with next child",
  361. (void *)tree,
  362. (void *)parent);
  363. if (!new_topic_filter) {
  364. /* Set new_tf to old_tf so it's easier to check against the existing node.
  365. * Basically, it's an INOUT param. */
  366. new_topic_filter = old_topic_filter;
  367. /* Search all subtopics until we find one that isn't current. */
  368. aws_hash_table_foreach(
  369. &parent->subtopics, s_topic_node_string_finder, (void *)&new_topic_filter);
  370. /* This would only happen if there is only one topic in subtopics (current's) and
  371. * it has no children (in which case it should have been removed above). */
  372. AWS_ASSERT(new_topic_filter != old_topic_filter);
  373. /* Now that the new string has been found, the old one can be destroyed. */
  374. aws_string_destroy((void *)current->topic_filter);
  375. current->owns_topic_filter = false;
  376. }
  377. /* Update the pointers. */
  378. parent->topic_filter = new_topic_filter;
  379. parent->topic.ptr = (uint8_t *)aws_string_bytes(new_topic_filter) + topic_offset;
  380. }
  381. }
  382. }
  383. /* Now that the strings are update, remove current. */
  384. if (destroy_current) {
  385. s_topic_node_destroy(current, tree->allocator);
  386. }
  387. current = NULL;
  388. }
  389. break;
  390. }
  391. }
  392. s_topic_tree_action_destroy(action);
  393. }
  394. /*******************************************************************************
  395. * Action Roll Back
  396. ******************************************************************************/
  397. static void s_topic_tree_action_roll_back(struct topic_tree_action *action, struct aws_mqtt_topic_tree *tree) {
  398. AWS_PRECONDITION(action);
  399. switch (action->mode) {
  400. case AWS_MQTT_TOPIC_TREE_ADD: {
  401. AWS_LOGF_TRACE(
  402. AWS_LS_MQTT_TOPIC_TREE,
  403. "tree=%p action=%p: Rolling back add transaction action",
  404. (void *)tree,
  405. (void *)action);
  406. /* Remove the first new node from it's parent's map */
  407. aws_hash_table_remove(&action->last_found->subtopics, &action->first_created->topic, NULL, NULL);
  408. /* Recursively destroy all other created nodes */
  409. s_topic_node_destroy(action->first_created, tree->allocator);
  410. if (action->topic_filter) {
  411. aws_string_destroy((void *)action->topic_filter);
  412. }
  413. break;
  414. }
  415. case AWS_MQTT_TOPIC_TREE_REMOVE:
  416. case AWS_MQTT_TOPIC_TREE_UPDATE: {
  417. AWS_LOGF_TRACE(
  418. AWS_LS_MQTT_TOPIC_TREE,
  419. "tree=%p action=%p: Rolling back remove/update transaction, no changes made",
  420. (void *)tree,
  421. (void *)action);
  422. /* Aborting a remove or update doesn't require any actions. */
  423. break;
  424. }
  425. }
  426. s_topic_tree_action_destroy(action);
  427. }
  428. /*******************************************************************************
  429. * Insert
  430. ******************************************************************************/
  431. int aws_mqtt_topic_tree_transaction_insert(
  432. struct aws_mqtt_topic_tree *tree,
  433. struct aws_array_list *transaction,
  434. const struct aws_string *topic_filter_ori,
  435. enum aws_mqtt_qos qos,
  436. aws_mqtt_publish_received_fn *callback,
  437. aws_mqtt_userdata_cleanup_fn *cleanup,
  438. void *userdata) {
  439. AWS_PRECONDITION(tree);
  440. AWS_PRECONDITION(transaction);
  441. AWS_PRECONDITION(topic_filter_ori);
  442. AWS_PRECONDITION(callback);
  443. /* let topic tree take the ownership of the new string and leave the caller string alone. */
  444. struct aws_string *topic_filter = aws_string_new_from_string(tree->allocator, topic_filter_ori);
  445. AWS_LOGF_DEBUG(
  446. AWS_LS_MQTT_TOPIC_TREE,
  447. "tree=%p: Inserting topic filter %s into topic tree",
  448. (void *)tree,
  449. topic_filter->bytes);
  450. struct aws_mqtt_topic_node *current = tree->root;
  451. struct topic_tree_action *action = s_topic_tree_action_create(transaction);
  452. if (!action) {
  453. return AWS_OP_ERR;
  454. }
  455. /* Default to update unless a node was added */
  456. action->mode = AWS_MQTT_TOPIC_TREE_UPDATE;
  457. action->qos = qos;
  458. action->callback = callback;
  459. action->cleanup = cleanup;
  460. action->userdata = userdata;
  461. struct aws_byte_cursor topic_filter_cur = aws_byte_cursor_from_string(topic_filter);
  462. struct aws_byte_cursor sub_part;
  463. AWS_ZERO_STRUCT(sub_part);
  464. struct aws_byte_cursor last_part;
  465. AWS_ZERO_STRUCT(last_part);
  466. while (aws_byte_cursor_next_split(&topic_filter_cur, '/', &sub_part)) {
  467. last_part = sub_part;
  468. /* Add or find mid-node */
  469. struct aws_hash_element *elem = NULL;
  470. int was_created = 0;
  471. aws_hash_table_create(&current->subtopics, &sub_part, &elem, &was_created);
  472. if (was_created) {
  473. if (action->mode == AWS_MQTT_TOPIC_TREE_UPDATE) {
  474. /* Store the last found node */
  475. action->last_found = current;
  476. }
  477. /* Node does not exist, add new one */
  478. current = s_topic_node_new(tree->allocator, &sub_part, topic_filter);
  479. if (!current) {
  480. /* Don't do handle_error logic, the action needs to persist to be rolled back */
  481. return AWS_OP_ERR;
  482. }
  483. /* Stash in the hash map */
  484. elem->key = &current->topic;
  485. elem->value = current;
  486. if (action->mode == AWS_MQTT_TOPIC_TREE_UPDATE) {
  487. AWS_LOGF_TRACE(
  488. AWS_LS_MQTT_TOPIC_TREE,
  489. "tree=%p: Topic part \"" PRInSTR "\" is new, it and all children will be added as new nodes",
  490. (void *)tree,
  491. AWS_BYTE_CURSOR_PRI(sub_part));
  492. /* Store the node we just made, and make sure we don't store again */
  493. action->mode = AWS_MQTT_TOPIC_TREE_ADD;
  494. action->first_created = current;
  495. }
  496. } else {
  497. AWS_ASSERT(action->mode == AWS_MQTT_TOPIC_TREE_UPDATE); /* Can't have found an existing node while adding */
  498. /* If the node exists, just traverse it */
  499. current = elem->value;
  500. }
  501. }
  502. action->node_to_update = current;
  503. /* Node found (or created), add the topic filter and callbacks */
  504. if (current->owns_topic_filter) {
  505. AWS_LOGF_TRACE(
  506. AWS_LS_MQTT_TOPIC_TREE,
  507. "tree=%p node=%p: Updating existing node that already owns its topic_filter, throwing out parameter",
  508. (void *)tree,
  509. (void *)current);
  510. /* If the topic filter was already here, this is already a subscription.
  511. Free the new topic_filter so all existing byte_cursors remain valid. */
  512. aws_string_destroy(topic_filter);
  513. } else {
  514. /* Node already existed (or was created) but wasn't subscription. */
  515. action->topic = last_part;
  516. action->topic_filter = topic_filter;
  517. }
  518. return AWS_OP_SUCCESS;
  519. }
  520. /*******************************************************************************
  521. * Remove
  522. ******************************************************************************/
  523. int aws_mqtt_topic_tree_transaction_remove(
  524. struct aws_mqtt_topic_tree *tree,
  525. struct aws_array_list *transaction,
  526. const struct aws_byte_cursor *topic_filter,
  527. void **old_userdata) {
  528. AWS_PRECONDITION(tree);
  529. AWS_PRECONDITION(transaction);
  530. AWS_PRECONDITION(topic_filter);
  531. AWS_LOGF_DEBUG(
  532. AWS_LS_MQTT_TOPIC_TREE,
  533. "tree=%p: Removing topic filter \"" PRInSTR "\" from topic tree",
  534. (void *)tree,
  535. AWS_BYTE_CURSOR_PRI(*topic_filter));
  536. /* Initialize output parameter to a safe default */
  537. if (old_userdata) {
  538. *old_userdata = NULL;
  539. }
  540. /* Default to error because that's what handle_error will do in all cases except node not found */
  541. int result = AWS_OP_ERR;
  542. struct topic_tree_action *action = s_topic_tree_action_create(transaction);
  543. if (!action) {
  544. return AWS_OP_ERR;
  545. }
  546. struct aws_array_list sub_topic_parts;
  547. AWS_ZERO_STRUCT(sub_topic_parts);
  548. if (aws_array_list_init_dynamic(&sub_topic_parts, tree->allocator, 1, sizeof(struct aws_byte_cursor))) {
  549. AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to initialize topic parts array", (void *)tree);
  550. goto handle_error;
  551. }
  552. if (aws_byte_cursor_split_on_char(topic_filter, '/', &sub_topic_parts)) {
  553. AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to split topic filter", (void *)tree);
  554. goto handle_error;
  555. }
  556. const size_t sub_parts_len = aws_array_list_length(&sub_topic_parts);
  557. if (!sub_parts_len) {
  558. AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to get topic parts length", (void *)tree);
  559. goto handle_error;
  560. }
  561. s_topic_tree_action_to_remove(action, tree->allocator, sub_parts_len);
  562. struct aws_mqtt_topic_node *current = tree->root;
  563. if (aws_array_list_push_back(&action->to_remove, &current)) {
  564. AWS_LOGF_ERROR(AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to insert root node into to_remove list", (void *)tree);
  565. goto handle_error;
  566. }
  567. for (size_t i = 0; i < sub_parts_len; ++i) {
  568. /* Get the current topic part */
  569. struct aws_byte_cursor *sub_part = NULL;
  570. aws_array_list_get_at_ptr(&sub_topic_parts, (void **)&sub_part, i);
  571. /* Find mid-node */
  572. struct aws_hash_element *elem = NULL;
  573. aws_hash_table_find(&current->subtopics, sub_part, &elem);
  574. if (elem) {
  575. /* If the node exists, just traverse it */
  576. current = elem->value;
  577. if (aws_array_list_push_back(&action->to_remove, &current)) {
  578. AWS_LOGF_ERROR(
  579. AWS_LS_MQTT_TOPIC_TREE, "tree=%p: Failed to insert topic node into to_remove list", (void *)tree);
  580. goto handle_error;
  581. }
  582. } else {
  583. /* If not, abandon ship */
  584. goto handle_not_found;
  585. }
  586. }
  587. action->node_to_update = current;
  588. aws_array_list_clean_up(&sub_topic_parts);
  589. if (old_userdata) {
  590. *old_userdata = current->userdata;
  591. }
  592. return AWS_OP_SUCCESS;
  593. handle_not_found:
  594. result = AWS_OP_SUCCESS;
  595. handle_error:
  596. aws_array_list_clean_up(&sub_topic_parts);
  597. s_topic_tree_action_destroy(action);
  598. aws_array_list_pop_back(transaction);
  599. return result;
  600. }
  601. /*******************************************************************************
  602. * Commit
  603. ******************************************************************************/
  604. void aws_mqtt_topic_tree_transaction_commit(struct aws_mqtt_topic_tree *tree, struct aws_array_list *transaction) {
  605. const size_t num_actions = aws_array_list_length(transaction);
  606. for (size_t i = 0; i < num_actions; ++i) {
  607. struct topic_tree_action *action = NULL;
  608. aws_array_list_get_at_ptr(transaction, (void **)&action, i);
  609. AWS_ASSUME(action); /* Within bounds */
  610. s_topic_tree_action_commit(action, tree);
  611. }
  612. aws_array_list_clear(transaction);
  613. }
  614. /*******************************************************************************
  615. * Roll Back
  616. ******************************************************************************/
  617. void aws_mqtt_topic_tree_transaction_roll_back(struct aws_mqtt_topic_tree *tree, struct aws_array_list *transaction) {
  618. const size_t num_actions = aws_array_list_length(transaction);
  619. for (size_t i = 1; i <= num_actions; ++i) {
  620. struct topic_tree_action *action = NULL;
  621. aws_array_list_get_at_ptr(transaction, (void **)&action, num_actions - i);
  622. AWS_ASSUME(action); /* Within bounds */
  623. s_topic_tree_action_roll_back(action, tree);
  624. }
  625. aws_array_list_clear(transaction);
  626. }
  627. int aws_mqtt_topic_tree_insert(
  628. struct aws_mqtt_topic_tree *tree,
  629. const struct aws_string *topic_filter,
  630. enum aws_mqtt_qos qos,
  631. aws_mqtt_publish_received_fn *callback,
  632. aws_mqtt_userdata_cleanup_fn *cleanup,
  633. void *userdata) {
  634. AWS_VARIABLE_LENGTH_ARRAY(uint8_t, transaction_buf, aws_mqtt_topic_tree_action_size);
  635. struct aws_array_list transaction;
  636. aws_array_list_init_static(&transaction, transaction_buf, 1, aws_mqtt_topic_tree_action_size);
  637. if (aws_mqtt_topic_tree_transaction_insert(tree, &transaction, topic_filter, qos, callback, cleanup, userdata)) {
  638. aws_mqtt_topic_tree_transaction_roll_back(tree, &transaction);
  639. return AWS_OP_ERR;
  640. }
  641. aws_mqtt_topic_tree_transaction_commit(tree, &transaction);
  642. return AWS_OP_SUCCESS;
  643. }
  644. int aws_mqtt_topic_tree_remove(struct aws_mqtt_topic_tree *tree, const struct aws_byte_cursor *topic_filter) {
  645. AWS_PRECONDITION(tree);
  646. AWS_PRECONDITION(topic_filter);
  647. AWS_VARIABLE_LENGTH_ARRAY(uint8_t, transaction_buf, aws_mqtt_topic_tree_action_size);
  648. struct aws_array_list transaction;
  649. aws_array_list_init_static(&transaction, transaction_buf, 1, aws_mqtt_topic_tree_action_size);
  650. if (aws_mqtt_topic_tree_transaction_remove(tree, &transaction, topic_filter, NULL)) {
  651. aws_mqtt_topic_tree_transaction_roll_back(tree, &transaction);
  652. return AWS_OP_ERR;
  653. }
  654. aws_mqtt_topic_tree_transaction_commit(tree, &transaction);
  655. return AWS_OP_SUCCESS;
  656. }
  657. /*******************************************************************************
  658. * Publish
  659. ******************************************************************************/
  660. static void s_topic_tree_publish_do_recurse(
  661. const struct aws_byte_cursor *current_sub_part,
  662. const struct aws_mqtt_topic_node *current,
  663. const struct aws_mqtt_packet_publish *pub) {
  664. struct aws_byte_cursor hash_cur = aws_byte_cursor_from_string(s_multi_level_wildcard);
  665. struct aws_byte_cursor plus_cur = aws_byte_cursor_from_string(s_single_level_wildcard);
  666. struct aws_hash_element *elem = NULL;
  667. struct aws_byte_cursor sub_part = *current_sub_part;
  668. if (!aws_byte_cursor_next_split(&pub->topic_name, '/', &sub_part)) {
  669. /* If this is the last node and is a sub, call it */
  670. if (s_topic_node_is_subscription(current)) {
  671. bool dup = aws_mqtt_packet_publish_get_dup(pub);
  672. enum aws_mqtt_qos qos = aws_mqtt_packet_publish_get_qos(pub);
  673. bool retain = aws_mqtt_packet_publish_get_retain(pub);
  674. current->callback(&pub->topic_name, &pub->payload, dup, qos, retain, current->userdata);
  675. }
  676. return;
  677. }
  678. /* Check multi-level wildcard */
  679. aws_hash_table_find(&current->subtopics, &hash_cur, &elem);
  680. if (elem) {
  681. /* Match! */
  682. struct aws_mqtt_topic_node *multi_wildcard = elem->value;
  683. /* Must be a subscription and have no children */
  684. AWS_ASSERT(s_topic_node_is_subscription(multi_wildcard));
  685. AWS_ASSERT(0 == aws_hash_table_get_entry_count(&multi_wildcard->subtopics));
  686. bool dup = aws_mqtt_packet_publish_get_dup(pub);
  687. enum aws_mqtt_qos qos = aws_mqtt_packet_publish_get_qos(pub);
  688. bool retain = aws_mqtt_packet_publish_get_retain(pub);
  689. multi_wildcard->callback(&pub->topic_name, &pub->payload, dup, qos, retain, multi_wildcard->userdata);
  690. }
  691. /* Check single level wildcard */
  692. aws_hash_table_find(&current->subtopics, &plus_cur, &elem);
  693. if (elem) {
  694. /* Recurse sub topics */
  695. s_topic_tree_publish_do_recurse(&sub_part, elem->value, pub);
  696. }
  697. /* Check actual topic name */
  698. aws_hash_table_find(&current->subtopics, &sub_part, &elem);
  699. if (elem) {
  700. /* Found the actual topic, recurse to it */
  701. s_topic_tree_publish_do_recurse(&sub_part, elem->value, pub);
  702. }
  703. }
  704. void aws_mqtt_topic_tree_publish(const struct aws_mqtt_topic_tree *tree, struct aws_mqtt_packet_publish *pub) {
  705. AWS_PRECONDITION(tree);
  706. AWS_PRECONDITION(pub);
  707. AWS_LOGF_TRACE(
  708. AWS_LS_MQTT_TOPIC_TREE,
  709. "tree=%p: Publishing on topic " PRInSTR,
  710. (void *)tree,
  711. AWS_BYTE_CURSOR_PRI(pub->topic_name));
  712. struct aws_byte_cursor sub_part;
  713. AWS_ZERO_STRUCT(sub_part);
  714. s_topic_tree_publish_do_recurse(&sub_part, tree->root, pub);
  715. }