Parallel.cpp 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
  2. //
  3. // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
  4. // See https://llvm.org/LICENSE.txt for license information.
  5. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  6. //
  7. //===----------------------------------------------------------------------===//
  8. #include "llvm/Support/Parallel.h"
  9. #include "llvm/Config/llvm-config.h"
  10. #include "llvm/Support/ManagedStatic.h"
  11. #include "llvm/Support/Threading.h"
  12. #include <atomic>
  13. #include <future>
  14. #include <stack>
  15. #include <thread>
  16. #include <vector>
  17. llvm::ThreadPoolStrategy llvm::parallel::strategy;
  18. namespace llvm {
  19. namespace parallel {
  20. #if LLVM_ENABLE_THREADS
  21. #ifdef _WIN32
  22. static thread_local unsigned threadIndex;
  23. unsigned getThreadIndex() { return threadIndex; }
  24. #else
  25. thread_local unsigned threadIndex;
  26. #endif
  27. namespace detail {
  28. namespace {
  29. /// An abstract class that takes closures and runs them asynchronously.
  30. class Executor {
  31. public:
  32. virtual ~Executor() = default;
  33. virtual void add(std::function<void()> func) = 0;
  34. static Executor *getDefaultExecutor();
  35. };
  36. /// An implementation of an Executor that runs closures on a thread pool
  37. /// in filo order.
  38. class ThreadPoolExecutor : public Executor {
  39. public:
  40. explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
  41. unsigned ThreadCount = S.compute_thread_count();
  42. // Spawn all but one of the threads in another thread as spawning threads
  43. // can take a while.
  44. Threads.reserve(ThreadCount);
  45. Threads.resize(1);
  46. std::lock_guard<std::mutex> Lock(Mutex);
  47. Threads[0] = std::thread([this, ThreadCount, S] {
  48. for (unsigned I = 1; I < ThreadCount; ++I) {
  49. Threads.emplace_back([=] { work(S, I); });
  50. if (Stop)
  51. break;
  52. }
  53. ThreadsCreated.set_value();
  54. work(S, 0);
  55. });
  56. }
  57. void stop() {
  58. {
  59. std::lock_guard<std::mutex> Lock(Mutex);
  60. if (Stop)
  61. return;
  62. Stop = true;
  63. }
  64. Cond.notify_all();
  65. ThreadsCreated.get_future().wait();
  66. }
  67. ~ThreadPoolExecutor() override {
  68. stop();
  69. std::thread::id CurrentThreadId = std::this_thread::get_id();
  70. for (std::thread &T : Threads)
  71. if (T.get_id() == CurrentThreadId)
  72. T.detach();
  73. else
  74. T.join();
  75. }
  76. struct Creator {
  77. static void *call() { return new ThreadPoolExecutor(strategy); }
  78. };
  79. struct Deleter {
  80. static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
  81. };
  82. void add(std::function<void()> F) override {
  83. {
  84. std::lock_guard<std::mutex> Lock(Mutex);
  85. WorkStack.push(std::move(F));
  86. }
  87. Cond.notify_one();
  88. }
  89. private:
  90. void work(ThreadPoolStrategy S, unsigned ThreadID) {
  91. threadIndex = ThreadID;
  92. S.apply_thread_strategy(ThreadID);
  93. while (true) {
  94. std::unique_lock<std::mutex> Lock(Mutex);
  95. Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
  96. if (Stop)
  97. break;
  98. auto Task = std::move(WorkStack.top());
  99. WorkStack.pop();
  100. Lock.unlock();
  101. Task();
  102. }
  103. }
  104. std::atomic<bool> Stop{false};
  105. std::stack<std::function<void()>> WorkStack;
  106. std::mutex Mutex;
  107. std::condition_variable Cond;
  108. std::promise<void> ThreadsCreated;
  109. std::vector<std::thread> Threads;
  110. };
  111. Executor *Executor::getDefaultExecutor() {
  112. // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
  113. // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
  114. // stops the thread pool and waits for any worker thread creation to complete
  115. // but does not wait for the threads to finish. The wait for worker thread
  116. // creation to complete is important as it prevents intermittent crashes on
  117. // Windows due to a race condition between thread creation and process exit.
  118. //
  119. // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
  120. // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
  121. // destructor ensures it has been stopped and waits for worker threads to
  122. // finish. The wait is important as it prevents intermittent crashes on
  123. // Windows when the process is doing a full exit.
  124. //
  125. // The Windows crashes appear to only occur with the MSVC static runtimes and
  126. // are more frequent with the debug static runtime.
  127. //
  128. // This also prevents intermittent deadlocks on exit with the MinGW runtime.
  129. static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
  130. ThreadPoolExecutor::Deleter>
  131. ManagedExec;
  132. static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
  133. return Exec.get();
  134. }
  135. } // namespace
  136. } // namespace detail
  137. #endif
  138. static std::atomic<int> TaskGroupInstances;
  139. // Latch::sync() called by the dtor may cause one thread to block. If is a dead
  140. // lock if all threads in the default executor are blocked. To prevent the dead
  141. // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario
  142. // of nested parallel_for_each(), only the outermost one runs parallelly.
  143. TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {}
  144. TaskGroup::~TaskGroup() {
  145. // We must ensure that all the workloads have finished before decrementing the
  146. // instances count.
  147. L.sync();
  148. --TaskGroupInstances;
  149. }
  150. void TaskGroup::spawn(std::function<void()> F) {
  151. #if LLVM_ENABLE_THREADS
  152. if (Parallel) {
  153. L.inc();
  154. detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
  155. F();
  156. L.dec();
  157. });
  158. return;
  159. }
  160. #endif
  161. F();
  162. }
  163. void TaskGroup::execute(std::function<void()> F) {
  164. if (parallel::strategy.ThreadsRequested == 1)
  165. F();
  166. else
  167. spawn(F);
  168. }
  169. } // namespace parallel
  170. } // namespace llvm
  171. void llvm::parallelFor(size_t Begin, size_t End,
  172. llvm::function_ref<void(size_t)> Fn) {
  173. // If we have zero or one items, then do not incur the overhead of spinning up
  174. // a task group. They are surprisingly expensive, and because they do not
  175. // support nested parallelism, a single entry task group can block parallel
  176. // execution underneath them.
  177. #if LLVM_ENABLE_THREADS
  178. auto NumItems = End - Begin;
  179. if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) {
  180. // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
  181. // overhead on large inputs.
  182. auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
  183. if (TaskSize == 0)
  184. TaskSize = 1;
  185. parallel::TaskGroup TG;
  186. for (; Begin + TaskSize < End; Begin += TaskSize) {
  187. TG.spawn([=, &Fn] {
  188. for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
  189. Fn(I);
  190. });
  191. }
  192. if (Begin != End) {
  193. TG.spawn([=, &Fn] {
  194. for (size_t I = Begin; I != End; ++I)
  195. Fn(I);
  196. });
  197. }
  198. return;
  199. }
  200. #endif
  201. for (; Begin != End; ++Begin)
  202. Fn(Begin);
  203. }