pool_stats_collector.h 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. #pragma once
  2. #include <library/cpp/actors/core/actor_bootstrapped.h>
  3. #include <library/cpp/actors/core/actorsystem.h>
  4. #include <library/cpp/actors/core/executor_thread.h>
  5. #include <library/cpp/actors/core/hfunc.h>
  6. #include <library/cpp/monlib/dynamic_counters/counters.h>
  7. #include <util/generic/vector.h>
  8. #include <util/generic/xrange.h>
  9. #include <util/string/printf.h>
  10. namespace NActors {
  11. // Periodically collects stats from executor threads and exposes them as mon counters
  12. class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
  13. private:
  14. struct THistogramCounters {
  15. void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) {
  16. for (size_t i = 0; (1ull<<i) <= maxVal; ++i) {
  17. TString bucketName = ToString(1ull<<i) + " " + unit;
  18. Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true));
  19. }
  20. Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true));
  21. }
  22. void Set(const TLogHistogram& data) {
  23. ui32 i = 0;
  24. for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
  25. *Buckets[i] = data.Buckets[i];
  26. ui64 last = 0;
  27. for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
  28. last += data.Buckets[i];
  29. *Buckets.back() = last;
  30. }
  31. void Set(const TLogHistogram& data, double factor) {
  32. ui32 i = 0;
  33. for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
  34. *Buckets[i] = data.Buckets[i]*factor;
  35. ui64 last = 0;
  36. for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
  37. last += data.Buckets[i];
  38. *Buckets.back() = last*factor;
  39. }
  40. private:
  41. TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets;
  42. };
  43. struct TActivityStats {
  44. void Init(NMonitoring::TDynamicCounterPtr group) {
  45. Group = group;
  46. ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
  47. ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
  48. ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
  49. ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount());
  50. StuckActorsByActivityBuckets.resize(GetActivityTypeCount());
  51. UsageByActivityBuckets.resize(GetActivityTypeCount());
  52. }
  53. void Set(const TExecutorThreadStats& stats) {
  54. for (ui32 i : xrange(stats.MaxActivityType())) {
  55. Y_VERIFY(i < GetActivityTypeCount());
  56. ui64 ticks = stats.ElapsedTicksByActivity[i];
  57. ui64 events = stats.ReceivedEventsByActivity[i];
  58. ui64 actors = stats.ActorsAliveByActivity[i];
  59. ui64 scheduled = stats.ScheduledEventsByActivity[i];
  60. ui64 stuck = stats.StuckActorsByActivity[i];
  61. if (!ActorsAliveByActivityBuckets[i]) {
  62. if (ticks || events || actors || scheduled) {
  63. InitCountersForActivity(i);
  64. } else {
  65. continue;
  66. }
  67. }
  68. *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
  69. *ReceivedEventsByActivityBuckets[i] = events;
  70. *ActorsAliveByActivityBuckets[i] = actors;
  71. *ScheduledEventsByActivityBuckets[i] = scheduled;
  72. *StuckActorsByActivityBuckets[i] = stuck;
  73. for (ui32 j = 0; j < 10; ++j) {
  74. *UsageByActivityBuckets[i][j] = stats.UsageByActivity[i][j];
  75. }
  76. }
  77. }
  78. private:
  79. void InitCountersForActivity(ui32 activityType) {
  80. Y_VERIFY(activityType < GetActivityTypeCount());
  81. auto bucketName = TString(GetActivityTypeName(activityType));
  82. ElapsedMicrosecByActivityBuckets[activityType] =
  83. Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
  84. ReceivedEventsByActivityBuckets[activityType] =
  85. Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true);
  86. ActorsAliveByActivityBuckets[activityType] =
  87. Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false);
  88. ScheduledEventsByActivityBuckets[activityType] =
  89. Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true);
  90. StuckActorsByActivityBuckets[activityType] =
  91. Group->GetSubgroup("sensor", "StuckActorsByActivity")->GetNamedCounter("activity", bucketName, false);
  92. for (ui32 i = 0; i < 10; ++i) {
  93. UsageByActivityBuckets[activityType][i] = Group->GetSubgroup("sensor", "UsageByActivity")->GetSubgroup("bin", ToString(i))->GetNamedCounter("activity", bucketName, false);
  94. }
  95. }
  96. private:
  97. NMonitoring::TDynamicCounterPtr Group;
  98. TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
  99. TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
  100. TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;
  101. TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets;
  102. TVector<NMonitoring::TDynamicCounters::TCounterPtr> StuckActorsByActivityBuckets;
  103. TVector<std::array<NMonitoring::TDynamicCounters::TCounterPtr, 10>> UsageByActivityBuckets;
  104. };
  105. struct TExecutorPoolCounters {
  106. TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup;
  107. NMonitoring::TDynamicCounters::TCounterPtr SentEvents;
  108. NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents;
  109. NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents;
  110. NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents;
  111. NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors;
  112. NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation;
  113. NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec;
  114. NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
  115. NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec;
  116. NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations;
  117. NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive;
  118. NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes;
  119. NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption;
  120. NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime;
  121. NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
  122. NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount;
  123. NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount;
  124. NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCount;
  125. NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCount;
  126. NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCount;
  127. NMonitoring::TDynamicCounters::TCounterPtr IsNeedy;
  128. NMonitoring::TDynamicCounters::TCounterPtr IsStarved;
  129. NMonitoring::TDynamicCounters::TCounterPtr IsHoggish;
  130. NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState;
  131. NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByExchange;
  132. NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState;
  133. NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState;
  134. NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange;
  135. NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions;
  136. NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu;
  137. NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu;
  138. NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu;
  139. NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu;
  140. THistogramCounters LegacyActivationTimeHistogram;
  141. NMonitoring::THistogramPtr ActivationTimeHistogram;
  142. THistogramCounters LegacyEventDeliveryTimeHistogram;
  143. NMonitoring::THistogramPtr EventDeliveryTimeHistogram;
  144. THistogramCounters LegacyEventProcessingCountHistogram;
  145. NMonitoring::THistogramPtr EventProcessingCountHistogram;
  146. THistogramCounters LegacyEventProcessingTimeHistogram;
  147. NMonitoring::THistogramPtr EventProcessingTimeHistogram;
  148. TActivityStats ActivityStats;
  149. NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime;
  150. double Usage = 0;
  151. double LastElapsedSeconds = 0;
  152. THPTimer UsageTimer;
  153. TString Name;
  154. ui32 Threads;
  155. void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {
  156. LastElapsedSeconds = 0;
  157. Usage = 0;
  158. UsageTimer.Reset();
  159. Name = poolName;
  160. Threads = threads;
  161. PoolGroup = group->GetSubgroup("execpool", poolName);
  162. SentEvents = PoolGroup->GetCounter("SentEvents", true);
  163. ReceivedEvents = PoolGroup->GetCounter("ReceivedEvents", true);
  164. PreemptedEvents = PoolGroup->GetCounter("PreemptedEvents", true);
  165. NonDeliveredEvents = PoolGroup->GetCounter("NonDeliveredEvents", true);
  166. DestroyedActors = PoolGroup->GetCounter("DestroyedActors", true);
  167. CpuMicrosec = PoolGroup->GetCounter("CpuMicrosec", true);
  168. ElapsedMicrosec = PoolGroup->GetCounter("ElapsedMicrosec", true);
  169. ParkedMicrosec = PoolGroup->GetCounter("ParkedMicrosec", true);
  170. EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true);
  171. ActorRegistrations = PoolGroup->GetCounter("ActorRegistrations", true);
  172. ActorsAlive = PoolGroup->GetCounter("ActorsAlive", false);
  173. AllocatedMailboxes = PoolGroup->GetCounter("AllocatedMailboxes", false);
  174. MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);
  175. MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);
  176. MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
  177. WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true);
  178. CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false);
  179. PotentialMaxThreadCount = PoolGroup->GetCounter("PotentialMaxThreadCount", false);
  180. DefaultThreadCount = PoolGroup->GetCounter("DefaultThreadCount", false);
  181. MaxThreadCount = PoolGroup->GetCounter("MaxThreadCount", false);
  182. IsNeedy = PoolGroup->GetCounter("IsNeedy", false);
  183. IsStarved = PoolGroup->GetCounter("IsStarved", false);
  184. IsHoggish = PoolGroup->GetCounter("IsHoggish", false);
  185. IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true);
  186. IncreasingThreadsByExchange = PoolGroup->GetCounter("IncreasingThreadsByExchange", true);
  187. DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true);
  188. DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true);
  189. DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true);
  190. NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true);
  191. MaxConsumedCpu = PoolGroup->GetCounter("MaxConsumedCpuByPool", false);
  192. MinConsumedCpu = PoolGroup->GetCounter("MinConsumedCpuByPool", false);
  193. MaxBookedCpu = PoolGroup->GetCounter("MaxBookedCpuByPool", false);
  194. MinBookedCpu = PoolGroup->GetCounter("MinBookedCpuByPool", false);
  195. LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
  196. ActivationTimeHistogram = PoolGroup->GetHistogram(
  197. "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
  198. LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000);
  199. EventDeliveryTimeHistogram = PoolGroup->GetHistogram(
  200. "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
  201. LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000);
  202. EventProcessingCountHistogram = PoolGroup->GetHistogram(
  203. "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1));
  204. LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000);
  205. EventProcessingTimeHistogram = PoolGroup->GetHistogram(
  206. "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
  207. ActivityStats.Init(PoolGroup.Get());
  208. MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true);
  209. }
  210. void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats, ui32 numThreads) {
  211. #ifdef ACTORSLIB_COLLECT_EXEC_STATS
  212. *SentEvents = stats.SentEvents;
  213. *ReceivedEvents = stats.ReceivedEvents;
  214. *PreemptedEvents = stats.PreemptedEvents;
  215. *NonDeliveredEvents = stats.NonDeliveredEvents;
  216. *DestroyedActors = stats.PoolDestroyedActors;
  217. *EmptyMailboxActivation = stats.EmptyMailboxActivation;
  218. *CpuMicrosec = stats.CpuUs;
  219. *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000;
  220. *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;
  221. *ActorRegistrations = stats.PoolActorRegistrations;
  222. *ActorsAlive = stats.PoolActorRegistrations - stats.PoolDestroyedActors;
  223. *AllocatedMailboxes = stats.PoolAllocatedMailboxes;
  224. *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;
  225. *MailboxPushedOutByTime = stats.MailboxPushedOutByTime;
  226. *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
  227. *WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount;
  228. *CurrentThreadCount = poolStats.CurrentThreadCount;
  229. *PotentialMaxThreadCount = poolStats.PotentialMaxThreadCount;
  230. *DefaultThreadCount = poolStats.DefaultThreadCount;
  231. *MaxThreadCount = poolStats.MaxThreadCount;
  232. *IsNeedy = poolStats.IsNeedy;
  233. *IsStarved = poolStats.IsStarved;
  234. *IsHoggish = poolStats.IsHoggish;
  235. *IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState;
  236. *IncreasingThreadsByExchange = poolStats.IncreasingThreadsByExchange;
  237. *DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState;
  238. *DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState;
  239. *DecreasingThreadsByExchange = poolStats.DecreasingThreadsByExchange;
  240. *NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions;
  241. LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
  242. ActivationTimeHistogram->Reset();
  243. ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram);
  244. LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram);
  245. EventDeliveryTimeHistogram->Reset();
  246. EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram);
  247. LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram);
  248. EventProcessingCountHistogram->Reset();
  249. EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram);
  250. double toMicrosec = 1000000 / NHPTimer::GetClockRate();
  251. LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec);
  252. EventProcessingTimeHistogram->Reset();
  253. for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) {
  254. EventProcessingTimeHistogram->Collect(
  255. stats.EventProcessingTimeHistogram.UpperBound(i),
  256. stats.EventProcessingTimeHistogram.Value(i) * toMicrosec);
  257. }
  258. ActivityStats.Set(stats);
  259. *MaxUtilizationTime = poolStats.MaxUtilizationTime;
  260. double seconds = UsageTimer.PassedReset();
  261. // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916
  262. const double elapsed = NHPTimer::GetSeconds(stats.ElapsedTicks);
  263. const double currentUsage = numThreads > 0 ? ((elapsed - LastElapsedSeconds) / seconds / numThreads) : 0;
  264. LastElapsedSeconds = elapsed;
  265. // update usage factor according to smoothness
  266. const double smoothness = 0.5;
  267. Usage = currentUsage * smoothness + Usage * (1.0 - smoothness);
  268. #else
  269. Y_UNUSED(poolStats);
  270. Y_UNUSED(stats);
  271. Y_UNUSED(numThreads);
  272. #endif
  273. }
  274. };
  275. struct TActorSystemCounters {
  276. TIntrusivePtr<NMonitoring::TDynamicCounters> Group;
  277. NMonitoring::TDynamicCounters::TCounterPtr MaxConsumedCpu;
  278. NMonitoring::TDynamicCounters::TCounterPtr MinConsumedCpu;
  279. NMonitoring::TDynamicCounters::TCounterPtr MaxBookedCpu;
  280. NMonitoring::TDynamicCounters::TCounterPtr MinBookedCpu;
  281. void Init(NMonitoring::TDynamicCounters* group) {
  282. Group = group;
  283. MaxConsumedCpu = Group->GetCounter("MaxConsumedCpu", false);
  284. MinConsumedCpu = Group->GetCounter("MinConsumedCpu", false);
  285. MaxBookedCpu = Group->GetCounter("MaxBookedCpu", false);
  286. MinBookedCpu = Group->GetCounter("MinBookedCpu", false);
  287. }
  288. void Set(const THarmonizerStats& harmonizerStats) {
  289. #ifdef ACTORSLIB_COLLECT_EXEC_STATS
  290. *MaxConsumedCpu = harmonizerStats.MaxConsumedCpu;
  291. *MinConsumedCpu = harmonizerStats.MinConsumedCpu;
  292. *MaxBookedCpu = harmonizerStats.MaxBookedCpu;
  293. *MinBookedCpu = harmonizerStats.MinBookedCpu;
  294. #else
  295. Y_UNUSED(poolStats);
  296. Y_UNUSED(stats);
  297. Y_UNUSED(numThreads);
  298. #endif
  299. }
  300. };
  301. public:
  302. static constexpr IActor::EActivityType ActorActivityType() {
  303. return IActor::EActivityType::ACTORLIB_STATS;
  304. }
  305. TStatsCollectingActor(
  306. ui32 intervalSec,
  307. const TActorSystemSetup& setup,
  308. NMonitoring::TDynamicCounterPtr counters)
  309. : IntervalSec(intervalSec)
  310. , Counters(counters)
  311. {
  312. PoolCounters.resize(setup.GetExecutorsCount());
  313. for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
  314. PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId));
  315. }
  316. ActorSystemCounters.Init(Counters.Get());
  317. }
  318. void Bootstrap(const TActorContext& ctx) {
  319. ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
  320. Become(&TThis::StateWork);
  321. }
  322. STFUNC(StateWork) {
  323. switch (ev->GetTypeRewrite()) {
  324. CFunc(TEvents::TSystem::Wakeup, Wakeup);
  325. }
  326. }
  327. private:
  328. virtual void OnWakeup(const TActorContext &ctx) {
  329. Y_UNUSED(ctx);
  330. }
  331. void Wakeup(const TActorContext &ctx) {
  332. for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
  333. TVector<TExecutorThreadStats> stats;
  334. TExecutorPoolStats poolStats;
  335. ctx.ExecutorThread.ActorSystem->GetPoolStats(poolId, poolStats, stats);
  336. SetAggregatedCounters(PoolCounters[poolId], poolStats, stats);
  337. }
  338. THarmonizerStats harmonizerStats = ctx.ExecutorThread.ActorSystem->GetHarmonizerStats();
  339. ActorSystemCounters.Set(harmonizerStats);
  340. OnWakeup(ctx);
  341. ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
  342. }
  343. void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats) {
  344. // Sum all per-thread counters into the 0th element
  345. for (ui32 idx = 1; idx < stats.size(); ++idx) {
  346. stats[0].Aggregate(stats[idx]);
  347. }
  348. if (stats.size()) {
  349. poolCounters.Set(poolStats, stats[0], stats.size() - 1);
  350. }
  351. }
  352. protected:
  353. const ui32 IntervalSec;
  354. NMonitoring::TDynamicCounterPtr Counters;
  355. TVector<TExecutorPoolCounters> PoolCounters;
  356. TActorSystemCounters ActorSystemCounters;
  357. };
  358. } // NActors