123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- // Copyright 2017 The Abseil Authors.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // https://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- #ifndef Y_ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
- #define Y_ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
- #include <cassert>
- #include <cstddef>
- #include <functional>
- #include <queue>
- #include <thread> // NOLINT(build/c++11)
- #include <utility>
- #include <vector>
- #include "y_absl/base/thread_annotations.h"
- #include "y_absl/functional/any_invocable.h"
- #include "y_absl/synchronization/mutex.h"
- namespace y_absl {
- Y_ABSL_NAMESPACE_BEGIN
- namespace synchronization_internal {
- // A simple ThreadPool implementation for tests.
- class ThreadPool {
- public:
- explicit ThreadPool(int num_threads) {
- threads_.reserve(num_threads);
- for (int i = 0; i < num_threads; ++i) {
- threads_.push_back(std::thread(&ThreadPool::WorkLoop, this));
- }
- }
- ThreadPool(const ThreadPool &) = delete;
- ThreadPool &operator=(const ThreadPool &) = delete;
- ~ThreadPool() {
- {
- y_absl::MutexLock l(&mu_);
- for (size_t i = 0; i < threads_.size(); i++) {
- queue_.push(nullptr); // Shutdown signal.
- }
- }
- for (auto &t : threads_) {
- t.join();
- }
- }
- // Schedule a function to be run on a ThreadPool thread immediately.
- void Schedule(y_absl::AnyInvocable<void()> func) {
- assert(func != nullptr);
- y_absl::MutexLock l(&mu_);
- queue_.push(std::move(func));
- }
- private:
- bool WorkAvailable() const Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
- return !queue_.empty();
- }
- void WorkLoop() {
- while (true) {
- y_absl::AnyInvocable<void()> func;
- {
- y_absl::MutexLock l(&mu_);
- mu_.Await(y_absl::Condition(this, &ThreadPool::WorkAvailable));
- func = std::move(queue_.front());
- queue_.pop();
- }
- if (func == nullptr) { // Shutdown signal.
- break;
- }
- func();
- }
- }
- y_absl::Mutex mu_;
- std::queue<y_absl::AnyInvocable<void()>> queue_ Y_ABSL_GUARDED_BY(mu_);
- std::vector<std::thread> threads_;
- };
- } // namespace synchronization_internal
- Y_ABSL_NAMESPACE_END
- } // namespace y_absl
- #endif // Y_ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
|