#include "utils.h" #include "lfqueue.h" #include "jobqueue.h" #include "pipequeue.h" #include #include #include using namespace NNeh; namespace { class TExecThread: public IThreadFactory::IThreadAble, public IJob { public: TExecThread() : T_(SystemThreadFactory()->Run(this)) { } ~TExecThread() override { Enqueue(this); T_->Join(); } inline void Enqueue(IJob* job) { Q_.Enqueue(job); } private: void DoRun(TCont* c) override { c->Executor()->Abort(); } void DoExecute() override { SetHighestThreadPriority(); TContExecutor e(RealStackSize(20000)); e.Execute(this); } inline void Dispatcher(TCont* c) { IJob* job; while ((job = Q_.Dequeue(c))) { try { c->Executor()->Create(*job, "job"); } catch (...) { (*job)(c); } } } typedef TAutoPtr IThreadRef; TOneConsumerPipeQueue Q_; IThreadRef T_; }; class TJobScatter: public IJobQueue { public: inline TJobScatter() { for (size_t i = 0; i < 2; ++i) { E_.push_back(new TExecThread()); } } void ScheduleImpl(IJob* job) override { E_[TThread::CurrentThreadId() % E_.size()]->Enqueue(job); } private: typedef TAutoPtr TExecThreadRef; TVector E_; }; } IJobQueue* NNeh::JobQueue() { return Singleton(); }