Parallel.cpp 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
  2. //
  3. // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
  4. // See https://llvm.org/LICENSE.txt for license information.
  5. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  6. //
  7. //===----------------------------------------------------------------------===//
  8. #include "llvm/Support/Parallel.h"
  9. #include "llvm/Config/llvm-config.h"
  10. #include "llvm/Support/ManagedStatic.h"
  11. #include "llvm/Support/Threading.h"
  12. #include <atomic>
  13. #include <future>
  14. #include <stack>
  15. #include <thread>
  16. #include <vector>
  17. llvm::ThreadPoolStrategy llvm::parallel::strategy;
  18. #if LLVM_ENABLE_THREADS
  19. namespace llvm {
  20. namespace parallel {
  21. namespace detail {
  22. namespace {
  23. /// An abstract class that takes closures and runs them asynchronously.
  24. class Executor {
  25. public:
  26. virtual ~Executor() = default;
  27. virtual void add(std::function<void()> func) = 0;
  28. static Executor *getDefaultExecutor();
  29. };
  30. /// An implementation of an Executor that runs closures on a thread pool
  31. /// in filo order.
  32. class ThreadPoolExecutor : public Executor {
  33. public:
  34. explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
  35. unsigned ThreadCount = S.compute_thread_count();
  36. // Spawn all but one of the threads in another thread as spawning threads
  37. // can take a while.
  38. Threads.reserve(ThreadCount);
  39. Threads.resize(1);
  40. std::lock_guard<std::mutex> Lock(Mutex);
  41. Threads[0] = std::thread([this, ThreadCount, S] {
  42. for (unsigned I = 1; I < ThreadCount; ++I) {
  43. Threads.emplace_back([=] { work(S, I); });
  44. if (Stop)
  45. break;
  46. }
  47. ThreadsCreated.set_value();
  48. work(S, 0);
  49. });
  50. }
  51. void stop() {
  52. {
  53. std::lock_guard<std::mutex> Lock(Mutex);
  54. if (Stop)
  55. return;
  56. Stop = true;
  57. }
  58. Cond.notify_all();
  59. ThreadsCreated.get_future().wait();
  60. }
  61. ~ThreadPoolExecutor() override {
  62. stop();
  63. std::thread::id CurrentThreadId = std::this_thread::get_id();
  64. for (std::thread &T : Threads)
  65. if (T.get_id() == CurrentThreadId)
  66. T.detach();
  67. else
  68. T.join();
  69. }
  70. struct Creator {
  71. static void *call() { return new ThreadPoolExecutor(strategy); }
  72. };
  73. struct Deleter {
  74. static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
  75. };
  76. void add(std::function<void()> F) override {
  77. {
  78. std::lock_guard<std::mutex> Lock(Mutex);
  79. WorkStack.push(F);
  80. }
  81. Cond.notify_one();
  82. }
  83. private:
  84. void work(ThreadPoolStrategy S, unsigned ThreadID) {
  85. S.apply_thread_strategy(ThreadID);
  86. while (true) {
  87. std::unique_lock<std::mutex> Lock(Mutex);
  88. Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
  89. if (Stop)
  90. break;
  91. auto Task = WorkStack.top();
  92. WorkStack.pop();
  93. Lock.unlock();
  94. Task();
  95. }
  96. }
  97. std::atomic<bool> Stop{false};
  98. std::stack<std::function<void()>> WorkStack;
  99. std::mutex Mutex;
  100. std::condition_variable Cond;
  101. std::promise<void> ThreadsCreated;
  102. std::vector<std::thread> Threads;
  103. };
  104. Executor *Executor::getDefaultExecutor() {
  105. // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
  106. // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
  107. // stops the thread pool and waits for any worker thread creation to complete
  108. // but does not wait for the threads to finish. The wait for worker thread
  109. // creation to complete is important as it prevents intermittent crashes on
  110. // Windows due to a race condition between thread creation and process exit.
  111. //
  112. // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
  113. // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
  114. // destructor ensures it has been stopped and waits for worker threads to
  115. // finish. The wait is important as it prevents intermittent crashes on
  116. // Windows when the process is doing a full exit.
  117. //
  118. // The Windows crashes appear to only occur with the MSVC static runtimes and
  119. // are more frequent with the debug static runtime.
  120. //
  121. // This also prevents intermittent deadlocks on exit with the MinGW runtime.
  122. static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
  123. ThreadPoolExecutor::Deleter>
  124. ManagedExec;
  125. static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
  126. return Exec.get();
  127. }
  128. } // namespace
  129. static std::atomic<int> TaskGroupInstances;
  130. // Latch::sync() called by the dtor may cause one thread to block. If is a dead
  131. // lock if all threads in the default executor are blocked. To prevent the dead
  132. // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario
  133. // of nested parallel_for_each(), only the outermost one runs parallelly.
  134. TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {}
  135. TaskGroup::~TaskGroup() {
  136. // We must ensure that all the workloads have finished before decrementing the
  137. // instances count.
  138. L.sync();
  139. --TaskGroupInstances;
  140. }
  141. void TaskGroup::spawn(std::function<void()> F) {
  142. if (Parallel) {
  143. L.inc();
  144. Executor::getDefaultExecutor()->add([&, F] {
  145. F();
  146. L.dec();
  147. });
  148. } else {
  149. F();
  150. }
  151. }
  152. } // namespace detail
  153. } // namespace parallel
  154. } // namespace llvm
  155. #endif // LLVM_ENABLE_THREADS
  156. void llvm::parallelForEachN(size_t Begin, size_t End,
  157. llvm::function_ref<void(size_t)> Fn) {
  158. // If we have zero or one items, then do not incur the overhead of spinning up
  159. // a task group. They are surprisingly expensive, and because they do not
  160. // support nested parallelism, a single entry task group can block parallel
  161. // execution underneath them.
  162. #if LLVM_ENABLE_THREADS
  163. auto NumItems = End - Begin;
  164. if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) {
  165. // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
  166. // overhead on large inputs.
  167. auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
  168. if (TaskSize == 0)
  169. TaskSize = 1;
  170. parallel::detail::TaskGroup TG;
  171. for (; Begin + TaskSize < End; Begin += TaskSize) {
  172. TG.spawn([=, &Fn] {
  173. for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
  174. Fn(I);
  175. });
  176. }
  177. for (; Begin != End; ++Begin)
  178. Fn(Begin);
  179. return;
  180. }
  181. #endif
  182. for (; Begin != End; ++Begin)
  183. Fn(Begin);
  184. }