event_loop.c 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/io/event_loop.h>
  6. #include <aws/common/clock.h>
  7. #include <aws/common/device_random.h>
  8. #include <aws/common/system_info.h>
  9. #include <aws/common/thread.h>
  10. struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, aws_io_clock_fn *clock) {
  11. struct aws_event_loop_options options = {
  12. .thread_options = NULL,
  13. .clock = clock,
  14. };
  15. return aws_event_loop_new_default_with_options(alloc, &options);
  16. }
  17. static void s_event_loop_group_thread_exit(void *user_data) {
  18. struct aws_event_loop_group *el_group = user_data;
  19. aws_simple_completion_callback *completion_callback = el_group->shutdown_options.shutdown_callback_fn;
  20. void *completion_user_data = el_group->shutdown_options.shutdown_callback_user_data;
  21. aws_mem_release(el_group->allocator, el_group);
  22. if (completion_callback != NULL) {
  23. completion_callback(completion_user_data);
  24. }
  25. }
  26. static void s_aws_event_loop_group_shutdown_sync(struct aws_event_loop_group *el_group) {
  27. while (aws_array_list_length(&el_group->event_loops) > 0) {
  28. struct aws_event_loop *loop = NULL;
  29. if (!aws_array_list_back(&el_group->event_loops, &loop)) {
  30. aws_event_loop_destroy(loop);
  31. }
  32. aws_array_list_pop_back(&el_group->event_loops);
  33. }
  34. aws_array_list_clean_up(&el_group->event_loops);
  35. }
  36. static void s_event_loop_destroy_async_thread_fn(void *thread_data) {
  37. struct aws_event_loop_group *el_group = thread_data;
  38. s_aws_event_loop_group_shutdown_sync(el_group);
  39. aws_thread_current_at_exit(s_event_loop_group_thread_exit, el_group);
  40. }
  41. static void s_aws_event_loop_group_shutdown_async(struct aws_event_loop_group *el_group) {
  42. /* It's possible that the last refcount was released on an event-loop thread,
  43. * so we would deadlock if we waited here for all the event-loop threads to shut down.
  44. * Therefore, we spawn a NEW thread and have it wait for all the event-loop threads to shut down
  45. */
  46. struct aws_thread cleanup_thread;
  47. AWS_ZERO_STRUCT(cleanup_thread);
  48. aws_thread_init(&cleanup_thread, el_group->allocator);
  49. struct aws_thread_options thread_options = *aws_default_thread_options();
  50. thread_options.join_strategy = AWS_TJS_MANAGED;
  51. thread_options.name = aws_byte_cursor_from_c_str("EvntLoopCleanup"); /* 15 characters is max for Linux */
  52. aws_thread_launch(&cleanup_thread, s_event_loop_destroy_async_thread_fn, el_group, &thread_options);
  53. }
  54. static struct aws_event_loop_group *s_event_loop_group_new(
  55. struct aws_allocator *alloc,
  56. aws_io_clock_fn *clock,
  57. uint16_t el_count,
  58. uint16_t cpu_group,
  59. bool pin_threads,
  60. aws_new_event_loop_fn *new_loop_fn,
  61. void *new_loop_user_data,
  62. const struct aws_shutdown_callback_options *shutdown_options) {
  63. AWS_ASSERT(new_loop_fn);
  64. size_t group_cpu_count = 0;
  65. struct aws_cpu_info *usable_cpus = NULL;
  66. if (pin_threads) {
  67. group_cpu_count = aws_get_cpu_count_for_group(cpu_group);
  68. if (!group_cpu_count) {
  69. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  70. return NULL;
  71. }
  72. usable_cpus = aws_mem_calloc(alloc, group_cpu_count, sizeof(struct aws_cpu_info));
  73. if (usable_cpus == NULL) {
  74. return NULL;
  75. }
  76. aws_get_cpu_ids_for_group(cpu_group, usable_cpus, group_cpu_count);
  77. }
  78. struct aws_event_loop_group *el_group = aws_mem_calloc(alloc, 1, sizeof(struct aws_event_loop_group));
  79. if (el_group == NULL) {
  80. return NULL;
  81. }
  82. el_group->allocator = alloc;
  83. aws_ref_count_init(
  84. &el_group->ref_count, el_group, (aws_simple_completion_callback *)s_aws_event_loop_group_shutdown_async);
  85. if (aws_array_list_init_dynamic(&el_group->event_loops, alloc, el_count, sizeof(struct aws_event_loop *))) {
  86. goto on_error;
  87. }
  88. for (uint16_t i = 0; i < el_count; ++i) {
  89. /* Don't pin to hyper-threads if a user cared enough to specify a NUMA node */
  90. if (!pin_threads || (i < group_cpu_count && !usable_cpus[i].suspected_hyper_thread)) {
  91. struct aws_thread_options thread_options = *aws_default_thread_options();
  92. struct aws_event_loop_options options = {
  93. .clock = clock,
  94. .thread_options = &thread_options,
  95. };
  96. if (pin_threads) {
  97. thread_options.cpu_id = usable_cpus[i].cpu_id;
  98. }
  99. /* Thread name should be <= 15 characters */
  100. char thread_name[32] = {0};
  101. int thread_name_len = snprintf(thread_name, sizeof(thread_name), "AwsEventLoop %d", (int)i + 1);
  102. if (thread_name_len > AWS_THREAD_NAME_RECOMMENDED_STRLEN) {
  103. snprintf(thread_name, sizeof(thread_name), "AwsEventLoop");
  104. }
  105. thread_options.name = aws_byte_cursor_from_c_str(thread_name);
  106. struct aws_event_loop *loop = new_loop_fn(alloc, &options, new_loop_user_data);
  107. if (!loop) {
  108. goto on_error;
  109. }
  110. if (aws_array_list_push_back(&el_group->event_loops, (const void *)&loop)) {
  111. aws_event_loop_destroy(loop);
  112. goto on_error;
  113. }
  114. if (aws_event_loop_run(loop)) {
  115. goto on_error;
  116. }
  117. }
  118. }
  119. if (shutdown_options != NULL) {
  120. el_group->shutdown_options = *shutdown_options;
  121. }
  122. if (pin_threads) {
  123. aws_mem_release(alloc, usable_cpus);
  124. }
  125. return el_group;
  126. on_error:
  127. aws_mem_release(alloc, usable_cpus);
  128. s_aws_event_loop_group_shutdown_sync(el_group);
  129. s_event_loop_group_thread_exit(el_group);
  130. return NULL;
  131. }
  132. struct aws_event_loop_group *aws_event_loop_group_new(
  133. struct aws_allocator *alloc,
  134. aws_io_clock_fn *clock,
  135. uint16_t el_count,
  136. aws_new_event_loop_fn *new_loop_fn,
  137. void *new_loop_user_data,
  138. const struct aws_shutdown_callback_options *shutdown_options) {
  139. AWS_ASSERT(new_loop_fn);
  140. AWS_ASSERT(el_count);
  141. return s_event_loop_group_new(alloc, clock, el_count, 0, false, new_loop_fn, new_loop_user_data, shutdown_options);
  142. }
  143. static struct aws_event_loop *s_default_new_event_loop(
  144. struct aws_allocator *allocator,
  145. const struct aws_event_loop_options *options,
  146. void *user_data) {
  147. (void)user_data;
  148. return aws_event_loop_new_default_with_options(allocator, options);
  149. }
  150. struct aws_event_loop_group *aws_event_loop_group_new_default(
  151. struct aws_allocator *alloc,
  152. uint16_t max_threads,
  153. const struct aws_shutdown_callback_options *shutdown_options) {
  154. if (!max_threads) {
  155. uint16_t processor_count = (uint16_t)aws_system_info_processor_count();
  156. /* cut them in half to avoid using hyper threads for the IO work. */
  157. max_threads = processor_count > 1 ? processor_count / 2 : processor_count;
  158. }
  159. return aws_event_loop_group_new(
  160. alloc, aws_high_res_clock_get_ticks, max_threads, s_default_new_event_loop, NULL, shutdown_options);
  161. }
  162. struct aws_event_loop_group *aws_event_loop_group_new_pinned_to_cpu_group(
  163. struct aws_allocator *alloc,
  164. aws_io_clock_fn *clock,
  165. uint16_t el_count,
  166. uint16_t cpu_group,
  167. aws_new_event_loop_fn *new_loop_fn,
  168. void *new_loop_user_data,
  169. const struct aws_shutdown_callback_options *shutdown_options) {
  170. AWS_ASSERT(new_loop_fn);
  171. AWS_ASSERT(el_count);
  172. return s_event_loop_group_new(
  173. alloc, clock, el_count, cpu_group, true, new_loop_fn, new_loop_user_data, shutdown_options);
  174. }
  175. struct aws_event_loop_group *aws_event_loop_group_new_default_pinned_to_cpu_group(
  176. struct aws_allocator *alloc,
  177. uint16_t max_threads,
  178. uint16_t cpu_group,
  179. const struct aws_shutdown_callback_options *shutdown_options) {
  180. if (!max_threads) {
  181. uint16_t processor_count = (uint16_t)aws_system_info_processor_count();
  182. /* cut them in half to avoid using hyper threads for the IO work. */
  183. max_threads = processor_count > 1 ? processor_count / 2 : processor_count;
  184. }
  185. return aws_event_loop_group_new_pinned_to_cpu_group(
  186. alloc, aws_high_res_clock_get_ticks, max_threads, cpu_group, s_default_new_event_loop, NULL, shutdown_options);
  187. }
  188. struct aws_event_loop_group *aws_event_loop_group_acquire(struct aws_event_loop_group *el_group) {
  189. if (el_group != NULL) {
  190. aws_ref_count_acquire(&el_group->ref_count);
  191. }
  192. return el_group;
  193. }
  194. void aws_event_loop_group_release(struct aws_event_loop_group *el_group) {
  195. if (el_group != NULL) {
  196. aws_ref_count_release(&el_group->ref_count);
  197. }
  198. }
  199. size_t aws_event_loop_group_get_loop_count(struct aws_event_loop_group *el_group) {
  200. return aws_array_list_length(&el_group->event_loops);
  201. }
  202. struct aws_event_loop *aws_event_loop_group_get_loop_at(struct aws_event_loop_group *el_group, size_t index) {
  203. struct aws_event_loop *el = NULL;
  204. aws_array_list_get_at(&el_group->event_loops, &el, index);
  205. return el;
  206. }
  207. struct aws_event_loop *aws_event_loop_group_get_next_loop(struct aws_event_loop_group *el_group) {
  208. size_t loop_count = aws_array_list_length(&el_group->event_loops);
  209. AWS_ASSERT(loop_count > 0);
  210. if (loop_count == 0) {
  211. return NULL;
  212. }
  213. /* do one call to get 32 random bits because this hits an actual entropy source and it's not cheap */
  214. uint32_t random_32_bit_num = 0;
  215. aws_device_random_u32(&random_32_bit_num);
  216. /* use the best of two algorithm to select the loop with the lowest load.
  217. * If we find device random is too hard on the kernel, we can seed it and use another random
  218. * number generator. */
  219. /* it's fine and intentional, the case will throw off the top 16 bits and that's what we want. */
  220. uint16_t random_num_a = (uint16_t)random_32_bit_num;
  221. random_num_a = random_num_a % loop_count;
  222. uint16_t random_num_b = (uint16_t)(random_32_bit_num >> 16);
  223. random_num_b = random_num_b % loop_count;
  224. struct aws_event_loop *random_loop_a = NULL;
  225. struct aws_event_loop *random_loop_b = NULL;
  226. aws_array_list_get_at(&el_group->event_loops, &random_loop_a, random_num_a);
  227. aws_array_list_get_at(&el_group->event_loops, &random_loop_b, random_num_b);
  228. /* there's no logical reason why this should ever be possible. It's just best to die if it happens. */
  229. AWS_FATAL_ASSERT((random_loop_a && random_loop_b) && "random_loop_a or random_loop_b is NULL.");
  230. size_t load_a = aws_event_loop_get_load_factor(random_loop_a);
  231. size_t load_b = aws_event_loop_get_load_factor(random_loop_b);
  232. return load_a < load_b ? random_loop_a : random_loop_b;
  233. }
  234. static void s_object_removed(void *value) {
  235. struct aws_event_loop_local_object *object = (struct aws_event_loop_local_object *)value;
  236. if (object->on_object_removed) {
  237. object->on_object_removed(object);
  238. }
  239. }
  240. int aws_event_loop_init_base(struct aws_event_loop *event_loop, struct aws_allocator *alloc, aws_io_clock_fn *clock) {
  241. AWS_ZERO_STRUCT(*event_loop);
  242. event_loop->alloc = alloc;
  243. event_loop->clock = clock;
  244. aws_atomic_init_int(&event_loop->current_load_factor, 0u);
  245. aws_atomic_init_int(&event_loop->next_flush_time, 0u);
  246. if (aws_hash_table_init(&event_loop->local_data, alloc, 20, aws_hash_ptr, aws_ptr_eq, NULL, s_object_removed)) {
  247. return AWS_OP_ERR;
  248. }
  249. return AWS_OP_SUCCESS;
  250. }
  251. void aws_event_loop_clean_up_base(struct aws_event_loop *event_loop) {
  252. aws_hash_table_clean_up(&event_loop->local_data);
  253. }
  254. void aws_event_loop_register_tick_start(struct aws_event_loop *event_loop) {
  255. aws_high_res_clock_get_ticks(&event_loop->latest_tick_start);
  256. }
  257. void aws_event_loop_register_tick_end(struct aws_event_loop *event_loop) {
  258. /* increment the timestamp diff counter (this should always be called from the same thread), the concurrency
  259. * work happens during the flush. */
  260. uint64_t end_tick = 0;
  261. aws_high_res_clock_get_ticks(&end_tick);
  262. size_t elapsed = (size_t)aws_min_u64(end_tick - event_loop->latest_tick_start, SIZE_MAX);
  263. event_loop->current_tick_latency_sum = aws_add_size_saturating(event_loop->current_tick_latency_sum, elapsed);
  264. event_loop->latest_tick_start = 0;
  265. size_t next_flush_time_secs = aws_atomic_load_int(&event_loop->next_flush_time);
  266. /* store as seconds because we can't make a 64-bit integer reliably atomic across platforms. */
  267. uint64_t end_tick_secs = aws_timestamp_convert(end_tick, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, NULL);
  268. /* if a second has passed, flush the load-factor. */
  269. if (end_tick_secs > next_flush_time_secs) {
  270. aws_atomic_store_int(&event_loop->current_load_factor, event_loop->current_tick_latency_sum);
  271. event_loop->current_tick_latency_sum = 0;
  272. /* run again in a second. */
  273. aws_atomic_store_int(&event_loop->next_flush_time, (size_t)(end_tick_secs + 1));
  274. }
  275. }
  276. size_t aws_event_loop_get_load_factor(struct aws_event_loop *event_loop) {
  277. uint64_t current_time = 0;
  278. aws_high_res_clock_get_ticks(&current_time);
  279. uint64_t current_time_secs = aws_timestamp_convert(current_time, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, NULL);
  280. size_t next_flush_time_secs = aws_atomic_load_int(&event_loop->next_flush_time);
  281. /* safety valve just in case an event-loop had heavy load and then went completely idle. If we haven't
  282. * had an update from the event-loop in 10 seconds, just assume idle. Also, yes this is racy, but it should
  283. * be good enough because an active loop will be updating its counter frequently ( more than once per 10 seconds
  284. * for sure ), in the case where we hit the technical race condition, we don't care anyways and returning 0
  285. * is the desired behavior. */
  286. if (current_time_secs > next_flush_time_secs + 10) {
  287. return 0;
  288. }
  289. return aws_atomic_load_int(&event_loop->current_load_factor);
  290. }
  291. void aws_event_loop_destroy(struct aws_event_loop *event_loop) {
  292. if (!event_loop) {
  293. return;
  294. }
  295. AWS_ASSERT(event_loop->vtable && event_loop->vtable->destroy);
  296. AWS_ASSERT(!aws_event_loop_thread_is_callers_thread(event_loop));
  297. event_loop->vtable->destroy(event_loop);
  298. }
  299. int aws_event_loop_fetch_local_object(
  300. struct aws_event_loop *event_loop,
  301. void *key,
  302. struct aws_event_loop_local_object *obj) {
  303. AWS_ASSERT(aws_event_loop_thread_is_callers_thread(event_loop));
  304. struct aws_hash_element *object = NULL;
  305. if (!aws_hash_table_find(&event_loop->local_data, key, &object) && object) {
  306. *obj = *(struct aws_event_loop_local_object *)object->value;
  307. return AWS_OP_SUCCESS;
  308. }
  309. return AWS_OP_ERR;
  310. }
  311. int aws_event_loop_put_local_object(struct aws_event_loop *event_loop, struct aws_event_loop_local_object *obj) {
  312. AWS_ASSERT(aws_event_loop_thread_is_callers_thread(event_loop));
  313. struct aws_hash_element *object = NULL;
  314. int was_created = 0;
  315. if (!aws_hash_table_create(&event_loop->local_data, obj->key, &object, &was_created)) {
  316. object->key = obj->key;
  317. object->value = obj;
  318. return AWS_OP_SUCCESS;
  319. }
  320. return AWS_OP_ERR;
  321. }
  322. int aws_event_loop_remove_local_object(
  323. struct aws_event_loop *event_loop,
  324. void *key,
  325. struct aws_event_loop_local_object *removed_obj) {
  326. AWS_ASSERT(aws_event_loop_thread_is_callers_thread(event_loop));
  327. struct aws_hash_element existing_object;
  328. AWS_ZERO_STRUCT(existing_object);
  329. int was_present = 0;
  330. struct aws_hash_element *remove_candidate = removed_obj ? &existing_object : NULL;
  331. if (!aws_hash_table_remove(&event_loop->local_data, key, remove_candidate, &was_present)) {
  332. if (remove_candidate && was_present) {
  333. *removed_obj = *(struct aws_event_loop_local_object *)existing_object.value;
  334. }
  335. return AWS_OP_SUCCESS;
  336. }
  337. return AWS_OP_ERR;
  338. }
  339. int aws_event_loop_run(struct aws_event_loop *event_loop) {
  340. AWS_ASSERT(event_loop->vtable && event_loop->vtable->run);
  341. return event_loop->vtable->run(event_loop);
  342. }
  343. int aws_event_loop_stop(struct aws_event_loop *event_loop) {
  344. AWS_ASSERT(event_loop->vtable && event_loop->vtable->stop);
  345. return event_loop->vtable->stop(event_loop);
  346. }
  347. int aws_event_loop_wait_for_stop_completion(struct aws_event_loop *event_loop) {
  348. AWS_ASSERT(!aws_event_loop_thread_is_callers_thread(event_loop));
  349. AWS_ASSERT(event_loop->vtable && event_loop->vtable->wait_for_stop_completion);
  350. return event_loop->vtable->wait_for_stop_completion(event_loop);
  351. }
  352. void aws_event_loop_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
  353. AWS_ASSERT(event_loop->vtable && event_loop->vtable->schedule_task_now);
  354. AWS_ASSERT(task);
  355. event_loop->vtable->schedule_task_now(event_loop, task);
  356. }
  357. void aws_event_loop_schedule_task_future(
  358. struct aws_event_loop *event_loop,
  359. struct aws_task *task,
  360. uint64_t run_at_nanos) {
  361. AWS_ASSERT(event_loop->vtable && event_loop->vtable->schedule_task_future);
  362. AWS_ASSERT(task);
  363. event_loop->vtable->schedule_task_future(event_loop, task, run_at_nanos);
  364. }
  365. void aws_event_loop_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
  366. AWS_ASSERT(event_loop->vtable && event_loop->vtable->cancel_task);
  367. AWS_ASSERT(aws_event_loop_thread_is_callers_thread(event_loop));
  368. AWS_ASSERT(task);
  369. event_loop->vtable->cancel_task(event_loop, task);
  370. }
  371. #if AWS_USE_IO_COMPLETION_PORTS
  372. int aws_event_loop_connect_handle_to_io_completion_port(
  373. struct aws_event_loop *event_loop,
  374. struct aws_io_handle *handle) {
  375. AWS_ASSERT(event_loop->vtable && event_loop->vtable->connect_to_io_completion_port);
  376. return event_loop->vtable->connect_to_io_completion_port(event_loop, handle);
  377. }
  378. #else /* !AWS_USE_IO_COMPLETION_PORTS */
  379. int aws_event_loop_subscribe_to_io_events(
  380. struct aws_event_loop *event_loop,
  381. struct aws_io_handle *handle,
  382. int events,
  383. aws_event_loop_on_event_fn *on_event,
  384. void *user_data) {
  385. AWS_ASSERT(event_loop->vtable && event_loop->vtable->subscribe_to_io_events);
  386. return event_loop->vtable->subscribe_to_io_events(event_loop, handle, events, on_event, user_data);
  387. }
  388. #endif /* AWS_USE_IO_COMPLETION_PORTS */
  389. int aws_event_loop_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
  390. AWS_ASSERT(aws_event_loop_thread_is_callers_thread(event_loop));
  391. AWS_ASSERT(event_loop->vtable && event_loop->vtable->unsubscribe_from_io_events);
  392. return event_loop->vtable->unsubscribe_from_io_events(event_loop, handle);
  393. }
  394. void aws_event_loop_free_io_event_resources(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
  395. AWS_ASSERT(event_loop && event_loop->vtable->free_io_event_resources);
  396. event_loop->vtable->free_io_event_resources(handle->additional_data);
  397. }
  398. bool aws_event_loop_thread_is_callers_thread(struct aws_event_loop *event_loop) {
  399. AWS_ASSERT(event_loop->vtable && event_loop->vtable->is_on_callers_thread);
  400. return event_loop->vtable->is_on_callers_thread(event_loop);
  401. }
  402. int aws_event_loop_current_clock_time(struct aws_event_loop *event_loop, uint64_t *time_nanos) {
  403. AWS_ASSERT(event_loop->clock);
  404. return event_loop->clock(time_nanos);
  405. }