#include "local_executor.h" #include #include #include #include #include #include #include #include #include #ifdef _win_ static void RegularYield() { } #else // unix actually has cooperative multitasking! :) // without this function program runs slower and system lags for some magic reason static void RegularYield() { SchedYield(); } #endif namespace { struct TFunctionWrapper : NPar::ILocallyExecutable { NPar::TLocallyExecutableFunction Exec; TFunctionWrapper(NPar::TLocallyExecutableFunction exec) : Exec(std::move(exec)) { } void LocalExec(int id) override { Exec(id); } }; class TFunctionWrapperWithPromise: public NPar::ILocallyExecutable { private: NPar::TLocallyExecutableFunction Exec; int FirstId, LastId; TVector> Promises; public: TFunctionWrapperWithPromise(NPar::TLocallyExecutableFunction exec, int firstId, int lastId) : Exec(std::move(exec)) , FirstId(firstId) , LastId(lastId) { Y_ASSERT(FirstId <= LastId); const int rangeSize = LastId - FirstId; Promises.resize(rangeSize, NThreading::NewPromise()); for (auto& promise : Promises) { promise = NThreading::NewPromise(); } } void LocalExec(int id) override { Y_ASSERT(FirstId <= id && id < LastId); NThreading::NImpl::SetValue(Promises[id - FirstId], [=] { Exec(id); }); } TVector> GetFutures() const { TVector> out; out.reserve(Promises.ysize()); for (auto& promise : Promises) { out.push_back(promise.GetFuture()); } return out; } }; struct TSingleJob { TIntrusivePtr Exec; int Id{0}; TSingleJob() = default; TSingleJob(TIntrusivePtr exec, int id) : Exec(std::move(exec)) , Id(id) { } }; class TLocalRangeExecutor: public NPar::ILocallyExecutable { TIntrusivePtr Exec; alignas(64) TAtomic Counter; alignas(64) TAtomic WorkerCount; int LastId; void LocalExec(int) override { AtomicAdd(WorkerCount, 1); for (;;) { if (!DoSingleOp()) break; } AtomicAdd(WorkerCount, -1); } public: TLocalRangeExecutor(TIntrusivePtr exec, int firstId, int lastId) : Exec(std::move(exec)) , Counter(firstId) , WorkerCount(0) , LastId(lastId) { } bool DoSingleOp() { const int id = AtomicAdd(Counter, 1) - 1; if (id >= LastId) return false; Exec->LocalExec(id); RegularYield(); return true; } void WaitComplete() { while (AtomicGet(WorkerCount) > 0) RegularYield(); } int GetRangeSize() const { return Max(LastId - Counter, 0); } }; } ////////////////////////////////////////////////////////////////////////// class NPar::TLocalExecutor::TImpl { public: TLockFreeQueue JobQueue; TLockFreeQueue MedJobQueue; TLockFreeQueue LowJobQueue; alignas(64) TSystemEvent HasJob; TAtomic ThreadCount{0}; alignas(64) TAtomic QueueSize{0}; TAtomic MPQueueSize{0}; TAtomic LPQueueSize{0}; TAtomic ThreadId{0}; Y_THREAD(int) CurrentTaskPriority; Y_THREAD(int) WorkerThreadId; static void* HostWorkerThread(void* p); bool GetJob(TSingleJob* job); void RunNewThread(); void LaunchRange(TIntrusivePtr execRange, int queueSizeLimit, TAtomic* queueSize, TLockFreeQueue* jobQueue); TImpl() = default; ~TImpl(); }; NPar::TLocalExecutor::TImpl::~TImpl() { AtomicAdd(QueueSize, 1); JobQueue.Enqueue(TSingleJob(nullptr, 0)); HasJob.Signal(); while (AtomicGet(ThreadCount)) { ThreadYield(); } } void* NPar::TLocalExecutor::TImpl::HostWorkerThread(void* p) { static const int FAST_ITERATIONS = 200; auto* const ctx = (TImpl*)p; TThread::SetCurrentThreadName("ParLocalExecutor"); ctx->WorkerThreadId = AtomicAdd(ctx->ThreadId, 1); for (bool cont = true; cont;) { TSingleJob job; bool gotJob = false; for (int iter = 0; iter < FAST_ITERATIONS; ++iter) { if (ctx->GetJob(&job)) { gotJob = true; break; } } if (!gotJob) { ctx->HasJob.Reset(); if (!ctx->GetJob(&job)) { ctx->HasJob.Wait(); continue; } } if (job.Exec.Get()) { job.Exec->LocalExec(job.Id); RegularYield(); } else { AtomicAdd(ctx->QueueSize, 1); ctx->JobQueue.Enqueue(job); ctx->HasJob.Signal(); cont = false; } } AtomicAdd(ctx->ThreadCount, -1); return nullptr; } bool NPar::TLocalExecutor::TImpl::GetJob(TSingleJob* job) { if (JobQueue.Dequeue(job)) { CurrentTaskPriority = TLocalExecutor::HIGH_PRIORITY; AtomicAdd(QueueSize, -1); return true; } else if (MedJobQueue.Dequeue(job)) { CurrentTaskPriority = TLocalExecutor::MED_PRIORITY; AtomicAdd(MPQueueSize, -1); return true; } else if (LowJobQueue.Dequeue(job)) { CurrentTaskPriority = TLocalExecutor::LOW_PRIORITY; AtomicAdd(LPQueueSize, -1); return true; } return false; } void NPar::TLocalExecutor::TImpl::RunNewThread() { AtomicAdd(ThreadCount, 1); TThread thr(HostWorkerThread, this); thr.Start(); thr.Detach(); } void NPar::TLocalExecutor::TImpl::LaunchRange(TIntrusivePtr rangeExec, int queueSizeLimit, TAtomic* queueSize, TLockFreeQueue* jobQueue) { int count = Min(ThreadCount + 1, rangeExec->GetRangeSize()); if (queueSizeLimit >= 0 && AtomicGet(*queueSize) >= queueSizeLimit) { return; } AtomicAdd(*queueSize, count); jobQueue->EnqueueAll(TVector{size_t(count), TSingleJob(rangeExec, 0)}); HasJob.Signal(); } NPar::TLocalExecutor::TLocalExecutor() : Impl_{MakeHolder()} { } NPar::TLocalExecutor::~TLocalExecutor() = default; void NPar::TLocalExecutor::RunAdditionalThreads(int threadCount) { for (int i = 0; i < threadCount; i++) Impl_->RunNewThread(); } void NPar::TLocalExecutor::Exec(TIntrusivePtr exec, int id, int flags) { Y_ASSERT((flags & WAIT_COMPLETE) == 0); // unsupported int prior = Max(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); switch (prior) { case HIGH_PRIORITY: AtomicAdd(Impl_->QueueSize, 1); Impl_->JobQueue.Enqueue(TSingleJob(std::move(exec), id)); break; case MED_PRIORITY: AtomicAdd(Impl_->MPQueueSize, 1); Impl_->MedJobQueue.Enqueue(TSingleJob(std::move(exec), id)); break; case LOW_PRIORITY: AtomicAdd(Impl_->LPQueueSize, 1); Impl_->LowJobQueue.Enqueue(TSingleJob(std::move(exec), id)); break; default: Y_ASSERT(0); break; } Impl_->HasJob.Signal(); } void NPar::ILocalExecutor::Exec(TLocallyExecutableFunction exec, int id, int flags) { Exec(new TFunctionWrapper(std::move(exec)), id, flags); } void NPar::TLocalExecutor::ExecRange(TIntrusivePtr exec, int firstId, int lastId, int flags) { Y_ASSERT(lastId >= firstId); if (TryExecRangeSequentially([=] (int id) { exec->LocalExec(id); }, firstId, lastId, flags)) { return; } auto rangeExec = MakeIntrusive(std::move(exec), firstId, lastId); int queueSizeLimit = (flags & WAIT_COMPLETE) ? 10000 : -1; int prior = Max(Impl_->CurrentTaskPriority, flags & PRIORITY_MASK); switch (prior) { case HIGH_PRIORITY: Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->QueueSize, &Impl_->JobQueue); break; case MED_PRIORITY: Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->MPQueueSize, &Impl_->MedJobQueue); break; case LOW_PRIORITY: Impl_->LaunchRange(rangeExec, queueSizeLimit, &Impl_->LPQueueSize, &Impl_->LowJobQueue); break; default: Y_ASSERT(0); break; } if (flags & WAIT_COMPLETE) { int keepPrior = Impl_->CurrentTaskPriority; Impl_->CurrentTaskPriority = prior; while (rangeExec->DoSingleOp()) { } Impl_->CurrentTaskPriority = keepPrior; rangeExec->WaitComplete(); } } void NPar::ILocalExecutor::ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { return; } ExecRange(new TFunctionWrapper(exec), firstId, lastId, flags); } void NPar::ILocalExecutor::ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { Y_ABORT_UNLESS((flags & WAIT_COMPLETE) != 0, "ExecRangeWithThrow() requires WAIT_COMPLETE to wait if exceptions arise."); if (TryExecRangeSequentially(exec, firstId, lastId, flags)) { return; } TVector> currentRun = ExecRangeWithFutures(exec, firstId, lastId, flags); for (auto& result : currentRun) { result.GetValueSync(); // Exception will be rethrown if exists. If several exception - only the one with minimal id is rethrown. } } TVector> NPar::ILocalExecutor::ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags) { TFunctionWrapperWithPromise* execWrapper = new TFunctionWrapperWithPromise(exec, firstId, lastId); TVector> out = execWrapper->GetFutures(); ExecRange(execWrapper, firstId, lastId, flags); return out; } void NPar::TLocalExecutor::ClearLPQueue() { for (bool cont = true; cont;) { cont = false; TSingleJob job; while (Impl_->LowJobQueue.Dequeue(&job)) { AtomicAdd(Impl_->LPQueueSize, -1); cont = true; } while (Impl_->MedJobQueue.Dequeue(&job)) { AtomicAdd(Impl_->MPQueueSize, -1); cont = true; } } } int NPar::TLocalExecutor::GetQueueSize() const noexcept { return AtomicGet(Impl_->QueueSize); } int NPar::TLocalExecutor::GetMPQueueSize() const noexcept { return AtomicGet(Impl_->MPQueueSize); } int NPar::TLocalExecutor::GetLPQueueSize() const noexcept { return AtomicGet(Impl_->LPQueueSize); } int NPar::TLocalExecutor::GetWorkerThreadId() const noexcept { return Impl_->WorkerThreadId; } int NPar::TLocalExecutor::GetThreadCount() const noexcept { return AtomicGet(Impl_->ThreadCount); } //////////////////////////////////////////////////////////////////////////