#include "scheduler_basic.h" #include "scheduler_queue.h" #include #include #ifdef BALLOC #include #endif namespace NActors { struct TBasicSchedulerThread::TMonCounters { NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs; NMonitoring::TDynamicCounters::TCounterPtr QueueSize; NMonitoring::TDynamicCounters::TCounterPtr EventsSent; NMonitoring::TDynamicCounters::TCounterPtr EventsDropped; NMonitoring::TDynamicCounters::TCounterPtr EventsAdded; NMonitoring::TDynamicCounters::TCounterPtr Iterations; NMonitoring::TDynamicCounters::TCounterPtr Sleeps; NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec; TMonCounters(const NMonitoring::TDynamicCounterPtr& counters) : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false)) , QueueSize(counters->GetCounter("Scheduler/QueueSize", false)) , EventsSent(counters->GetCounter("Scheduler/EventsSent", true)) , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true)) , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true)) , Iterations(counters->GetCounter("Scheduler/Iterations", true)) , Sleeps(counters->GetCounter("Scheduler/Sleeps", true)) , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true)) { } }; TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config) : Config(config) , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr) , ActorSystem(nullptr) , CurrentTimestamp(nullptr) , CurrentMonotonic(nullptr) , TotalReaders(0) , StopFlag(false) , ScheduleMap(3600) { Y_VERIFY(!Config.UseSchedulerActor, "Cannot create scheduler thread because Config.UseSchedulerActor# true"); } TBasicSchedulerThread::~TBasicSchedulerThread() { Y_VERIFY(!MainCycle); } void TBasicSchedulerThread::CycleFunc() { #ifdef BALLOC ThreadDisableBalloc(); #endif ::SetCurrentThreadName("Scheduler"); ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic); ui64 throttledMonotonic = currentMonotonic; ui64 activeTick = AlignUp(throttledMonotonic, IntrasecondThreshold); TAutoPtr activeSec; NHPTimer::STime hpprev = GetCycleCountFast(); ui64 nextTimestamp = TInstant::Now().MicroSeconds(); ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); while (!AtomicLoad(&StopFlag)) { { const ui64 delta = nextMonotonic - throttledMonotonic; const ui64 elapsedDelta = nextMonotonic - currentMonotonic; const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1)); throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic; if (MonCounters) { *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000; } } AtomicStore(CurrentTimestamp, nextTimestamp); AtomicStore(CurrentMonotonic, nextMonotonic); currentMonotonic = nextMonotonic; if (MonCounters) { ++*MonCounters->Iterations; } bool somethingDone = false; // first step - send everything triggered on schedule ui64 eventsSent = 0; ui64 eventsDropped = 0; for (;;) { while (!!activeSec && !activeSec->empty()) { TMomentMap::iterator it = activeSec->begin(); if (it->first <= throttledMonotonic) { if (NSchedulerQueue::TQueueType* q = it->second.Get()) { while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) { somethingDone = true; Y_VERIFY_DEBUG(x->InstantMicroseconds <= activeTick); IEventHandle* ev = x->Ev; ISchedulerCookie* cookie = x->Cookie; // TODO: lazy send with backoff queue to not hang over contended mailboxes if (cookie) { if (cookie->Detach()) { ActorSystem->Send(ev); ++eventsSent; } else { delete ev; ++eventsDropped; } } else { ActorSystem->Send(ev); ++eventsSent; } } } activeSec->erase(it); } else break; } if (activeTick <= throttledMonotonic) { Y_VERIFY_DEBUG(!activeSec || activeSec->empty()); activeSec.Destroy(); activeTick += IntrasecondThreshold; TScheduleMap::iterator it = ScheduleMap.find(activeTick); if (it != ScheduleMap.end()) { activeSec = it->second; ScheduleMap.erase(it); } continue; } // ok, if we are here - then nothing is ready, so send step complete break; } // second step - collect everything from queues ui64 eventsAdded = 0; for (ui32 i = 0; i != TotalReaders; ++i) { while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) { somethingDone = true; const ui64 instant = AlignUp(x->InstantMicroseconds, Config.ResolutionMicroseconds); IEventHandle* const ev = x->Ev; ISchedulerCookie* const cookie = x->Cookie; // check is cookie still valid? looks like it will hurt performance w/o sagnificant memory save if (instant <= activeTick) { if (!activeSec) activeSec.Reset(new TMomentMap()); TAutoPtr& queue = (*activeSec)[instant]; if (!queue) queue.Reset(new NSchedulerQueue::TQueueType()); queue->Writer.Push(instant, ev, cookie); } else { const ui64 intrasecond = AlignUp(instant, IntrasecondThreshold); TAutoPtr& msec = ScheduleMap[intrasecond]; if (!msec) msec.Reset(new TMomentMap()); TAutoPtr& queue = (*msec)[instant]; if (!queue) queue.Reset(new NSchedulerQueue::TQueueType()); queue->Writer.Push(instant, ev, cookie); } ++eventsAdded; } } NHPTimer::STime hpnow = GetCycleCountFast(); if (MonCounters) { *MonCounters->QueueSize -= eventsSent + eventsDropped; *MonCounters->QueueSize += eventsAdded; *MonCounters->EventsSent += eventsSent; *MonCounters->EventsDropped += eventsDropped; *MonCounters->EventsAdded += eventsAdded; *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000; } hpprev = hpnow; nextTimestamp = TInstant::Now().MicroSeconds(); nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); // ok complete, if nothing left - sleep if (!somethingDone) { const ui64 nextInstant = AlignDown(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds); if (nextMonotonic >= nextInstant) // already in next time-slice continue; const ui64 delta = nextInstant - nextMonotonic; if (delta < Config.SpinThreshold) // not so much time left, just spin continue; if (MonCounters) { ++*MonCounters->Sleeps; } NanoSleep(delta * 1000); // ok, looks like we should sleep a bit. // Don't count sleep in elapsed microseconds hpprev = GetCycleCountFast(); nextTimestamp = TInstant::Now().MicroSeconds(); nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds()); } } // ok, die! } void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) { ActorSystem = actorSystem; CurrentTimestamp = currentTimestamp; CurrentMonotonic = currentMonotonic; *CurrentTimestamp = TInstant::Now().MicroSeconds(); *CurrentMonotonic = GetMonotonicMicroSeconds(); } void TBasicSchedulerThread::PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) { Y_VERIFY(scheduleReadersCount > 0); TotalReaders = scheduleReadersCount; Readers.Reset(new NSchedulerQueue::TReader*[scheduleReadersCount]); Copy(readers, readers + scheduleReadersCount, Readers.Get()); } void TBasicSchedulerThread::PrepareStart() { // Called after actor system is initialized, but before executor threads // are started, giving us a chance to update current timestamp with a // more recent value, taking initialization time into account. This is // safe to do, since scheduler thread is not started yet, so no other // threads are updating time concurrently. AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds()); AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds())); } void TBasicSchedulerThread::Start() { MainCycle.Reset(new NThreading::TLegacyFuture(std::bind(&TBasicSchedulerThread::CycleFunc, this))); } void TBasicSchedulerThread::PrepareStop() { AtomicStore(&StopFlag, true); } void TBasicSchedulerThread::Stop() { MainCycle->Get(); MainCycle.Destroy(); } } #ifdef __linux__ namespace NActors { ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) { if (config.UseSchedulerActor) { return new TMockSchedulerThread(); } else { return new TBasicSchedulerThread(config); } } } #else // __linux__ namespace NActors { ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) { return new TBasicSchedulerThread(config); } } #endif // __linux__