ThreadPool.h 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #pragma once
  2. #ifdef __GNUC__
  3. #pragma GCC diagnostic push
  4. #pragma GCC diagnostic ignored "-Wunused-parameter"
  5. #endif
  6. //===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===//
  7. //
  8. // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
  9. // See https://llvm.org/LICENSE.txt for license information.
  10. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  11. //
  12. //===----------------------------------------------------------------------===//
  13. //
  14. // This file defines a crude C++11 based thread pool.
  15. //
  16. //===----------------------------------------------------------------------===//
  17. #ifndef LLVM_SUPPORT_THREADPOOL_H
  18. #define LLVM_SUPPORT_THREADPOOL_H
  19. #include "llvm/Config/llvm-config.h"
  20. #include "llvm/Support/Threading.h"
  21. #include "llvm/Support/thread.h"
  22. #include <future>
  23. #include <condition_variable>
  24. #include <functional>
  25. #include <memory>
  26. #include <mutex>
  27. #include <queue>
  28. #include <utility>
  29. namespace llvm {
  30. /// A ThreadPool for asynchronous parallel execution on a defined number of
  31. /// threads.
  32. ///
  33. /// The pool keeps a vector of threads alive, waiting on a condition variable
  34. /// for some work to become available.
  35. class ThreadPool {
  36. public:
  37. /// Construct a pool using the hardware strategy \p S for mapping hardware
  38. /// execution resources (threads, cores, CPUs)
  39. /// Defaults to using the maximum execution resources in the system, but
  40. /// accounting for the affinity mask.
  41. ThreadPool(ThreadPoolStrategy S = hardware_concurrency());
  42. /// Blocking destructor: the pool will wait for all the threads to complete.
  43. ~ThreadPool();
  44. /// Asynchronous submission of a task to the pool. The returned future can be
  45. /// used to wait for the task to finish and is *non-blocking* on destruction.
  46. template <typename Function, typename... Args>
  47. inline auto async(Function &&F, Args &&...ArgList) {
  48. auto Task =
  49. std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...);
  50. return async(std::move(Task));
  51. }
  52. /// Asynchronous submission of a task to the pool. The returned future can be
  53. /// used to wait for the task to finish and is *non-blocking* on destruction.
  54. template <typename Func>
  55. auto async(Func &&F) -> std::shared_future<decltype(F())> {
  56. return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)));
  57. }
  58. /// Blocking wait for all the threads to complete and the queue to be empty.
  59. /// It is an error to try to add new tasks while blocking on this call.
  60. void wait();
  61. // TODO: misleading legacy name warning!
  62. // Returns the maximum number of worker threads in the pool, not the current
  63. // number of threads!
  64. unsigned getThreadCount() const { return MaxThreadCount; }
  65. /// Returns true if the current thread is a worker thread of this thread pool.
  66. bool isWorkerThread() const;
  67. private:
  68. /// Helpers to create a promise and a callable wrapper of \p Task that sets
  69. /// the result of the promise. Returns the callable and a future to access the
  70. /// result.
  71. template <typename ResTy>
  72. static std::pair<std::function<void()>, std::future<ResTy>>
  73. createTaskAndFuture(std::function<ResTy()> Task) {
  74. std::shared_ptr<std::promise<ResTy>> Promise =
  75. std::make_shared<std::promise<ResTy>>();
  76. auto F = Promise->get_future();
  77. return {
  78. [Promise = std::move(Promise), Task]() { Promise->set_value(Task()); },
  79. std::move(F)};
  80. }
  81. static std::pair<std::function<void()>, std::future<void>>
  82. createTaskAndFuture(std::function<void()> Task) {
  83. std::shared_ptr<std::promise<void>> Promise =
  84. std::make_shared<std::promise<void>>();
  85. auto F = Promise->get_future();
  86. return {[Promise = std::move(Promise), Task]() {
  87. Task();
  88. Promise->set_value();
  89. },
  90. std::move(F)};
  91. }
  92. bool workCompletedUnlocked() { return !ActiveThreads && Tasks.empty(); }
  93. /// Asynchronous submission of a task to the pool. The returned future can be
  94. /// used to wait for the task to finish and is *non-blocking* on destruction.
  95. template <typename ResTy>
  96. std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task) {
  97. #if LLVM_ENABLE_THREADS
  98. /// Wrap the Task in a std::function<void()> that sets the result of the
  99. /// corresponding future.
  100. auto R = createTaskAndFuture(Task);
  101. int requestedThreads;
  102. {
  103. // Lock the queue and push the new task
  104. std::unique_lock<std::mutex> LockGuard(QueueLock);
  105. // Don't allow enqueueing after disabling the pool
  106. assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
  107. Tasks.push(std::move(R.first));
  108. requestedThreads = ActiveThreads + Tasks.size();
  109. }
  110. QueueCondition.notify_one();
  111. grow(requestedThreads);
  112. return R.second.share();
  113. #else // LLVM_ENABLE_THREADS Disabled
  114. // Get a Future with launch::deferred execution using std::async
  115. auto Future = std::async(std::launch::deferred, std::move(Task)).share();
  116. // Wrap the future so that both ThreadPool::wait() can operate and the
  117. // returned future can be sync'ed on.
  118. Tasks.push([Future]() { Future.get(); });
  119. return Future;
  120. #endif
  121. }
  122. #if LLVM_ENABLE_THREADS
  123. // Grow to ensure that we have at least `requested` Threads, but do not go
  124. // over MaxThreadCount.
  125. void grow(int requested);
  126. #endif
  127. /// Threads in flight
  128. std::vector<llvm::thread> Threads;
  129. /// Lock protecting access to the Threads vector.
  130. mutable std::mutex ThreadsLock;
  131. /// Tasks waiting for execution in the pool.
  132. std::queue<std::function<void()>> Tasks;
  133. /// Locking and signaling for accessing the Tasks queue.
  134. std::mutex QueueLock;
  135. std::condition_variable QueueCondition;
  136. /// Signaling for job completion
  137. std::condition_variable CompletionCondition;
  138. /// Keep track of the number of thread actually busy
  139. unsigned ActiveThreads = 0;
  140. #if LLVM_ENABLE_THREADS // avoids warning for unused variable
  141. /// Signal for the destruction of the pool, asking thread to exit.
  142. bool EnableFlag = true;
  143. #endif
  144. const ThreadPoolStrategy Strategy;
  145. /// Maximum number of threads to potentially grow this pool to.
  146. const unsigned MaxThreadCount;
  147. };
  148. }
  149. #endif // LLVM_SUPPORT_THREADPOOL_H
  150. #ifdef __GNUC__
  151. #pragma GCC diagnostic pop
  152. #endif