// Copyright 2017 The Abseil Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef Y_ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ #define Y_ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ #include #include #include #include #include // NOLINT(build/c++11) #include #include #include "y_absl/base/thread_annotations.h" #include "y_absl/functional/any_invocable.h" #include "y_absl/synchronization/mutex.h" namespace y_absl { Y_ABSL_NAMESPACE_BEGIN namespace synchronization_internal { // A simple ThreadPool implementation for tests. class ThreadPool { public: explicit ThreadPool(int num_threads) { threads_.reserve(num_threads); for (int i = 0; i < num_threads; ++i) { threads_.push_back(std::thread(&ThreadPool::WorkLoop, this)); } } ThreadPool(const ThreadPool &) = delete; ThreadPool &operator=(const ThreadPool &) = delete; ~ThreadPool() { { y_absl::MutexLock l(&mu_); for (size_t i = 0; i < threads_.size(); i++) { queue_.push(nullptr); // Shutdown signal. } } for (auto &t : threads_) { t.join(); } } // Schedule a function to be run on a ThreadPool thread immediately. void Schedule(y_absl::AnyInvocable func) { assert(func != nullptr); y_absl::MutexLock l(&mu_); queue_.push(std::move(func)); } private: bool WorkAvailable() const Y_ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return !queue_.empty(); } void WorkLoop() { while (true) { y_absl::AnyInvocable func; { y_absl::MutexLock l(&mu_); mu_.Await(y_absl::Condition(this, &ThreadPool::WorkAvailable)); func = std::move(queue_.front()); queue_.pop(); } if (func == nullptr) { // Shutdown signal. break; } func(); } } y_absl::Mutex mu_; std::queue> queue_ Y_ABSL_GUARDED_BY(mu_); std::vector threads_; }; } // namespace synchronization_internal Y_ABSL_NAMESPACE_END } // namespace y_absl #endif // Y_ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_