task_scheduler.c 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/common/task_scheduler.h>
  6. #include <aws/common/logging.h>
  7. #include <inttypes.h>
  8. static const size_t DEFAULT_QUEUE_SIZE = 7;
  9. void aws_task_init(struct aws_task *task, aws_task_fn *fn, void *arg, const char *type_tag) {
  10. AWS_ZERO_STRUCT(*task);
  11. task->fn = fn;
  12. task->arg = arg;
  13. task->type_tag = type_tag;
  14. }
  15. const char *aws_task_status_to_c_str(enum aws_task_status status) {
  16. switch (status) {
  17. case AWS_TASK_STATUS_RUN_READY:
  18. return "<Running>";
  19. case AWS_TASK_STATUS_CANCELED:
  20. return "<Canceled>";
  21. default:
  22. return "<Unknown>";
  23. }
  24. }
  25. void aws_task_run(struct aws_task *task, enum aws_task_status status) {
  26. AWS_ASSERT(task->fn);
  27. AWS_LOGF_DEBUG(
  28. AWS_LS_COMMON_TASK_SCHEDULER,
  29. "id=%p: Running %s task with %s status",
  30. (void *)task,
  31. task->type_tag,
  32. aws_task_status_to_c_str(status));
  33. task->abi_extension.scheduled = false;
  34. task->fn(task, task->arg, status);
  35. }
  36. static int s_compare_timestamps(const void *a, const void *b) {
  37. uint64_t a_time = (*(struct aws_task **)a)->timestamp;
  38. uint64_t b_time = (*(struct aws_task **)b)->timestamp;
  39. return a_time > b_time; /* min-heap */
  40. }
  41. static void s_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time, enum aws_task_status status);
  42. int aws_task_scheduler_init(struct aws_task_scheduler *scheduler, struct aws_allocator *alloc) {
  43. AWS_ASSERT(alloc);
  44. AWS_ZERO_STRUCT(*scheduler);
  45. if (aws_priority_queue_init_dynamic(
  46. &scheduler->timed_queue, alloc, DEFAULT_QUEUE_SIZE, sizeof(struct aws_task *), &s_compare_timestamps)) {
  47. return AWS_OP_ERR;
  48. };
  49. scheduler->alloc = alloc;
  50. aws_linked_list_init(&scheduler->timed_list);
  51. aws_linked_list_init(&scheduler->asap_list);
  52. AWS_POSTCONDITION(aws_task_scheduler_is_valid(scheduler));
  53. return AWS_OP_SUCCESS;
  54. }
  55. void aws_task_scheduler_clean_up(struct aws_task_scheduler *scheduler) {
  56. AWS_ASSERT(scheduler);
  57. if (aws_task_scheduler_is_valid(scheduler)) {
  58. /* Execute all remaining tasks as CANCELED.
  59. * Do this in a loop so that tasks scheduled by other tasks are executed */
  60. while (aws_task_scheduler_has_tasks(scheduler, NULL)) {
  61. s_run_all(scheduler, UINT64_MAX, AWS_TASK_STATUS_CANCELED);
  62. }
  63. }
  64. aws_priority_queue_clean_up(&scheduler->timed_queue);
  65. AWS_ZERO_STRUCT(*scheduler);
  66. }
  67. bool aws_task_scheduler_is_valid(const struct aws_task_scheduler *scheduler) {
  68. return scheduler && scheduler->alloc && aws_priority_queue_is_valid(&scheduler->timed_queue) &&
  69. aws_linked_list_is_valid(&scheduler->asap_list) && aws_linked_list_is_valid(&scheduler->timed_list);
  70. }
  71. bool aws_task_scheduler_has_tasks(const struct aws_task_scheduler *scheduler, uint64_t *next_task_time) {
  72. AWS_ASSERT(scheduler);
  73. uint64_t timestamp = UINT64_MAX;
  74. bool has_tasks = false;
  75. if (!aws_linked_list_empty(&scheduler->asap_list)) {
  76. timestamp = 0;
  77. has_tasks = true;
  78. } else {
  79. /* Check whether timed_list or timed_queue has the earlier task */
  80. if (AWS_UNLIKELY(!aws_linked_list_empty(&scheduler->timed_list))) {
  81. struct aws_linked_list_node *node = aws_linked_list_front(&scheduler->timed_list);
  82. struct aws_task *task = AWS_CONTAINER_OF(node, struct aws_task, node);
  83. timestamp = task->timestamp;
  84. has_tasks = true;
  85. }
  86. struct aws_task **task_ptrptr = NULL;
  87. if (aws_priority_queue_top(&scheduler->timed_queue, (void **)&task_ptrptr) == AWS_OP_SUCCESS) {
  88. if ((*task_ptrptr)->timestamp < timestamp) {
  89. timestamp = (*task_ptrptr)->timestamp;
  90. }
  91. has_tasks = true;
  92. }
  93. }
  94. if (next_task_time) {
  95. *next_task_time = timestamp;
  96. }
  97. return has_tasks;
  98. }
  99. void aws_task_scheduler_schedule_now(struct aws_task_scheduler *scheduler, struct aws_task *task) {
  100. AWS_ASSERT(scheduler);
  101. AWS_ASSERT(task);
  102. AWS_ASSERT(task->fn);
  103. AWS_LOGF_DEBUG(
  104. AWS_LS_COMMON_TASK_SCHEDULER,
  105. "id=%p: Scheduling %s task for immediate execution",
  106. (void *)task,
  107. task->type_tag);
  108. task->priority_queue_node.current_index = SIZE_MAX;
  109. aws_linked_list_node_reset(&task->node);
  110. task->timestamp = 0;
  111. aws_linked_list_push_back(&scheduler->asap_list, &task->node);
  112. task->abi_extension.scheduled = true;
  113. }
  114. void aws_task_scheduler_schedule_future(
  115. struct aws_task_scheduler *scheduler,
  116. struct aws_task *task,
  117. uint64_t time_to_run) {
  118. AWS_ASSERT(scheduler);
  119. AWS_ASSERT(task);
  120. AWS_ASSERT(task->fn);
  121. AWS_LOGF_DEBUG(
  122. AWS_LS_COMMON_TASK_SCHEDULER,
  123. "id=%p: Scheduling %s task for future execution at time %" PRIu64,
  124. (void *)task,
  125. task->type_tag,
  126. time_to_run);
  127. task->timestamp = time_to_run;
  128. task->priority_queue_node.current_index = SIZE_MAX;
  129. aws_linked_list_node_reset(&task->node);
  130. int err = aws_priority_queue_push_ref(&scheduler->timed_queue, &task, &task->priority_queue_node);
  131. if (AWS_UNLIKELY(err)) {
  132. /* In the (very unlikely) case that we can't push into the timed_queue,
  133. * perform a sorted insertion into timed_list. */
  134. struct aws_linked_list_node *node_i;
  135. for (node_i = aws_linked_list_begin(&scheduler->timed_list);
  136. node_i != aws_linked_list_end(&scheduler->timed_list);
  137. node_i = aws_linked_list_next(node_i)) {
  138. struct aws_task *task_i = AWS_CONTAINER_OF(node_i, struct aws_task, node);
  139. if (task_i->timestamp > time_to_run) {
  140. break;
  141. }
  142. }
  143. aws_linked_list_insert_before(node_i, &task->node);
  144. }
  145. task->abi_extension.scheduled = true;
  146. }
  147. void aws_task_scheduler_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time) {
  148. AWS_ASSERT(scheduler);
  149. s_run_all(scheduler, current_time, AWS_TASK_STATUS_RUN_READY);
  150. }
  151. static void s_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time, enum aws_task_status status) {
  152. /* Move scheduled tasks to running_list before executing.
  153. * This gives us the desired behavior that: if executing a task results in another task being scheduled,
  154. * that new task is not executed until the next time run() is invoked. */
  155. struct aws_linked_list running_list;
  156. aws_linked_list_init(&running_list);
  157. /* First move everything from asap_list */
  158. aws_linked_list_swap_contents(&running_list, &scheduler->asap_list);
  159. /* Next move tasks from timed_queue and timed_list, based on whichever's next-task is sooner.
  160. * It's very unlikely that any tasks are in timed_list, so once it has no more valid tasks,
  161. * break out of this complex loop in favor of a simpler one. */
  162. while (AWS_UNLIKELY(!aws_linked_list_empty(&scheduler->timed_list))) {
  163. struct aws_linked_list_node *timed_list_node = aws_linked_list_begin(&scheduler->timed_list);
  164. struct aws_task *timed_list_task = AWS_CONTAINER_OF(timed_list_node, struct aws_task, node);
  165. if (timed_list_task->timestamp > current_time) {
  166. /* timed_list is out of valid tasks, break out of complex loop */
  167. break;
  168. }
  169. /* Check if timed_queue has a task which is sooner */
  170. struct aws_task **timed_queue_task_ptrptr = NULL;
  171. if (aws_priority_queue_top(&scheduler->timed_queue, (void **)&timed_queue_task_ptrptr) == AWS_OP_SUCCESS) {
  172. if ((*timed_queue_task_ptrptr)->timestamp <= current_time) {
  173. if ((*timed_queue_task_ptrptr)->timestamp < timed_list_task->timestamp) {
  174. /* Take task from timed_queue */
  175. struct aws_task *timed_queue_task;
  176. aws_priority_queue_pop(&scheduler->timed_queue, &timed_queue_task);
  177. aws_linked_list_push_back(&running_list, &timed_queue_task->node);
  178. continue;
  179. }
  180. }
  181. }
  182. /* Take task from timed_list */
  183. aws_linked_list_pop_front(&scheduler->timed_list);
  184. aws_linked_list_push_back(&running_list, &timed_list_task->node);
  185. }
  186. /* Simpler loop that moves remaining valid tasks from timed_queue */
  187. struct aws_task **timed_queue_task_ptrptr = NULL;
  188. while (aws_priority_queue_top(&scheduler->timed_queue, (void **)&timed_queue_task_ptrptr) == AWS_OP_SUCCESS) {
  189. if ((*timed_queue_task_ptrptr)->timestamp > current_time) {
  190. break;
  191. }
  192. struct aws_task *next_timed_task;
  193. aws_priority_queue_pop(&scheduler->timed_queue, &next_timed_task);
  194. aws_linked_list_push_back(&running_list, &next_timed_task->node);
  195. }
  196. /* Run tasks */
  197. while (!aws_linked_list_empty(&running_list)) {
  198. struct aws_linked_list_node *task_node = aws_linked_list_pop_front(&running_list);
  199. struct aws_task *task = AWS_CONTAINER_OF(task_node, struct aws_task, node);
  200. aws_task_run(task, status);
  201. }
  202. }
  203. void aws_task_scheduler_cancel_task(struct aws_task_scheduler *scheduler, struct aws_task *task) {
  204. /* attempt the linked lists first since those will be faster access and more likely to occur
  205. * anyways.
  206. */
  207. if (task->node.next) {
  208. aws_linked_list_remove(&task->node);
  209. } else if (task->abi_extension.scheduled) {
  210. aws_priority_queue_remove(&scheduler->timed_queue, &task, &task->priority_queue_node);
  211. }
  212. /*
  213. * No need to log cancellation specially; it will get logged during the run call with the canceled status
  214. */
  215. aws_task_run(task, AWS_TASK_STATUS_CANCELED);
  216. }