TaskQueue.h 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. #pragma once
  2. #ifdef __GNUC__
  3. #pragma GCC diagnostic push
  4. #pragma GCC diagnostic ignored "-Wunused-parameter"
  5. #endif
  6. //===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===//
  7. //
  8. // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
  9. // See https://llvm.org/LICENSE.txt for license information.
  10. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  11. //
  12. //===----------------------------------------------------------------------===//
  13. //
  14. // This file defines a crude C++11 based task queue.
  15. //
  16. //===----------------------------------------------------------------------===//
  17. #ifndef LLVM_SUPPORT_TASKQUEUE_H
  18. #define LLVM_SUPPORT_TASKQUEUE_H
  19. #include "llvm/Config/llvm-config.h"
  20. #include "llvm/Support/ThreadPool.h"
  21. #include "llvm/Support/thread.h"
  22. #include <atomic>
  23. #include <cassert>
  24. #include <condition_variable>
  25. #include <deque>
  26. #include <functional>
  27. #include <future>
  28. #include <memory>
  29. #include <mutex>
  30. #include <utility>
  31. namespace llvm {
  32. /// TaskQueue executes serialized work on a user-defined Thread Pool. It
  33. /// guarantees that if task B is enqueued after task A, task B begins after
  34. /// task A completes and there is no overlap between the two.
  35. class TaskQueue {
  36. // Because we don't have init capture to use move-only local variables that
  37. // are captured into a lambda, we create the promise inside an explicit
  38. // callable struct. We want to do as much of the wrapping in the
  39. // type-specialized domain (before type erasure) and then erase this into a
  40. // std::function.
  41. template <typename Callable> struct Task {
  42. using ResultTy = std::invoke_result_t<Callable>;
  43. explicit Task(Callable C, TaskQueue &Parent)
  44. : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()),
  45. Parent(&Parent) {}
  46. template<typename T>
  47. void invokeCallbackAndSetPromise(T*) {
  48. P->set_value(C());
  49. }
  50. void invokeCallbackAndSetPromise(void*) {
  51. C();
  52. P->set_value();
  53. }
  54. void operator()() noexcept {
  55. ResultTy *Dummy = nullptr;
  56. invokeCallbackAndSetPromise(Dummy);
  57. Parent->completeTask();
  58. }
  59. Callable C;
  60. std::shared_ptr<std::promise<ResultTy>> P;
  61. TaskQueue *Parent;
  62. };
  63. public:
  64. /// Construct a task queue with no work.
  65. TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }
  66. /// Blocking destructor: the queue will wait for all work to complete.
  67. ~TaskQueue() {
  68. Scheduler.wait();
  69. assert(Tasks.empty());
  70. }
  71. /// Asynchronous submission of a task to the queue. The returned future can be
  72. /// used to wait for the task (and all previous tasks that have not yet
  73. /// completed) to finish.
  74. template <typename Callable>
  75. std::future<std::invoke_result_t<Callable>> async(Callable &&C) {
  76. #if !LLVM_ENABLE_THREADS
  77. static_assert(false,
  78. "TaskQueue requires building with LLVM_ENABLE_THREADS!");
  79. #endif
  80. Task<Callable> T{std::move(C), *this};
  81. using ResultTy = std::invoke_result_t<Callable>;
  82. std::future<ResultTy> F = T.P->get_future();
  83. {
  84. std::lock_guard<std::mutex> Lock(QueueLock);
  85. // If there's already a task in flight, just queue this one up. If
  86. // there is not a task in flight, bypass the queue and schedule this
  87. // task immediately.
  88. if (IsTaskInFlight)
  89. Tasks.push_back(std::move(T));
  90. else {
  91. Scheduler.async(std::move(T));
  92. IsTaskInFlight = true;
  93. }
  94. }
  95. return F;
  96. }
  97. private:
  98. void completeTask() {
  99. // We just completed a task. If there are no more tasks in the queue,
  100. // update IsTaskInFlight to false and stop doing work. Otherwise
  101. // schedule the next task (while not holding the lock).
  102. std::function<void()> Continuation;
  103. {
  104. std::lock_guard<std::mutex> Lock(QueueLock);
  105. if (Tasks.empty()) {
  106. IsTaskInFlight = false;
  107. return;
  108. }
  109. Continuation = std::move(Tasks.front());
  110. Tasks.pop_front();
  111. }
  112. Scheduler.async(std::move(Continuation));
  113. }
  114. /// The thread pool on which to run the work.
  115. ThreadPool &Scheduler;
  116. /// State which indicates whether the queue currently is currently processing
  117. /// any work.
  118. bool IsTaskInFlight = false;
  119. /// Mutex for synchronizing access to the Tasks array.
  120. std::mutex QueueLock;
  121. /// Tasks waiting for execution in the queue.
  122. std::deque<std::function<void()>> Tasks;
  123. };
  124. } // namespace llvm
  125. #endif // LLVM_SUPPORT_TASKQUEUE_H
  126. #ifdef __GNUC__
  127. #pragma GCC diagnostic pop
  128. #endif