io_dispatcher.cpp 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. #include "io_dispatcher.h"
  2. #include "actor_bootstrapped.h"
  3. #include "hfunc.h"
  4. #include <util/system/mutex.h>
  5. #include <util/system/condvar.h>
  6. #include <util/system/thread.h>
  7. #include <map>
  8. #include <list>
  9. namespace NActors {
  10. class TIoDispatcherActor : public TActorBootstrapped<TIoDispatcherActor> {
  11. enum {
  12. EvNotifyThreadStopped = EventSpaceBegin(TEvents::ES_PRIVATE),
  13. };
  14. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  15. // IO task queue
  16. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  17. class TTask {
  18. TInstant Timestamp;
  19. std::function<void()> Callback;
  20. public:
  21. TTask(TInstant timestamp, TEvInvokeQuery *ev)
  22. : Timestamp(timestamp)
  23. , Callback(std::move(ev->Callback))
  24. {}
  25. void Execute() {
  26. Callback();
  27. }
  28. TInstant GetTimestamp() const {
  29. return Timestamp;
  30. }
  31. };
  32. class TTaskQueue {
  33. std::list<TTask> Tasks;
  34. TMutex Mutex;
  35. TCondVar CondVar;
  36. size_t NumThreadsToStop = 0;
  37. public:
  38. void Enqueue(TInstant timestamp, TEvInvokeQuery *ev) {
  39. std::list<TTask> list;
  40. list.emplace_back(timestamp, ev);
  41. with_lock (Mutex) {
  42. Tasks.splice(Tasks.end(), std::move(list));
  43. }
  44. CondVar.Signal();
  45. }
  46. bool Dequeue(std::list<TTask>& list, bool *sendNotify) {
  47. with_lock (Mutex) {
  48. CondVar.Wait(Mutex, [&] { return NumThreadsToStop || !Tasks.empty(); });
  49. if (NumThreadsToStop) {
  50. *sendNotify = NumThreadsToStop != Max<size_t>();
  51. if (*sendNotify) {
  52. --NumThreadsToStop;
  53. }
  54. return false;
  55. } else {
  56. list.splice(list.end(), Tasks, Tasks.begin());
  57. return true;
  58. }
  59. }
  60. }
  61. void Stop() {
  62. with_lock (Mutex) {
  63. NumThreadsToStop = Max<size_t>();
  64. }
  65. CondVar.BroadCast();
  66. }
  67. void StopOne() {
  68. with_lock (Mutex) {
  69. ++NumThreadsToStop;
  70. Y_VERIFY(NumThreadsToStop);
  71. }
  72. CondVar.Signal();
  73. }
  74. std::optional<TInstant> GetEarliestTaskTimestamp() {
  75. with_lock (Mutex) {
  76. return Tasks.empty() ? std::nullopt : std::make_optional(Tasks.front().GetTimestamp());
  77. }
  78. }
  79. };
  80. TTaskQueue TaskQueue;
  81. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  82. // IO dispatcher threads
  83. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  84. class TThread : public ISimpleThread {
  85. TIoDispatcherActor& Actor;
  86. TActorSystem* const ActorSystem;
  87. public:
  88. TThread(TIoDispatcherActor& actor, TActorSystem *actorSystem)
  89. : Actor(actor)
  90. , ActorSystem(actorSystem)
  91. {
  92. Start();
  93. }
  94. void *ThreadProc() override {
  95. SetCurrentThreadName("kikimr IO");
  96. for (;;) {
  97. std::list<TTask> tasks;
  98. bool sendNotify;
  99. if (!Actor.TaskQueue.Dequeue(tasks, &sendNotify)) {
  100. if (sendNotify) {
  101. ActorSystem->Send(new IEventHandle(EvNotifyThreadStopped, 0, Actor.SelfId(), TActorId(),
  102. nullptr, TThread::CurrentThreadId()));
  103. }
  104. break;
  105. }
  106. for (TTask& task : tasks) {
  107. task.Execute();
  108. ++*Actor.TasksCompleted;
  109. }
  110. }
  111. return nullptr;
  112. }
  113. };
  114. static constexpr size_t MinThreadCount = 4;
  115. static constexpr size_t MaxThreadCount = 64;
  116. std::map<TThread::TId, std::unique_ptr<TThread>> Threads;
  117. size_t NumRunningThreads = 0;
  118. void StartThread() {
  119. auto thread = std::make_unique<TThread>(*this, TlsActivationContext->ExecutorThread.ActorSystem);
  120. const TThread::TId id = thread->Id();
  121. Threads.emplace(id, std::move(thread));
  122. *NumThreads = ++NumRunningThreads;
  123. ++*ThreadsStarted;
  124. }
  125. void StopThread() {
  126. Y_VERIFY(Threads.size());
  127. TaskQueue.StopOne();
  128. *NumThreads = --NumRunningThreads;
  129. ++*ThreadsStopped;
  130. }
  131. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  132. // Counters
  133. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  134. NMonitoring::TDynamicCounters::TCounterPtr NumThreads;
  135. NMonitoring::TDynamicCounters::TCounterPtr TasksAdded;
  136. NMonitoring::TDynamicCounters::TCounterPtr TasksCompleted;
  137. NMonitoring::TDynamicCounters::TCounterPtr ThreadsStarted;
  138. NMonitoring::TDynamicCounters::TCounterPtr ThreadsStopped;
  139. public:
  140. TIoDispatcherActor(const NMonitoring::TDynamicCounterPtr& counters)
  141. : NumThreads(counters->GetCounter("NumThreads"))
  142. , TasksAdded(counters->GetCounter("TasksAdded", true))
  143. , TasksCompleted(counters->GetCounter("TasksCompleted", true))
  144. , ThreadsStarted(counters->GetCounter("ThreadsStarted", true))
  145. , ThreadsStopped(counters->GetCounter("ThreadsStopped", true))
  146. {}
  147. ~TIoDispatcherActor() override {
  148. TaskQueue.Stop();
  149. }
  150. void Bootstrap() {
  151. while (NumRunningThreads < MinThreadCount) {
  152. StartThread();
  153. }
  154. HandleWakeup();
  155. Become(&TThis::StateFunc);
  156. }
  157. void HandleThreadStopped(TAutoPtr<IEventHandle> ev) {
  158. auto it = Threads.find(ev->Cookie);
  159. Y_VERIFY(it != Threads.end());
  160. it->second->Join();
  161. Threads.erase(it);
  162. }
  163. void Handle(TEvInvokeQuery::TPtr ev) {
  164. ++*TasksAdded;
  165. TaskQueue.Enqueue(TActivationContext::Now(), ev->Get());
  166. }
  167. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  168. // Thread usage counter logic
  169. ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
  170. std::optional<TInstant> IdleTimestamp;
  171. static constexpr TDuration ThreadStartTime = TDuration::MilliSeconds(500);
  172. static constexpr TDuration ThreadStopTime = TDuration::MilliSeconds(500);
  173. void HandleWakeup() {
  174. const TInstant now = TActivationContext::Now();
  175. std::optional<TInstant> earliest = TaskQueue.GetEarliestTaskTimestamp();
  176. if (earliest) {
  177. if (now >= *earliest + ThreadStartTime && NumRunningThreads < MaxThreadCount) {
  178. StartThread();
  179. }
  180. IdleTimestamp.reset();
  181. } else if (!IdleTimestamp) {
  182. IdleTimestamp = now;
  183. } else if (now >= *IdleTimestamp + ThreadStopTime) {
  184. IdleTimestamp.reset();
  185. if (NumRunningThreads > MinThreadCount) {
  186. StopThread();
  187. }
  188. }
  189. Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup);
  190. }
  191. STRICT_STFUNC(StateFunc, {
  192. fFunc(EvNotifyThreadStopped, HandleThreadStopped);
  193. hFunc(TEvInvokeQuery, Handle);
  194. cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
  195. cFunc(TEvents::TSystem::Poison, PassAway);
  196. })
  197. };
  198. IActor *CreateIoDispatcherActor(const NMonitoring::TDynamicCounterPtr& counters) {
  199. return new TIoDispatcherActor(counters);
  200. }
  201. } // NActors