Browse Source

Move realization of pool stats collector to cpp files (#15510)

kruall 2 days ago
parent
commit
a2426cce07

+ 3 - 2
ydb/core/base/pool_stats_collector.cpp

@@ -8,6 +8,7 @@
 
 #include <ydb/library/actors/core/actor_bootstrapped.h>
 #include <ydb/library/actors/helpers/pool_stats_collector.h>
+#include <ydb/library/actors/helpers/collector_counters.h>
 
 #include <ydb/core/graph/api/service.h>
 #include <ydb/core/graph/api/events.h>
@@ -23,7 +24,7 @@ public:
         ::NMonitoring::TDynamicCounterPtr counters)
         : NActors::TStatsCollectingActor(intervalSec, setup, GetServiceCounters(counters, "utils"))
     {
-        MiniKQLPoolStats.Init(Counters.Get());
+        MiniKQLPoolStats.Init(counters.Get());
     }
 
 private:
@@ -57,7 +58,7 @@ private:
         auto systemUpdate = std::make_unique<NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate>();
         ui32 coresTotal = 0;
         double coresUsed = 0;
-        for (const auto& pool : PoolCounters) {
+        for (const auto& pool : GetPoolCounters()) {
             auto& pb = *systemUpdate->Record.AddPoolStats();
             pb.SetName(pool.Name);
             pb.SetUsage(pool.Usage);

+ 315 - 0
ydb/library/actors/helpers/collector_counters.cpp

@@ -0,0 +1,315 @@
+#include "collector_counters.h"
+
+namespace NActors {
+
+// THistogramCounters
+
+void THistogramCounters::Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) {
+    for (size_t i = 0; (1ull<<i) <= maxVal; ++i) {
+        TString bucketName = ToString(1ull<<i) + " " + unit;
+        Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true));
+    }
+    Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true));
+}
+
+void THistogramCounters::Set(const TLogHistogram& data) {
+    ui32 i = 0;
+    for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
+        *Buckets[i] = data.Buckets[i];
+    ui64 last = 0;
+    for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
+        last += data.Buckets[i];
+    *Buckets.back() = last;
+}
+
+void THistogramCounters::Set(const TLogHistogram& data, double factor) {
+    ui32 i = 0;
+    for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
+        *Buckets[i] = data.Buckets[i]*factor;
+    ui64 last = 0;
+    for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
+        last += data.Buckets[i];
+    *Buckets.back() = last*factor;
+}
+
+// TActivityStats
+
+void TActivityStats::Init(NMonitoring::TDynamicCounterPtr group) {
+    Group = group;
+
+    CurrentActivationTimeByActivity.resize(GetActivityTypeCount());
+    ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
+    ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
+    ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
+    ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount());
+    StuckActorsByActivityBuckets.resize(GetActivityTypeCount());
+    UsageByActivityBuckets.resize(GetActivityTypeCount());
+}
+
+void TActivityStats::Set(const TExecutorThreadStats& stats) {
+    for (ui32 i : xrange(stats.MaxActivityType())) {
+        Y_ABORT_UNLESS(i < GetActivityTypeCount());
+        ui64 ticks = stats.ElapsedTicksByActivity[i];
+        ui64 events = stats.ReceivedEventsByActivity[i];
+        ui64 actors = stats.ActorsAliveByActivity[i];
+        ui64 scheduled = stats.ScheduledEventsByActivity[i];
+        ui64 stuck = stats.StuckActorsByActivity[i];
+
+        if (!ActorsAliveByActivityBuckets[i]) {
+            if (ticks || events || actors || scheduled) {
+                InitCountersForActivity(i);
+            } else {
+                continue;
+            }
+        }
+
+        *CurrentActivationTimeByActivity[i] = 0;
+        *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
+        *ReceivedEventsByActivityBuckets[i] = events;
+        *ActorsAliveByActivityBuckets[i] = actors;
+        *ScheduledEventsByActivityBuckets[i] = scheduled;
+        *StuckActorsByActivityBuckets[i] = stuck;
+
+        for (ui32 j = 0; j < 10; ++j) {
+            *UsageByActivityBuckets[i][j] = stats.UsageByActivity[i][j];
+        }
+    }
+
+    auto setActivationTime = [&](TActivationTime activation) {
+        if (!ActorsAliveByActivityBuckets[activation.LastActivity]) {
+            InitCountersForActivity(activation.LastActivity);
+        }
+        *CurrentActivationTimeByActivity[activation.LastActivity] = activation.TimeUs;
+    };
+    if (stats.CurrentActivationTime.TimeUs) {
+        setActivationTime(stats.CurrentActivationTime);
+    }
+    std::vector<TActivationTime> activationTimes = stats.AggregatedCurrentActivationTime;
+    Sort(activationTimes.begin(), activationTimes.end(), [](auto &left, auto &right) {
+        return left.LastActivity < right.LastActivity ||
+            left.LastActivity == right.LastActivity && left.TimeUs > right.TimeUs;
+    });
+    ui32 prevActivity = Max<ui32>();
+    for (auto &activationTime : activationTimes) {
+        if (activationTime.LastActivity == prevActivity) {
+            continue;
+        }
+        setActivationTime(activationTime);
+        prevActivity = activationTime.LastActivity;
+    }
+}
+
+void TActivityStats::InitCountersForActivity(ui32 activityType) {
+    Y_ABORT_UNLESS(activityType < GetActivityTypeCount());
+
+    auto bucketName = TString(GetActivityTypeName(activityType));
+
+    CurrentActivationTimeByActivity[activityType] =
+        Group->GetSubgroup("sensor", "CurrentActivationTimeUsByActivity")->GetNamedCounter("activity", bucketName, false);
+    ElapsedMicrosecByActivityBuckets[activityType] =
+        Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
+    ReceivedEventsByActivityBuckets[activityType] =
+        Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true);
+    ActorsAliveByActivityBuckets[activityType] =
+        Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false);
+    ScheduledEventsByActivityBuckets[activityType] =
+        Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true);
+    StuckActorsByActivityBuckets[activityType] =
+        Group->GetSubgroup("sensor", "StuckActorsByActivity")->GetNamedCounter("activity", bucketName, false);
+
+    for (ui32 i = 0; i < 10; ++i) {
+        UsageByActivityBuckets[activityType][i] = Group->GetSubgroup("sensor", "UsageByActivity")->GetSubgroup("bin", ToString(i))->GetNamedCounter("activity", bucketName, false);
+    }
+}
+
+// TExecutorPoolCounters
+
+void TExecutorPoolCounters::Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {
+    LastElapsedSeconds = 0;
+    Usage = 0;
+    UsageTimer.Reset();
+    Name = poolName;
+    Threads = threads;
+    LimitThreads = threads;
+    DefaultThreads = threads;
+
+    PoolGroup = group->GetSubgroup("execpool", poolName);
+
+    SentEvents          = PoolGroup->GetCounter("SentEvents", true);
+    ReceivedEvents      = PoolGroup->GetCounter("ReceivedEvents", true);
+    PreemptedEvents     = PoolGroup->GetCounter("PreemptedEvents", true);
+    NonDeliveredEvents  = PoolGroup->GetCounter("NonDeliveredEvents", true);
+    DestroyedActors     = PoolGroup->GetCounter("DestroyedActors", true);
+    CpuMicrosec         = PoolGroup->GetCounter("CpuMicrosec", true);
+    ElapsedMicrosec     = PoolGroup->GetCounter("ElapsedMicrosec", true);
+    ParkedMicrosec      = PoolGroup->GetCounter("ParkedMicrosec", true);
+    EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true);
+    ActorRegistrations  = PoolGroup->GetCounter("ActorRegistrations", true);
+    ActorsAlive         = PoolGroup->GetCounter("ActorsAlive", false);
+    AllocatedMailboxes  = PoolGroup->GetCounter("AllocatedMailboxes", false);
+    MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);
+    MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);
+    MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
+    WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true);
+    CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false);
+    PotentialMaxThreadCount = PoolGroup->GetCounter("PotentialMaxThreadCount", false);
+    DefaultThreadCount = PoolGroup->GetCounter("DefaultThreadCount", false);
+    MaxThreadCount = PoolGroup->GetCounter("MaxThreadCount", false);
+
+    CurrentThreadCountPercent = PoolGroup->GetCounter("CurrentThreadCountPercent", false);
+    PotentialMaxThreadCountPercent  = PoolGroup->GetCounter("PotentialMaxThreadCountPercent", false);
+    PossibleMaxThreadCountPercent  = PoolGroup->GetCounter("PossibleMaxThreadCountPercent", false);
+    DefaultThreadCountPercent  = PoolGroup->GetCounter("DefaultThreadCountPercent", false);
+    MaxThreadCountPercent  = PoolGroup->GetCounter("MaxThreadCountPercent", false);
+
+    IsNeedy = PoolGroup->GetCounter("IsNeedy", false);
+    IsStarved = PoolGroup->GetCounter("IsStarved", false);
+    IsHoggish = PoolGroup->GetCounter("IsHoggish", false);
+    HasFullOwnSharedThread = PoolGroup->GetCounter("HasFullOwnSharedThread", false);
+    HasHalfOfOwnSharedThread = PoolGroup->GetCounter("HasHalfOfOwnSharedThread", false);
+    HasHalfOfOtherSharedThread = PoolGroup->GetCounter("HasHalfOfOtherSharedThread", false);
+    IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true);
+    IncreasingThreadsByExchange = PoolGroup->GetCounter("IncreasingThreadsByExchange", true);
+    DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true);
+    DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true);
+    DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true);
+    NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true);
+    SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true);
+    SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false);
+
+
+    LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
+    ActivationTimeHistogram = PoolGroup->GetHistogram(
+        "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+    LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000);
+    EventDeliveryTimeHistogram = PoolGroup->GetHistogram(
+        "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+    LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000);
+    EventProcessingCountHistogram = PoolGroup->GetHistogram(
+        "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+    LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000);
+    EventProcessingTimeHistogram = PoolGroup->GetHistogram(
+        "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
+
+    ActivityStats.Init(PoolGroup.Get());
+
+    MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true);
+}
+
+void TExecutorPoolCounters::Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats) {
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+    double elapsedSeconds = ::NHPTimer::GetSeconds(stats.ElapsedTicks);
+    *SentEvents         = stats.SentEvents;
+    *ReceivedEvents     = stats.ReceivedEvents;
+    *PreemptedEvents     = stats.PreemptedEvents;
+    *NonDeliveredEvents = stats.NonDeliveredEvents;
+    *DestroyedActors    = stats.PoolDestroyedActors;
+    *EmptyMailboxActivation = stats.EmptyMailboxActivation;
+    *CpuMicrosec        = stats.CpuUs;
+    *ElapsedMicrosec    = elapsedSeconds*1000000;
+    *ParkedMicrosec     = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;
+    *ActorRegistrations = stats.PoolActorRegistrations;
+    *ActorsAlive        = stats.PoolActorRegistrations - stats.PoolDestroyedActors;
+    *AllocatedMailboxes = stats.PoolAllocatedMailboxes;
+    *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;
+    *MailboxPushedOutByTime = stats.MailboxPushedOutByTime;
+    *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
+    *WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount;
+    *CurrentThreadCount = poolStats.CurrentThreadCount;
+    *PotentialMaxThreadCount = poolStats.PotentialMaxThreadCount;
+    *DefaultThreadCount = poolStats.DefaultThreadCount;
+    *MaxThreadCount = poolStats.MaxThreadCount;
+
+    *CurrentThreadCountPercent = poolStats.CurrentThreadCount * 100;
+    *PotentialMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100;
+    *PossibleMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100;
+    *DefaultThreadCountPercent = poolStats.DefaultThreadCount * 100;
+    *MaxThreadCountPercent = poolStats.MaxThreadCount * 100;
+
+    *IsNeedy = poolStats.IsNeedy;
+    *IsStarved = poolStats.IsStarved;
+    *IsHoggish = poolStats.IsHoggish;
+
+    *HasFullOwnSharedThread = poolStats.HasFullOwnSharedThread;
+    *HasHalfOfOwnSharedThread = poolStats.HasHalfOfOwnSharedThread;
+    *HasHalfOfOtherSharedThread = poolStats.HasHalfOfOtherSharedThread;
+    *IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState;
+    *IncreasingThreadsByExchange = poolStats.IncreasingThreadsByExchange;
+    *DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState;
+    *DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState;
+    *DecreasingThreadsByExchange = poolStats.DecreasingThreadsByExchange;
+    *NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions;
+
+    *SpinningTimeUs = poolStats.SpinningTimeUs;
+    *SpinThresholdUs = poolStats.SpinThresholdUs;
+
+    LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
+    ActivationTimeHistogram->Reset();
+    ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram);
+
+    LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram);
+    EventDeliveryTimeHistogram->Reset();
+    EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram);
+
+    LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram);
+    EventProcessingCountHistogram->Reset();
+    EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram);
+
+    double toMicrosec = 1000000 / NHPTimer::GetClockRate();
+    LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec);
+    EventProcessingTimeHistogram->Reset();
+    for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) {
+        EventProcessingTimeHistogram->Collect(
+            stats.EventProcessingTimeHistogram.UpperBound(i),
+            stats.EventProcessingTimeHistogram.Value(i) * toMicrosec);
+    }
+
+    ActivityStats.Set(stats);
+
+    *MaxUtilizationTime = poolStats.MaxUtilizationTime;
+
+    double seconds = UsageTimer.PassedReset();
+
+    // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916
+    Threads = poolStats.CurrentThreadCount;
+    LimitThreads = poolStats.PotentialMaxThreadCount;
+    const double currentUsage = LimitThreads > 0 ? ((elapsedSeconds - LastElapsedSeconds) / seconds / LimitThreads) : 0;
+
+    // update usage factor according to smoothness
+    const double smoothness = 0.5;
+    Usage = currentUsage * smoothness + Usage * (1.0 - smoothness);
+    LastElapsedSeconds = elapsedSeconds;
+#else
+    Y_UNUSED(stats);
+    Y_UNUSED(poolStats);
+#endif
+}
+
+// TActorSystemCounters
+
+void TActorSystemCounters::Init(NMonitoring::TDynamicCounters* group) {
+    Group = group;
+
+    MaxUsedCpuPercent = Group->GetCounter("MaxUsedCpuPercent", false);
+    MinUsedCpuPercent = Group->GetCounter("MinUsedCpuPercent", false);
+    MaxElapsedCpuPercent = Group->GetCounter("MaxElapsedCpuPercent", false);
+    MinElapsedCpuPercent = Group->GetCounter("MinElapsedCpuPercent", false);
+    AvgAwakeningTimeNs = Group->GetCounter("AvgAwakeningTimeNs", false);
+    AvgWakingUpTimeNs = Group->GetCounter("AvgWakingUpTimeNs", false);
+}
+
+void TActorSystemCounters::Set(const THarmonizerStats& harmonizerStats) {
+#ifdef ACTORSLIB_COLLECT_EXEC_STATS
+    *MaxUsedCpuPercent = harmonizerStats.MaxUsedCpu;
+    *MinUsedCpuPercent = harmonizerStats.MinUsedCpu;
+    *MaxElapsedCpuPercent = harmonizerStats.MaxElapsedCpu;
+    *MinElapsedCpuPercent = harmonizerStats.MinElapsedCpu;
+
+    *AvgAwakeningTimeNs = harmonizerStats.AvgAwakeningTimeUs * 1000;
+    *AvgWakingUpTimeNs = harmonizerStats.AvgWakingUpTimeUs * 1000;
+#else
+    Y_UNUSED(harmonizerStats);
+#endif
+}
+
+} // NActors 

