thread_scheduler.c 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/common/clock.h>
  6. #include <aws/common/condition_variable.h>
  7. #include <aws/common/mutex.h>
  8. #include <aws/common/ref_count.h>
  9. #include <aws/common/task_scheduler.h>
  10. #include <aws/common/thread.h>
  11. #include <aws/common/thread_scheduler.h>
  12. struct aws_thread_scheduler {
  13. struct aws_allocator *allocator;
  14. struct aws_ref_count ref_count;
  15. struct aws_thread thread;
  16. struct aws_task_scheduler scheduler;
  17. struct aws_atomic_var should_exit;
  18. struct {
  19. struct aws_linked_list scheduling_queue;
  20. struct aws_linked_list cancel_queue;
  21. struct aws_mutex mutex;
  22. struct aws_condition_variable c_var;
  23. } thread_data;
  24. };
  25. struct cancellation_node {
  26. struct aws_task *task_to_cancel;
  27. struct aws_linked_list_node node;
  28. };
  29. static void s_destroy_callback(void *arg) {
  30. struct aws_thread_scheduler *scheduler = arg;
  31. aws_atomic_store_int(&scheduler->should_exit, 1u);
  32. aws_condition_variable_notify_all(&scheduler->thread_data.c_var);
  33. aws_thread_join(&scheduler->thread);
  34. aws_task_scheduler_clean_up(&scheduler->scheduler);
  35. aws_condition_variable_clean_up(&scheduler->thread_data.c_var);
  36. aws_mutex_clean_up(&scheduler->thread_data.mutex);
  37. aws_thread_clean_up(&scheduler->thread);
  38. aws_mem_release(scheduler->allocator, scheduler);
  39. }
  40. static bool s_thread_should_wake(void *arg) {
  41. struct aws_thread_scheduler *scheduler = arg;
  42. uint64_t current_time = 0;
  43. aws_high_res_clock_get_ticks(&current_time);
  44. uint64_t next_scheduled_task = 0;
  45. aws_task_scheduler_has_tasks(&scheduler->scheduler, &next_scheduled_task);
  46. return aws_atomic_load_int(&scheduler->should_exit) ||
  47. !aws_linked_list_empty(&scheduler->thread_data.scheduling_queue) ||
  48. !aws_linked_list_empty(&scheduler->thread_data.cancel_queue) || (next_scheduled_task <= current_time);
  49. }
  50. static void s_thread_fn(void *arg) {
  51. struct aws_thread_scheduler *scheduler = arg;
  52. while (!aws_atomic_load_int(&scheduler->should_exit)) {
  53. /* move tasks from the mutex protected list to the scheduler. This is because we don't want to hold the lock
  54. * for the scheduler during run_all and then try and acquire the lock from another thread to schedule something
  55. * because that potentially would block the calling thread. */
  56. struct aws_linked_list list_cpy;
  57. aws_linked_list_init(&list_cpy);
  58. struct aws_linked_list cancel_list_cpy;
  59. aws_linked_list_init(&cancel_list_cpy);
  60. AWS_FATAL_ASSERT(!aws_mutex_lock(&scheduler->thread_data.mutex) && "mutex lock failed!");
  61. aws_linked_list_swap_contents(&scheduler->thread_data.scheduling_queue, &list_cpy);
  62. aws_linked_list_swap_contents(&scheduler->thread_data.cancel_queue, &cancel_list_cpy);
  63. AWS_FATAL_ASSERT(!aws_mutex_unlock(&scheduler->thread_data.mutex) && "mutex unlock failed!");
  64. while (!aws_linked_list_empty(&list_cpy)) {
  65. struct aws_linked_list_node *node = aws_linked_list_pop_front(&list_cpy);
  66. struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
  67. if (task->timestamp) {
  68. aws_task_scheduler_schedule_future(&scheduler->scheduler, task, task->timestamp);
  69. } else {
  70. aws_task_scheduler_schedule_now(&scheduler->scheduler, task);
  71. }
  72. }
  73. /* now cancel the tasks. */
  74. while (!aws_linked_list_empty(&cancel_list_cpy)) {
  75. struct aws_linked_list_node *node = aws_linked_list_pop_front(&cancel_list_cpy);
  76. struct cancellation_node *cancellation_node = AWS_CONTAINER_OF(node, struct cancellation_node, node);
  77. aws_task_scheduler_cancel_task(&scheduler->scheduler, cancellation_node->task_to_cancel);
  78. aws_mem_release(scheduler->allocator, cancellation_node);
  79. }
  80. /* now run everything */
  81. uint64_t current_time = 0;
  82. aws_high_res_clock_get_ticks(&current_time);
  83. aws_task_scheduler_run_all(&scheduler->scheduler, current_time);
  84. uint64_t next_scheduled_task = 0;
  85. aws_task_scheduler_has_tasks(&scheduler->scheduler, &next_scheduled_task);
  86. int64_t timeout = 0;
  87. if (next_scheduled_task == UINT64_MAX) {
  88. /* at least wake up once per 30 seconds. */
  89. timeout = (int64_t)30 * (int64_t)AWS_TIMESTAMP_NANOS;
  90. } else {
  91. timeout = (int64_t)(next_scheduled_task - current_time);
  92. }
  93. if (timeout > 0) {
  94. AWS_FATAL_ASSERT(!aws_mutex_lock(&scheduler->thread_data.mutex) && "mutex lock failed!");
  95. aws_condition_variable_wait_for_pred(
  96. &scheduler->thread_data.c_var, &scheduler->thread_data.mutex, timeout, s_thread_should_wake, scheduler);
  97. AWS_FATAL_ASSERT(!aws_mutex_unlock(&scheduler->thread_data.mutex) && "mutex unlock failed!");
  98. }
  99. }
  100. }
  101. struct aws_thread_scheduler *aws_thread_scheduler_new(
  102. struct aws_allocator *allocator,
  103. const struct aws_thread_options *thread_options) {
  104. struct aws_thread_scheduler *scheduler = aws_mem_calloc(allocator, 1, sizeof(struct aws_thread_scheduler));
  105. if (!scheduler) {
  106. return NULL;
  107. }
  108. if (aws_thread_init(&scheduler->thread, allocator)) {
  109. goto clean_up;
  110. }
  111. AWS_FATAL_ASSERT(!aws_mutex_init(&scheduler->thread_data.mutex) && "mutex init failed!");
  112. AWS_FATAL_ASSERT(!aws_condition_variable_init(&scheduler->thread_data.c_var) && "condition variable init failed!");
  113. if (aws_task_scheduler_init(&scheduler->scheduler, allocator)) {
  114. goto thread_init;
  115. }
  116. scheduler->allocator = allocator;
  117. aws_atomic_init_int(&scheduler->should_exit, 0u);
  118. aws_ref_count_init(&scheduler->ref_count, scheduler, s_destroy_callback);
  119. aws_linked_list_init(&scheduler->thread_data.scheduling_queue);
  120. aws_linked_list_init(&scheduler->thread_data.cancel_queue);
  121. if (aws_thread_launch(&scheduler->thread, s_thread_fn, scheduler, thread_options)) {
  122. goto scheduler_init;
  123. }
  124. return scheduler;
  125. scheduler_init:
  126. aws_task_scheduler_clean_up(&scheduler->scheduler);
  127. thread_init:
  128. aws_condition_variable_clean_up(&scheduler->thread_data.c_var);
  129. aws_mutex_clean_up(&scheduler->thread_data.mutex);
  130. aws_thread_clean_up(&scheduler->thread);
  131. clean_up:
  132. aws_mem_release(allocator, scheduler);
  133. return NULL;
  134. }
  135. void aws_thread_scheduler_acquire(struct aws_thread_scheduler *scheduler) {
  136. aws_ref_count_acquire(&scheduler->ref_count);
  137. }
  138. void aws_thread_scheduler_release(const struct aws_thread_scheduler *scheduler) {
  139. aws_ref_count_release((struct aws_ref_count *)&scheduler->ref_count);
  140. }
  141. void aws_thread_scheduler_schedule_future(
  142. struct aws_thread_scheduler *scheduler,
  143. struct aws_task *task,
  144. uint64_t time_to_run) {
  145. task->timestamp = time_to_run;
  146. AWS_FATAL_ASSERT(!aws_mutex_lock(&scheduler->thread_data.mutex) && "mutex lock failed!");
  147. aws_linked_list_push_back(&scheduler->thread_data.scheduling_queue, &task->node);
  148. AWS_FATAL_ASSERT(!aws_mutex_unlock(&scheduler->thread_data.mutex) && "mutex unlock failed!");
  149. aws_condition_variable_notify_one(&scheduler->thread_data.c_var);
  150. }
  151. void aws_thread_scheduler_schedule_now(struct aws_thread_scheduler *scheduler, struct aws_task *task) {
  152. aws_thread_scheduler_schedule_future(scheduler, task, 0u);
  153. }
  154. void aws_thread_scheduler_cancel_task(struct aws_thread_scheduler *scheduler, struct aws_task *task) {
  155. struct cancellation_node *cancellation_node =
  156. aws_mem_calloc(scheduler->allocator, 1, sizeof(struct cancellation_node));
  157. AWS_FATAL_ASSERT(cancellation_node && "allocation failed for cancellation node!");
  158. AWS_FATAL_ASSERT(!aws_mutex_lock(&scheduler->thread_data.mutex) && "mutex lock failed!");
  159. struct aws_task *found_task = NULL;
  160. /* remove tasks that are still in the scheduling queue, but haven't made it to the scheduler yet. */
  161. struct aws_linked_list_node *node = aws_linked_list_empty(&scheduler->thread_data.scheduling_queue)
  162. ? NULL
  163. : aws_linked_list_front(&scheduler->thread_data.scheduling_queue);
  164. while (node != NULL) {
  165. struct aws_task *potential_task = AWS_CONTAINER_OF(node, struct aws_task, node);
  166. if (potential_task == task) {
  167. found_task = potential_task;
  168. break;
  169. }
  170. if (aws_linked_list_node_next_is_valid(node)) {
  171. node = aws_linked_list_next(node);
  172. } else {
  173. node = NULL;
  174. }
  175. }
  176. if (found_task) {
  177. aws_linked_list_remove(&found_task->node);
  178. }
  179. cancellation_node->task_to_cancel = task;
  180. /* regardless put it in the cancel queue so the thread can call the task with canceled status. */
  181. aws_linked_list_push_back(&scheduler->thread_data.cancel_queue, &cancellation_node->node);
  182. AWS_FATAL_ASSERT(!aws_mutex_unlock(&scheduler->thread_data.mutex) && "mutex unlock failed!");
  183. /* notify so the loop knows to wakeup and process the cancellations. */
  184. aws_condition_variable_notify_one(&scheduler->thread_data.c_var);
  185. }