123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- #pragma once
- #ifdef __GNUC__
- #pragma GCC diagnostic push
- #pragma GCC diagnostic ignored "-Wunused-parameter"
- #endif
- //===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===//
- //
- // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
- // See https://llvm.org/LICENSE.txt for license information.
- // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
- //
- //===----------------------------------------------------------------------===//
- //
- // This file defines a crude C++11 based task queue.
- //
- //===----------------------------------------------------------------------===//
- #ifndef LLVM_SUPPORT_TASKQUEUE_H
- #define LLVM_SUPPORT_TASKQUEUE_H
- #include "llvm/Config/llvm-config.h"
- #include "llvm/Support/ThreadPool.h"
- #include "llvm/Support/thread.h"
- #include <atomic>
- #include <cassert>
- #include <condition_variable>
- #include <deque>
- #include <functional>
- #include <future>
- #include <memory>
- #include <mutex>
- #include <utility>
- namespace llvm {
- /// TaskQueue executes serialized work on a user-defined Thread Pool. It
- /// guarantees that if task B is enqueued after task A, task B begins after
- /// task A completes and there is no overlap between the two.
- class TaskQueue {
- // Because we don't have init capture to use move-only local variables that
- // are captured into a lambda, we create the promise inside an explicit
- // callable struct. We want to do as much of the wrapping in the
- // type-specialized domain (before type erasure) and then erase this into a
- // std::function.
- template <typename Callable> struct Task {
- using ResultTy = std::invoke_result_t<Callable>;
- explicit Task(Callable C, TaskQueue &Parent)
- : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()),
- Parent(&Parent) {}
- template<typename T>
- void invokeCallbackAndSetPromise(T*) {
- P->set_value(C());
- }
- void invokeCallbackAndSetPromise(void*) {
- C();
- P->set_value();
- }
- void operator()() noexcept {
- ResultTy *Dummy = nullptr;
- invokeCallbackAndSetPromise(Dummy);
- Parent->completeTask();
- }
- Callable C;
- std::shared_ptr<std::promise<ResultTy>> P;
- TaskQueue *Parent;
- };
- public:
- /// Construct a task queue with no work.
- TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }
- /// Blocking destructor: the queue will wait for all work to complete.
- ~TaskQueue() {
- Scheduler.wait();
- assert(Tasks.empty());
- }
- /// Asynchronous submission of a task to the queue. The returned future can be
- /// used to wait for the task (and all previous tasks that have not yet
- /// completed) to finish.
- template <typename Callable>
- std::future<std::invoke_result_t<Callable>> async(Callable &&C) {
- #if !LLVM_ENABLE_THREADS
- static_assert(false,
- "TaskQueue requires building with LLVM_ENABLE_THREADS!");
- #endif
- Task<Callable> T{std::move(C), *this};
- using ResultTy = std::invoke_result_t<Callable>;
- std::future<ResultTy> F = T.P->get_future();
- {
- std::lock_guard<std::mutex> Lock(QueueLock);
- // If there's already a task in flight, just queue this one up. If
- // there is not a task in flight, bypass the queue and schedule this
- // task immediately.
- if (IsTaskInFlight)
- Tasks.push_back(std::move(T));
- else {
- Scheduler.async(std::move(T));
- IsTaskInFlight = true;
- }
- }
- return F;
- }
- private:
- void completeTask() {
- // We just completed a task. If there are no more tasks in the queue,
- // update IsTaskInFlight to false and stop doing work. Otherwise
- // schedule the next task (while not holding the lock).
- std::function<void()> Continuation;
- {
- std::lock_guard<std::mutex> Lock(QueueLock);
- if (Tasks.empty()) {
- IsTaskInFlight = false;
- return;
- }
- Continuation = std::move(Tasks.front());
- Tasks.pop_front();
- }
- Scheduler.async(std::move(Continuation));
- }
- /// The thread pool on which to run the work.
- ThreadPool &Scheduler;
- /// State which indicates whether the queue currently is currently processing
- /// any work.
- bool IsTaskInFlight = false;
- /// Mutex for synchronizing access to the Tasks array.
- std::mutex QueueLock;
- /// Tasks waiting for execution in the queue.
- std::deque<std::function<void()>> Tasks;
- };
- } // namespace llvm
- #endif // LLVM_SUPPORT_TASKQUEUE_H
- #ifdef __GNUC__
- #pragma GCC diagnostic pop
- #endif
|