+ 125 - 0
ydb/library/actors/helpers/collector_counters.h

@@ -0,0 +1,125 @@
+#pragma once
+
+#include <ydb/library/actors/core/actorsystem.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <util/generic/vector.h>
+#include <util/generic/xrange.h>
+#include <util/string/printf.h>
+
+namespace NActors {
+
+struct THistogramCounters {
+    void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal);
+    void Set(const TLogHistogram& data);
+    void Set(const TLogHistogram& data, double factor);
+
+private:
+    TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets;
+};
+
+struct TActivityStats {
+    void Init(NMonitoring::TDynamicCounterPtr group);
+    void Set(const TExecutorThreadStats& stats);
+
+private:
+    void InitCountersForActivity(ui32 activityType);
+
+private:
+    NMonitoring::TDynamicCounterPtr Group;
+
+    TVector<NMonitoring::TDynamicCounters::TCounterPtr> CurrentActivationTimeByActivity;
+    TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
+    TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
+    TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;
+    TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets;
+    TVector<NMonitoring::TDynamicCounters::TCounterPtr> StuckActorsByActivityBuckets;
+    TVector<std::array<NMonitoring::TDynamicCounters::TCounterPtr, 10>> UsageByActivityBuckets;
+};
+
+struct TExecutorPoolCounters {
+    TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup;
+
+    NMonitoring::TDynamicCounters::TCounterPtr SentEvents;
+    NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents;
+    NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents;
+    NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents;
+    NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors;
+    NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation;
+    NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec;
+    NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
+    NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec;
+    NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations;
+    NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive;
+    NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes;
+    NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption;
+    NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime;
+    NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
+    NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount;
+    NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount;
+    NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCount;
+    NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCount;
+    NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCount;
+    NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCountPercent;
+    NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCountPercent;
+    NMonitoring::TDynamicCounters::TCounterPtr PossibleMaxThreadCountPercent;
+    NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCountPercent;
+    NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCountPercent;
+    NMonitoring::TDynamicCounters::TCounterPtr IsNeedy;
+    NMonitoring::TDynamicCounters::TCounterPtr IsStarved;
+    NMonitoring::TDynamicCounters::TCounterPtr IsHoggish;
+    NMonitoring::TDynamicCounters::TCounterPtr HasFullOwnSharedThread;
+    NMonitoring::TDynamicCounters::TCounterPtr HasHalfOfOwnSharedThread;
+    NMonitoring::TDynamicCounters::TCounterPtr HasHalfOfOtherSharedThread;
+    NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState;
+    NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByExchange;
+    NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState;
+    NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState;
+    NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange;
+    NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions;
+    NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs;
+    NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs;
+
+
+    THistogramCounters LegacyActivationTimeHistogram;
+    NMonitoring::THistogramPtr ActivationTimeHistogram;
+    THistogramCounters LegacyEventDeliveryTimeHistogram;
+    NMonitoring::THistogramPtr EventDeliveryTimeHistogram;
+    THistogramCounters LegacyEventProcessingCountHistogram;
+    NMonitoring::THistogramPtr EventProcessingCountHistogram;
+    THistogramCounters LegacyEventProcessingTimeHistogram;
+    NMonitoring::THistogramPtr EventProcessingTimeHistogram;
+
+    TActivityStats ActivityStats;
+    NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime;
+
+    double Usage = 0;
+    double LastElapsedSeconds = 0;
+    THPTimer UsageTimer;
+    TString Name;
+    double Threads;
+    double LimitThreads;
+    double DefaultThreads;
+
+    void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads);
+    void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats);
+};
+
+struct TActorSystemCounters {
+    TIntrusivePtr<NMonitoring::TDynamicCounters> Group;
+
+    NMonitoring::TDynamicCounters::TCounterPtr MaxUsedCpuPercent;
+    NMonitoring::TDynamicCounters::TCounterPtr MinUsedCpuPercent;
+    NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedCpuPercent;
+    NMonitoring::TDynamicCounters::TCounterPtr MinElapsedCpuPercent;
+
+    NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeNs;
+    NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeNs;
+
+
+    void Init(NMonitoring::TDynamicCounters* group);
+    void Set(const THarmonizerStats& harmonizerStats);
+};
+
+} // NActors

