scheduler_basic.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. #include "scheduler_basic.h"
  2. #include "scheduler_queue.h"
  3. #include <library/cpp/actors/util/datetime.h>
  4. #include <library/cpp/actors/util/thread.h>
  5. #ifdef BALLOC
  6. #include <library/cpp/balloc/optional/operators.h>
  7. #endif
  8. namespace NActors {
  9. struct TBasicSchedulerThread::TMonCounters {
  10. NMonitoring::TDynamicCounters::TCounterPtr TimeDelayMs;
  11. NMonitoring::TDynamicCounters::TCounterPtr QueueSize;
  12. NMonitoring::TDynamicCounters::TCounterPtr EventsSent;
  13. NMonitoring::TDynamicCounters::TCounterPtr EventsDropped;
  14. NMonitoring::TDynamicCounters::TCounterPtr EventsAdded;
  15. NMonitoring::TDynamicCounters::TCounterPtr Iterations;
  16. NMonitoring::TDynamicCounters::TCounterPtr Sleeps;
  17. NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
  18. TMonCounters(const NMonitoring::TDynamicCounterPtr& counters)
  19. : TimeDelayMs(counters->GetCounter("Scheduler/TimeDelayMs", false))
  20. , QueueSize(counters->GetCounter("Scheduler/QueueSize", false))
  21. , EventsSent(counters->GetCounter("Scheduler/EventsSent", true))
  22. , EventsDropped(counters->GetCounter("Scheduler/EventsDropped", true))
  23. , EventsAdded(counters->GetCounter("Scheduler/EventsAdded", true))
  24. , Iterations(counters->GetCounter("Scheduler/Iterations", true))
  25. , Sleeps(counters->GetCounter("Scheduler/Sleeps", true))
  26. , ElapsedMicrosec(counters->GetCounter("Scheduler/ElapsedMicrosec", true))
  27. { }
  28. };
  29. TBasicSchedulerThread::TBasicSchedulerThread(const TSchedulerConfig& config)
  30. : Config(config)
  31. , MonCounters(Config.MonCounters ? new TMonCounters(Config.MonCounters) : nullptr)
  32. , ActorSystem(nullptr)
  33. , CurrentTimestamp(nullptr)
  34. , CurrentMonotonic(nullptr)
  35. , TotalReaders(0)
  36. , StopFlag(false)
  37. , ScheduleMap(3600)
  38. {
  39. Y_VERIFY(!Config.UseSchedulerActor, "Cannot create scheduler thread because Config.UseSchedulerActor# true");
  40. }
  41. TBasicSchedulerThread::~TBasicSchedulerThread() {
  42. Y_VERIFY(!MainCycle);
  43. }
  44. void TBasicSchedulerThread::CycleFunc() {
  45. #ifdef BALLOC
  46. ThreadDisableBalloc();
  47. #endif
  48. ::SetCurrentThreadName("Scheduler");
  49. ui64 currentMonotonic = RelaxedLoad(CurrentMonotonic);
  50. ui64 throttledMonotonic = currentMonotonic;
  51. ui64 activeTick = AlignUp<ui64>(throttledMonotonic, IntrasecondThreshold);
  52. TAutoPtr<TMomentMap> activeSec;
  53. NHPTimer::STime hpprev = GetCycleCountFast();
  54. ui64 nextTimestamp = TInstant::Now().MicroSeconds();
  55. ui64 nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
  56. while (!AtomicLoad(&StopFlag)) {
  57. {
  58. const ui64 delta = nextMonotonic - throttledMonotonic;
  59. const ui64 elapsedDelta = nextMonotonic - currentMonotonic;
  60. const ui64 threshold = Max(Min(Config.ProgressThreshold, 2 * elapsedDelta), ui64(1));
  61. throttledMonotonic = (delta > threshold) ? throttledMonotonic + threshold : nextMonotonic;
  62. if (MonCounters) {
  63. *MonCounters->TimeDelayMs = (nextMonotonic - throttledMonotonic) / 1000;
  64. }
  65. }
  66. AtomicStore(CurrentTimestamp, nextTimestamp);
  67. AtomicStore(CurrentMonotonic, nextMonotonic);
  68. currentMonotonic = nextMonotonic;
  69. if (MonCounters) {
  70. ++*MonCounters->Iterations;
  71. }
  72. bool somethingDone = false;
  73. // first step - send everything triggered on schedule
  74. ui64 eventsSent = 0;
  75. ui64 eventsDropped = 0;
  76. for (;;) {
  77. while (!!activeSec && !activeSec->empty()) {
  78. TMomentMap::iterator it = activeSec->begin();
  79. if (it->first <= throttledMonotonic) {
  80. if (NSchedulerQueue::TQueueType* q = it->second.Get()) {
  81. while (NSchedulerQueue::TEntry* x = q->Reader.Pop()) {
  82. somethingDone = true;
  83. Y_VERIFY_DEBUG(x->InstantMicroseconds <= activeTick);
  84. IEventHandle* ev = x->Ev;
  85. ISchedulerCookie* cookie = x->Cookie;
  86. // TODO: lazy send with backoff queue to not hang over contended mailboxes
  87. if (cookie) {
  88. if (cookie->Detach()) {
  89. ActorSystem->Send(ev);
  90. ++eventsSent;
  91. } else {
  92. delete ev;
  93. ++eventsDropped;
  94. }
  95. } else {
  96. ActorSystem->Send(ev);
  97. ++eventsSent;
  98. }
  99. }
  100. }
  101. activeSec->erase(it);
  102. } else
  103. break;
  104. }
  105. if (activeTick <= throttledMonotonic) {
  106. Y_VERIFY_DEBUG(!activeSec || activeSec->empty());
  107. activeSec.Destroy();
  108. activeTick += IntrasecondThreshold;
  109. TScheduleMap::iterator it = ScheduleMap.find(activeTick);
  110. if (it != ScheduleMap.end()) {
  111. activeSec = it->second;
  112. ScheduleMap.erase(it);
  113. }
  114. continue;
  115. }
  116. // ok, if we are here - then nothing is ready, so send step complete
  117. break;
  118. }
  119. // second step - collect everything from queues
  120. ui64 eventsAdded = 0;
  121. for (ui32 i = 0; i != TotalReaders; ++i) {
  122. while (NSchedulerQueue::TEntry* x = Readers[i]->Pop()) {
  123. somethingDone = true;
  124. const ui64 instant = AlignUp<ui64>(x->InstantMicroseconds, Config.ResolutionMicroseconds);
  125. IEventHandle* const ev = x->Ev;
  126. ISchedulerCookie* const cookie = x->Cookie;
  127. // check is cookie still valid? looks like it will hurt performance w/o sagnificant memory save
  128. if (instant <= activeTick) {
  129. if (!activeSec)
  130. activeSec.Reset(new TMomentMap());
  131. TAutoPtr<NSchedulerQueue::TQueueType>& queue = (*activeSec)[instant];
  132. if (!queue)
  133. queue.Reset(new NSchedulerQueue::TQueueType());
  134. queue->Writer.Push(instant, ev, cookie);
  135. } else {
  136. const ui64 intrasecond = AlignUp<ui64>(instant, IntrasecondThreshold);
  137. TAutoPtr<TMomentMap>& msec = ScheduleMap[intrasecond];
  138. if (!msec)
  139. msec.Reset(new TMomentMap());
  140. TAutoPtr<NSchedulerQueue::TQueueType>& queue = (*msec)[instant];
  141. if (!queue)
  142. queue.Reset(new NSchedulerQueue::TQueueType());
  143. queue->Writer.Push(instant, ev, cookie);
  144. }
  145. ++eventsAdded;
  146. }
  147. }
  148. NHPTimer::STime hpnow = GetCycleCountFast();
  149. if (MonCounters) {
  150. *MonCounters->QueueSize -= eventsSent + eventsDropped;
  151. *MonCounters->QueueSize += eventsAdded;
  152. *MonCounters->EventsSent += eventsSent;
  153. *MonCounters->EventsDropped += eventsDropped;
  154. *MonCounters->EventsAdded += eventsAdded;
  155. *MonCounters->ElapsedMicrosec += NHPTimer::GetSeconds(hpnow - hpprev) * 1000000;
  156. }
  157. hpprev = hpnow;
  158. nextTimestamp = TInstant::Now().MicroSeconds();
  159. nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
  160. // ok complete, if nothing left - sleep
  161. if (!somethingDone) {
  162. const ui64 nextInstant = AlignDown<ui64>(throttledMonotonic + Config.ResolutionMicroseconds, Config.ResolutionMicroseconds);
  163. if (nextMonotonic >= nextInstant) // already in next time-slice
  164. continue;
  165. const ui64 delta = nextInstant - nextMonotonic;
  166. if (delta < Config.SpinThreshold) // not so much time left, just spin
  167. continue;
  168. if (MonCounters) {
  169. ++*MonCounters->Sleeps;
  170. }
  171. NanoSleep(delta * 1000); // ok, looks like we should sleep a bit.
  172. // Don't count sleep in elapsed microseconds
  173. hpprev = GetCycleCountFast();
  174. nextTimestamp = TInstant::Now().MicroSeconds();
  175. nextMonotonic = Max(currentMonotonic, GetMonotonicMicroSeconds());
  176. }
  177. }
  178. // ok, die!
  179. }
  180. void TBasicSchedulerThread::Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) {
  181. ActorSystem = actorSystem;
  182. CurrentTimestamp = currentTimestamp;
  183. CurrentMonotonic = currentMonotonic;
  184. *CurrentTimestamp = TInstant::Now().MicroSeconds();
  185. *CurrentMonotonic = GetMonotonicMicroSeconds();
  186. }
  187. void TBasicSchedulerThread::PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) {
  188. Y_VERIFY(scheduleReadersCount > 0);
  189. TotalReaders = scheduleReadersCount;
  190. Readers.Reset(new NSchedulerQueue::TReader*[scheduleReadersCount]);
  191. Copy(readers, readers + scheduleReadersCount, Readers.Get());
  192. }
  193. void TBasicSchedulerThread::PrepareStart() {
  194. // Called after actor system is initialized, but before executor threads
  195. // are started, giving us a chance to update current timestamp with a
  196. // more recent value, taking initialization time into account. This is
  197. // safe to do, since scheduler thread is not started yet, so no other
  198. // threads are updating time concurrently.
  199. AtomicStore(CurrentTimestamp, TInstant::Now().MicroSeconds());
  200. AtomicStore(CurrentMonotonic, Max(RelaxedLoad(CurrentMonotonic), GetMonotonicMicroSeconds()));
  201. }
  202. void TBasicSchedulerThread::Start() {
  203. MainCycle.Reset(new NThreading::TLegacyFuture<void, false>(std::bind(&TBasicSchedulerThread::CycleFunc, this)));
  204. }
  205. void TBasicSchedulerThread::PrepareStop() {
  206. AtomicStore(&StopFlag, true);
  207. }
  208. void TBasicSchedulerThread::Stop() {
  209. MainCycle->Get();
  210. MainCycle.Destroy();
  211. }
  212. }
  213. #ifdef __linux__
  214. namespace NActors {
  215. ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) {
  216. if (config.UseSchedulerActor) {
  217. return new TMockSchedulerThread();
  218. } else {
  219. return new TBasicSchedulerThread(config);
  220. }
  221. }
  222. }
  223. #else // __linux__
  224. namespace NActors {
  225. ISchedulerThread* CreateSchedulerThread(const TSchedulerConfig& config) {
  226. return new TBasicSchedulerThread(config);
  227. }
  228. }
  229. #endif // __linux__