epoll_event_loop.c 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/io/event_loop.h>
  6. #include <aws/common/atomics.h>
  7. #include <aws/common/clock.h>
  8. #include <aws/common/mutex.h>
  9. #include <aws/common/task_scheduler.h>
  10. #include <aws/common/thread.h>
  11. #include <aws/io/logging.h>
  12. #include <sys/epoll.h>
  13. #include <errno.h>
  14. #include <limits.h>
  15. #include <unistd.h>
  16. #if !defined(COMPAT_MODE) && defined(__GLIBC__) && ((__GLIBC__ == 2 && __GLIBC_MINOR__ >= 8) || __GLIBC__ > 2)
  17. # define USE_EFD 1
  18. #else
  19. # define USE_EFD 0
  20. #endif
  21. #if USE_EFD
  22. # include <aws/io/io.h>
  23. # include <sys/eventfd.h>
  24. #else
  25. # include <aws/io/pipe.h>
  26. #endif
  27. /* This isn't defined on ancient linux distros (breaking the builds).
  28. * However, if this is a prebuild, we purposely build on an ancient system, but
  29. * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
  30. * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
  31. * gets passed as long as it does.
  32. */
  33. #ifndef EPOLLRDHUP
  34. # define EPOLLRDHUP 0x2000
  35. #endif
  36. static void s_destroy(struct aws_event_loop *event_loop);
  37. static int s_run(struct aws_event_loop *event_loop);
  38. static int s_stop(struct aws_event_loop *event_loop);
  39. static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
  40. static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
  41. static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
  42. static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
  43. static int s_subscribe_to_io_events(
  44. struct aws_event_loop *event_loop,
  45. struct aws_io_handle *handle,
  46. int events,
  47. aws_event_loop_on_event_fn *on_event,
  48. void *user_data);
  49. static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
  50. static void s_free_io_event_resources(void *user_data);
  51. static bool s_is_on_callers_thread(struct aws_event_loop *event_loop);
  52. static void aws_event_loop_thread(void *args);
  53. static struct aws_event_loop_vtable s_vtable = {
  54. .destroy = s_destroy,
  55. .run = s_run,
  56. .stop = s_stop,
  57. .wait_for_stop_completion = s_wait_for_stop_completion,
  58. .schedule_task_now = s_schedule_task_now,
  59. .schedule_task_future = s_schedule_task_future,
  60. .cancel_task = s_cancel_task,
  61. .subscribe_to_io_events = s_subscribe_to_io_events,
  62. .unsubscribe_from_io_events = s_unsubscribe_from_io_events,
  63. .free_io_event_resources = s_free_io_event_resources,
  64. .is_on_callers_thread = s_is_on_callers_thread,
  65. };
  66. struct epoll_loop {
  67. struct aws_task_scheduler scheduler;
  68. struct aws_thread thread_created_on;
  69. struct aws_thread_options thread_options;
  70. aws_thread_id_t thread_joined_to;
  71. struct aws_atomic_var running_thread_id;
  72. struct aws_io_handle read_task_handle;
  73. struct aws_io_handle write_task_handle;
  74. struct aws_mutex task_pre_queue_mutex;
  75. struct aws_linked_list task_pre_queue;
  76. struct aws_task stop_task;
  77. struct aws_atomic_var stop_task_ptr;
  78. int epoll_fd;
  79. bool should_process_task_pre_queue;
  80. bool should_continue;
  81. };
  82. struct epoll_event_data {
  83. struct aws_allocator *alloc;
  84. struct aws_io_handle *handle;
  85. aws_event_loop_on_event_fn *on_event;
  86. void *user_data;
  87. struct aws_task cleanup_task;
  88. bool is_subscribed; /* false when handle is unsubscribed, but this struct hasn't been cleaned up yet */
  89. };
  90. /* default timeout is 100 seconds */
  91. enum {
  92. DEFAULT_TIMEOUT = 100 * 1000,
  93. MAX_EVENTS = 100,
  94. };
  95. int aws_open_nonblocking_posix_pipe(int pipe_fds[2]);
  96. /* Setup edge triggered epoll with a scheduler. */
  97. struct aws_event_loop *aws_event_loop_new_default_with_options(
  98. struct aws_allocator *alloc,
  99. const struct aws_event_loop_options *options) {
  100. AWS_PRECONDITION(options);
  101. AWS_PRECONDITION(options->clock);
  102. struct aws_event_loop *loop = aws_mem_calloc(alloc, 1, sizeof(struct aws_event_loop));
  103. if (!loop) {
  104. return NULL;
  105. }
  106. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered epoll", (void *)loop);
  107. if (aws_event_loop_init_base(loop, alloc, options->clock)) {
  108. goto clean_up_loop;
  109. }
  110. struct epoll_loop *epoll_loop = aws_mem_calloc(alloc, 1, sizeof(struct epoll_loop));
  111. if (!epoll_loop) {
  112. goto cleanup_base_loop;
  113. }
  114. if (options->thread_options) {
  115. epoll_loop->thread_options = *options->thread_options;
  116. } else {
  117. epoll_loop->thread_options = *aws_default_thread_options();
  118. }
  119. /* initialize thread id to NULL, it should be updated when the event loop thread starts. */
  120. aws_atomic_init_ptr(&epoll_loop->running_thread_id, NULL);
  121. aws_linked_list_init(&epoll_loop->task_pre_queue);
  122. epoll_loop->task_pre_queue_mutex = (struct aws_mutex)AWS_MUTEX_INIT;
  123. aws_atomic_init_ptr(&epoll_loop->stop_task_ptr, NULL);
  124. epoll_loop->epoll_fd = epoll_create(100);
  125. if (epoll_loop->epoll_fd < 0) {
  126. AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open epoll handle.", (void *)loop);
  127. aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
  128. goto clean_up_epoll;
  129. }
  130. if (aws_thread_init(&epoll_loop->thread_created_on, alloc)) {
  131. goto clean_up_epoll;
  132. }
  133. #if USE_EFD
  134. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Using eventfd for cross-thread notifications.", (void *)loop);
  135. int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
  136. if (fd < 0) {
  137. AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open eventfd handle.", (void *)loop);
  138. aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
  139. goto clean_up_thread;
  140. }
  141. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: eventfd descriptor %d.", (void *)loop, fd);
  142. epoll_loop->write_task_handle = (struct aws_io_handle){.data.fd = fd, .additional_data = NULL};
  143. epoll_loop->read_task_handle = (struct aws_io_handle){.data.fd = fd, .additional_data = NULL};
  144. #else
  145. AWS_LOGF_DEBUG(
  146. AWS_LS_IO_EVENT_LOOP,
  147. "id=%p: Eventfd not available, falling back to pipe for cross-thread notification.",
  148. (void *)loop);
  149. int pipe_fds[2] = {0};
  150. /* this pipe is for task scheduling. */
  151. if (aws_open_nonblocking_posix_pipe(pipe_fds)) {
  152. AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to open pipe handle.", (void *)loop);
  153. goto clean_up_thread;
  154. }
  155. AWS_LOGF_TRACE(
  156. AWS_LS_IO_EVENT_LOOP, "id=%p: pipe descriptors read %d, write %d.", (void *)loop, pipe_fds[0], pipe_fds[1]);
  157. epoll_loop->write_task_handle.data.fd = pipe_fds[1];
  158. epoll_loop->read_task_handle.data.fd = pipe_fds[0];
  159. #endif
  160. if (aws_task_scheduler_init(&epoll_loop->scheduler, alloc)) {
  161. goto clean_up_pipe;
  162. }
  163. epoll_loop->should_continue = false;
  164. loop->impl_data = epoll_loop;
  165. loop->vtable = &s_vtable;
  166. return loop;
  167. clean_up_pipe:
  168. #if USE_EFD
  169. close(epoll_loop->write_task_handle.data.fd);
  170. epoll_loop->write_task_handle.data.fd = -1;
  171. epoll_loop->read_task_handle.data.fd = -1;
  172. #else
  173. close(epoll_loop->read_task_handle.data.fd);
  174. close(epoll_loop->write_task_handle.data.fd);
  175. #endif
  176. clean_up_thread:
  177. aws_thread_clean_up(&epoll_loop->thread_created_on);
  178. clean_up_epoll:
  179. if (epoll_loop->epoll_fd >= 0) {
  180. close(epoll_loop->epoll_fd);
  181. }
  182. aws_mem_release(alloc, epoll_loop);
  183. cleanup_base_loop:
  184. aws_event_loop_clean_up_base(loop);
  185. clean_up_loop:
  186. aws_mem_release(alloc, loop);
  187. return NULL;
  188. }
  189. static void s_destroy(struct aws_event_loop *event_loop) {
  190. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Destroying event_loop", (void *)event_loop);
  191. struct epoll_loop *epoll_loop = event_loop->impl_data;
  192. /* we don't know if stop() has been called by someone else,
  193. * just call stop() again and wait for event-loop to finish. */
  194. aws_event_loop_stop(event_loop);
  195. s_wait_for_stop_completion(event_loop);
  196. /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
  197. epoll_loop->thread_joined_to = aws_thread_current_thread_id();
  198. aws_atomic_store_ptr(&epoll_loop->running_thread_id, &epoll_loop->thread_joined_to);
  199. aws_task_scheduler_clean_up(&epoll_loop->scheduler);
  200. while (!aws_linked_list_empty(&epoll_loop->task_pre_queue)) {
  201. struct aws_linked_list_node *node = aws_linked_list_pop_front(&epoll_loop->task_pre_queue);
  202. struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
  203. task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
  204. }
  205. aws_thread_clean_up(&epoll_loop->thread_created_on);
  206. #if USE_EFD
  207. close(epoll_loop->write_task_handle.data.fd);
  208. epoll_loop->write_task_handle.data.fd = -1;
  209. epoll_loop->read_task_handle.data.fd = -1;
  210. #else
  211. close(epoll_loop->read_task_handle.data.fd);
  212. close(epoll_loop->write_task_handle.data.fd);
  213. #endif
  214. close(epoll_loop->epoll_fd);
  215. aws_mem_release(event_loop->alloc, epoll_loop);
  216. aws_event_loop_clean_up_base(event_loop);
  217. aws_mem_release(event_loop->alloc, event_loop);
  218. }
  219. static int s_run(struct aws_event_loop *event_loop) {
  220. struct epoll_loop *epoll_loop = event_loop->impl_data;
  221. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop);
  222. epoll_loop->should_continue = true;
  223. aws_thread_increment_unjoined_count();
  224. if (aws_thread_launch(
  225. &epoll_loop->thread_created_on, &aws_event_loop_thread, event_loop, &epoll_loop->thread_options)) {
  226. aws_thread_decrement_unjoined_count();
  227. AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
  228. epoll_loop->should_continue = false;
  229. return AWS_OP_ERR;
  230. }
  231. return AWS_OP_SUCCESS;
  232. }
  233. static void s_stop_task(struct aws_task *task, void *args, enum aws_task_status status) {
  234. (void)task;
  235. struct aws_event_loop *event_loop = args;
  236. struct epoll_loop *epoll_loop = event_loop->impl_data;
  237. /* now okay to reschedule stop tasks. */
  238. aws_atomic_store_ptr(&epoll_loop->stop_task_ptr, NULL);
  239. if (status == AWS_TASK_STATUS_RUN_READY) {
  240. /*
  241. * this allows the event loop to invoke the callback once the event loop has completed.
  242. */
  243. epoll_loop->should_continue = false;
  244. }
  245. }
  246. static int s_stop(struct aws_event_loop *event_loop) {
  247. struct epoll_loop *epoll_loop = event_loop->impl_data;
  248. void *expected_ptr = NULL;
  249. bool update_succeeded =
  250. aws_atomic_compare_exchange_ptr(&epoll_loop->stop_task_ptr, &expected_ptr, &epoll_loop->stop_task);
  251. if (!update_succeeded) {
  252. /* the stop task is already scheduled. */
  253. return AWS_OP_SUCCESS;
  254. }
  255. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Stopping event-loop thread.", (void *)event_loop);
  256. aws_task_init(&epoll_loop->stop_task, s_stop_task, event_loop, "epoll_event_loop_stop");
  257. s_schedule_task_now(event_loop, &epoll_loop->stop_task);
  258. return AWS_OP_SUCCESS;
  259. }
  260. static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
  261. struct epoll_loop *epoll_loop = event_loop->impl_data;
  262. int result = aws_thread_join(&epoll_loop->thread_created_on);
  263. aws_thread_decrement_unjoined_count();
  264. return result;
  265. }
  266. static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
  267. struct epoll_loop *epoll_loop = event_loop->impl_data;
  268. /* if event loop and the caller are the same thread, just schedule and be done with it. */
  269. if (s_is_on_callers_thread(event_loop)) {
  270. AWS_LOGF_TRACE(
  271. AWS_LS_IO_EVENT_LOOP,
  272. "id=%p: scheduling task %p in-thread for timestamp %llu",
  273. (void *)event_loop,
  274. (void *)task,
  275. (unsigned long long)run_at_nanos);
  276. if (run_at_nanos == 0) {
  277. /* zero denotes "now" task */
  278. aws_task_scheduler_schedule_now(&epoll_loop->scheduler, task);
  279. } else {
  280. aws_task_scheduler_schedule_future(&epoll_loop->scheduler, task, run_at_nanos);
  281. }
  282. return;
  283. }
  284. AWS_LOGF_TRACE(
  285. AWS_LS_IO_EVENT_LOOP,
  286. "id=%p: Scheduling task %p cross-thread for timestamp %llu",
  287. (void *)event_loop,
  288. (void *)task,
  289. (unsigned long long)run_at_nanos);
  290. task->timestamp = run_at_nanos;
  291. aws_mutex_lock(&epoll_loop->task_pre_queue_mutex);
  292. uint64_t counter = 1;
  293. bool is_first_task = aws_linked_list_empty(&epoll_loop->task_pre_queue);
  294. aws_linked_list_push_back(&epoll_loop->task_pre_queue, &task->node);
  295. /* if the list was not empty, we already have a pending read on the pipe/eventfd, no need to write again. */
  296. if (is_first_task) {
  297. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: Waking up event-loop thread", (void *)event_loop);
  298. /* If the write fails because the buffer is full, we don't actually care because that means there's a pending
  299. * read on the pipe/eventfd and thus the event loop will end up checking to see if something has been queued.*/
  300. ssize_t do_not_care = write(epoll_loop->write_task_handle.data.fd, (void *)&counter, sizeof(counter));
  301. (void)do_not_care;
  302. }
  303. aws_mutex_unlock(&epoll_loop->task_pre_queue_mutex);
  304. }
  305. static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
  306. s_schedule_task_common(event_loop, task, 0 /* zero denotes "now" task */);
  307. }
  308. static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
  309. s_schedule_task_common(event_loop, task, run_at_nanos);
  310. }
  311. static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
  312. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task);
  313. struct epoll_loop *epoll_loop = event_loop->impl_data;
  314. aws_task_scheduler_cancel_task(&epoll_loop->scheduler, task);
  315. }
  316. static int s_subscribe_to_io_events(
  317. struct aws_event_loop *event_loop,
  318. struct aws_io_handle *handle,
  319. int events,
  320. aws_event_loop_on_event_fn *on_event,
  321. void *user_data) {
  322. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle->data.fd);
  323. struct epoll_event_data *epoll_event_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct epoll_event_data));
  324. handle->additional_data = epoll_event_data;
  325. if (!epoll_event_data) {
  326. return AWS_OP_ERR;
  327. }
  328. struct epoll_loop *epoll_loop = event_loop->impl_data;
  329. epoll_event_data->alloc = event_loop->alloc;
  330. epoll_event_data->user_data = user_data;
  331. epoll_event_data->handle = handle;
  332. epoll_event_data->on_event = on_event;
  333. epoll_event_data->is_subscribed = true;
  334. /*everyone is always registered for edge-triggered, hang up, remote hang up, errors. */
  335. uint32_t event_mask = EPOLLET | EPOLLHUP | EPOLLRDHUP | EPOLLERR;
  336. if (events & AWS_IO_EVENT_TYPE_READABLE) {
  337. event_mask |= EPOLLIN;
  338. }
  339. if (events & AWS_IO_EVENT_TYPE_WRITABLE) {
  340. event_mask |= EPOLLOUT;
  341. }
  342. /* this guy is copied by epoll_ctl */
  343. struct epoll_event epoll_event = {
  344. .data = {.ptr = epoll_event_data},
  345. .events = event_mask,
  346. };
  347. if (epoll_ctl(epoll_loop->epoll_fd, EPOLL_CTL_ADD, handle->data.fd, &epoll_event)) {
  348. AWS_LOGF_ERROR(
  349. AWS_LS_IO_EVENT_LOOP, "id=%p: failed to subscribe to events on fd %d", (void *)event_loop, handle->data.fd);
  350. handle->additional_data = NULL;
  351. aws_mem_release(event_loop->alloc, epoll_event_data);
  352. return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
  353. }
  354. return AWS_OP_SUCCESS;
  355. }
  356. static void s_free_io_event_resources(void *user_data) {
  357. struct epoll_event_data *event_data = user_data;
  358. aws_mem_release(event_data->alloc, (void *)event_data);
  359. }
  360. static void s_unsubscribe_cleanup_task(struct aws_task *task, void *arg, enum aws_task_status status) {
  361. (void)task;
  362. (void)status;
  363. struct epoll_event_data *event_data = (struct epoll_event_data *)arg;
  364. s_free_io_event_resources(event_data);
  365. }
  366. static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
  367. AWS_LOGF_TRACE(
  368. AWS_LS_IO_EVENT_LOOP, "id=%p: un-subscribing from events on fd %d", (void *)event_loop, handle->data.fd);
  369. struct epoll_loop *epoll_loop = event_loop->impl_data;
  370. AWS_ASSERT(handle->additional_data);
  371. struct epoll_event_data *additional_handle_data = handle->additional_data;
  372. struct epoll_event dummy_event;
  373. if (AWS_UNLIKELY(epoll_ctl(epoll_loop->epoll_fd, EPOLL_CTL_DEL, handle->data.fd, &dummy_event /*ignored*/))) {
  374. AWS_LOGF_ERROR(
  375. AWS_LS_IO_EVENT_LOOP,
  376. "id=%p: failed to un-subscribe from events on fd %d",
  377. (void *)event_loop,
  378. handle->data.fd);
  379. return aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
  380. }
  381. /* We can't clean up yet, because we have schedule tasks and more events to process,
  382. * mark it as unsubscribed and schedule a cleanup task. */
  383. additional_handle_data->is_subscribed = false;
  384. aws_task_init(
  385. &additional_handle_data->cleanup_task,
  386. s_unsubscribe_cleanup_task,
  387. additional_handle_data,
  388. "epoll_event_loop_unsubscribe_cleanup");
  389. s_schedule_task_now(event_loop, &additional_handle_data->cleanup_task);
  390. handle->additional_data = NULL;
  391. return AWS_OP_SUCCESS;
  392. }
  393. static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
  394. struct epoll_loop *epoll_loop = event_loop->impl_data;
  395. aws_thread_id_t *thread_id = aws_atomic_load_ptr(&epoll_loop->running_thread_id);
  396. return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
  397. }
  398. /* We treat the pipe fd with a subscription to io events just like any other managed file descriptor.
  399. * This is the event handler for events on that pipe.*/
  400. static void s_on_tasks_to_schedule(
  401. struct aws_event_loop *event_loop,
  402. struct aws_io_handle *handle,
  403. int events,
  404. void *user_data) {
  405. (void)handle;
  406. (void)user_data;
  407. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: notified of cross-thread tasks to schedule", (void *)event_loop);
  408. struct epoll_loop *epoll_loop = event_loop->impl_data;
  409. if (events & AWS_IO_EVENT_TYPE_READABLE) {
  410. epoll_loop->should_process_task_pre_queue = true;
  411. }
  412. }
  413. static void s_process_task_pre_queue(struct aws_event_loop *event_loop) {
  414. struct epoll_loop *epoll_loop = event_loop->impl_data;
  415. if (!epoll_loop->should_process_task_pre_queue) {
  416. return;
  417. }
  418. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)event_loop);
  419. epoll_loop->should_process_task_pre_queue = false;
  420. struct aws_linked_list task_pre_queue;
  421. aws_linked_list_init(&task_pre_queue);
  422. uint64_t count_ignore = 0;
  423. aws_mutex_lock(&epoll_loop->task_pre_queue_mutex);
  424. /* several tasks could theoretically have been written (though this should never happen), make sure we drain the
  425. * eventfd/pipe. */
  426. while (read(epoll_loop->read_task_handle.data.fd, &count_ignore, sizeof(count_ignore)) > -1) {
  427. }
  428. aws_linked_list_swap_contents(&epoll_loop->task_pre_queue, &task_pre_queue);
  429. aws_mutex_unlock(&epoll_loop->task_pre_queue_mutex);
  430. while (!aws_linked_list_empty(&task_pre_queue)) {
  431. struct aws_linked_list_node *node = aws_linked_list_pop_front(&task_pre_queue);
  432. struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
  433. AWS_LOGF_TRACE(
  434. AWS_LS_IO_EVENT_LOOP,
  435. "id=%p: task %p pulled to event-loop, scheduling now.",
  436. (void *)event_loop,
  437. (void *)task);
  438. /* Timestamp 0 is used to denote "now" tasks */
  439. if (task->timestamp == 0) {
  440. aws_task_scheduler_schedule_now(&epoll_loop->scheduler, task);
  441. } else {
  442. aws_task_scheduler_schedule_future(&epoll_loop->scheduler, task, task->timestamp);
  443. }
  444. }
  445. }
  446. /**
  447. * This just calls epoll_wait()
  448. *
  449. * We broke this out into its own function so that the stacktrace clearly shows
  450. * what this thread is doing. We've had a lot of cases where users think this
  451. * thread is deadlocked because it's stuck here. We want it to be clear
  452. * that it's doing nothing on purpose. It's waiting for events to happen...
  453. */
  454. AWS_NO_INLINE
  455. static int aws_event_loop_listen_for_io_events(int epoll_fd, struct epoll_event events[MAX_EVENTS], int timeout) {
  456. return epoll_wait(epoll_fd, events, MAX_EVENTS, timeout);
  457. }
  458. static void aws_event_loop_thread(void *args) {
  459. struct aws_event_loop *event_loop = args;
  460. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
  461. struct epoll_loop *epoll_loop = event_loop->impl_data;
  462. /* set thread id to the thread of the event loop */
  463. aws_atomic_store_ptr(&epoll_loop->running_thread_id, &epoll_loop->thread_created_on.thread_id);
  464. int err = s_subscribe_to_io_events(
  465. event_loop, &epoll_loop->read_task_handle, AWS_IO_EVENT_TYPE_READABLE, s_on_tasks_to_schedule, NULL);
  466. if (err) {
  467. return;
  468. }
  469. int timeout = DEFAULT_TIMEOUT;
  470. struct epoll_event events[MAX_EVENTS];
  471. AWS_LOGF_INFO(
  472. AWS_LS_IO_EVENT_LOOP,
  473. "id=%p: default timeout %d, and max events to process per tick %d",
  474. (void *)event_loop,
  475. timeout,
  476. MAX_EVENTS);
  477. /*
  478. * until stop is called,
  479. * call epoll_wait, if a task is scheduled, or a file descriptor has activity, it will
  480. * return.
  481. *
  482. * process all events,
  483. *
  484. * run all scheduled tasks.
  485. *
  486. * process queued subscription cleanups.
  487. */
  488. while (epoll_loop->should_continue) {
  489. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: waiting for a maximum of %d ms", (void *)event_loop, timeout);
  490. int event_count = aws_event_loop_listen_for_io_events(epoll_loop->epoll_fd, events, timeout);
  491. aws_event_loop_register_tick_start(event_loop);
  492. AWS_LOGF_TRACE(
  493. AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, event_count);
  494. for (int i = 0; i < event_count; ++i) {
  495. struct epoll_event_data *event_data = (struct epoll_event_data *)events[i].data.ptr;
  496. int event_mask = 0;
  497. if (events[i].events & EPOLLIN) {
  498. event_mask |= AWS_IO_EVENT_TYPE_READABLE;
  499. }
  500. if (events[i].events & EPOLLOUT) {
  501. event_mask |= AWS_IO_EVENT_TYPE_WRITABLE;
  502. }
  503. if (events[i].events & EPOLLRDHUP) {
  504. event_mask |= AWS_IO_EVENT_TYPE_REMOTE_HANG_UP;
  505. }
  506. if (events[i].events & EPOLLHUP) {
  507. event_mask |= AWS_IO_EVENT_TYPE_CLOSED;
  508. }
  509. if (events[i].events & EPOLLERR) {
  510. event_mask |= AWS_IO_EVENT_TYPE_ERROR;
  511. }
  512. if (event_data->is_subscribed) {
  513. AWS_LOGF_TRACE(
  514. AWS_LS_IO_EVENT_LOOP,
  515. "id=%p: activity on fd %d, invoking handler.",
  516. (void *)event_loop,
  517. event_data->handle->data.fd);
  518. event_data->on_event(event_loop, event_data->handle, event_mask, event_data->user_data);
  519. }
  520. }
  521. /* run scheduled tasks */
  522. s_process_task_pre_queue(event_loop);
  523. uint64_t now_ns = 0;
  524. event_loop->clock(&now_ns); /* if clock fails, now_ns will be 0 and tasks scheduled for a specific time
  525. will not be run. That's ok, we'll handle them next time around. */
  526. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
  527. aws_task_scheduler_run_all(&epoll_loop->scheduler, now_ns);
  528. /* set timeout for next epoll_wait() call.
  529. * if clock fails, or scheduler has no tasks, use default timeout */
  530. bool use_default_timeout = false;
  531. if (event_loop->clock(&now_ns)) {
  532. use_default_timeout = true;
  533. }
  534. uint64_t next_run_time_ns;
  535. if (!aws_task_scheduler_has_tasks(&epoll_loop->scheduler, &next_run_time_ns)) {
  536. use_default_timeout = true;
  537. }
  538. if (use_default_timeout) {
  539. AWS_LOGF_TRACE(
  540. AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
  541. timeout = DEFAULT_TIMEOUT;
  542. } else {
  543. /* Translate timestamp (in nanoseconds) to timeout (in milliseconds) */
  544. uint64_t timeout_ns = (next_run_time_ns > now_ns) ? (next_run_time_ns - now_ns) : 0;
  545. uint64_t timeout_ms64 = aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
  546. timeout = timeout_ms64 > INT_MAX ? INT_MAX : (int)timeout_ms64;
  547. AWS_LOGF_TRACE(
  548. AWS_LS_IO_EVENT_LOOP,
  549. "id=%p: detected more scheduled tasks with the next occurring at "
  550. "%llu, using timeout of %d.",
  551. (void *)event_loop,
  552. (unsigned long long)timeout_ns,
  553. timeout);
  554. }
  555. aws_event_loop_register_tick_end(event_loop);
  556. }
  557. AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
  558. s_unsubscribe_from_io_events(event_loop, &epoll_loop->read_task_handle);
  559. /* set thread id back to NULL. This should be updated again in destroy, before tasks are canceled. */
  560. aws_atomic_store_ptr(&epoll_loop->running_thread_id, NULL);
  561. }