bus.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724
  1. /*
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License").
  5. * You may not use this file except in compliance with the License.
  6. * A copy of the License is located at
  7. *
  8. * http://aws.amazon.com/apache2.0
  9. *
  10. * or in the "license" file accompanying this file. This file is distributed
  11. * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  12. * express or implied. See the License for the specific language governing
  13. * permissions and limitations under the License.
  14. */
  15. #include <aws/common/bus.h>
  16. #include <aws/common/allocator.h>
  17. #include <aws/common/atomics.h>
  18. #include <aws/common/byte_buf.h>
  19. #include <aws/common/condition_variable.h>
  20. #include <aws/common/hash_table.h>
  21. #include <aws/common/linked_list.h>
  22. #include <aws/common/logging.h>
  23. #include <aws/common/mutex.h>
  24. #include <aws/common/thread.h>
  25. #include <inttypes.h>
  26. #ifdef _MSC_VER
  27. # pragma warning(push)
  28. # pragma warning(disable : 4204) /* nonstandard extension used: non-constant aggregate initializer */
  29. #endif
  30. struct aws_bus {
  31. struct aws_allocator *allocator;
  32. /* vtable and additional data structures for delivery policy */
  33. void *impl;
  34. };
  35. /* MUST be the first member of any impl to allow blind casting */
  36. struct bus_vtable {
  37. void (*clean_up)(struct aws_bus *bus);
  38. int (*send)(struct aws_bus *bus, uint64_t address, void *payload, void (*destructor)(void *));
  39. int (*subscribe)(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *callback, void *user_data);
  40. void (*unsubscribe)(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *callback, void *user_data);
  41. };
  42. /* each bound callback is stored as a bus_listener in the slots table */
  43. struct bus_listener {
  44. struct aws_linked_list_node list_node;
  45. void *user_data;
  46. aws_bus_listener_fn *deliver;
  47. };
  48. /* value type stored in each slot in the slots table in a bus */
  49. struct listener_list {
  50. struct aws_allocator *allocator;
  51. struct aws_linked_list listeners;
  52. };
  53. /* find a listener list (or NULL) by address */
  54. static struct listener_list *bus_find_listeners(struct aws_hash_table *slots, uint64_t address) {
  55. struct aws_hash_element *elem = NULL;
  56. if (aws_hash_table_find(slots, (void *)(uintptr_t)address, &elem)) {
  57. return NULL;
  58. }
  59. if (!elem) {
  60. return NULL;
  61. }
  62. struct listener_list *list = elem->value;
  63. return list;
  64. }
  65. /* find a listener list by address, or create/insert/return a new one */
  66. static struct listener_list *bus_find_or_create_listeners(
  67. struct aws_allocator *allocator,
  68. struct aws_hash_table *slots,
  69. uint64_t address) {
  70. struct listener_list *list = bus_find_listeners(slots, address);
  71. if (list) {
  72. return list;
  73. }
  74. list = aws_mem_calloc(allocator, 1, sizeof(struct listener_list));
  75. list->allocator = allocator;
  76. aws_linked_list_init(&list->listeners);
  77. aws_hash_table_put(slots, (void *)(uintptr_t)address, list, NULL);
  78. return list;
  79. }
  80. static void s_bus_deliver_msg_to_slot(
  81. struct aws_bus *bus,
  82. uint64_t slot,
  83. uint64_t address,
  84. struct aws_hash_table *slots,
  85. const void *payload) {
  86. (void)bus;
  87. struct listener_list *list = bus_find_listeners(slots, slot);
  88. if (!list) {
  89. return;
  90. }
  91. struct aws_linked_list_node *node = aws_linked_list_begin(&list->listeners);
  92. for (; node != aws_linked_list_end(&list->listeners); node = aws_linked_list_next(node)) {
  93. struct bus_listener *listener = AWS_CONTAINER_OF(node, struct bus_listener, list_node);
  94. listener->deliver(address, payload, listener->user_data);
  95. }
  96. }
  97. /* common delivery logic */
  98. static void s_bus_deliver_msg(
  99. struct aws_bus *bus,
  100. uint64_t address,
  101. struct aws_hash_table *slots,
  102. const void *payload) {
  103. s_bus_deliver_msg_to_slot(bus, AWS_BUS_ADDRESS_ALL, address, slots, payload);
  104. s_bus_deliver_msg_to_slot(bus, address, address, slots, payload);
  105. }
  106. /* common subscribe logic */
  107. static int s_bus_subscribe(
  108. struct aws_bus *bus,
  109. uint64_t address,
  110. struct aws_hash_table *slots,
  111. aws_bus_listener_fn *callback,
  112. void *user_data) {
  113. if (address == AWS_BUS_ADDRESS_CLOSE) {
  114. AWS_LOGF_ERROR(AWS_LS_COMMON_BUS, "Cannot directly subscribe to AWS_BUS_ADDRESS_CLOSE(0)");
  115. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  116. }
  117. struct listener_list *list = bus_find_or_create_listeners(bus->allocator, slots, address);
  118. struct bus_listener *listener = aws_mem_calloc(bus->allocator, 1, sizeof(struct bus_listener));
  119. listener->deliver = callback;
  120. listener->user_data = user_data;
  121. aws_linked_list_push_back(&list->listeners, &listener->list_node);
  122. return AWS_OP_SUCCESS;
  123. }
  124. /* common unsubscribe logic */
  125. static void s_bus_unsubscribe(
  126. struct aws_bus *bus,
  127. uint64_t address,
  128. struct aws_hash_table *slots,
  129. aws_bus_listener_fn *callback,
  130. void *user_data) {
  131. (void)bus;
  132. if (address == AWS_BUS_ADDRESS_CLOSE) {
  133. AWS_LOGF_WARN(AWS_LS_COMMON_BUS, "Attempted to unsubscribe from invalid address AWS_BUS_ADDRESS_CLOSE")
  134. return;
  135. }
  136. struct listener_list *list = bus_find_listeners(slots, address);
  137. if (!list) {
  138. return;
  139. }
  140. struct aws_linked_list_node *node;
  141. for (node = aws_linked_list_begin(&list->listeners); node != aws_linked_list_end(&list->listeners);
  142. node = aws_linked_list_next(node)) {
  143. struct bus_listener *listener = AWS_CONTAINER_OF(node, struct bus_listener, list_node);
  144. if (listener->deliver == callback && listener->user_data == user_data) {
  145. aws_linked_list_remove(node);
  146. aws_mem_release(list->allocator, listener);
  147. return;
  148. }
  149. }
  150. }
  151. /* destructor for listener lists in the slots tables */
  152. void s_bus_destroy_listener_list(void *data) {
  153. struct listener_list *list = data;
  154. AWS_PRECONDITION(list->allocator);
  155. /* call all listeners with an AWS_BUS_ADDRESS_CLOSE message type to clean up */
  156. while (!aws_linked_list_empty(&list->listeners)) {
  157. struct aws_linked_list_node *back = aws_linked_list_back(&list->listeners);
  158. struct bus_listener *listener = AWS_CONTAINER_OF(back, struct bus_listener, list_node);
  159. listener->deliver(AWS_BUS_ADDRESS_CLOSE, NULL, listener->user_data);
  160. aws_linked_list_pop_back(&list->listeners);
  161. aws_mem_release(list->allocator, listener);
  162. }
  163. aws_mem_release(list->allocator, list);
  164. }
  165. /*
  166. * AWS_BUS_SYNC implementation
  167. */
  168. struct bus_sync_impl {
  169. struct bus_vtable vtable;
  170. struct {
  171. /* Map of address -> list of listeners */
  172. struct aws_hash_table table;
  173. } slots;
  174. };
  175. static void s_bus_sync_clean_up(struct aws_bus *bus) {
  176. struct bus_sync_impl *impl = bus->impl;
  177. aws_hash_table_clean_up(&impl->slots.table);
  178. aws_mem_release(bus->allocator, impl);
  179. }
  180. static int s_bus_sync_send(struct aws_bus *bus, uint64_t address, void *payload, void (*destructor)(void *)) {
  181. struct bus_sync_impl *impl = bus->impl;
  182. s_bus_deliver_msg(bus, address, &impl->slots.table, payload);
  183. if (destructor) {
  184. destructor(payload);
  185. }
  186. return AWS_OP_SUCCESS;
  187. }
  188. static int s_bus_sync_subscribe(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *callback, void *user_data) {
  189. struct bus_sync_impl *impl = bus->impl;
  190. return s_bus_subscribe(bus, address, &impl->slots.table, callback, user_data);
  191. }
  192. static void s_bus_sync_unsubscribe(
  193. struct aws_bus *bus,
  194. uint64_t address,
  195. aws_bus_listener_fn *callback,
  196. void *user_data) {
  197. struct bus_sync_impl *impl = bus->impl;
  198. s_bus_unsubscribe(bus, address, &impl->slots.table, callback, user_data);
  199. }
  200. static struct bus_vtable bus_sync_vtable = {
  201. .clean_up = s_bus_sync_clean_up,
  202. .send = s_bus_sync_send,
  203. .subscribe = s_bus_sync_subscribe,
  204. .unsubscribe = s_bus_sync_unsubscribe,
  205. };
  206. static void s_bus_sync_init(struct aws_bus *bus, const struct aws_bus_options *options) {
  207. (void)options;
  208. struct bus_sync_impl *impl = bus->impl = aws_mem_calloc(bus->allocator, 1, sizeof(struct bus_sync_impl));
  209. impl->vtable = bus_sync_vtable;
  210. if (aws_hash_table_init(
  211. &impl->slots.table, bus->allocator, 8, aws_hash_ptr, aws_ptr_eq, NULL, s_bus_destroy_listener_list)) {
  212. goto error;
  213. }
  214. return;
  215. error:
  216. aws_mem_release(bus->allocator, impl);
  217. }
  218. /*
  219. * AWS_BUS_ASYNC implementation
  220. */
  221. struct bus_async_impl {
  222. struct bus_vtable vtable;
  223. struct {
  224. /* Map of address -> list of listeners */
  225. struct aws_hash_table table;
  226. } slots;
  227. /* Queue of bus_messages to deliver */
  228. struct {
  229. struct aws_mutex mutex;
  230. /* backing memory for the message free list */
  231. void *buffer;
  232. void *buffer_end; /* 1 past the end of buffer */
  233. /* message free list */
  234. struct aws_linked_list free; /* struct bus_message */
  235. /* message delivery queue */
  236. struct aws_linked_list msgs; /* struct bus_message */
  237. /* list of pending adds/removes of listeners */
  238. struct aws_linked_list subs; /* struct pending_listener */
  239. } queue;
  240. /* dispatch thread */
  241. struct {
  242. struct aws_thread thread;
  243. struct aws_condition_variable notify;
  244. bool running;
  245. struct aws_atomic_var started;
  246. struct aws_atomic_var exited;
  247. } dispatch;
  248. bool reliable;
  249. };
  250. /* represents a message in the queue on impls that queue */
  251. struct bus_message {
  252. struct aws_linked_list_node list_node;
  253. uint64_t address;
  254. void *payload;
  255. void (*destructor)(void *);
  256. };
  257. struct pending_listener {
  258. struct aws_linked_list_node list_node;
  259. uint64_t address;
  260. aws_bus_listener_fn *listener;
  261. void *user_data;
  262. uint32_t add : 1;
  263. uint32_t remove : 1;
  264. };
  265. static void s_bus_message_clean_up(struct bus_message *msg) {
  266. if (msg->destructor) {
  267. msg->destructor(msg->payload);
  268. }
  269. msg->destructor = NULL;
  270. msg->payload = NULL;
  271. }
  272. /* Assumes the caller holds the lock */
  273. static void s_bus_async_free_message(struct aws_bus *bus, struct bus_message *msg) {
  274. struct bus_async_impl *impl = bus->impl;
  275. s_bus_message_clean_up(msg);
  276. if ((void *)msg >= impl->queue.buffer && (void *)msg < impl->queue.buffer_end) {
  277. AWS_ZERO_STRUCT(*msg);
  278. aws_linked_list_push_back(&impl->queue.free, &msg->list_node);
  279. return;
  280. }
  281. aws_mem_release(bus->allocator, msg);
  282. }
  283. /* Assumes the caller holds the lock */
  284. struct bus_message *s_bus_async_alloc_message(struct aws_bus *bus) {
  285. struct bus_async_impl *impl = bus->impl;
  286. /* try the free list first */
  287. if (!aws_linked_list_empty(&impl->queue.free)) {
  288. struct aws_linked_list_node *msg_node = aws_linked_list_pop_back(&impl->queue.free);
  289. struct bus_message *msg = AWS_CONTAINER_OF(msg_node, struct bus_message, list_node);
  290. return msg;
  291. }
  292. /* unreliable will re-use the oldest message */
  293. if (!impl->reliable) {
  294. struct aws_linked_list_node *msg_node = aws_linked_list_pop_front(&impl->queue.msgs);
  295. struct bus_message *msg = AWS_CONTAINER_OF(msg_node, struct bus_message, list_node);
  296. s_bus_async_free_message(bus, msg);
  297. return s_bus_async_alloc_message(bus);
  298. }
  299. return aws_mem_calloc(bus->allocator, 1, sizeof(struct bus_message));
  300. }
  301. /*
  302. * resolve all adds and removes of listeners, in FIFO order
  303. * NOTE: expects mutex to be held by caller
  304. */
  305. static void s_bus_apply_listeners(struct aws_bus *bus, struct aws_linked_list *pending_subs) {
  306. struct bus_async_impl *impl = bus->impl;
  307. while (!aws_linked_list_empty(pending_subs)) {
  308. struct aws_linked_list_node *node = aws_linked_list_pop_front(pending_subs);
  309. struct pending_listener *listener = AWS_CONTAINER_OF(node, struct pending_listener, list_node);
  310. if (listener->add) {
  311. s_bus_subscribe(bus, listener->address, &impl->slots.table, listener->listener, listener->user_data);
  312. } else if (listener->remove) {
  313. s_bus_unsubscribe(bus, listener->address, &impl->slots.table, listener->listener, listener->user_data);
  314. }
  315. aws_mem_release(bus->allocator, listener);
  316. }
  317. }
  318. static void s_bus_async_deliver_messages(struct aws_bus *bus, struct aws_linked_list *pending_msgs) {
  319. struct bus_async_impl *impl = bus->impl;
  320. struct aws_linked_list_node *msg_node = aws_linked_list_begin(pending_msgs);
  321. for (; msg_node != aws_linked_list_end(pending_msgs); msg_node = aws_linked_list_next(msg_node)) {
  322. struct bus_message *msg = AWS_CONTAINER_OF(msg_node, struct bus_message, list_node);
  323. s_bus_deliver_msg(bus, msg->address, &impl->slots.table, msg->payload);
  324. s_bus_message_clean_up(msg);
  325. }
  326. /* push all pending messages back on the free list */
  327. aws_mutex_lock(&impl->queue.mutex);
  328. {
  329. while (!aws_linked_list_empty(pending_msgs)) {
  330. msg_node = aws_linked_list_pop_front(pending_msgs);
  331. struct bus_message *msg = AWS_CONTAINER_OF(msg_node, struct bus_message, list_node);
  332. s_bus_async_free_message(bus, msg);
  333. }
  334. }
  335. aws_mutex_unlock(&impl->queue.mutex);
  336. }
  337. static void s_bus_async_clean_up(struct aws_bus *bus) {
  338. struct bus_async_impl *impl = bus->impl;
  339. /* shut down delivery thread, clean up dispatch */
  340. AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus: %p clean_up: starting final drain", (void *)bus);
  341. aws_mutex_lock(&impl->queue.mutex);
  342. impl->dispatch.running = false;
  343. aws_mutex_unlock(&impl->queue.mutex);
  344. aws_condition_variable_notify_one(&impl->dispatch.notify);
  345. /* Spin wait for the final drain and dispatch thread to complete */
  346. while (!aws_atomic_load_int(&impl->dispatch.exited)) {
  347. aws_thread_current_sleep(1000 * 1000); /* 1 microsecond */
  348. }
  349. AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus: %p clean_up: finished final drain", (void *)bus);
  350. aws_thread_join(&impl->dispatch.thread);
  351. aws_thread_clean_up(&impl->dispatch.thread);
  352. aws_condition_variable_clean_up(&impl->dispatch.notify);
  353. /* should be impossible for subs or msgs to remain after final drain */
  354. AWS_FATAL_ASSERT(aws_linked_list_empty(&impl->queue.msgs));
  355. AWS_FATAL_ASSERT(aws_linked_list_empty(&impl->queue.subs));
  356. /* this frees everything that the free/msgs lists point to */
  357. if (impl->queue.buffer) {
  358. aws_mem_release(bus->allocator, impl->queue.buffer);
  359. }
  360. aws_mutex_clean_up(&impl->queue.mutex);
  361. aws_hash_table_clean_up(&impl->slots.table);
  362. aws_mem_release(bus->allocator, impl);
  363. }
  364. static bool s_bus_async_should_wake_up(void *user_data) {
  365. struct bus_async_impl *impl = user_data;
  366. return !impl->dispatch.running || !aws_linked_list_empty(&impl->queue.subs) ||
  367. !aws_linked_list_empty(&impl->queue.msgs);
  368. }
  369. static bool s_bus_async_is_running(struct bus_async_impl *impl) {
  370. aws_mutex_lock(&impl->queue.mutex);
  371. bool running = impl->dispatch.running;
  372. aws_mutex_unlock(&impl->queue.mutex);
  373. return running;
  374. }
  375. /* Async bus delivery thread loop */
  376. static void s_bus_async_deliver(void *user_data) {
  377. struct aws_bus *bus = user_data;
  378. struct bus_async_impl *impl = bus->impl;
  379. aws_atomic_store_int(&impl->dispatch.started, 1);
  380. AWS_LOGF_DEBUG(AWS_LS_COMMON_BUS, "bus %p: delivery thread loop started", (void *)bus);
  381. /* once shutdown has been triggered, need to drain one more time to ensure all queues are empty */
  382. int pending_drains = 1;
  383. do {
  384. struct aws_linked_list pending_msgs;
  385. aws_linked_list_init(&pending_msgs);
  386. struct aws_linked_list pending_subs;
  387. aws_linked_list_init(&pending_subs);
  388. aws_mutex_lock(&impl->queue.mutex);
  389. {
  390. aws_condition_variable_wait_pred(
  391. &impl->dispatch.notify, &impl->queue.mutex, s_bus_async_should_wake_up, impl);
  392. /* copy out any queued subs/unsubs */
  393. aws_linked_list_swap_contents(&impl->queue.subs, &pending_subs);
  394. /* copy out any queued messages */
  395. aws_linked_list_swap_contents(&impl->queue.msgs, &pending_msgs);
  396. }
  397. aws_mutex_unlock(&impl->queue.mutex);
  398. /* first resolve subs/unsubs */
  399. if (!aws_linked_list_empty(&pending_subs)) {
  400. s_bus_apply_listeners(bus, &pending_subs);
  401. }
  402. /* Then deliver queued messages */
  403. if (!aws_linked_list_empty(&pending_msgs)) {
  404. s_bus_async_deliver_messages(bus, &pending_msgs);
  405. }
  406. } while (s_bus_async_is_running(impl) || pending_drains--);
  407. /* record that the dispatch thread is done */
  408. aws_atomic_store_int(&impl->dispatch.exited, 1);
  409. }
  410. int s_bus_async_send(struct aws_bus *bus, uint64_t address, void *payload, void (*destructor)(void *)) {
  411. struct bus_async_impl *impl = bus->impl;
  412. aws_mutex_lock(&impl->queue.mutex);
  413. {
  414. if (!impl->dispatch.running) {
  415. AWS_LOGF_WARN(
  416. AWS_LS_COMMON_BUS, "bus %p: message sent after clean_up: address: %" PRIu64 "", (void *)bus, address);
  417. aws_mutex_unlock(&impl->queue.mutex);
  418. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  419. }
  420. struct bus_message *msg = s_bus_async_alloc_message(bus);
  421. msg->address = address;
  422. msg->payload = payload;
  423. msg->destructor = destructor;
  424. /* push the message onto the delivery queue */
  425. aws_linked_list_push_back(&impl->queue.msgs, &msg->list_node);
  426. }
  427. aws_mutex_unlock(&impl->queue.mutex);
  428. /* notify the delivery thread to wake up */
  429. aws_condition_variable_notify_one(&impl->dispatch.notify);
  430. return AWS_OP_SUCCESS;
  431. }
  432. int s_bus_async_subscribe(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *listener, void *user_data) {
  433. struct bus_async_impl *impl = bus->impl;
  434. if (address == AWS_BUS_ADDRESS_CLOSE) {
  435. AWS_LOGF_ERROR(AWS_LS_COMMON_BUS, "Cannot subscribe to AWS_BUS_ADDRESS_CLOSE");
  436. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  437. }
  438. aws_mutex_lock(&impl->queue.mutex);
  439. {
  440. if (!impl->dispatch.running) {
  441. AWS_LOGF_WARN(
  442. AWS_LS_COMMON_BUS,
  443. "bus %p: subscribe requested after clean_up: address: %" PRIu64 "",
  444. (void *)bus,
  445. address);
  446. aws_mutex_unlock(&impl->queue.mutex);
  447. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  448. }
  449. struct pending_listener *sub = aws_mem_calloc(bus->allocator, 1, sizeof(struct pending_listener));
  450. sub->address = address;
  451. sub->listener = listener;
  452. sub->user_data = user_data;
  453. sub->add = true;
  454. aws_linked_list_push_back(&impl->queue.subs, &sub->list_node);
  455. }
  456. aws_mutex_unlock(&impl->queue.mutex);
  457. /* notify the delivery thread to wake up */
  458. aws_condition_variable_notify_one(&impl->dispatch.notify);
  459. return AWS_OP_SUCCESS;
  460. }
  461. void s_bus_async_unsubscribe(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *listener, void *user_data) {
  462. struct bus_async_impl *impl = bus->impl;
  463. if (address == AWS_BUS_ADDRESS_CLOSE) {
  464. AWS_LOGF_ERROR(AWS_LS_COMMON_BUS, "Cannot unsubscribe from AWS_BUS_ADDRESS_CLOSE");
  465. return;
  466. }
  467. aws_mutex_lock(&impl->queue.mutex);
  468. {
  469. if (!impl->dispatch.running) {
  470. AWS_LOGF_WARN(
  471. AWS_LS_COMMON_BUS,
  472. "bus %p: unsubscribe requested after clean_up: address: %" PRIu64 "",
  473. (void *)bus,
  474. address);
  475. aws_mutex_unlock(&impl->queue.mutex);
  476. return;
  477. }
  478. struct pending_listener *unsub = aws_mem_calloc(bus->allocator, 1, sizeof(struct pending_listener));
  479. unsub->address = address;
  480. unsub->listener = listener;
  481. unsub->user_data = user_data;
  482. unsub->remove = true;
  483. aws_linked_list_push_back(&impl->queue.subs, &unsub->list_node);
  484. }
  485. aws_mutex_unlock(&impl->queue.mutex);
  486. /* notify the delivery thread to wake up */
  487. aws_condition_variable_notify_one(&impl->dispatch.notify);
  488. }
  489. static struct bus_vtable bus_async_vtable = {
  490. .clean_up = s_bus_async_clean_up,
  491. .send = s_bus_async_send,
  492. .subscribe = s_bus_async_subscribe,
  493. .unsubscribe = s_bus_async_unsubscribe,
  494. };
  495. static void s_bus_async_init(struct aws_bus *bus, const struct aws_bus_options *options) {
  496. struct bus_async_impl *impl = bus->impl = aws_mem_calloc(bus->allocator, 1, sizeof(struct bus_async_impl));
  497. impl->vtable = bus_async_vtable;
  498. impl->reliable = (options->policy == AWS_BUS_ASYNC_RELIABLE);
  499. /* init msg queue */
  500. if (aws_mutex_init(&impl->queue.mutex)) {
  501. AWS_LOGF_ERROR(
  502. AWS_LS_COMMON_BUS,
  503. "bus %p: Unable to initialize queue synchronization: %s",
  504. (void *)bus,
  505. aws_error_name(aws_last_error()));
  506. goto error;
  507. }
  508. aws_linked_list_init(&impl->queue.msgs);
  509. aws_linked_list_init(&impl->queue.free);
  510. aws_linked_list_init(&impl->queue.subs);
  511. /* push as many bus_messages as we can into the free list from the buffer */
  512. if (options->buffer_size) {
  513. impl->queue.buffer = aws_mem_calloc(bus->allocator, 1, options->buffer_size);
  514. impl->queue.buffer_end = ((uint8_t *)impl->queue.buffer) + options->buffer_size;
  515. const int msg_count = (int)(options->buffer_size / sizeof(struct bus_message));
  516. for (int msg_idx = 0; msg_idx < msg_count; ++msg_idx) {
  517. struct bus_message *msg = (void *)&((char *)impl->queue.buffer)[msg_idx * sizeof(struct bus_message)];
  518. aws_linked_list_push_back(&impl->queue.free, &msg->list_node);
  519. }
  520. }
  521. /* init subscription table */
  522. if (aws_hash_table_init(
  523. &impl->slots.table, bus->allocator, 8, aws_hash_ptr, aws_ptr_eq, NULL, s_bus_destroy_listener_list)) {
  524. AWS_LOGF_ERROR(
  525. AWS_LS_COMMON_BUS,
  526. "bus %p: Unable to initialize bus addressing table: %s",
  527. (void *)bus,
  528. aws_error_name(aws_last_error()));
  529. goto error;
  530. }
  531. /* Setup dispatch thread */
  532. if (aws_condition_variable_init(&impl->dispatch.notify)) {
  533. AWS_LOGF_ERROR(
  534. AWS_LS_COMMON_BUS,
  535. "bus %p: Unable to initialize async notify: %s",
  536. (void *)bus,
  537. aws_error_name(aws_last_error()));
  538. goto error;
  539. }
  540. if (aws_thread_init(&impl->dispatch.thread, bus->allocator)) {
  541. AWS_LOGF_ERROR(
  542. AWS_LS_COMMON_BUS,
  543. "bus %p: Unable to initialize background thread: %s",
  544. (void *)bus,
  545. aws_error_name(aws_last_error()));
  546. goto error;
  547. }
  548. impl->dispatch.running = true;
  549. aws_atomic_init_int(&impl->dispatch.started, 0);
  550. aws_atomic_init_int(&impl->dispatch.exited, 0);
  551. if (aws_thread_launch(&impl->dispatch.thread, s_bus_async_deliver, bus, aws_default_thread_options())) {
  552. AWS_LOGF_ERROR(
  553. AWS_LS_COMMON_BUS,
  554. "bus %p: Unable to launch delivery thread: %s",
  555. (void *)bus,
  556. aws_error_name(aws_last_error()));
  557. goto error;
  558. }
  559. /* wait for dispatch thread to start before returning control */
  560. AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus %p: Waiting for delivery thread to start", (void *)bus);
  561. while (!aws_atomic_load_int(&impl->dispatch.started)) {
  562. aws_thread_current_sleep(1000 * 1000);
  563. }
  564. AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus %p: Delivery thread started", (void *)bus);
  565. return;
  566. error:
  567. aws_thread_clean_up(&impl->dispatch.thread);
  568. aws_condition_variable_clean_up(&impl->dispatch.notify);
  569. aws_hash_table_clean_up(&impl->slots.table);
  570. aws_mem_release(bus->allocator, &impl->queue.buffer);
  571. aws_mutex_clean_up(&impl->queue.mutex);
  572. aws_mem_release(bus->allocator, impl);
  573. bus->impl = NULL;
  574. }
  575. /*
  576. * Public API
  577. */
  578. struct aws_bus *aws_bus_new(struct aws_allocator *allocator, const struct aws_bus_options *options) {
  579. struct aws_bus *bus = aws_mem_calloc(allocator, 1, sizeof(struct aws_bus));
  580. bus->allocator = allocator;
  581. switch (options->policy) {
  582. case AWS_BUS_ASYNC_RELIABLE:
  583. case AWS_BUS_ASYNC_UNRELIABLE:
  584. s_bus_async_init(bus, options);
  585. break;
  586. case AWS_BUS_SYNC_RELIABLE:
  587. s_bus_sync_init(bus, options);
  588. break;
  589. }
  590. if (!bus->impl) {
  591. aws_mem_release(allocator, bus);
  592. return NULL;
  593. }
  594. return bus;
  595. }
  596. void aws_bus_destroy(struct aws_bus *bus) {
  597. struct bus_vtable *vtable = bus->impl;
  598. vtable->clean_up(bus);
  599. aws_mem_release(bus->allocator, bus);
  600. }
  601. int aws_bus_subscribe(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *listener, void *user_data) {
  602. struct bus_vtable *vtable = bus->impl;
  603. return vtable->subscribe(bus, address, listener, user_data);
  604. }
  605. void aws_bus_unsubscribe(struct aws_bus *bus, uint64_t address, aws_bus_listener_fn *listener, void *user_data) {
  606. struct bus_vtable *vtable = bus->impl;
  607. vtable->unsubscribe(bus, address, listener, user_data);
  608. }
  609. int aws_bus_send(struct aws_bus *bus, uint64_t address, void *payload, void (*destructor)(void *)) {
  610. struct bus_vtable *vtable = bus->impl;
  611. return vtable->send(bus, address, payload, destructor);
  612. }
  613. #ifdef _MSC_VER
  614. # pragma warning(pop)
  615. #endif