ThreadPool.cpp 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==//
  2. //
  3. // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
  4. // See https://llvm.org/LICENSE.txt for license information.
  5. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
  6. //
  7. //===----------------------------------------------------------------------===//
  8. //
  9. // This file implements a crude C++11 based thread pool.
  10. //
  11. //===----------------------------------------------------------------------===//
  12. #include "llvm/Support/ThreadPool.h"
  13. #include "llvm/Config/llvm-config.h"
  14. #if LLVM_ENABLE_THREADS
  15. #include "llvm/Support/Threading.h"
  16. #else
  17. #include "llvm/Support/raw_ostream.h"
  18. #endif
  19. using namespace llvm;
  20. #if LLVM_ENABLE_THREADS
  21. ThreadPool::ThreadPool(ThreadPoolStrategy S)
  22. : Strategy(S), MaxThreadCount(S.compute_thread_count()) {}
  23. void ThreadPool::grow(int requested) {
  24. std::unique_lock<std::mutex> LockGuard(ThreadsLock);
  25. if (Threads.size() >= MaxThreadCount)
  26. return; // Already hit the max thread pool size.
  27. int newThreadCount = std::min<int>(requested, MaxThreadCount);
  28. while (static_cast<int>(Threads.size()) < newThreadCount) {
  29. int ThreadID = Threads.size();
  30. Threads.emplace_back([this, ThreadID] {
  31. Strategy.apply_thread_strategy(ThreadID);
  32. while (true) {
  33. std::function<void()> Task;
  34. {
  35. std::unique_lock<std::mutex> LockGuard(QueueLock);
  36. // Wait for tasks to be pushed in the queue
  37. QueueCondition.wait(LockGuard,
  38. [&] { return !EnableFlag || !Tasks.empty(); });
  39. // Exit condition
  40. if (!EnableFlag && Tasks.empty())
  41. return;
  42. // Yeah, we have a task, grab it and release the lock on the queue
  43. // We first need to signal that we are active before popping the queue
  44. // in order for wait() to properly detect that even if the queue is
  45. // empty, there is still a task in flight.
  46. ++ActiveThreads;
  47. Task = std::move(Tasks.front());
  48. Tasks.pop();
  49. }
  50. // Run the task we just grabbed
  51. Task();
  52. bool Notify;
  53. {
  54. // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
  55. std::lock_guard<std::mutex> LockGuard(QueueLock);
  56. --ActiveThreads;
  57. Notify = workCompletedUnlocked();
  58. }
  59. // Notify task completion if this is the last active thread, in case
  60. // someone waits on ThreadPool::wait().
  61. if (Notify)
  62. CompletionCondition.notify_all();
  63. }
  64. });
  65. }
  66. }
  67. void ThreadPool::wait() {
  68. // Wait for all threads to complete and the queue to be empty
  69. std::unique_lock<std::mutex> LockGuard(QueueLock);
  70. CompletionCondition.wait(LockGuard, [&] { return workCompletedUnlocked(); });
  71. }
  72. bool ThreadPool::isWorkerThread() const {
  73. std::unique_lock<std::mutex> LockGuard(ThreadsLock);
  74. llvm::thread::id CurrentThreadId = llvm::this_thread::get_id();
  75. for (const llvm::thread &Thread : Threads)
  76. if (CurrentThreadId == Thread.get_id())
  77. return true;
  78. return false;
  79. }
  80. // The destructor joins all threads, waiting for completion.
  81. ThreadPool::~ThreadPool() {
  82. {
  83. std::unique_lock<std::mutex> LockGuard(QueueLock);
  84. EnableFlag = false;
  85. }
  86. QueueCondition.notify_all();
  87. std::unique_lock<std::mutex> LockGuard(ThreadsLock);
  88. for (auto &Worker : Threads)
  89. Worker.join();
  90. }
  91. #else // LLVM_ENABLE_THREADS Disabled
  92. // No threads are launched, issue a warning if ThreadCount is not 0
  93. ThreadPool::ThreadPool(ThreadPoolStrategy S) : MaxThreadCount(1) {
  94. int ThreadCount = S.compute_thread_count();
  95. if (ThreadCount != 1) {
  96. errs() << "Warning: request a ThreadPool with " << ThreadCount
  97. << " threads, but LLVM_ENABLE_THREADS has been turned off\n";
  98. }
  99. }
  100. void ThreadPool::wait() {
  101. // Sequential implementation running the tasks
  102. while (!Tasks.empty()) {
  103. auto Task = std::move(Tasks.front());
  104. Tasks.pop();
  105. Task();
  106. }
  107. }
  108. bool ThreadPool::isWorkerThread() const {
  109. report_fatal_error("LLVM compiled without multithreading");
  110. }
  111. ThreadPool::~ThreadPool() { wait(); }
  112. #endif