+ 111 - 0
ydb/library/actors/helpers/pool_stats_collector.cpp

@@ -0,0 +1,111 @@
+#include "pool_stats_collector.h"
+#include "collector_counters.h"
+
+#include <ydb/library/actors/core/actorsystem.h>
+#include <util/generic/vector.h>
+
+namespace NActors {
+
+class TStatsCollectingActor::TImpl {
+private:
+    friend class TStatsCollectingActor;
+
+    const ui32 IntervalSec;
+    TInstant StartOfCollecting;
+    NMonitoring::TDynamicCounterPtr Counters;
+
+    TVector<TExecutorPoolCounters> PoolCounters;
+    TActorSystemCounters ActorSystemCounters;
+
+public:
+    TImpl(ui32 intervalSec, const TActorSystemSetup& setup, NMonitoring::TDynamicCounterPtr counters)
+        : IntervalSec(intervalSec)
+        , Counters(counters)
+    {
+        PoolCounters.resize(setup.GetExecutorsCount());
+        for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
+            PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId));
+        }
+        ActorSystemCounters.Init(Counters.Get());
+    }
+
+    void Bootstrap(const TActorContext& ctx, TStatsCollectingActor* actor) {
+        ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
+        actor->Become(&TStatsCollectingActor::StateWork);
+    }
+
+    void Wakeup(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx, TStatsCollectingActor* actor) {
+        auto *event = ev->Get();
+        if (event->Tag == 0) {
+            StartOfCollecting = ctx.Now();
+        }
+        if (event->Tag < PoolCounters.size()) {
+            ui16 poolId = event->Tag;
+            TVector<TExecutorThreadStats> stats;
+            TVector<TExecutorThreadStats> sharedStats;
+            TExecutorPoolStats poolStats;
+            ctx.ActorSystem()->GetPoolStats(poolId, poolStats, stats, sharedStats);
+            SetAggregatedCounters(PoolCounters[poolId], poolStats, stats, sharedStats);
+            ctx.Schedule(TDuration::MilliSeconds(1), new TEvents::TEvWakeup(poolId + 1));
+            return;
+        }
+        THarmonizerStats harmonizerStats = ctx.ActorSystem()->GetHarmonizerStats();
+        ActorSystemCounters.Set(harmonizerStats);
+        actor->OnWakeup(ctx);
+        ctx.Schedule(TDuration::Seconds(IntervalSec) - (ctx.Now() - StartOfCollecting), new TEvents::TEvWakeup(0));
+    }
+
+    void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats, TVector<TExecutorThreadStats>& sharedStats) {
+        // Sum all per-thread counters into the 0th element
+        TExecutorThreadStats aggregated;
+        for (ui32 idx = 0; idx < stats.size(); ++idx) {
+            aggregated.Aggregate(stats[idx]);
+        }
+        for (ui32 idx = 0; idx < sharedStats.size(); ++idx) {
+            aggregated.Aggregate(sharedStats[idx]);
+        }
+        if (stats.size()) {
+            poolCounters.Set(poolStats, aggregated);
+        }
+    }
+};
+
+// Реализация методов класса TStatsCollectingActor
+
+TStatsCollectingActor::TStatsCollectingActor(
+        ui32 intervalSec,
+        const TActorSystemSetup& setup,
+        NMonitoring::TDynamicCounterPtr counters)
+    : Impl(std::make_unique<TImpl>(intervalSec, setup, counters))
+{
+}
+
+TStatsCollectingActor::~TStatsCollectingActor() = default;
+
+void TStatsCollectingActor::Bootstrap(const TActorContext& ctx) {
+    Impl->Bootstrap(ctx, this);
+}
+
+void TStatsCollectingActor::OnWakeup(const TActorContext &ctx) {
+    Y_UNUSED(ctx);
+}
+
+STFUNC(TStatsCollectingActor::StateWork) {
+    switch (ev->GetTypeRewrite()) {
+        HFunc(TEvents::TEvWakeup, Wakeup);
+    }
+}
+
+void TStatsCollectingActor::Wakeup(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx) {
+    Impl->Wakeup(ev, ctx, this);
+}
+
+const TVector<TExecutorPoolCounters>& TStatsCollectingActor::GetPoolCounters() const {
+    return Impl->PoolCounters;
+}
+
+const TActorSystemCounters& TStatsCollectingActor::GetActorSystemCounters() const {
+    return Impl->ActorSystemCounters;  
+}
+
+} // NActors

