#include "task_scheduler.h" #include #include #include TTaskScheduler::ITask::~ITask() {} TTaskScheduler::IRepeatedTask::~IRepeatedTask() {} class TTaskScheduler::TWorkerThread : public ISimpleThread { public: TWorkerThread(TTaskScheduler& state) : Scheduler_(state) { } TString DebugState = "?"; TString DebugId = ""; private: void* ThreadProc() noexcept override { Scheduler_.WorkerFunc(this); return nullptr; } private: TTaskScheduler& Scheduler_; }; TTaskScheduler::TTaskScheduler(size_t threadCount, size_t maxTaskCount) : MaxTaskCount_(maxTaskCount) { for (size_t i = 0; i < threadCount; ++i) { Workers_.push_back(new TWorkerThread(*this)); Workers_.back()->DebugId = ToString(i); } } TTaskScheduler::~TTaskScheduler() { try { Stop(); } catch (...) { Cdbg << "task scheduled destruction error: " << CurrentExceptionMessage(); } } void TTaskScheduler::Start() { for (auto& w : Workers_) { w->Start(); } } void TTaskScheduler::Stop() { with_lock (Lock_) { IsStopped_ = true; CondVar_.BroadCast(); } for (auto& w: Workers_) { w->Join(); } Workers_.clear(); Queue_.clear(); } size_t TTaskScheduler::GetTaskCount() const { return static_cast(AtomicGet(TaskCounter_)); } namespace { class TTaskWrapper : public TTaskScheduler::ITask , TNonCopyable { public: TTaskWrapper(TTaskScheduler::ITaskRef task, TAtomic& counter) : Task_(task) , Counter_(counter) { AtomicIncrement(Counter_); } ~TTaskWrapper() override { AtomicDecrement(Counter_); } private: TInstant Process() override { return Task_->Process(); } private: TTaskScheduler::ITaskRef Task_; TAtomic& Counter_; }; } bool TTaskScheduler::Add(ITaskRef task, TInstant expire) { with_lock (Lock_) { if (!IsStopped_ && Workers_.size() > 0 && GetTaskCount() + 1 <= MaxTaskCount_) { ITaskRef newTask = new TTaskWrapper(task, TaskCounter_); Queue_.insert(std::make_pair(expire, TTaskHolder(newTask))); if (!Queue_.begin()->second.WaitingWorker) { CondVar_.Signal(); } return true; } } return false; } namespace { class TRepeatedTask : public TTaskScheduler::ITask { public: TRepeatedTask(TTaskScheduler::IRepeatedTaskRef task, TDuration period, TInstant deadline) : Task_(task) , Period_(period) , Deadline_(deadline) { } private: TInstant Process() final { Deadline_ += Period_; if (Task_->Process()) { return Deadline_; } else { return TInstant::Max(); } } private: TTaskScheduler::IRepeatedTaskRef Task_; TDuration Period_; TInstant Deadline_; }; } bool TTaskScheduler::Add(IRepeatedTaskRef task, TDuration period) { const TInstant deadline = Now() + period; ITaskRef t = new TRepeatedTask(task, period, deadline); return Add(t, deadline); } const bool debugOutput = false; void TTaskScheduler::ChangeDebugState(TWorkerThread* thread, const TString& state) { if (!debugOutput) { Y_UNUSED(thread); Y_UNUSED(state); return; } thread->DebugState = state; TStringStream ss; ss << Now() << " " << thread->DebugId << ":\t"; for (auto& w : Workers_) { ss << w->DebugState << " "; } ss << " [" << Queue_.size() << "] [" << TaskCounter_ << "]" << Endl; Cerr << ss.Str(); } bool TTaskScheduler::Wait(TWorkerThread* thread, TQueueIterator& toWait) { ChangeDebugState(thread, "w"); toWait->second.WaitingWorker = thread; return !CondVar_.WaitD(Lock_, toWait->first); } void TTaskScheduler::ChooseFromQueue(TQueueIterator& toWait) { for (TQueueIterator it = Queue_.begin(); it != Queue_.end(); ++it) { if (!it->second.WaitingWorker) { if (toWait == Queue_.end()) { toWait = it; } else if (it->first < toWait->first) { toWait->second.WaitingWorker = nullptr; toWait = it; } break; } } } void TTaskScheduler::WorkerFunc(TWorkerThread* thread) { TThread::SetCurrentThreadName("TaskSchedWorker"); TQueueIterator toWait = Queue_.end(); ITaskRef toDo; for (;;) { TInstant repeat = TInstant::Max(); if (!!toDo) { try { repeat = toDo->Process(); } catch (...) { Cdbg << "task scheduler error: " << CurrentExceptionMessage(); } } with_lock (Lock_) { ChangeDebugState(thread, "f"); if (IsStopped_) { ChangeDebugState(thread, "s"); return ; } if (!!toDo) { if (repeat < TInstant::Max()) { Queue_.insert(std::make_pair(repeat, TTaskHolder(toDo))); } } toDo = nullptr; ChooseFromQueue(toWait); if (toWait != Queue_.end()) { if (toWait->first <= Now() || Wait(thread, toWait)) { toDo = toWait->second.Task; Queue_.erase(toWait); toWait = Queue_.end(); if (!Queue_.empty() && !Queue_.begin()->second.WaitingWorker && Workers_.size() > 1) { CondVar_.Signal(); } ChangeDebugState(thread, "p"); } } else { ChangeDebugState(thread, "e"); CondVar_.WaitI(Lock_); } } } }