scheduler_basic.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. #include "scheduler_basic.h"
  2. #include "scheduler_queue.h"
  3. #include "actor.h"
  4. #include <library/cpp/actors/util/datetime.h>
  5. #include <library/cpp/actors/util/thread.h>
  6. #ifdef BALLOC
  7. #include <library/cpp/balloc/optional/operators.h>
  8. #endif
  9. namespace NActors {
  10. struct TBasicSchedulerThread::TMonCounters {
  11. NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs;
  12. NMonitoring::TDynamicCounters::TCounterPtr QueueSize;
  13. NMonitoring::TDynamicCounters::TCounterPtr EventsSent;
  14. NMonitoring::TDynamicCounters::TCounterPtr EventsDropped;
  15. NMonitoring::TDynamicCounters::TCounterPtr EventsAdded;
  16. NMonitoring::TDynamicCounters::TCounterPtr Iterations;
  17. NMonitoring::TDynamicCounters::TCounterPtr Sleeps;
  18. NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
  19. TMonCounters(const NMonitoring::TDynamicCounterPtr& counters)
  20. : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false))
  21. , QueueSize(counters->GetCounter("Scheduler/QueueSize", false))
  22. , EventsSent(counters->GetCounter("Scheduler/EventsSent", true))
  23. , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true))
  24. , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true))
  25. , Iterations(counters->GetCounter("Scheduler/Iterations", true))
  26. , Sleeps(counters->GetCounter("Scheduler/Sleeps", true))
  27. , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true))
  28. { }
  29. };
  30. TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config)
  31. : Config(config)
  32. , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr)
  33. , ActorSystem(nullptr)
  34. , CurrentTimestamp(nullptr)
  35. , CurrentMonotonic(nullptr)
  36. , TotalReaders(0)
  37. , StopFlag(false)
  38. , ScheduleMap(3600)
  39. {
  40. Y_ABORT_UNLESS(!Config.UseSchedulerActor, "Cannot create scheduler thread because Config.UseSchedulerActor# true");
  41. }
  42. TBasicSchedulerThread::~TBasicSchedulerThread() {
  43. Y_ABORT_UNLESS(!MainCycle);
  44. }
  45. void TBasicSchedulerThread::CycleFunc() {
  46. #ifdef BALLOC
  47. ThreadDisableBalloc();
  48. #endif
  49. ::SetCurrentThreadName("Scheduler");
  50. ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic);
  51. ui64 throttledMonotonic = currentMonotonic;
  52. ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold);
  53. TAutoPtr<TMomentMap> activeSec;
  54. NHPTimer::STime hpprev = GetCycleCountFast();
  55. ui64 nextTimestamp = TInstant::Now().MicroSeconds();
  56. ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
  57. while (!AtomicLoad(&StopFlag)) {
  58. {
  59. const ui64 delta = nextMonotonic - throttledMonotonic;
  60. const ui64 elapsedDelta = nextMonotonic - currentMonotonic;
  61. const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1));
  62. throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic;
  63. if (MonCounters) {
  64. *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000;
  65. }
  66. }
  67. AtomicStore(CurrentTimestamp, nextTimestamp);
  68. AtomicStore(CurrentMonotonic, nextMonotonic);
  69. currentMonotonic = nextMonotonic;
  70. if (MonCounters) {
  71. ++*MonCounters->Iterations;
  72. }
  73. bool somethingDone = false;
  74. // first step - send everything triggered on schedule
  75. ui64 eventsSent = 0;
  76. ui64 eventsDropped = 0;
  77. for (;;) {
  78. while (!!activeSec && !activeSec->empty()) {
  79. TMomentMap::iterator it = activeSec->begin();
  80. if (it->first <= throttledMonotonic) {
  81. if (NSchedulerQueue::TQueueType* q = it->second.Get()) {
  82. while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) {
  83. somethingDone = true;
  84. Y_VERIFY_DEBUG(x->InstantMicroseconds <= activeTick);
  85. IEventHandle* ev = x->Ev;
  86. ISchedulerCookie* cookie = x->Cookie;
  87. // TODO: lazy send with backoff queue to not hang over contended mailboxes
  88. if (cookie) {
  89. if (cookie->Detach()) {
  90. ActorSystem->Send(ev);
  91. ++eventsSent;
  92. } else {
  93. delete ev;
  94. ++eventsDropped;
  95. }
  96. } else {
  97. ActorSystem->Send(ev);
  98. ++eventsSent;
  99. }
  100. }
  101. }
  102. activeSec->erase(it);
  103. } else
  104. break;
  105. }
  106. if (activeTick <= throttledMonotonic) {
  107. Y_VERIFY_DEBUG(!activeSec || activeSec->empty());
  108. activeSec.Destroy();
  109. activeTick += IntrasecondThreshold;
  110. TScheduleMap::iterator it = ScheduleMap.find(activeTick);
  111. if (it != ScheduleMap.end()) {
  112. activeSec = it->second;
  113. ScheduleMap.erase(it);
  114. }
  115. continue;
  116. }
  117. // ok, if we are here - then nothing is ready, so send step complete
  118. break;
  119. }
  120. // second step - collect everything from queues
  121. ui64 eventsAdded = 0;
  122. for (ui32 i = 0; i != TotalReaders; ++i) {
  123. while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) {
  124. somethingDone = true;
  125. const ui64 instant = AlignUp<ui64>(x->InstantMicroseconds, Config.ResolutionMicroseconds);
  126. IEventHandle* const ev = x->Ev;
  127. ISchedulerCookie* const cookie = x->Cookie;
  128. // check is cookie still valid? looks like it will hurt performance w/o sagnificant memory save
  129. if (instant <= activeTick) {
  130. if (!activeSec)
  131. activeSec.Reset(new TMomentMap());
  132. TAutoPtr<NSchedulerQueue::TQueueType>& queue = (*activeSec)[instant];
  133. if (!queue)
  134. queue.Reset(new NSchedulerQueue::TQueueType());
  135. queue->Writer.Push(instant, ev, cookie);
  136. } else {
  137. const ui64 intrasecond = AlignUp<ui64>(instant, IntrasecondThreshold);
  138. TAutoPtr<TMomentMap>& msec = ScheduleMap[intrasecond];
  139. if (!msec)
  140. msec.Reset(new TMomentMap());
  141. TAutoPtr<NSchedulerQueue::TQueueType>& queue = (*msec)[instant];
  142. if (!queue)
  143. queue.Reset(new NSchedulerQueue::TQueueType());
  144. queue->Writer.Push(instant, ev, cookie);
  145. }
  146. ++eventsAdded;
  147. }
  148. }
  149. NHPTimer::STime hpnow = GetCycleCountFast();
  150. if (MonCounters) {
  151. *MonCounters->QueueSize -= eventsSent + eventsDropped;
  152. *MonCounters->QueueSize += eventsAdded;
  153. *MonCounters->EventsSent += eventsSent;
  154. *MonCounters->EventsDropped += eventsDropped;
  155. *MonCounters->EventsAdded += eventsAdded;
  156. *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000;
  157. }
  158. hpprev = hpnow;
  159. nextTimestamp = TInstant::Now().MicroSeconds();
  160. nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
  161. // ok complete, if nothing left - sleep
  162. if (!somethingDone) {
  163. const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds);
  164. if (nextMonotonic >= nextInstant) // already in next time-slice
  165. continue;
  166. const ui64 delta = nextInstant - nextMonotonic;
  167. if (delta < Config.SpinThreshold) // not so much time left, just spin
  168. continue;
  169. if (MonCounters) {
  170. ++*MonCounters->Sleeps;
  171. }
  172. NanoSleep(delta * 1000); // ok, looks like we should sleep a bit.
  173. // Don't count sleep in elapsed microseconds
  174. hpprev = GetCycleCountFast();
  175. nextTimestamp = TInstant::Now().MicroSeconds();
  176. nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
  177. }
  178. }
  179. // ok, die!
  180. }
  181. void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) {
  182. ActorSystem = actorSystem;
  183. CurrentTimestamp = currentTimestamp;
  184. CurrentMonotonic = currentMonotonic;
  185. *CurrentTimestamp = TInstant::Now().MicroSeconds();
  186. *CurrentMonotonic = GetMonotonicMicroSeconds();
  187. }
  188. void TBasicSchedulerThread::PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) {
  189. Y_ABORT_UNLESS(scheduleReadersCount > 0);
  190. TotalReaders = scheduleReadersCount;
  191. Readers.Reset(new NSchedulerQueue::TReader*[scheduleReadersCount]);
  192. Copy(readers, readers + scheduleReadersCount, Readers.Get());
  193. }
  194. void TBasicSchedulerThread::PrepareStart() {
  195. // Called after actor system is initialized, but before executor threads
  196. // are started, giving us a chance to update current timestamp with a
  197. // more recent value, taking initialization time into account. This is
  198. // safe to do, since scheduler thread is not started yet, so no other
  199. // threads are updating time concurrently.
  200. AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds());
  201. AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds()));
  202. }
  203. void TBasicSchedulerThread::Start() {
  204. MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this)));
  205. }
  206. void TBasicSchedulerThread::PrepareStop() {
  207. AtomicStore(&StopFlag, true);
  208. }
  209. void TBasicSchedulerThread::Stop() {
  210. MainCycle->Get();
  211. MainCycle.Destroy();
  212. }
  213. }
  214. #ifdef __linux__
  215. namespace NActors {
  216. ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) {
  217. if (config.UseSchedulerActor) {
  218. return new TMockSchedulerThread();
  219. } else {
  220. return new TBasicSchedulerThread(config);
  221. }
  222. }
  223. }
  224. #else // __linux__
  225. namespace NActors {
  226. ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) {
  227. return new TBasicSchedulerThread(config);
  228. }
  229. }
  230. #endif // __linux__