+ 20 - 468
ydb/library/actors/helpers/pool_stats_collector.h

@@ -1,418 +1,20 @@
 #pragma once
 
 #include <ydb/library/actors/core/actor_bootstrapped.h>
-#include <ydb/library/actors/core/actorsystem.h>
 #include <ydb/library/actors/core/hfunc.h>
 #include <library/cpp/monlib/dynamic_counters/counters.h>
 
-#include <util/generic/vector.h>
-#include <util/generic/xrange.h>
-#include <util/string/printf.h>
+#include <memory>
 
 namespace NActors {
 
-// Periodically collects stats from executor threads and exposes them as mon counters
-class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
-private:
-    struct THistogramCounters {
-        void Init(NMonitoring::TDynamicCounters* group, const TString& baseName, const TString& unit, ui64 maxVal) {
-            for (size_t i = 0; (1ull<<i) <= maxVal; ++i) {
-                TString bucketName = ToString(1ull<<i) + " " + unit;
-                Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", bucketName, true));
-            }
-            Buckets.push_back(group->GetSubgroup("sensor", baseName)->GetNamedCounter("range", "INF", true));
-        }
-
-        void Set(const TLogHistogram& data) {
-            ui32 i = 0;
-            for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
-                *Buckets[i] = data.Buckets[i];
-            ui64 last = 0;
-            for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
-                last += data.Buckets[i];
-            *Buckets.back() = last;
-        }
-
-        void Set(const TLogHistogram& data, double factor) {
-            ui32 i = 0;
-            for (;i < Y_ARRAY_SIZE(data.Buckets) && i < Buckets.size()-1; ++i)
-                *Buckets[i] = data.Buckets[i]*factor;
-            ui64 last = 0;
-            for (;i < Y_ARRAY_SIZE(data.Buckets); ++i)
-                last += data.Buckets[i];
-            *Buckets.back() = last*factor;
-        }
-
-    private:
-        TVector<NMonitoring::TDynamicCounters::TCounterPtr> Buckets;
-    };
-
-    struct TActivityStats {
-        void Init(NMonitoring::TDynamicCounterPtr group) {
-            Group = group;
-
-            CurrentActivationTimeByActivity.resize(GetActivityTypeCount());
-            ElapsedMicrosecByActivityBuckets.resize(GetActivityTypeCount());
-            ReceivedEventsByActivityBuckets.resize(GetActivityTypeCount());
-            ActorsAliveByActivityBuckets.resize(GetActivityTypeCount());
-            ScheduledEventsByActivityBuckets.resize(GetActivityTypeCount());
-            StuckActorsByActivityBuckets.resize(GetActivityTypeCount());
-            UsageByActivityBuckets.resize(GetActivityTypeCount());
-        }
-
-        void Set(const TExecutorThreadStats& stats) {
-            for (ui32 i : xrange(stats.MaxActivityType())) {
-                Y_ABORT_UNLESS(i < GetActivityTypeCount());
-                ui64 ticks = stats.ElapsedTicksByActivity[i];
-                ui64 events = stats.ReceivedEventsByActivity[i];
-                ui64 actors = stats.ActorsAliveByActivity[i];
-                ui64 scheduled = stats.ScheduledEventsByActivity[i];
-                ui64 stuck = stats.StuckActorsByActivity[i];
-
-                if (!ActorsAliveByActivityBuckets[i]) {
-                    if (ticks || events || actors || scheduled) {
-                        InitCountersForActivity(i);
-                    } else {
-                        continue;
-                    }
-                }
-
-                *CurrentActivationTimeByActivity[i] = 0;
-                *ElapsedMicrosecByActivityBuckets[i] = ::NHPTimer::GetSeconds(ticks)*1000000;
-                *ReceivedEventsByActivityBuckets[i] = events;
-                *ActorsAliveByActivityBuckets[i] = actors;
-                *ScheduledEventsByActivityBuckets[i] = scheduled;
-                *StuckActorsByActivityBuckets[i] = stuck;
-
-                for (ui32 j = 0; j < 10; ++j) {
-                    *UsageByActivityBuckets[i][j] = stats.UsageByActivity[i][j];
-                }
-            }
-
-            auto setActivationTime = [&](TActivationTime activation) {
-                if (!ActorsAliveByActivityBuckets[activation.LastActivity]) {
-                    InitCountersForActivity(activation.LastActivity);
-                }
-                *CurrentActivationTimeByActivity[activation.LastActivity] = activation.TimeUs;
-            };
-            if (stats.CurrentActivationTime.TimeUs) {
-                setActivationTime(stats.CurrentActivationTime);
-            }
-            std::vector<TActivationTime> activationTimes = stats.AggregatedCurrentActivationTime;
-            Sort(activationTimes.begin(), activationTimes.end(), [](auto &left, auto &right) {
-                return left.LastActivity < right.LastActivity ||
-                    left.LastActivity == right.LastActivity && left.TimeUs > right.TimeUs;
-            });
-            ui32 prevActivity = Max<ui32>();
-            for (auto &activationTime : activationTimes) {
-                if (activationTime.LastActivity == prevActivity) {
-                    continue;
-                }
-                setActivationTime(activationTime);
-                prevActivity = activationTime.LastActivity;
-            }
-        }
-
-    private:
-        void InitCountersForActivity(ui32 activityType) {
-            Y_ABORT_UNLESS(activityType < GetActivityTypeCount());
-
-            auto bucketName = TString(GetActivityTypeName(activityType));
-
-            CurrentActivationTimeByActivity[activityType] =
-                Group->GetSubgroup("sensor", "CurrentActivationTimeUsByActivity")->GetNamedCounter("activity", bucketName, false);
-            ElapsedMicrosecByActivityBuckets[activityType] =
-                Group->GetSubgroup("sensor", "ElapsedMicrosecByActivity")->GetNamedCounter("activity", bucketName, true);
-            ReceivedEventsByActivityBuckets[activityType] =
-                Group->GetSubgroup("sensor", "ReceivedEventsByActivity")->GetNamedCounter("activity", bucketName, true);
-            ActorsAliveByActivityBuckets[activityType] =
-                Group->GetSubgroup("sensor", "ActorsAliveByActivity")->GetNamedCounter("activity", bucketName, false);
-            ScheduledEventsByActivityBuckets[activityType] =
-                Group->GetSubgroup("sensor", "ScheduledEventsByActivity")->GetNamedCounter("activity", bucketName, true);
-            StuckActorsByActivityBuckets[activityType] =
-                Group->GetSubgroup("sensor", "StuckActorsByActivity")->GetNamedCounter("activity", bucketName, false);
-
-            for (ui32 i = 0; i < 10; ++i) {
-                UsageByActivityBuckets[activityType][i] = Group->GetSubgroup("sensor", "UsageByActivity")->GetSubgroup("bin", ToString(i))->GetNamedCounter("activity", bucketName, false);
-            }
-        }
-
-    private:
-        NMonitoring::TDynamicCounterPtr Group;
-
-        TVector<NMonitoring::TDynamicCounters::TCounterPtr> CurrentActivationTimeByActivity;
-        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ElapsedMicrosecByActivityBuckets;
-        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ReceivedEventsByActivityBuckets;
-        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ActorsAliveByActivityBuckets;
-        TVector<NMonitoring::TDynamicCounters::TCounterPtr> ScheduledEventsByActivityBuckets;
-        TVector<NMonitoring::TDynamicCounters::TCounterPtr> StuckActorsByActivityBuckets;
-        TVector<std::array<NMonitoring::TDynamicCounters::TCounterPtr, 10>> UsageByActivityBuckets;
-    };
-
-    struct TExecutorPoolCounters {
-        TIntrusivePtr<NMonitoring::TDynamicCounters> PoolGroup;
-
-        NMonitoring::TDynamicCounters::TCounterPtr SentEvents;
-        NMonitoring::TDynamicCounters::TCounterPtr ReceivedEvents;
-        NMonitoring::TDynamicCounters::TCounterPtr PreemptedEvents;
-        NMonitoring::TDynamicCounters::TCounterPtr NonDeliveredEvents;
-        NMonitoring::TDynamicCounters::TCounterPtr DestroyedActors;
-        NMonitoring::TDynamicCounters::TCounterPtr EmptyMailboxActivation;
-        NMonitoring::TDynamicCounters::TCounterPtr CpuMicrosec;
-        NMonitoring::TDynamicCounters::TCounterPtr ElapsedMicrosec;
-        NMonitoring::TDynamicCounters::TCounterPtr ParkedMicrosec;
-        NMonitoring::TDynamicCounters::TCounterPtr ActorRegistrations;
-        NMonitoring::TDynamicCounters::TCounterPtr ActorsAlive;
-        NMonitoring::TDynamicCounters::TCounterPtr AllocatedMailboxes;
-        NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutBySoftPreemption;
-        NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByTime;
-        NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
-        NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount;
-        NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount;
-        NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCount;
-        NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCount;
-        NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCount;
-        NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCountPercent;
-        NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCountPercent;
-        NMonitoring::TDynamicCounters::TCounterPtr PossibleMaxThreadCountPercent;
-        NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCountPercent;
-        NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCountPercent;
-        NMonitoring::TDynamicCounters::TCounterPtr IsNeedy;
-        NMonitoring::TDynamicCounters::TCounterPtr IsStarved;
-        NMonitoring::TDynamicCounters::TCounterPtr IsHoggish;
-        NMonitoring::TDynamicCounters::TCounterPtr HasFullOwnSharedThread;
-        NMonitoring::TDynamicCounters::TCounterPtr HasHalfOfOwnSharedThread;
-        NMonitoring::TDynamicCounters::TCounterPtr HasHalfOfOtherSharedThread;
-        NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByNeedyState;
-        NMonitoring::TDynamicCounters::TCounterPtr IncreasingThreadsByExchange;
-        NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByStarvedState;
-        NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByHoggishState;
-        NMonitoring::TDynamicCounters::TCounterPtr DecreasingThreadsByExchange;
-        NMonitoring::TDynamicCounters::TCounterPtr NotEnoughCpuExecutions;
-        NMonitoring::TDynamicCounters::TCounterPtr SpinningTimeUs;
-        NMonitoring::TDynamicCounters::TCounterPtr SpinThresholdUs;
-
-
-        THistogramCounters LegacyActivationTimeHistogram;
-        NMonitoring::THistogramPtr ActivationTimeHistogram;
-        THistogramCounters LegacyEventDeliveryTimeHistogram;
-        NMonitoring::THistogramPtr EventDeliveryTimeHistogram;
-        THistogramCounters LegacyEventProcessingCountHistogram;
-        NMonitoring::THistogramPtr EventProcessingCountHistogram;
-        THistogramCounters LegacyEventProcessingTimeHistogram;
-        NMonitoring::THistogramPtr EventProcessingTimeHistogram;
-
-        TActivityStats ActivityStats;
-        NMonitoring::TDynamicCounters::TCounterPtr MaxUtilizationTime;
-
-        double Usage = 0;
-        double LastElapsedSeconds = 0;
-        THPTimer UsageTimer;
-        TString Name;
-        double Threads;
-        double LimitThreads;
-        double DefaultThreads;
-
-        void Init(NMonitoring::TDynamicCounters* group, const TString& poolName, ui32 threads) {
-            LastElapsedSeconds = 0;
-            Usage = 0;
-            UsageTimer.Reset();
-            Name = poolName;
-            Threads = threads;
-            LimitThreads = threads;
-            DefaultThreads = threads;
-
-            PoolGroup = group->GetSubgroup("execpool", poolName);
-
-            SentEvents          = PoolGroup->GetCounter("SentEvents", true);
-            ReceivedEvents      = PoolGroup->GetCounter("ReceivedEvents", true);
-            PreemptedEvents     = PoolGroup->GetCounter("PreemptedEvents", true);
-            NonDeliveredEvents  = PoolGroup->GetCounter("NonDeliveredEvents", true);
-            DestroyedActors     = PoolGroup->GetCounter("DestroyedActors", true);
-            CpuMicrosec         = PoolGroup->GetCounter("CpuMicrosec", true);
-            ElapsedMicrosec     = PoolGroup->GetCounter("ElapsedMicrosec", true);
-            ParkedMicrosec      = PoolGroup->GetCounter("ParkedMicrosec", true);
-            EmptyMailboxActivation = PoolGroup->GetCounter("EmptyMailboxActivation", true);
-            ActorRegistrations  = PoolGroup->GetCounter("ActorRegistrations", true);
-            ActorsAlive         = PoolGroup->GetCounter("ActorsAlive", false);
-            AllocatedMailboxes  = PoolGroup->GetCounter("AllocatedMailboxes", false);
-            MailboxPushedOutBySoftPreemption = PoolGroup->GetCounter("MailboxPushedOutBySoftPreemption", true);
-            MailboxPushedOutByTime = PoolGroup->GetCounter("MailboxPushedOutByTime", true);
-            MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
-            WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true);
-            CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false);
-            PotentialMaxThreadCount = PoolGroup->GetCounter("PotentialMaxThreadCount", false);
-            DefaultThreadCount = PoolGroup->GetCounter("DefaultThreadCount", false);
-            MaxThreadCount = PoolGroup->GetCounter("MaxThreadCount", false);
-
-            CurrentThreadCountPercent = PoolGroup->GetCounter("CurrentThreadCountPercent", false);
-            PotentialMaxThreadCountPercent  = PoolGroup->GetCounter("PotentialMaxThreadCountPercent", false);
-            PossibleMaxThreadCountPercent  = PoolGroup->GetCounter("PossibleMaxThreadCountPercent", false);
-            DefaultThreadCountPercent  = PoolGroup->GetCounter("DefaultThreadCountPercent", false);
-            MaxThreadCountPercent  = PoolGroup->GetCounter("MaxThreadCountPercent", false);
-
-            IsNeedy = PoolGroup->GetCounter("IsNeedy", false);
-            IsStarved = PoolGroup->GetCounter("IsStarved", false);
-            IsHoggish = PoolGroup->GetCounter("IsHoggish", false);
-            HasFullOwnSharedThread = PoolGroup->GetCounter("HasFullOwnSharedThread", false);
-            HasHalfOfOwnSharedThread = PoolGroup->GetCounter("HasHalfOfOwnSharedThread", false);
-            HasHalfOfOtherSharedThread = PoolGroup->GetCounter("HasHalfOfOtherSharedThread", false);
-            IncreasingThreadsByNeedyState = PoolGroup->GetCounter("IncreasingThreadsByNeedyState", true);
-            IncreasingThreadsByExchange = PoolGroup->GetCounter("IncreasingThreadsByExchange", true);
-            DecreasingThreadsByStarvedState = PoolGroup->GetCounter("DecreasingThreadsByStarvedState", true);
-            DecreasingThreadsByHoggishState = PoolGroup->GetCounter("DecreasingThreadsByHoggishState", true);
-            DecreasingThreadsByExchange = PoolGroup->GetCounter("DecreasingThreadsByExchange", true);
-            NotEnoughCpuExecutions = PoolGroup->GetCounter("NotEnoughCpuExecutions", true);
-            SpinningTimeUs = PoolGroup->GetCounter("SpinningTimeUs", true);
-            SpinThresholdUs = PoolGroup->GetCounter("SpinThresholdUs", false);
-
-
-            LegacyActivationTimeHistogram.Init(PoolGroup.Get(), "ActivationTime", "usec", 5*1000*1000);
-            ActivationTimeHistogram = PoolGroup->GetHistogram(
-                "ActivationTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
-            LegacyEventDeliveryTimeHistogram.Init(PoolGroup.Get(), "EventDeliveryTime", "usec", 5*1000*1000);
-            EventDeliveryTimeHistogram = PoolGroup->GetHistogram(
-                "EventDeliveryTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
-            LegacyEventProcessingCountHistogram.Init(PoolGroup.Get(), "EventProcessingCount", "usec", 5*1000*1000);
-            EventProcessingCountHistogram = PoolGroup->GetHistogram(
-                "EventProcessingCountUs", NMonitoring::ExponentialHistogram(24, 2, 1));
-            LegacyEventProcessingTimeHistogram.Init(PoolGroup.Get(), "EventProcessingTime", "usec", 5*1000*1000);
-            EventProcessingTimeHistogram = PoolGroup->GetHistogram(
-                "EventProcessingTimeUs", NMonitoring::ExponentialHistogram(24, 2, 1));
-
-            ActivityStats.Init(PoolGroup.Get());
-
-            MaxUtilizationTime = PoolGroup->GetCounter("MaxUtilizationTime", true);
-        }
-
-        void Set(const TExecutorPoolStats& poolStats, const TExecutorThreadStats& stats) {
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
-            double elapsedSeconds = ::NHPTimer::GetSeconds(stats.ElapsedTicks);
-            *SentEvents         = stats.SentEvents;
-            *ReceivedEvents     = stats.ReceivedEvents;
-            *PreemptedEvents     = stats.PreemptedEvents;
-            *NonDeliveredEvents = stats.NonDeliveredEvents;
-            *DestroyedActors    = stats.PoolDestroyedActors;
-            *EmptyMailboxActivation = stats.EmptyMailboxActivation;
-            *CpuMicrosec        = stats.CpuUs;
-            *ElapsedMicrosec    = elapsedSeconds*1000000;
-            *ParkedMicrosec     = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;
-            *ActorRegistrations = stats.PoolActorRegistrations;
-            *ActorsAlive        = stats.PoolActorRegistrations - stats.PoolDestroyedActors;
-            *AllocatedMailboxes = stats.PoolAllocatedMailboxes;
-            *MailboxPushedOutBySoftPreemption = stats.MailboxPushedOutBySoftPreemption;
-            *MailboxPushedOutByTime = stats.MailboxPushedOutByTime;
-            *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
-            *WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount;
-            *CurrentThreadCount = poolStats.CurrentThreadCount;
-            *PotentialMaxThreadCount = poolStats.PotentialMaxThreadCount;
-            *DefaultThreadCount = poolStats.DefaultThreadCount;
-            *MaxThreadCount = poolStats.MaxThreadCount;
+struct TActorSystemSetup;
+struct TExecutorPoolCounters;
+struct TActorSystemCounters;
 
-            *CurrentThreadCountPercent = poolStats.CurrentThreadCount * 100;
-            *PotentialMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100;
-            *PossibleMaxThreadCountPercent = poolStats.PotentialMaxThreadCount * 100;
-            *DefaultThreadCountPercent = poolStats.DefaultThreadCount * 100;
-            *MaxThreadCountPercent = poolStats.MaxThreadCount * 100;
-
-            *IsNeedy = poolStats.IsNeedy;
-            *IsStarved = poolStats.IsStarved;
-            *IsHoggish = poolStats.IsHoggish;
-
-            *HasFullOwnSharedThread = poolStats.HasFullOwnSharedThread;
-            *HasHalfOfOwnSharedThread = poolStats.HasHalfOfOwnSharedThread;
-            *HasHalfOfOtherSharedThread = poolStats.HasHalfOfOtherSharedThread;
-            *IncreasingThreadsByNeedyState = poolStats.IncreasingThreadsByNeedyState;
-            *IncreasingThreadsByExchange = poolStats.IncreasingThreadsByExchange;
-            *DecreasingThreadsByStarvedState = poolStats.DecreasingThreadsByStarvedState;
-            *DecreasingThreadsByHoggishState = poolStats.DecreasingThreadsByHoggishState;
-            *DecreasingThreadsByExchange = poolStats.DecreasingThreadsByExchange;
-            *NotEnoughCpuExecutions = stats.NotEnoughCpuExecutions;
-
-            *SpinningTimeUs = poolStats.SpinningTimeUs;
-            *SpinThresholdUs = poolStats.SpinThresholdUs;
-
-            LegacyActivationTimeHistogram.Set(stats.ActivationTimeHistogram);
-            ActivationTimeHistogram->Reset();
-            ActivationTimeHistogram->Collect(stats.ActivationTimeHistogram);
-
-            LegacyEventDeliveryTimeHistogram.Set(stats.EventDeliveryTimeHistogram);
-            EventDeliveryTimeHistogram->Reset();
-            EventDeliveryTimeHistogram->Collect(stats.EventDeliveryTimeHistogram);
-
-            LegacyEventProcessingCountHistogram.Set(stats.EventProcessingCountHistogram);
-            EventProcessingCountHistogram->Reset();
-            EventProcessingCountHistogram->Collect(stats.EventProcessingCountHistogram);
-
-            double toMicrosec = 1000000 / NHPTimer::GetClockRate();
-            LegacyEventProcessingTimeHistogram.Set(stats.EventProcessingTimeHistogram, toMicrosec);
-            EventProcessingTimeHistogram->Reset();
-            for (ui32 i = 0; i < stats.EventProcessingTimeHistogram.Count(); ++i) {
-                EventProcessingTimeHistogram->Collect(
-                    stats.EventProcessingTimeHistogram.UpperBound(i),
-                    stats.EventProcessingTimeHistogram.Value(i) * toMicrosec);
-            }
-
-            ActivityStats.Set(stats);
-
-            *MaxUtilizationTime = poolStats.MaxUtilizationTime;
-
-            double seconds = UsageTimer.PassedReset();
-
-            // TODO[serxa]: It doesn't account for contention. Use 1 - parkedTicksDelta / seconds / numThreads KIKIMR-11916
-            Threads = poolStats.CurrentThreadCount;
-            LimitThreads = poolStats.PotentialMaxThreadCount;
-            const double currentUsage = LimitThreads > 0 ? ((elapsedSeconds - LastElapsedSeconds) / seconds / LimitThreads) : 0;
-
-            // update usage factor according to smoothness
-            const double smoothness = 0.5;
-            Usage = currentUsage * smoothness + Usage * (1.0 - smoothness);
-            LastElapsedSeconds = elapsedSeconds;
-#else
-            Y_UNUSED(stats);
-#endif
-        }
-    };
-
-    struct TActorSystemCounters {
-        TIntrusivePtr<NMonitoring::TDynamicCounters> Group;
-
-        NMonitoring::TDynamicCounters::TCounterPtr MaxUsedCpuPercent;
-        NMonitoring::TDynamicCounters::TCounterPtr MinUsedCpuPercent;
-        NMonitoring::TDynamicCounters::TCounterPtr MaxElapsedCpuPercent;
-        NMonitoring::TDynamicCounters::TCounterPtr MinElapsedCpuPercent;
-
-        NMonitoring::TDynamicCounters::TCounterPtr AvgAwakeningTimeNs;
-        NMonitoring::TDynamicCounters::TCounterPtr AvgWakingUpTimeNs;
-
-
-        void Init(NMonitoring::TDynamicCounters* group) {
-            Group = group;
-
-            MaxUsedCpuPercent = Group->GetCounter("MaxUsedCpuPercent", false);
-            MinUsedCpuPercent = Group->GetCounter("MinUsedCpuPercent", false);
-            MaxElapsedCpuPercent = Group->GetCounter("MaxElapsedCpuPercent", false);
-            MinElapsedCpuPercent = Group->GetCounter("MinElapsedCpuPercent", false);
-            AvgAwakeningTimeNs = Group->GetCounter("AvgAwakeningTimeNs", false);
-            AvgWakingUpTimeNs = Group->GetCounter("AvgWakingUpTimeNs", false);
-        }
-
-        void Set(const THarmonizerStats& harmonizerStats) {
-#ifdef ACTORSLIB_COLLECT_EXEC_STATS
-            *MaxUsedCpuPercent = harmonizerStats.MaxUsedCpu;
-            *MinUsedCpuPercent = harmonizerStats.MinUsedCpu;
-            *MaxElapsedCpuPercent = harmonizerStats.MaxElapsedCpu;
-            *MinElapsedCpuPercent = harmonizerStats.MinElapsedCpu;
-
-            *AvgAwakeningTimeNs = harmonizerStats.AvgAwakeningTimeUs * 1000;
-            *AvgWakingUpTimeNs = harmonizerStats.AvgWakingUpTimeUs * 1000;
-#else
-            Y_UNUSED(harmonizerStats);
-#endif
-        }
-
-    };
 
+// Periodically collects stats from executor threads and exposes them as mon counters
+class TStatsCollectingActor : public TActorBootstrapped<TStatsCollectingActor> {
 public:
     static constexpr IActor::EActivityType ActorActivityType() {
         return IActor::EActivityType::ACTORLIB_STATS;
@@ -421,75 +23,25 @@ public:
     TStatsCollectingActor(
             ui32 intervalSec,
             const TActorSystemSetup& setup,
-            NMonitoring::TDynamicCounterPtr counters)
-        : IntervalSec(intervalSec)
-        , Counters(counters)
-    {
-        PoolCounters.resize(setup.GetExecutorsCount());
-        for (size_t poolId = 0; poolId < PoolCounters.size(); ++poolId) {
-            PoolCounters[poolId].Init(Counters.Get(), setup.GetPoolName(poolId), setup.GetThreads(poolId));
-        }
-        ActorSystemCounters.Init(Counters.Get());
-    }
+            NMonitoring::TDynamicCounterPtr counters);
+    
+    ~TStatsCollectingActor();
 
-    void Bootstrap(const TActorContext& ctx) {
-        ctx.Schedule(TDuration::Seconds(IntervalSec), new TEvents::TEvWakeup());
-        Become(&TThis::StateWork);
-    }
+    void Bootstrap(const TActorContext& ctx);
 
-    STFUNC(StateWork) {
-        switch (ev->GetTypeRewrite()) {
-            HFunc(TEvents::TEvWakeup, Wakeup);
-        }
-    }
-
-private:
-    virtual void OnWakeup(const TActorContext &ctx) {
-        Y_UNUSED(ctx);
-    }
-
-    void Wakeup(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx) {
-        auto *event = ev->Get();
-        if (event->Tag == 0) {
-            StartOfCollecting = ctx.Now();
-        }
-        if (event->Tag < PoolCounters.size()) {
-            ui16 poolId = event->Tag;
-            TVector<TExecutorThreadStats> stats;
-            TVector<TExecutorThreadStats> sharedStats;
-            TExecutorPoolStats poolStats;
-            ctx.ActorSystem()->GetPoolStats(poolId, poolStats, stats, sharedStats);
-            SetAggregatedCounters(PoolCounters[poolId], poolStats, stats, sharedStats);
-            ctx.Schedule(TDuration::MilliSeconds(1), new TEvents::TEvWakeup(poolId + 1));
-            return;
-        }
-        THarmonizerStats harmonizerStats = ctx.ActorSystem()->GetHarmonizerStats();
-        ActorSystemCounters.Set(harmonizerStats);
-        OnWakeup(ctx);
-        ctx.Schedule(TDuration::Seconds(IntervalSec) - (ctx.Now() - StartOfCollecting), new TEvents::TEvWakeup(0));
-    }
-
-    void SetAggregatedCounters(TExecutorPoolCounters& poolCounters, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& stats, TVector<TExecutorThreadStats>& sharedStats) {
-        // Sum all per-thread counters into the 0th element
-        TExecutorThreadStats aggregated;
-        for (ui32 idx = 0; idx < stats.size(); ++idx) {
-            aggregated.Aggregate(stats[idx]);
-        }
-        for (ui32 idx = 0; idx < sharedStats.size(); ++idx) {
-            aggregated.Aggregate(sharedStats[idx]);
-        }
-        if (stats.size()) {
-            poolCounters.Set(poolStats, aggregated);
-        }
-    }
+    STFUNC(StateWork);
 
 protected:
-    const ui32 IntervalSec;
-    TInstant StartOfCollecting;
-    NMonitoring::TDynamicCounterPtr Counters;
+    virtual void OnWakeup(const TActorContext &ctx);
 
-    TVector<TExecutorPoolCounters> PoolCounters;
-    TActorSystemCounters ActorSystemCounters;
+    const TVector<TExecutorPoolCounters>& GetPoolCounters() const;
+    const TActorSystemCounters& GetActorSystemCounters() const;
+
+private:
+    void Wakeup(TEvents::TEvWakeup::TPtr &ev, const TActorContext &ctx);
+    
+    class TImpl;
+    std::unique_ptr<TImpl> Impl;
 };
 
 } // NActors

+ 2 - 0
ydb/library/actors/helpers/ya.make

@@ -3,8 +3,10 @@ LIBRARY()
 SRCS(
     activeactors.cpp
     activeactors.h
+    collector_counters.cpp
     future_callback.h
     mon_histogram_helper.h
+    pool_stats_collector.cpp
     selfping_actor.cpp
 )