kqueue_event_loop.c 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/io/event_loop.h>
  6. #include <aws/io/logging.h>
  7. #include <aws/common/atomics.h>
  8. #include <aws/common/clock.h>
  9. #include <aws/common/mutex.h>
  10. #include <aws/common/task_scheduler.h>
  11. #include <aws/common/thread.h>
  12. #if defined(__FreeBSD__) || defined(__NetBSD__)
  13. # define __BSD_VISIBLE 1
  14. # include <sys/types.h>
  15. #endif
  16. #include <sys/event.h>
  17. #include <aws/io/io.h>
  18. #include <limits.h>
  19. #include <unistd.h>
  20. static void s_destroy(struct aws_event_loop *event_loop);
  21. static int s_run(struct aws_event_loop *event_loop);
  22. static int s_stop(struct aws_event_loop *event_loop);
  23. static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
  24. static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
  25. static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
  26. static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
  27. static int s_subscribe_to_io_events(
  28. struct aws_event_loop *event_loop,
  29. struct aws_io_handle *handle,
  30. int events,
  31. aws_event_loop_on_event_fn *on_event,
  32. void *user_data);
  33. static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
  34. static void s_free_io_event_resources(void *user_data);
  35. static bool s_is_event_thread(struct aws_event_loop *event_loop);
  36. static void aws_event_loop_thread(void *user_data);
  37. int aws_open_nonblocking_posix_pipe(int pipe_fds[2]);
  38. enum event_thread_state {
  39. EVENT_THREAD_STATE_READY_TO_RUN,
  40. EVENT_THREAD_STATE_RUNNING,
  41. EVENT_THREAD_STATE_STOPPING,
  42. };
  43. enum pipe_fd_index {
  44. READ_FD,
  45. WRITE_FD,
  46. };
  47. struct kqueue_loop {
  48. /* thread_created_on is the handle to the event loop thread. */
  49. struct aws_thread thread_created_on;
  50. /* thread_joined_to is used by the thread destroying the event loop. */
  51. aws_thread_id_t thread_joined_to;
  52. /* running_thread_id is NULL if the event loop thread is stopped or points-to the thread_id of the thread running
  53. * the event loop (either thread_created_on or thread_joined_to). Atomic because of concurrent writes (e.g.,
  54. * run/stop) and reads (e.g., is_event_loop_thread).
  55. * An aws_thread_id_t variable itself cannot be atomic because it is an opaque type that is platform-dependent. */
  56. struct aws_atomic_var running_thread_id;
  57. int kq_fd; /* kqueue file descriptor */
  58. /* Pipe for signaling to event-thread that cross_thread_data has changed. */
  59. int cross_thread_signal_pipe[2];
  60. /* cross_thread_data holds things that must be communicated across threads.
  61. * When the event-thread is running, the mutex must be locked while anyone touches anything in cross_thread_data.
  62. * If this data is modified outside the thread, the thread is signaled via activity on a pipe. */
  63. struct {
  64. struct aws_mutex mutex;
  65. bool thread_signaled; /* whether thread has been signaled about changes to cross_thread_data */
  66. struct aws_linked_list tasks_to_schedule;
  67. enum event_thread_state state;
  68. } cross_thread_data;
  69. /* thread_data holds things which, when the event-thread is running, may only be touched by the thread */
  70. struct {
  71. struct aws_task_scheduler scheduler;
  72. int connected_handle_count;
  73. /* These variables duplicate ones in cross_thread_data. We move values out while holding the mutex and operate
  74. * on them later */
  75. enum event_thread_state state;
  76. } thread_data;
  77. struct aws_thread_options thread_options;
  78. };
  79. /* Data attached to aws_io_handle while the handle is subscribed to io events */
  80. struct handle_data {
  81. struct aws_io_handle *owner;
  82. struct aws_event_loop *event_loop;
  83. aws_event_loop_on_event_fn *on_event;
  84. void *on_event_user_data;
  85. int events_subscribed; /* aws_io_event_types this handle should be subscribed to */
  86. int events_this_loop; /* aws_io_event_types received during current loop of the event-thread */
  87. enum { HANDLE_STATE_SUBSCRIBING, HANDLE_STATE_SUBSCRIBED, HANDLE_STATE_UNSUBSCRIBED } state;
  88. struct aws_task subscribe_task;
  89. struct aws_task cleanup_task;
  90. };
  91. enum {
  92. DEFAULT_TIMEOUT_SEC = 100, /* Max kevent() timeout per loop of the event-thread */
  93. MAX_EVENTS = 100, /* Max kevents to process per loop of the event-thread */
  94. };
  95. struct aws_event_loop_vtable s_kqueue_vtable = {
  96. .destroy = s_destroy,
  97. .run = s_run,
  98. .stop = s_stop,
  99. .wait_for_stop_completion = s_wait_for_stop_completion,
  100. .schedule_task_now = s_schedule_task_now,
  101. .schedule_task_future = s_schedule_task_future,
  102. .subscribe_to_io_events = s_subscribe_to_io_events,
  103. .cancel_task = s_cancel_task,
  104. .unsubscribe_from_io_events = s_unsubscribe_from_io_events,
  105. .free_io_event_resources = s_free_io_event_resources,
  106. .is_on_callers_thread = s_is_event_thread,
  107. };
  108. struct aws_event_loop *aws_event_loop_new_default_with_options(
  109. struct aws_allocator *alloc,
  110. const struct aws_event_loop_options *options) {
  111. AWS_ASSERT(alloc);
  112. AWS_ASSERT(clock);
  113. AWS_ASSERT(options);
  114. AWS_ASSERT(options->clock);
  115. bool clean_up_event_loop_mem = false;
  116. bool clean_up_event_loop_base = false;
  117. bool clean_up_impl_mem = false;
  118. bool clean_up_thread = false;
  119. bool clean_up_kqueue = false;
  120. bool clean_up_signal_pipe = false;
  121. bool clean_up_signal_kevent = false;
  122. bool clean_up_mutex = false;
  123. struct aws_event_loop *event_loop = aws_mem_acquire(alloc, sizeof(struct aws_event_loop));
  124. if (!event_loop) {
  125. return NULL;
  126. }
  127. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Initializing edge-triggered kqueue", (void *)event_loop);
  128. clean_up_event_loop_mem = true;
  129. int err = aws_event_loop_init_base(event_loop, alloc, options->clock);
  130. if (err) {
  131. goto clean_up;
  132. }
  133. clean_up_event_loop_base = true;
  134. struct kqueue_loop *impl = aws_mem_calloc(alloc, 1, sizeof(struct kqueue_loop));
  135. if (!impl) {
  136. goto clean_up;
  137. }
  138. if (options->thread_options) {
  139. impl->thread_options = *options->thread_options;
  140. } else {
  141. impl->thread_options = *aws_default_thread_options();
  142. }
  143. /* intialize thread id to NULL. It will be set when the event loop thread starts. */
  144. aws_atomic_init_ptr(&impl->running_thread_id, NULL);
  145. clean_up_impl_mem = true;
  146. err = aws_thread_init(&impl->thread_created_on, alloc);
  147. if (err) {
  148. goto clean_up;
  149. }
  150. clean_up_thread = true;
  151. impl->kq_fd = kqueue();
  152. if (impl->kq_fd == -1) {
  153. AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: Failed to open kqueue handle.", (void *)event_loop);
  154. aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
  155. goto clean_up;
  156. }
  157. clean_up_kqueue = true;
  158. err = aws_open_nonblocking_posix_pipe(impl->cross_thread_signal_pipe);
  159. if (err) {
  160. AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to open pipe handle.", (void *)event_loop);
  161. goto clean_up;
  162. }
  163. AWS_LOGF_TRACE(
  164. AWS_LS_IO_EVENT_LOOP,
  165. "id=%p: pipe descriptors read %d, write %d.",
  166. (void *)event_loop,
  167. impl->cross_thread_signal_pipe[READ_FD],
  168. impl->cross_thread_signal_pipe[WRITE_FD]);
  169. clean_up_signal_pipe = true;
  170. /* Set up kevent to handle activity on the cross_thread_signal_pipe */
  171. struct kevent thread_signal_kevent;
  172. EV_SET(
  173. &thread_signal_kevent,
  174. impl->cross_thread_signal_pipe[READ_FD],
  175. EVFILT_READ /*filter*/,
  176. EV_ADD | EV_CLEAR /*flags*/,
  177. 0 /*fflags*/,
  178. 0 /*data*/,
  179. NULL /*udata*/);
  180. int res = kevent(
  181. impl->kq_fd,
  182. &thread_signal_kevent /*changelist*/,
  183. 1 /*nchanges*/,
  184. NULL /*eventlist*/,
  185. 0 /*nevents*/,
  186. NULL /*timeout*/);
  187. if (res == -1) {
  188. AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: failed to create cross-thread signal kevent.", (void *)event_loop);
  189. aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
  190. goto clean_up;
  191. }
  192. clean_up_signal_kevent = true;
  193. err = aws_mutex_init(&impl->cross_thread_data.mutex);
  194. if (err) {
  195. goto clean_up;
  196. }
  197. clean_up_mutex = true;
  198. impl->cross_thread_data.thread_signaled = false;
  199. aws_linked_list_init(&impl->cross_thread_data.tasks_to_schedule);
  200. impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
  201. err = aws_task_scheduler_init(&impl->thread_data.scheduler, alloc);
  202. if (err) {
  203. goto clean_up;
  204. }
  205. impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
  206. event_loop->impl_data = impl;
  207. event_loop->vtable = &s_kqueue_vtable;
  208. /* success */
  209. return event_loop;
  210. clean_up:
  211. if (clean_up_mutex) {
  212. aws_mutex_clean_up(&impl->cross_thread_data.mutex);
  213. }
  214. if (clean_up_signal_kevent) {
  215. thread_signal_kevent.flags = EV_DELETE;
  216. kevent(
  217. impl->kq_fd,
  218. &thread_signal_kevent /*changelist*/,
  219. 1 /*nchanges*/,
  220. NULL /*eventlist*/,
  221. 0 /*nevents*/,
  222. NULL /*timeout*/);
  223. }
  224. if (clean_up_signal_pipe) {
  225. close(impl->cross_thread_signal_pipe[READ_FD]);
  226. close(impl->cross_thread_signal_pipe[WRITE_FD]);
  227. }
  228. if (clean_up_kqueue) {
  229. close(impl->kq_fd);
  230. }
  231. if (clean_up_thread) {
  232. aws_thread_clean_up(&impl->thread_created_on);
  233. }
  234. if (clean_up_impl_mem) {
  235. aws_mem_release(alloc, impl);
  236. }
  237. if (clean_up_event_loop_base) {
  238. aws_event_loop_clean_up_base(event_loop);
  239. }
  240. if (clean_up_event_loop_mem) {
  241. aws_mem_release(alloc, event_loop);
  242. }
  243. return NULL;
  244. }
  245. static void s_destroy(struct aws_event_loop *event_loop) {
  246. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: destroying event_loop", (void *)event_loop);
  247. struct kqueue_loop *impl = event_loop->impl_data;
  248. /* Stop the event-thread. This might have already happened. It's safe to call multiple times. */
  249. s_stop(event_loop);
  250. int err = s_wait_for_stop_completion(event_loop);
  251. if (err) {
  252. AWS_LOGF_WARN(
  253. AWS_LS_IO_EVENT_LOOP,
  254. "id=%p: failed to destroy event-thread, resources have been leaked",
  255. (void *)event_loop);
  256. AWS_ASSERT("Failed to destroy event-thread, resources have been leaked." == NULL);
  257. return;
  258. }
  259. /* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
  260. impl->thread_joined_to = aws_thread_current_thread_id();
  261. aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_joined_to);
  262. /* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop.
  263. * Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */
  264. aws_task_scheduler_clean_up(&impl->thread_data.scheduler); /* Tasks in scheduler get cancelled*/
  265. while (!aws_linked_list_empty(&impl->cross_thread_data.tasks_to_schedule)) {
  266. struct aws_linked_list_node *node = aws_linked_list_pop_front(&impl->cross_thread_data.tasks_to_schedule);
  267. struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
  268. task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
  269. }
  270. /* Warn user if aws_io_handle was subscribed, but never unsubscribed. This would cause memory leaks. */
  271. AWS_ASSERT(impl->thread_data.connected_handle_count == 0);
  272. /* Clean up everything else */
  273. aws_mutex_clean_up(&impl->cross_thread_data.mutex);
  274. struct kevent thread_signal_kevent;
  275. EV_SET(
  276. &thread_signal_kevent,
  277. impl->cross_thread_signal_pipe[READ_FD],
  278. EVFILT_READ /*filter*/,
  279. EV_DELETE /*flags*/,
  280. 0 /*fflags*/,
  281. 0 /*data*/,
  282. NULL /*udata*/);
  283. kevent(
  284. impl->kq_fd,
  285. &thread_signal_kevent /*changelist*/,
  286. 1 /*nchanges*/,
  287. NULL /*eventlist*/,
  288. 0 /*nevents*/,
  289. NULL /*timeout*/);
  290. close(impl->cross_thread_signal_pipe[READ_FD]);
  291. close(impl->cross_thread_signal_pipe[WRITE_FD]);
  292. close(impl->kq_fd);
  293. aws_thread_clean_up(&impl->thread_created_on);
  294. aws_mem_release(event_loop->alloc, impl);
  295. aws_event_loop_clean_up_base(event_loop);
  296. aws_mem_release(event_loop->alloc, event_loop);
  297. }
  298. static int s_run(struct aws_event_loop *event_loop) {
  299. struct kqueue_loop *impl = event_loop->impl_data;
  300. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: starting event-loop thread.", (void *)event_loop);
  301. /* to re-run, call stop() and wait_for_stop_completion() */
  302. AWS_ASSERT(impl->cross_thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
  303. AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
  304. /* Since thread isn't running it's ok to touch thread_data,
  305. * and it's ok to touch cross_thread_data without locking the mutex */
  306. impl->cross_thread_data.state = EVENT_THREAD_STATE_RUNNING;
  307. aws_thread_increment_unjoined_count();
  308. int err =
  309. aws_thread_launch(&impl->thread_created_on, aws_event_loop_thread, (void *)event_loop, &impl->thread_options);
  310. if (err) {
  311. aws_thread_decrement_unjoined_count();
  312. AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
  313. goto clean_up;
  314. }
  315. return AWS_OP_SUCCESS;
  316. clean_up:
  317. impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
  318. return AWS_OP_ERR;
  319. }
  320. /* This function can't fail, we're relying on the thread responding to critical messages (ex: stop thread) */
  321. void signal_cross_thread_data_changed(struct aws_event_loop *event_loop) {
  322. struct kqueue_loop *impl = event_loop->impl_data;
  323. AWS_LOGF_TRACE(
  324. AWS_LS_IO_EVENT_LOOP,
  325. "id=%p: signaling event-loop that cross-thread tasks need to be scheduled.",
  326. (void *)event_loop);
  327. /* Doesn't actually matter what we write, any activity on pipe signals that cross_thread_data has changed,
  328. * If the pipe is full and the write fails, that's fine, the event-thread will get the signal from some previous
  329. * write */
  330. uint32_t write_whatever = 0xC0FFEE;
  331. write(impl->cross_thread_signal_pipe[WRITE_FD], &write_whatever, sizeof(write_whatever));
  332. }
  333. static int s_stop(struct aws_event_loop *event_loop) {
  334. struct kqueue_loop *impl = event_loop->impl_data;
  335. bool signal_thread = false;
  336. { /* Begin critical section */
  337. aws_mutex_lock(&impl->cross_thread_data.mutex);
  338. if (impl->cross_thread_data.state == EVENT_THREAD_STATE_RUNNING) {
  339. impl->cross_thread_data.state = EVENT_THREAD_STATE_STOPPING;
  340. signal_thread = !impl->cross_thread_data.thread_signaled;
  341. impl->cross_thread_data.thread_signaled = true;
  342. }
  343. aws_mutex_unlock(&impl->cross_thread_data.mutex);
  344. } /* End critical section */
  345. if (signal_thread) {
  346. signal_cross_thread_data_changed(event_loop);
  347. }
  348. return AWS_OP_SUCCESS;
  349. }
  350. static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
  351. struct kqueue_loop *impl = event_loop->impl_data;
  352. #ifdef DEBUG_BUILD
  353. aws_mutex_lock(&impl->cross_thread_data.mutex);
  354. /* call stop() before wait_for_stop_completion() or you'll wait forever */
  355. AWS_ASSERT(impl->cross_thread_data.state != EVENT_THREAD_STATE_RUNNING);
  356. aws_mutex_unlock(&impl->cross_thread_data.mutex);
  357. #endif
  358. int err = aws_thread_join(&impl->thread_created_on);
  359. aws_thread_decrement_unjoined_count();
  360. if (err) {
  361. return AWS_OP_ERR;
  362. }
  363. /* Since thread is no longer running it's ok to touch thread_data,
  364. * and it's ok to touch cross_thread_data without locking the mutex */
  365. impl->cross_thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
  366. impl->thread_data.state = EVENT_THREAD_STATE_READY_TO_RUN;
  367. return AWS_OP_SUCCESS;
  368. }
  369. /* Common functionality for "now" and "future" task scheduling.
  370. * If `run_at_nanos` is zero then the task is scheduled as a "now" task. */
  371. static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
  372. AWS_ASSERT(task);
  373. struct kqueue_loop *impl = event_loop->impl_data;
  374. /* If we're on the event-thread, just schedule it directly */
  375. if (s_is_event_thread(event_loop)) {
  376. AWS_LOGF_TRACE(
  377. AWS_LS_IO_EVENT_LOOP,
  378. "id=%p: scheduling task %p in-thread for timestamp %llu",
  379. (void *)event_loop,
  380. (void *)task,
  381. (unsigned long long)run_at_nanos);
  382. if (run_at_nanos == 0) {
  383. aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
  384. } else {
  385. aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos);
  386. }
  387. return;
  388. }
  389. /* Otherwise, add it to cross_thread_data.tasks_to_schedule and signal the event-thread to process it */
  390. AWS_LOGF_TRACE(
  391. AWS_LS_IO_EVENT_LOOP,
  392. "id=%p: scheduling task %p cross-thread for timestamp %llu",
  393. (void *)event_loop,
  394. (void *)task,
  395. (unsigned long long)run_at_nanos);
  396. task->timestamp = run_at_nanos;
  397. bool should_signal_thread = false;
  398. /* Begin critical section */
  399. aws_mutex_lock(&impl->cross_thread_data.mutex);
  400. aws_linked_list_push_back(&impl->cross_thread_data.tasks_to_schedule, &task->node);
  401. /* Signal thread that cross_thread_data has changed (unless it's been signaled already) */
  402. if (!impl->cross_thread_data.thread_signaled) {
  403. should_signal_thread = true;
  404. impl->cross_thread_data.thread_signaled = true;
  405. }
  406. aws_mutex_unlock(&impl->cross_thread_data.mutex);
  407. /* End critical section */
  408. if (should_signal_thread) {
  409. signal_cross_thread_data_changed(event_loop);
  410. }
  411. }
  412. static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
  413. s_schedule_task_common(event_loop, task, 0); /* Zero is used to denote "now" tasks */
  414. }
  415. static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
  416. s_schedule_task_common(event_loop, task, run_at_nanos);
  417. }
  418. static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task) {
  419. struct kqueue_loop *kqueue_loop = event_loop->impl_data;
  420. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: cancelling task %p", (void *)event_loop, (void *)task);
  421. aws_task_scheduler_cancel_task(&kqueue_loop->thread_data.scheduler, task);
  422. }
  423. /* Scheduled task that connects aws_io_handle with the kqueue */
  424. static void s_subscribe_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
  425. (void)task;
  426. struct handle_data *handle_data = user_data;
  427. struct aws_event_loop *event_loop = handle_data->event_loop;
  428. struct kqueue_loop *impl = handle_data->event_loop->impl_data;
  429. impl->thread_data.connected_handle_count++;
  430. /* if task was cancelled, nothing to do */
  431. if (status == AWS_TASK_STATUS_CANCELED) {
  432. return;
  433. }
  434. /* If handle was unsubscribed before this task could execute, nothing to do */
  435. if (handle_data->state == HANDLE_STATE_UNSUBSCRIBED) {
  436. return;
  437. }
  438. AWS_ASSERT(handle_data->state == HANDLE_STATE_SUBSCRIBING);
  439. AWS_LOGF_TRACE(
  440. AWS_LS_IO_EVENT_LOOP, "id=%p: subscribing to events on fd %d", (void *)event_loop, handle_data->owner->data.fd);
  441. /* In order to monitor both reads and writes, kqueue requires you to add two separate kevents.
  442. * If we're adding two separate kevents, but one of those fails, we need to remove the other kevent.
  443. * Therefore we use the EV_RECEIPT flag. This causes kevent() to tell whether each EV_ADD succeeded,
  444. * rather than the usual behavior of telling us about recent events. */
  445. struct kevent changelist[2];
  446. AWS_ZERO_ARRAY(changelist);
  447. int changelist_size = 0;
  448. if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
  449. EV_SET(
  450. &changelist[changelist_size++],
  451. handle_data->owner->data.fd,
  452. EVFILT_READ /*filter*/,
  453. EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
  454. 0 /*fflags*/,
  455. 0 /*data*/,
  456. handle_data /*udata*/);
  457. }
  458. if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
  459. EV_SET(
  460. &changelist[changelist_size++],
  461. handle_data->owner->data.fd,
  462. EVFILT_WRITE /*filter*/,
  463. EV_ADD | EV_RECEIPT | EV_CLEAR /*flags*/,
  464. 0 /*fflags*/,
  465. 0 /*data*/,
  466. handle_data /*udata*/);
  467. }
  468. int num_events = kevent(
  469. impl->kq_fd,
  470. changelist /*changelist*/,
  471. changelist_size /*nchanges*/,
  472. changelist /*eventlist. It's OK to re-use the same memory for changelist input and eventlist output*/,
  473. changelist_size /*nevents*/,
  474. NULL /*timeout*/);
  475. if (num_events == -1) {
  476. goto subscribe_failed;
  477. }
  478. /* Look through results to see if any failed */
  479. for (int i = 0; i < num_events; ++i) {
  480. /* Every result should be flagged as error, that's just how EV_RECEIPT works */
  481. AWS_ASSERT(changelist[i].flags & EV_ERROR);
  482. /* If a real error occurred, .data contains the error code */
  483. if (changelist[i].data != 0) {
  484. goto subscribe_failed;
  485. }
  486. }
  487. /* Success */
  488. handle_data->state = HANDLE_STATE_SUBSCRIBED;
  489. return;
  490. subscribe_failed:
  491. AWS_LOGF_ERROR(
  492. AWS_LS_IO_EVENT_LOOP,
  493. "id=%p: failed to subscribe to events on fd %d",
  494. (void *)event_loop,
  495. handle_data->owner->data.fd);
  496. /* Remove any related kevents that succeeded */
  497. for (int i = 0; i < num_events; ++i) {
  498. if (changelist[i].data == 0) {
  499. changelist[i].flags = EV_DELETE;
  500. kevent(
  501. impl->kq_fd,
  502. &changelist[i] /*changelist*/,
  503. 1 /*nchanges*/,
  504. NULL /*eventlist*/,
  505. 0 /*nevents*/,
  506. NULL /*timeout*/);
  507. }
  508. }
  509. /* We can't return an error code because this was a scheduled task.
  510. * Notify the user of the failed subscription by passing AWS_IO_EVENT_TYPE_ERROR to the callback. */
  511. handle_data->on_event(event_loop, handle_data->owner, AWS_IO_EVENT_TYPE_ERROR, handle_data->on_event_user_data);
  512. }
  513. static int s_subscribe_to_io_events(
  514. struct aws_event_loop *event_loop,
  515. struct aws_io_handle *handle,
  516. int events,
  517. aws_event_loop_on_event_fn *on_event,
  518. void *user_data) {
  519. AWS_ASSERT(event_loop);
  520. AWS_ASSERT(handle->data.fd != -1);
  521. AWS_ASSERT(handle->additional_data == NULL);
  522. AWS_ASSERT(on_event);
  523. /* Must subscribe for read, write, or both */
  524. AWS_ASSERT(events & (AWS_IO_EVENT_TYPE_READABLE | AWS_IO_EVENT_TYPE_WRITABLE));
  525. struct handle_data *handle_data = aws_mem_calloc(event_loop->alloc, 1, sizeof(struct handle_data));
  526. if (!handle_data) {
  527. return AWS_OP_ERR;
  528. }
  529. handle_data->owner = handle;
  530. handle_data->event_loop = event_loop;
  531. handle_data->on_event = on_event;
  532. handle_data->on_event_user_data = user_data;
  533. handle_data->events_subscribed = events;
  534. handle_data->state = HANDLE_STATE_SUBSCRIBING;
  535. handle->additional_data = handle_data;
  536. /* We schedule a task to perform the actual changes to the kqueue, read on for an explanation why...
  537. *
  538. * kqueue requires separate registrations for read and write events.
  539. * If the user wants to know about both read and write, we need register once for read and once for write.
  540. * If the first registration succeeds, but the second registration fails, we need to delete the first registration.
  541. * If this all happened outside the event-thread, the successful registration's events could begin processing
  542. * in the brief window of time before the registration is deleted. */
  543. aws_task_init(&handle_data->subscribe_task, s_subscribe_task, handle_data, "kqueue_event_loop_subscribe");
  544. s_schedule_task_now(event_loop, &handle_data->subscribe_task);
  545. return AWS_OP_SUCCESS;
  546. }
  547. static void s_free_io_event_resources(void *user_data) {
  548. struct handle_data *handle_data = user_data;
  549. struct kqueue_loop *impl = handle_data->event_loop->impl_data;
  550. impl->thread_data.connected_handle_count--;
  551. aws_mem_release(handle_data->event_loop->alloc, handle_data);
  552. }
  553. static void s_clean_up_handle_data_task(struct aws_task *task, void *user_data, enum aws_task_status status) {
  554. (void)task;
  555. (void)status;
  556. struct handle_data *handle_data = user_data;
  557. s_free_io_event_resources(handle_data);
  558. }
  559. static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
  560. AWS_LOGF_TRACE(
  561. AWS_LS_IO_EVENT_LOOP, "id=%p: un-subscribing from events on fd %d", (void *)event_loop, handle->data.fd);
  562. AWS_ASSERT(handle->additional_data);
  563. struct handle_data *handle_data = handle->additional_data;
  564. struct kqueue_loop *impl = event_loop->impl_data;
  565. AWS_ASSERT(event_loop == handle_data->event_loop);
  566. /* If the handle was successfully subscribed to kqueue, then remove it. */
  567. if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
  568. struct kevent changelist[2];
  569. int changelist_size = 0;
  570. if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_READABLE) {
  571. EV_SET(
  572. &changelist[changelist_size++],
  573. handle_data->owner->data.fd,
  574. EVFILT_READ /*filter*/,
  575. EV_DELETE /*flags*/,
  576. 0 /*fflags*/,
  577. 0 /*data*/,
  578. handle_data /*udata*/);
  579. }
  580. if (handle_data->events_subscribed & AWS_IO_EVENT_TYPE_WRITABLE) {
  581. EV_SET(
  582. &changelist[changelist_size++],
  583. handle_data->owner->data.fd,
  584. EVFILT_WRITE /*filter*/,
  585. EV_DELETE /*flags*/,
  586. 0 /*fflags*/,
  587. 0 /*data*/,
  588. handle_data /*udata*/);
  589. }
  590. kevent(impl->kq_fd, changelist, changelist_size, NULL /*eventlist*/, 0 /*nevents*/, NULL /*timeout*/);
  591. }
  592. /* Schedule a task to clean up the memory. This is done in a task to prevent the following scenario:
  593. * - While processing a batch of events, some callback unsubscribes another aws_io_handle.
  594. * - One of the other events in this batch belongs to that other aws_io_handle.
  595. * - If the handle_data were already deleted, there would be an access invalid memory. */
  596. aws_task_init(
  597. &handle_data->cleanup_task, s_clean_up_handle_data_task, handle_data, "kqueue_event_loop_clean_up_handle_data");
  598. aws_event_loop_schedule_task_now(event_loop, &handle_data->cleanup_task);
  599. handle_data->state = HANDLE_STATE_UNSUBSCRIBED;
  600. handle->additional_data = NULL;
  601. return AWS_OP_SUCCESS;
  602. }
  603. static bool s_is_event_thread(struct aws_event_loop *event_loop) {
  604. struct kqueue_loop *impl = event_loop->impl_data;
  605. aws_thread_id_t *thread_id = aws_atomic_load_ptr(&impl->running_thread_id);
  606. return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
  607. }
  608. /* Called from thread.
  609. * Takes tasks from tasks_to_schedule and adds them to the scheduler. */
  610. static void s_process_tasks_to_schedule(struct aws_event_loop *event_loop, struct aws_linked_list *tasks_to_schedule) {
  611. struct kqueue_loop *impl = event_loop->impl_data;
  612. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: processing cross-thread tasks", (void *)event_loop);
  613. while (!aws_linked_list_empty(tasks_to_schedule)) {
  614. struct aws_linked_list_node *node = aws_linked_list_pop_front(tasks_to_schedule);
  615. struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
  616. AWS_LOGF_TRACE(
  617. AWS_LS_IO_EVENT_LOOP,
  618. "id=%p: task %p pulled to event-loop, scheduling now.",
  619. (void *)event_loop,
  620. (void *)task);
  621. /* Timestamp 0 is used to denote "now" tasks */
  622. if (task->timestamp == 0) {
  623. aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
  624. } else {
  625. aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, task->timestamp);
  626. }
  627. }
  628. }
  629. static void s_process_cross_thread_data(struct aws_event_loop *event_loop) {
  630. struct kqueue_loop *impl = event_loop->impl_data;
  631. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: notified of cross-thread data to process", (void *)event_loop);
  632. /* If there are tasks to schedule, grab them all out of synced_data.tasks_to_schedule.
  633. * We'll process them later, so that we minimize time spent holding the mutex. */
  634. struct aws_linked_list tasks_to_schedule;
  635. aws_linked_list_init(&tasks_to_schedule);
  636. { /* Begin critical section */
  637. aws_mutex_lock(&impl->cross_thread_data.mutex);
  638. impl->cross_thread_data.thread_signaled = false;
  639. bool initiate_stop = (impl->cross_thread_data.state == EVENT_THREAD_STATE_STOPPING) &&
  640. (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING);
  641. if (AWS_UNLIKELY(initiate_stop)) {
  642. impl->thread_data.state = EVENT_THREAD_STATE_STOPPING;
  643. }
  644. aws_linked_list_swap_contents(&impl->cross_thread_data.tasks_to_schedule, &tasks_to_schedule);
  645. aws_mutex_unlock(&impl->cross_thread_data.mutex);
  646. } /* End critical section */
  647. s_process_tasks_to_schedule(event_loop, &tasks_to_schedule);
  648. }
  649. static int s_aws_event_flags_from_kevent(struct kevent *kevent) {
  650. int event_flags = 0;
  651. if (kevent->flags & EV_ERROR) {
  652. event_flags |= AWS_IO_EVENT_TYPE_ERROR;
  653. } else if (kevent->filter == EVFILT_READ) {
  654. if (kevent->data != 0) {
  655. event_flags |= AWS_IO_EVENT_TYPE_READABLE;
  656. }
  657. if (kevent->flags & EV_EOF) {
  658. event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
  659. }
  660. } else if (kevent->filter == EVFILT_WRITE) {
  661. if (kevent->data != 0) {
  662. event_flags |= AWS_IO_EVENT_TYPE_WRITABLE;
  663. }
  664. if (kevent->flags & EV_EOF) {
  665. event_flags |= AWS_IO_EVENT_TYPE_CLOSED;
  666. }
  667. }
  668. return event_flags;
  669. }
  670. /**
  671. * This just calls kevent()
  672. *
  673. * We broke this out into its own function so that the stacktrace clearly shows
  674. * what this thread is doing. We've had a lot of cases where users think this
  675. * thread is deadlocked because it's stuck here. We want it to be clear
  676. * that it's doing nothing on purpose. It's waiting for events to happen...
  677. */
  678. AWS_NO_INLINE
  679. static int aws_event_loop_listen_for_io_events(int kq_fd, struct kevent kevents[MAX_EVENTS], struct timespec *timeout) {
  680. return kevent(kq_fd, NULL /*changelist*/, 0 /*nchanges*/, kevents /*eventlist*/, MAX_EVENTS /*nevents*/, timeout);
  681. }
  682. static void aws_event_loop_thread(void *user_data) {
  683. struct aws_event_loop *event_loop = user_data;
  684. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
  685. struct kqueue_loop *impl = event_loop->impl_data;
  686. /* set thread id to the event-loop's thread. */
  687. aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id);
  688. AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
  689. impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
  690. struct kevent kevents[MAX_EVENTS];
  691. /* A single aws_io_handle could have two separate kevents if subscribed for both read and write.
  692. * If both the read and write kevents fire in the same loop of the event-thread,
  693. * combine the event-flags and deliver them in a single callback.
  694. * This makes the kqueue_event_loop behave more like the other platform implementations. */
  695. struct handle_data *io_handle_events[MAX_EVENTS];
  696. struct timespec timeout = {
  697. .tv_sec = DEFAULT_TIMEOUT_SEC,
  698. .tv_nsec = 0,
  699. };
  700. AWS_LOGF_INFO(
  701. AWS_LS_IO_EVENT_LOOP,
  702. "id=%p: default timeout %ds, and max events to process per tick %d",
  703. (void *)event_loop,
  704. DEFAULT_TIMEOUT_SEC,
  705. MAX_EVENTS);
  706. while (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING) {
  707. int num_io_handle_events = 0;
  708. bool should_process_cross_thread_data = false;
  709. AWS_LOGF_TRACE(
  710. AWS_LS_IO_EVENT_LOOP,
  711. "id=%p: waiting for a maximum of %ds %lluns",
  712. (void *)event_loop,
  713. (int)timeout.tv_sec,
  714. (unsigned long long)timeout.tv_nsec);
  715. /* Process kqueue events */
  716. int num_kevents = aws_event_loop_listen_for_io_events(impl->kq_fd, kevents, &timeout);
  717. aws_event_loop_register_tick_start(event_loop);
  718. AWS_LOGF_TRACE(
  719. AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, num_kevents);
  720. if (num_kevents == -1) {
  721. /* Raise an error, in case this is interesting to anyone monitoring,
  722. * and continue on with this loop. We can't process events,
  723. * but we can still process scheduled tasks */
  724. aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
  725. /* Force the cross_thread_data to be processed.
  726. * There might be valuable info in there, like the message to stop the thread.
  727. * It's fine to do this even if nothing has changed, it just costs a mutex lock/unlock. */
  728. should_process_cross_thread_data = true;
  729. }
  730. for (int i = 0; i < num_kevents; ++i) {
  731. struct kevent *kevent = &kevents[i];
  732. /* Was this event to signal that cross_thread_data has changed? */
  733. if ((int)kevent->ident == impl->cross_thread_signal_pipe[READ_FD]) {
  734. should_process_cross_thread_data = true;
  735. /* Drain whatever data was written to the signaling pipe */
  736. uint32_t read_whatever;
  737. while (read((int)kevent->ident, &read_whatever, sizeof(read_whatever)) > 0) {
  738. }
  739. continue;
  740. }
  741. /* Otherwise this was a normal event on a subscribed handle. Figure out which flags to report. */
  742. int event_flags = s_aws_event_flags_from_kevent(kevent);
  743. if (event_flags == 0) {
  744. continue;
  745. }
  746. /* Combine flags, in case multiple kevents correspond to one handle. (see notes at top of function) */
  747. struct handle_data *handle_data = kevent->udata;
  748. if (handle_data->events_this_loop == 0) {
  749. io_handle_events[num_io_handle_events++] = handle_data;
  750. }
  751. handle_data->events_this_loop |= event_flags;
  752. }
  753. /* Invoke each handle's event callback (unless the handle has been unsubscribed) */
  754. for (int i = 0; i < num_io_handle_events; ++i) {
  755. struct handle_data *handle_data = io_handle_events[i];
  756. if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
  757. AWS_LOGF_TRACE(
  758. AWS_LS_IO_EVENT_LOOP,
  759. "id=%p: activity on fd %d, invoking handler.",
  760. (void *)event_loop,
  761. handle_data->owner->data.fd);
  762. handle_data->on_event(
  763. event_loop, handle_data->owner, handle_data->events_this_loop, handle_data->on_event_user_data);
  764. }
  765. handle_data->events_this_loop = 0;
  766. }
  767. /* Process cross_thread_data */
  768. if (should_process_cross_thread_data) {
  769. s_process_cross_thread_data(event_loop);
  770. }
  771. /* Run scheduled tasks */
  772. uint64_t now_ns = 0;
  773. event_loop->clock(&now_ns); /* If clock fails, now_ns will be 0 and tasks scheduled for a specific time
  774. will not be run. That's ok, we'll handle them next time around. */
  775. AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
  776. aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns);
  777. /* Set timeout for next kevent() call.
  778. * If clock fails, or scheduler has no tasks, use default timeout */
  779. bool use_default_timeout = false;
  780. int err = event_loop->clock(&now_ns);
  781. if (err) {
  782. use_default_timeout = true;
  783. }
  784. uint64_t next_run_time_ns;
  785. if (!aws_task_scheduler_has_tasks(&impl->thread_data.scheduler, &next_run_time_ns)) {
  786. use_default_timeout = true;
  787. }
  788. if (use_default_timeout) {
  789. AWS_LOGF_TRACE(
  790. AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
  791. timeout.tv_sec = DEFAULT_TIMEOUT_SEC;
  792. timeout.tv_nsec = 0;
  793. } else {
  794. /* Convert from timestamp in nanoseconds, to timeout in seconds with nanosecond remainder */
  795. uint64_t timeout_ns = next_run_time_ns > now_ns ? next_run_time_ns - now_ns : 0;
  796. uint64_t timeout_remainder_ns = 0;
  797. uint64_t timeout_sec =
  798. aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &timeout_remainder_ns);
  799. if (timeout_sec > LONG_MAX) { /* Check for overflow. On Darwin, these values are stored as longs */
  800. timeout_sec = LONG_MAX;
  801. timeout_remainder_ns = 0;
  802. }
  803. AWS_LOGF_TRACE(
  804. AWS_LS_IO_EVENT_LOOP,
  805. "id=%p: detected more scheduled tasks with the next occurring at "
  806. "%llu using timeout of %ds %lluns.",
  807. (void *)event_loop,
  808. (unsigned long long)timeout_ns,
  809. (int)timeout_sec,
  810. (unsigned long long)timeout_remainder_ns);
  811. timeout.tv_sec = (time_t)(timeout_sec);
  812. timeout.tv_nsec = (long)(timeout_remainder_ns);
  813. }
  814. aws_event_loop_register_tick_end(event_loop);
  815. }
  816. AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
  817. /* reset to NULL. This should be updated again during destroy before tasks are canceled. */
  818. aws_atomic_store_ptr(&impl->running_thread_id, NULL);
  819. }