123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- #include "io_dispatcher.h"
- #include "actor_bootstrapped.h"
- #include "hfunc.h"
- #include <util/system/mutex.h>
- #include <util/system/condvar.h>
- #include <util/system/thread.h>
- #include <map>
- #include <list>
- namespace NActors {
- class TIoDispatcherActor : public TActorBootstrapped<TIoDispatcherActor> {
- enum {
- EvNotifyThreadStopped = EventSpaceBegin(TEvents::ES_PRIVATE),
- };
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // IO task queue
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- class TTask {
- TInstant Timestamp;
- std::function<void()> Callback;
- public:
- TTask(TInstant timestamp, TEvInvokeQuery *ev)
- : Timestamp(timestamp)
- , Callback(std::move(ev->Callback))
- {}
- void Execute() {
- Callback();
- }
- TInstant GetTimestamp() const {
- return Timestamp;
- }
- };
- class TTaskQueue {
- std::list<TTask> Tasks;
- TMutex Mutex;
- TCondVar CondVar;
- size_t NumThreadsToStop = 0;
- public:
- void Enqueue(TInstant timestamp, TEvInvokeQuery *ev) {
- std::list<TTask> list;
- list.emplace_back(timestamp, ev);
- with_lock (Mutex) {
- Tasks.splice(Tasks.end(), std::move(list));
- }
- CondVar.Signal();
- }
- bool Dequeue(std::list<TTask>& list, bool *sendNotify) {
- with_lock (Mutex) {
- CondVar.Wait(Mutex, [&] { return NumThreadsToStop || !Tasks.empty(); });
- if (NumThreadsToStop) {
- *sendNotify = NumThreadsToStop != Max<size_t>();
- if (*sendNotify) {
- --NumThreadsToStop;
- }
- return false;
- } else {
- list.splice(list.end(), Tasks, Tasks.begin());
- return true;
- }
- }
- }
- void Stop() {
- with_lock (Mutex) {
- NumThreadsToStop = Max<size_t>();
- }
- CondVar.BroadCast();
- }
- void StopOne() {
- with_lock (Mutex) {
- ++NumThreadsToStop;
- Y_VERIFY(NumThreadsToStop);
- }
- CondVar.Signal();
- }
- std::optional<TInstant> GetEarliestTaskTimestamp() {
- with_lock (Mutex) {
- return Tasks.empty() ? std::nullopt : std::make_optional(Tasks.front().GetTimestamp());
- }
- }
- };
- TTaskQueue TaskQueue;
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // IO dispatcher threads
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- class TThread : public ISimpleThread {
- TIoDispatcherActor& Actor;
- TActorSystem* const ActorSystem;
- public:
- TThread(TIoDispatcherActor& actor, TActorSystem *actorSystem)
- : Actor(actor)
- , ActorSystem(actorSystem)
- {
- Start();
- }
- void *ThreadProc() override {
- SetCurrentThreadName("kikimr IO");
- for (;;) {
- std::list<TTask> tasks;
- bool sendNotify;
- if (!Actor.TaskQueue.Dequeue(tasks, &sendNotify)) {
- if (sendNotify) {
- ActorSystem->Send(new IEventHandle(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(),
- nullptr, TThread::CurrentThreadId()));
- }
- break;
- }
- for (TTask& task : tasks) {
- task.Execute();
- ++*Actor.TasksCompleted;
- }
- }
- return nullptr;
- }
- };
- static constexpr size_t MinThreadCount = 4;
- static constexpr size_t MaxThreadCount = 64;
- std::map<TThread::TId, std::unique_ptr<TThread>> Threads;
- size_t NumRunningThreads = 0;
- void StartThread() {
- auto thread = std::make_unique<TThread>(*this, TlsActivationContext->ExecutorThread.ActorSystem);
- const TThread::TId id = thread->Id();
- Threads.emplace(id, std::move(thread));
- *NumThreads = ++NumRunningThreads;
- ++*ThreadsStarted;
- }
- void StopThread() {
- Y_VERIFY(Threads.size());
- TaskQueue.StopOne();
- *NumThreads = --NumRunningThreads;
- ++*ThreadsStopped;
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Counters
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- NMonitoring::TDynamicCounters::TCounterPtr NumThreads;
- NMonitoring::TDynamicCounters::TCounterPtr TasksAdded;
- NMonitoring::TDynamicCounters::TCounterPtr TasksCompleted;
- NMonitoring::TDynamicCounters::TCounterPtr ThreadsStarted;
- NMonitoring::TDynamicCounters::TCounterPtr ThreadsStopped;
- public:
- TIoDispatcherActor(const NMonitoring::TDynamicCounterPtr& counters)
- : NumThreads(counters->GetCounter("NumThreads"))
- , TasksAdded(counters->GetCounter("TasksAdded", true))
- , TasksCompleted(counters->GetCounter("TasksCompleted", true))
- , ThreadsStarted(counters->GetCounter("ThreadsStarted", true))
- , ThreadsStopped(counters->GetCounter("ThreadsStopped", true))
- {}
- ~TIoDispatcherActor() override {
- TaskQueue.Stop();
- }
- void Bootstrap() {
- while (NumRunningThreads < MinThreadCount) {
- StartThread();
- }
- HandleWakeup();
- Become(&TThis::StateFunc);
- }
- void HandleThreadStopped(TAutoPtr<IEventHandle> ev) {
- auto it = Threads.find(ev->Cookie);
- Y_VERIFY(it != Threads.end());
- it->second->Join();
- Threads.erase(it);
- }
- void Handle(TEvInvokeQuery::TPtr ev) {
- ++*TasksAdded;
- TaskQueue.Enqueue(TActivationContext::Now(), ev->Get());
- }
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Thread usage counter logic
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- std::optional<TInstant> IdleTimestamp;
- static constexpr TDuration ThreadStartTime = TDuration::MilliSeconds(500);
- static constexpr TDuration ThreadStopTime = TDuration::MilliSeconds(500);
- void HandleWakeup() {
- const TInstant now = TActivationContext::Now();
- std::optional<TInstant> earliest = TaskQueue.GetEarliestTaskTimestamp();
- if (earliest) {
- if (now >= *earliest + ThreadStartTime && NumRunningThreads < MaxThreadCount) {
- StartThread();
- }
- IdleTimestamp.reset();
- } else if (!IdleTimestamp) {
- IdleTimestamp = now;
- } else if (now >= *IdleTimestamp + ThreadStopTime) {
- IdleTimestamp.reset();
- if (NumRunningThreads > MinThreadCount) {
- StopThread();
- }
- }
- Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup);
- }
- STRICT_STFUNC(StateFunc, {
- fFunc(EvNotifyThreadStopped, HandleThreadStopped);
- hFunc(TEvInvokeQuery, Handle);
- cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
- cFunc(TEvents::TSystem::Poison, PassAway);
- })
- };
- IActor *CreateIoDispatcherActor(const NMonitoring::TDynamicCounterPtr& counters) {
- return new TIoDispatcherActor(counters);
- }
- } // NActors
|