#pragma once #include #include #include #include #include #include #include #include #include #include class TMtpQueueHelper { public: TMtpQueueHelper() { SetThreadCount(NSystemInfo::CachedNumberOfCpus()); } IThreadPool* Get() { return q.Get(); } size_t GetThreadCount() { return ThreadCount; } void SetThreadCount(size_t threads) { ThreadCount = threads; q = CreateThreadPool(ThreadCount); } static TMtpQueueHelper& Instance(); private: size_t ThreadCount; TAutoPtr q; }; namespace NYmp { inline void SetThreadCount(size_t threads) { TMtpQueueHelper::Instance().SetThreadCount(threads); } inline size_t GetThreadCount() { return TMtpQueueHelper::Instance().GetThreadCount(); } template inline void ParallelForStaticChunk(T begin, T end, size_t chunkSize, std::function func) { chunkSize = Max(chunkSize, 1); size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); IThreadPool* queue = TMtpQueueHelper::Instance().Get(); TCondVar cv; TMutex mutex; TAtomic counter = threadCount; std::exception_ptr err; for (size_t i = 0; i < threadCount; ++i) { queue->SafeAddFunc([&cv, &counter, &mutex, &func, i, begin, end, chunkSize, threadCount, &err]() { try { T currentChunkStart = begin + static_cast(i * chunkSize); while (currentChunkStart < end) { T currentChunkEnd = Min(end, currentChunkStart + chunkSize); for (T val = currentChunkStart; val < currentChunkEnd; ++val) { func(val); } currentChunkStart += chunkSize * threadCount; } } catch (...) { with_lock (mutex) { err = std::current_exception(); } } with_lock (mutex) { if (AtomicDecrement(counter) == 0) { //last one cv.Signal(); } } }); } with_lock (mutex) { while (AtomicGet(counter) > 0) { cv.WaitI(mutex); } } if (err) { std::rethrow_exception(err); } } template inline void ParallelForStaticAutoChunk(T begin, T end, std::function func) { const size_t taskSize = end - begin; const size_t threadCount = TMtpQueueHelper::Instance().GetThreadCount(); ParallelForStaticChunk(begin, end, (taskSize + threadCount - 1) / threadCount, func); } }