Browse Source

Restoring authorship annotation for <serxa@yandex-team.ru>. Commit 2 of 2.

serxa 3 years ago
parent
commit
e5d4696304

+ 2 - 2
library/cpp/actors/core/actor.cpp

@@ -1,7 +1,7 @@
 #include "actor.h"
 #include "executor_thread.h"
 #include "mailbox.h"
-#include <library/cpp/actors/util/datetime.h> 
+#include <library/cpp/actors/util/datetime.h>
 
 namespace NActors {
     Y_POD_THREAD(TActivationContext*)
@@ -88,7 +88,7 @@ namespace NActors {
     }
 
     i64 TActivationContext::GetCurrentEventTicks() {
-        return GetCycleCountFast() - TlsActivationContext->EventStart; 
+        return GetCycleCountFast() - TlsActivationContext->EventStart;
     }
 
     double TActivationContext::GetCurrentEventTicksAsSeconds() {

+ 236 - 236
library/cpp/actors/core/actor_ut.cpp

@@ -35,7 +35,7 @@ struct TTestEndDecorator : TDecorator {
 };
 
 Y_UNIT_TEST_SUITE(ActorBenchmark) {
-    static constexpr bool DefaultNoRealtime = true; 
+    static constexpr bool DefaultNoRealtime = true;
     static constexpr ui32 DefaultSpinThreshold = 1000000;
     static constexpr ui32 TotalEventsAmount = 1000;
 
@@ -43,49 +43,49 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
     public:
         TDummyActor() : TActor<TDummyActor>(&TDummyActor::StateFunc) {}
         STFUNC(StateFunc) {
-            (void)ev; 
+            (void)ev;
             (void)ctx;
         }
     };
 
-    enum ERole { 
-        Leader, 
-        Follower 
-    }; 
- 
+    enum ERole {
+        Leader,
+        Follower
+    };
+
     class TSendReceiveActor : public TActorBootstrapped<TSendReceiveActor> {
     public:
         static constexpr auto ActorActivityType() {
             return ACTORLIB_COMMON;
         }
 
-        TSendReceiveActor(double* elapsedTime, TActorId receiver, bool allocation, ERole role, ui32 neighbours = 0) 
-            : EventsCounter(TotalEventsAmount) 
-            , ElapsedTime(elapsedTime) 
-            , Receiver(receiver) 
-            , AllocatesMemory(allocation) 
-            , Role(role) 
-            , MailboxNeighboursCount(neighbours) 
+        TSendReceiveActor(double* elapsedTime, TActorId receiver, bool allocation, ERole role, ui32 neighbours = 0)
+            : EventsCounter(TotalEventsAmount)
+            , ElapsedTime(elapsedTime)
+            , Receiver(receiver)
+            , AllocatesMemory(allocation)
+            , Role(role)
+            , MailboxNeighboursCount(neighbours)
         {}
 
         void Bootstrap(const TActorContext &ctx) {
             if (!Receiver) {
                 this->Receiver = SelfId();
-            } else { 
-                EventsCounter /= 2; // We want to measure CPU requirement for one-way send 
+            } else {
+                EventsCounter /= 2; // We want to measure CPU requirement for one-way send
             }
             Timer.Reset();
             Become(&TThis::StateFunc);
             for (ui32 i = 0; i < MailboxNeighboursCount; ++i) {
                 ctx.RegisterWithSameMailbox(new TDummyActor());
             }
-            if (Role == Leader) { 
-                Send(Receiver, new TEvents::TEvPing()); 
-            } 
+            if (Role == Leader) {
+                Send(Receiver, new TEvents::TEvPing());
+            }
         }
 
         STATEFN(StateFunc) {
-            if (EventsCounter == 0 && ElapsedTime != nullptr) { 
+            if (EventsCounter == 0 && ElapsedTime != nullptr) {
                 *ElapsedTime = Timer.Passed() / TotalEventsAmount;
                 PassAway();
             }
@@ -97,91 +97,91 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
                 ev->DropRewrite();
                 TActivationContext::Send(ev.Release());
             }
-            EventsCounter--; 
+            EventsCounter--;
         }
 
     private:
         THPTimer Timer;
-        ui64 EventsCounter; 
+        ui64 EventsCounter;
         double* ElapsedTime;
         TActorId Receiver;
         bool AllocatesMemory;
-        ERole Role; 
+        ERole Role;
         ui32 MailboxNeighboursCount;
     };
 
-    void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent) { 
-        TBasicExecutorPoolConfig basic; 
-        basic.PoolId = setup->GetExecutorsCount(); 
-        basic.PoolName = TStringBuilder() << "b" << basic.PoolId; 
-        basic.Threads = threads; 
-        basic.SpinThreshold = DefaultSpinThreshold; 
-        basic.TimePerMailbox = TDuration::Hours(1); 
-        if (activateEveryEvent) { 
-            basic.EventsPerMailbox = 1; 
-        } else { 
-            basic.EventsPerMailbox = Max<ui32>(); 
+    void AddBasicPool(THolder<TActorSystemSetup>& setup, ui32 threads, bool activateEveryEvent) {
+        TBasicExecutorPoolConfig basic;
+        basic.PoolId = setup->GetExecutorsCount();
+        basic.PoolName = TStringBuilder() << "b" << basic.PoolId;
+        basic.Threads = threads;
+        basic.SpinThreshold = DefaultSpinThreshold;
+        basic.TimePerMailbox = TDuration::Hours(1);
+        if (activateEveryEvent) {
+            basic.EventsPerMailbox = 1;
+        } else {
+            basic.EventsPerMailbox = Max<ui32>();
+        }
+        setup->CpuManager.Basic.emplace_back(std::move(basic));
+    }
+
+    void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency, bool activateEveryEvent) {
+        TUnitedExecutorPoolConfig united;
+        united.PoolId = setup->GetExecutorsCount();
+        united.PoolName = TStringBuilder() << "u" << united.PoolId;
+        united.Concurrency = concurrency;
+        united.TimePerMailbox = TDuration::Hours(1);
+        if (activateEveryEvent) {
+            united.EventsPerMailbox = 1;
+        } else {
+            united.EventsPerMailbox = Max<ui32>();
+        }
+        setup->CpuManager.United.emplace_back(std::move(united));
+    }
+
+    THolder<TActorSystemSetup> GetActorSystemSetup(ui32 unitedCpuCount, bool preemption) {
+        auto setup = MakeHolder<NActors::TActorSystemSetup>();
+        setup->NodeId = 1;
+        setup->CpuManager.UnitedWorkers.CpuCount = unitedCpuCount;
+        setup->CpuManager.UnitedWorkers.SpinThresholdUs = DefaultSpinThreshold;
+        setup->CpuManager.UnitedWorkers.NoRealtime = DefaultNoRealtime;
+        if (preemption) {
+            setup->CpuManager.UnitedWorkers.PoolLimitUs = 500;
+            setup->CpuManager.UnitedWorkers.EventLimitUs = 100;
+            setup->CpuManager.UnitedWorkers.LimitPrecisionUs = 100;
+        } else {
+            setup->CpuManager.UnitedWorkers.PoolLimitUs = 100'000'000'000;
+            setup->CpuManager.UnitedWorkers.EventLimitUs = 10'000'000'000;
+            setup->CpuManager.UnitedWorkers.LimitPrecisionUs = 10'000'000'000;
+        }
+        setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0));
+        return setup;
+    }
+
+    enum class EPoolType {
+        Basic,
+        United
+    };
+
+    THolder<TActorSystemSetup> InitActorSystemSetup(EPoolType poolType, ui32 poolsCount, ui32 threads, bool activateEveryEvent, bool preemption) {
+        if (poolType == EPoolType::Basic) {
+            THolder<TActorSystemSetup> setup = GetActorSystemSetup(0, false);
+            for (ui32 i = 0; i < poolsCount; ++i) {
+                AddBasicPool(setup, threads, activateEveryEvent);
+            }
+            return setup;
+        } else if (poolType == EPoolType::United) {
+            THolder<TActorSystemSetup> setup = GetActorSystemSetup(poolsCount * threads, preemption);
+            for (ui32 i = 0; i < poolsCount; ++i) {
+                AddUnitedPool(setup, threads, activateEveryEvent);
+            }
+            return setup;
         }
-        setup->CpuManager.Basic.emplace_back(std::move(basic)); 
-    } 
-
-    void AddUnitedPool(THolder<TActorSystemSetup>& setup, ui32 concurrency, bool activateEveryEvent) { 
-        TUnitedExecutorPoolConfig united; 
-        united.PoolId = setup->GetExecutorsCount(); 
-        united.PoolName = TStringBuilder() << "u" << united.PoolId; 
-        united.Concurrency = concurrency; 
-        united.TimePerMailbox = TDuration::Hours(1); 
-        if (activateEveryEvent) { 
-            united.EventsPerMailbox = 1; 
-        } else { 
-            united.EventsPerMailbox = Max<ui32>(); 
-        } 
-        setup->CpuManager.United.emplace_back(std::move(united)); 
-    } 
- 
-    THolder<TActorSystemSetup> GetActorSystemSetup(ui32 unitedCpuCount, bool preemption) { 
-        auto setup = MakeHolder<NActors::TActorSystemSetup>(); 
-        setup->NodeId = 1; 
-        setup->CpuManager.UnitedWorkers.CpuCount = unitedCpuCount; 
-        setup->CpuManager.UnitedWorkers.SpinThresholdUs = DefaultSpinThreshold; 
-        setup->CpuManager.UnitedWorkers.NoRealtime = DefaultNoRealtime; 
-        if (preemption) { 
-            setup->CpuManager.UnitedWorkers.PoolLimitUs = 500; 
-            setup->CpuManager.UnitedWorkers.EventLimitUs = 100; 
-            setup->CpuManager.UnitedWorkers.LimitPrecisionUs = 100; 
-        } else { 
-            setup->CpuManager.UnitedWorkers.PoolLimitUs = 100'000'000'000; 
-            setup->CpuManager.UnitedWorkers.EventLimitUs = 10'000'000'000; 
-            setup->CpuManager.UnitedWorkers.LimitPrecisionUs = 10'000'000'000; 
-        } 
-        setup->Scheduler = new TBasicSchedulerThread(NActors::TSchedulerConfig(512, 0)); 
-        return setup; 
-    } 
- 
-    enum class EPoolType { 
-        Basic, 
-        United 
-    }; 
- 
-    THolder<TActorSystemSetup> InitActorSystemSetup(EPoolType poolType, ui32 poolsCount, ui32 threads, bool activateEveryEvent, bool preemption) { 
-        if (poolType == EPoolType::Basic) { 
-            THolder<TActorSystemSetup> setup = GetActorSystemSetup(0, false); 
-            for (ui32 i = 0; i < poolsCount; ++i) { 
-                AddBasicPool(setup, threads, activateEveryEvent); 
-            } 
-            return setup; 
-        } else if (poolType == EPoolType::United) { 
-            THolder<TActorSystemSetup> setup = GetActorSystemSetup(poolsCount * threads, preemption); 
-            for (ui32 i = 0; i < poolsCount; ++i) { 
-                AddUnitedPool(setup, threads, activateEveryEvent); 
-            } 
-            return setup; 
-        } 
-        Y_FAIL(); 
-    } 
- 
-    double BenchSendReceive(bool allocation, NActors::TMailboxType::EType mType, EPoolType poolType) { 
-        THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false); 
+        Y_FAIL();
+    }
+
+    double BenchSendReceive(bool allocation, NActors::TMailboxType::EType mType, EPoolType poolType) {
+        THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false);
         TActorSystem actorSystem(setup);
         actorSystem.Start();
 
@@ -197,86 +197,86 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
         pad.Park();
         actorSystem.Stop();
 
-        return 1e9 * elapsedTime; 
+        return 1e9 * elapsedTime;
     }
 
-    double BenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType) { 
-        THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, poolsCount, threads, true, false); 
-        TActorSystem actorSystem(setup); 
+    double BenchSendActivateReceive(ui32 poolsCount, ui32 threads, bool allocation, EPoolType poolType) {
+        THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, poolsCount, threads, true, false);
+        TActorSystem actorSystem(setup);
         actorSystem.Start();
 
         TThreadParkPad pad;
         TAtomic actorsAlive = 0;
         double elapsedTime = 0;
-        ui32 followerPoolId = 0; 
+        ui32 followerPoolId = 0;
 
-        ui32 leaderPoolId = poolsCount == 1 ? 0 : 1; 
-        TActorId followerId = actorSystem.Register( 
-            new TSendReceiveActor(nullptr, {}, allocation, Follower), TMailboxType::HTSwap, followerPoolId); 
+        ui32 leaderPoolId = poolsCount == 1 ? 0 : 1;
+        TActorId followerId = actorSystem.Register(
+            new TSendReceiveActor(nullptr, {}, allocation, Follower), TMailboxType::HTSwap, followerPoolId);
         THolder<IActor> leader{
             new TTestEndDecorator(THolder(
                 new TSendReceiveActor(&elapsedTime, followerId, allocation, Leader)), &pad, &actorsAlive)};
-        actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); 
- 
+        actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
+
         pad.Park();
         actorSystem.Stop();
 
-        return 1e9 * elapsedTime; 
+        return 1e9 * elapsedTime;
     }
 
-    double BenchSendActivateReceiveWithMailboxNeighbours(ui32 MailboxNeighbourActors, EPoolType poolType) { 
-        THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false); 
-        TActorSystem actorSystem(setup); 
+    double BenchSendActivateReceiveWithMailboxNeighbours(ui32 MailboxNeighbourActors, EPoolType poolType) {
+        THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, 1, false, false);
+        TActorSystem actorSystem(setup);
         actorSystem.Start();
 
         TThreadParkPad pad;
         TAtomic actorsAlive = 0;
         double elapsedTime = 0;
 
-        TActorId followerId = actorSystem.Register( 
-            new TSendReceiveActor(nullptr, {}, false, Follower, MailboxNeighbourActors), TMailboxType::HTSwap); 
+        TActorId followerId = actorSystem.Register(
+            new TSendReceiveActor(nullptr, {}, false, Follower, MailboxNeighbourActors), TMailboxType::HTSwap);
         THolder<IActor> leader{
             new TTestEndDecorator(THolder(
                 new TSendReceiveActor(&elapsedTime, followerId, false, Leader, MailboxNeighbourActors)), &pad, &actorsAlive)};
-        actorSystem.Register(leader.Release(), TMailboxType::HTSwap); 
+        actorSystem.Register(leader.Release(), TMailboxType::HTSwap);
 
         pad.Park();
         actorSystem.Stop();
 
-        return 1e9 * elapsedTime; 
+        return 1e9 * elapsedTime;
     }
 
-    double BenchContentedThreads(ui32 threads, ui32 actorsPairsCount, EPoolType poolType) { 
-        THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, true, false); 
-        TActorSystem actorSystem(setup); 
+    double BenchContentedThreads(ui32 threads, ui32 actorsPairsCount, EPoolType poolType) {
+        THolder<TActorSystemSetup> setup = InitActorSystemSetup(poolType, 1, threads, true, false);
+        TActorSystem actorSystem(setup);
         actorSystem.Start();
 
         TThreadParkPad pad;
         TAtomic actorsAlive = 0;
         THPTimer Timer;
 
-        TVector<double> dummy(actorsPairsCount); 
+        TVector<double> dummy(actorsPairsCount);
         Timer.Reset();
         for (ui32 i = 0; i < actorsPairsCount; ++i) {
-            ui32 followerPoolId = 0; 
-            ui32 leaderPoolId = 0; 
-            TActorId followerId = actorSystem.Register( 
-                new TSendReceiveActor(nullptr, {}, true, Follower), TMailboxType::HTSwap, followerPoolId); 
+            ui32 followerPoolId = 0;
+            ui32 leaderPoolId = 0;
+            TActorId followerId = actorSystem.Register(
+                new TSendReceiveActor(nullptr, {}, true, Follower), TMailboxType::HTSwap, followerPoolId);
             THolder<IActor> leader{
                 new TTestEndDecorator(THolder(
                     new TSendReceiveActor(&dummy[i], followerId, true, Leader)), &pad, &actorsAlive)};
-            actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId); 
+            actorSystem.Register(leader.Release(), TMailboxType::HTSwap, leaderPoolId);
         }
 
         pad.Park();
         auto elapsedTime = Timer.Passed() / TotalEventsAmount;
         actorSystem.Stop();
 
-        return 1e9 * elapsedTime; 
+        return 1e9 * elapsedTime;
     }
 
     auto Mean(const TVector<double>& data) {
-        return Accumulate(data.begin(), data.end(), 0.0) / data.size(); 
+        return Accumulate(data.begin(), data.end(), 0.0) / data.size();
     }
 
     auto Deviation(const TVector<double>& data) {
@@ -285,19 +285,19 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
         for (const auto& x : data) {
             deviation += (x - mean) * (x - mean);
         }
-        return std::sqrt(deviation / data.size()); 
+        return std::sqrt(deviation / data.size());
     }
 
     struct TStats {
         double Mean;
         double Deviation;
         TString ToString() {
-            return TStringBuilder() << Mean << " ± " << Deviation << " ns " << std::ceil(Deviation / Mean * 1000) / 10.0 << "%"; 
+            return TStringBuilder() << Mean << " ± " << Deviation << " ns " << std::ceil(Deviation / Mean * 1000) / 10.0 << "%";
         }
     };
 
     template <typename Func>
-    TStats CountStats(Func func, ui32 itersCount = 5) { 
+    TStats CountStats(Func func, ui32 itersCount = 5) {
         TVector<double> elapsedTimes;
         for (ui32 i = 0; i < itersCount; ++i) {
             auto elapsedTime = func();
@@ -314,171 +314,171 @@ Y_UNIT_TEST_SUITE(ActorBenchmark) {
         TMailboxType::TinyReadAsFilled
     };
 
-    Y_UNIT_TEST(SendReceive1Pool1ThreadAlloc) { 
+    Y_UNIT_TEST(SendReceive1Pool1ThreadAlloc) {
+        for (const auto& mType : MailboxTypes) {
+            auto stats = CountStats([mType] {
+                return BenchSendReceive(true, mType, EPoolType::Basic);
+            });
+            Cerr << stats.ToString() << " " << mType << Endl;
+        }
+    }
+
+    Y_UNIT_TEST(SendReceive1Pool1ThreadAllocUnited) {
         for (const auto& mType : MailboxTypes) {
             auto stats = CountStats([mType] {
-                return BenchSendReceive(true, mType, EPoolType::Basic); 
+                return BenchSendReceive(true, mType, EPoolType::United);
             });
             Cerr << stats.ToString() << " " << mType << Endl;
         }
     }
 
-    Y_UNIT_TEST(SendReceive1Pool1ThreadAllocUnited) { 
-        for (const auto& mType : MailboxTypes) { 
-            auto stats = CountStats([mType] { 
-                return BenchSendReceive(true, mType, EPoolType::United); 
-            }); 
-            Cerr << stats.ToString() << " " << mType << Endl; 
-        } 
-    } 
- 
     Y_UNIT_TEST(SendReceive1Pool1ThreadNoAlloc) {
         for (const auto& mType : MailboxTypes) {
             auto stats = CountStats([mType] {
-                return BenchSendReceive(false, mType, EPoolType::Basic); 
+                return BenchSendReceive(false, mType, EPoolType::Basic);
             });
             Cerr << stats.ToString() << " " << mType << Endl;
         }
     }
- 
-    Y_UNIT_TEST(SendReceive1Pool1ThreadNoAllocUnited) { 
-        for (const auto& mType : MailboxTypes) { 
-            auto stats = CountStats([mType] { 
-                return BenchSendReceive(false, mType, EPoolType::United); 
-            }); 
-            Cerr << stats.ToString() << " " << mType << Endl; 
-        } 
-    } 
- 
+
+    Y_UNIT_TEST(SendReceive1Pool1ThreadNoAllocUnited) {
+        for (const auto& mType : MailboxTypes) {
+            auto stats = CountStats([mType] {
+                return BenchSendReceive(false, mType, EPoolType::United);
+            });
+            Cerr << stats.ToString() << " " << mType << Endl;
+        }
+    }
+
     Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAlloc) {
         auto stats = CountStats([] {
-            return BenchSendActivateReceive(1, 1, true, EPoolType::Basic); 
+            return BenchSendActivateReceive(1, 1, true, EPoolType::Basic);
+        });
+        Cerr << stats.ToString() << Endl;
+    }
+
+    Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAllocUnited) {
+        auto stats = CountStats([] {
+            return BenchSendActivateReceive(1, 1, true, EPoolType::United);
         });
         Cerr << stats.ToString() << Endl;
     }
 
-    Y_UNIT_TEST(SendActivateReceive1Pool1ThreadAllocUnited) { 
-        auto stats = CountStats([] { 
-            return BenchSendActivateReceive(1, 1, true, EPoolType::United); 
-        }); 
-        Cerr << stats.ToString() << Endl; 
-    } 
- 
     Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAlloc) {
         auto stats = CountStats([] {
-            return BenchSendActivateReceive(1, 1, false, EPoolType::Basic); 
+            return BenchSendActivateReceive(1, 1, false, EPoolType::Basic);
+        });
+        Cerr << stats.ToString() << Endl;
+    }
+
+    Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAllocUnited) {
+        auto stats = CountStats([] {
+            return BenchSendActivateReceive(1, 1, false, EPoolType::United);
         });
         Cerr << stats.ToString() << Endl;
     }
 
-    Y_UNIT_TEST(SendActivateReceive1Pool1ThreadNoAllocUnited) { 
-        auto stats = CountStats([] { 
-            return BenchSendActivateReceive(1, 1, false, EPoolType::United); 
-        }); 
-        Cerr << stats.ToString() << Endl; 
-    } 
- 
     Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAlloc) {
         auto stats = CountStats([] {
-            return BenchSendActivateReceive(1, 2, true, EPoolType::Basic); 
+            return BenchSendActivateReceive(1, 2, true, EPoolType::Basic);
+        });
+        Cerr << stats.ToString() << Endl;
+    }
+
+    Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAllocUnited) {
+        auto stats = CountStats([] {
+            return BenchSendActivateReceive(1, 2, true, EPoolType::United);
         });
         Cerr << stats.ToString() << Endl;
     }
 
-    Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsAllocUnited) { 
-        auto stats = CountStats([] { 
-            return BenchSendActivateReceive(1, 2, true, EPoolType::United); 
-        }); 
-        Cerr << stats.ToString() << Endl; 
-    } 
- 
     Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAlloc) {
         auto stats = CountStats([] {
-            return BenchSendActivateReceive(1, 2, false, EPoolType::Basic); 
+            return BenchSendActivateReceive(1, 2, false, EPoolType::Basic);
+        });
+        Cerr << stats.ToString() << Endl;
+    }
+
+    Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAllocUnited) {
+        auto stats = CountStats([] {
+            return BenchSendActivateReceive(1, 2, false, EPoolType::United);
+        });
+        Cerr << stats.ToString() << Endl;
+    }
+
+    Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAlloc) {
+        auto stats = CountStats([] {
+            return BenchSendActivateReceive(2, 1, true, EPoolType::Basic);
+        });
+        Cerr << stats.ToString() << Endl;
+    }
+
+    Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAllocUnited) {
+        auto stats = CountStats([] {
+            return BenchSendActivateReceive(2, 1, true, EPoolType::United);
         });
         Cerr << stats.ToString() << Endl;
     }
 
-    Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsNoAllocUnited) { 
+    Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAlloc) {
         auto stats = CountStats([] {
-            return BenchSendActivateReceive(1, 2, false, EPoolType::United); 
+            return BenchSendActivateReceive(2, 1, false, EPoolType::Basic);
         });
         Cerr << stats.ToString() << Endl;
     }
 
-    Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAlloc) { 
+    Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAllocUnited) {
         auto stats = CountStats([] {
-            return BenchSendActivateReceive(2, 1, true, EPoolType::Basic); 
+            return BenchSendActivateReceive(2, 1, false, EPoolType::United);
         });
         Cerr << stats.ToString() << Endl;
     }
 
-    Y_UNIT_TEST(SendActivateReceive2Pool1ThreadAllocUnited) { 
-        auto stats = CountStats([] { 
-            return BenchSendActivateReceive(2, 1, true, EPoolType::United); 
-        }); 
-        Cerr << stats.ToString() << Endl; 
-    }
-
-    Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAlloc) { 
-        auto stats = CountStats([] { 
-            return BenchSendActivateReceive(2, 1, false, EPoolType::Basic); 
-        }); 
-        Cerr << stats.ToString() << Endl; 
-    }
-
-    Y_UNIT_TEST(SendActivateReceive2Pool1ThreadNoAllocUnited) { 
-        auto stats = CountStats([] { 
-            return BenchSendActivateReceive(2, 1, false, EPoolType::United); 
-        }); 
-        Cerr << stats.ToString() << Endl; 
-    }
-
-    void RunBenchContentedThreads(ui32 threads, EPoolType poolType) { 
-        for (ui32 actorPairs = 1; actorPairs <= 2 * threads; actorPairs++) { 
-            auto stats = CountStats([threads, actorPairs, poolType] { 
-                return BenchContentedThreads(threads, actorPairs, poolType); 
-            }); 
-            Cerr << stats.ToString() << " actorPairs: " << actorPairs << Endl; 
-        } 
-    }
-
-    Y_UNIT_TEST(SendActivateReceive1Pool1Threads)       { RunBenchContentedThreads(1, EPoolType::Basic);  } 
-    Y_UNIT_TEST(SendActivateReceive1Pool1ThreadsUnited) { RunBenchContentedThreads(1, EPoolType::United); } 
-    Y_UNIT_TEST(SendActivateReceive1Pool2Threads)       { RunBenchContentedThreads(2, EPoolType::Basic);  } 
-    Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsUnited) { RunBenchContentedThreads(2, EPoolType::United); } 
-    Y_UNIT_TEST(SendActivateReceive1Pool3Threads)       { RunBenchContentedThreads(3, EPoolType::Basic);  } 
-    Y_UNIT_TEST(SendActivateReceive1Pool3ThreadsUnited) { RunBenchContentedThreads(3, EPoolType::United); } 
-    Y_UNIT_TEST(SendActivateReceive1Pool4Threads)       { RunBenchContentedThreads(4, EPoolType::Basic);  } 
-    Y_UNIT_TEST(SendActivateReceive1Pool4ThreadsUnited) { RunBenchContentedThreads(4, EPoolType::United); } 
-    Y_UNIT_TEST(SendActivateReceive1Pool5Threads)       { RunBenchContentedThreads(5, EPoolType::Basic);  } 
-    Y_UNIT_TEST(SendActivateReceive1Pool5ThreadsUnited) { RunBenchContentedThreads(5, EPoolType::United); } 
-    Y_UNIT_TEST(SendActivateReceive1Pool6Threads)       { RunBenchContentedThreads(6, EPoolType::Basic);  } 
-    Y_UNIT_TEST(SendActivateReceive1Pool6ThreadsUnited) { RunBenchContentedThreads(6, EPoolType::United); } 
-    Y_UNIT_TEST(SendActivateReceive1Pool7Threads)       { RunBenchContentedThreads(7, EPoolType::Basic);  } 
-    Y_UNIT_TEST(SendActivateReceive1Pool7ThreadsUnited) { RunBenchContentedThreads(7, EPoolType::United); } 
-    Y_UNIT_TEST(SendActivateReceive1Pool8Threads)       { RunBenchContentedThreads(8, EPoolType::Basic);  } 
-    Y_UNIT_TEST(SendActivateReceive1Pool8ThreadsUnited) { RunBenchContentedThreads(8, EPoolType::United); } 
- 
+    void RunBenchContentedThreads(ui32 threads, EPoolType poolType) {
+        for (ui32 actorPairs = 1; actorPairs <= 2 * threads; actorPairs++) {
+            auto stats = CountStats([threads, actorPairs, poolType] {
+                return BenchContentedThreads(threads, actorPairs, poolType);
+            });
+            Cerr << stats.ToString() << " actorPairs: " << actorPairs << Endl;
+        }
+    }
+
+    Y_UNIT_TEST(SendActivateReceive1Pool1Threads)       { RunBenchContentedThreads(1, EPoolType::Basic);  }
+    Y_UNIT_TEST(SendActivateReceive1Pool1ThreadsUnited) { RunBenchContentedThreads(1, EPoolType::United); }
+    Y_UNIT_TEST(SendActivateReceive1Pool2Threads)       { RunBenchContentedThreads(2, EPoolType::Basic);  }
+    Y_UNIT_TEST(SendActivateReceive1Pool2ThreadsUnited) { RunBenchContentedThreads(2, EPoolType::United); }
+    Y_UNIT_TEST(SendActivateReceive1Pool3Threads)       { RunBenchContentedThreads(3, EPoolType::Basic);  }
+    Y_UNIT_TEST(SendActivateReceive1Pool3ThreadsUnited) { RunBenchContentedThreads(3, EPoolType::United); }
+    Y_UNIT_TEST(SendActivateReceive1Pool4Threads)       { RunBenchContentedThreads(4, EPoolType::Basic);  }
+    Y_UNIT_TEST(SendActivateReceive1Pool4ThreadsUnited) { RunBenchContentedThreads(4, EPoolType::United); }
+    Y_UNIT_TEST(SendActivateReceive1Pool5Threads)       { RunBenchContentedThreads(5, EPoolType::Basic);  }
+    Y_UNIT_TEST(SendActivateReceive1Pool5ThreadsUnited) { RunBenchContentedThreads(5, EPoolType::United); }
+    Y_UNIT_TEST(SendActivateReceive1Pool6Threads)       { RunBenchContentedThreads(6, EPoolType::Basic);  }
+    Y_UNIT_TEST(SendActivateReceive1Pool6ThreadsUnited) { RunBenchContentedThreads(6, EPoolType::United); }
+    Y_UNIT_TEST(SendActivateReceive1Pool7Threads)       { RunBenchContentedThreads(7, EPoolType::Basic);  }
+    Y_UNIT_TEST(SendActivateReceive1Pool7ThreadsUnited) { RunBenchContentedThreads(7, EPoolType::United); }
+    Y_UNIT_TEST(SendActivateReceive1Pool8Threads)       { RunBenchContentedThreads(8, EPoolType::Basic);  }
+    Y_UNIT_TEST(SendActivateReceive1Pool8ThreadsUnited) { RunBenchContentedThreads(8, EPoolType::United); }
+
     Y_UNIT_TEST(SendActivateReceiveWithMailboxNeighbours) {
         TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256};
         for (const auto& neighbour : NeighbourActors) {
             auto stats = CountStats([neighbour] {
-                return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic); 
+                return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::Basic);
+            });
+            Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl;
+        }
+    }
+
+    Y_UNIT_TEST(SendActivateReceiveWithMailboxNeighboursUnited) {
+        TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256};
+        for (const auto& neighbour : NeighbourActors) {
+            auto stats = CountStats([neighbour] {
+                return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United);
             });
-            Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl; 
+            Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl;
         }
     }
- 
-    Y_UNIT_TEST(SendActivateReceiveWithMailboxNeighboursUnited) { 
-        TVector<ui32> NeighbourActors = {0, 1, 2, 3, 4, 5, 6, 7, 8, 16, 32, 64, 128, 256}; 
-        for (const auto& neighbour : NeighbourActors) { 
-            auto stats = CountStats([neighbour] { 
-                return BenchSendActivateReceiveWithMailboxNeighbours(neighbour, EPoolType::United); 
-            }); 
-            Cerr << stats.ToString() << " neighbourActors: " << neighbour << Endl; 
-        } 
-    } 
 }
 
 Y_UNIT_TEST_SUITE(TestDecorator) {

+ 1 - 1
library/cpp/actors/core/actorid.h

@@ -180,7 +180,7 @@ namespace NActors {
     };
 
     static_assert(sizeof(TActorId) == 16, "expect sizeof(TActorId) == 16");
-    static_assert(MaxPools < TActorId::MaxPoolID); // current implementation of united pool has limit MaxPools on pool id 
+    static_assert(MaxPools < TActorId::MaxPoolID); // current implementation of united pool has limit MaxPools on pool id
 }
 
 template <>

+ 20 - 20
library/cpp/actors/core/actorsystem.cpp

@@ -1,7 +1,7 @@
 #include "defs.h"
 #include "actorsystem.h"
 #include "callstack.h"
-#include "cpu_manager.h" 
+#include "cpu_manager.h"
 #include "mailbox.h"
 #include "events.h"
 #include "interconnect.h"
@@ -9,10 +9,10 @@
 #include "scheduler_queue.h"
 #include "scheduler_actor.h"
 #include "log.h"
-#include "probes.h" 
+#include "probes.h"
 #include "ask.h"
 #include <library/cpp/actors/util/affinity.h>
-#include <library/cpp/actors/util/datetime.h> 
+#include <library/cpp/actors/util/datetime.h>
 #include <util/generic/hash.h>
 #include <util/system/rwlock.h>
 #include <util/random/random.h>
@@ -38,8 +38,8 @@ namespace NActors {
     TActorSystem::TActorSystem(THolder<TActorSystemSetup>& setup, void* appData,
                                TIntrusivePtr<NLog::TSettings> loggerSettings)
         : NodeId(setup->NodeId)
-        , CpuManager(new TCpuManager(setup)) 
-        , ExecutorPoolCount(CpuManager->GetExecutorsCount()) 
+        , CpuManager(new TCpuManager(setup))
+        , ExecutorPoolCount(CpuManager->GetExecutorsCount())
         , Scheduler(setup->Scheduler)
         , InterconnectCount((ui32)setup->Interconnect.ProxyActors.size())
         , CurrentTimestamp(0)
@@ -105,10 +105,10 @@ namespace NActors {
         Y_VERIFY_DEBUG(recipient == ev->GetRecipientRewrite());
         const ui32 recpPool = recipient.PoolID();
         if (recipient && recpPool < ExecutorPoolCount) {
-            if (CpuManager->GetExecutorPool(recpPool)->Send(ev)) { 
+            if (CpuManager->GetExecutorPool(recpPool)->Send(ev)) {
                 return true;
-            } 
-        } 
+            }
+        }
 
         Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown));
         return false;
@@ -142,7 +142,7 @@ namespace NActors {
                                     const TActorId& parentId) {
         Y_VERIFY(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32,
                  (ui32)executorPool, (ui32)ExecutorPoolCount);
-        return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId); 
+        return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
     }
 
     NThreading::TFuture<THolder<IEventBase>> TActorSystem::AskGeneric(TMaybe<ui32> expectedEventType,
@@ -199,20 +199,20 @@ namespace NActors {
         return ServiceMap->RegisterLocalService(serviceId, actorId);
     }
 
-    void TActorSystem::GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const { 
-        CpuManager->GetPoolStats(poolId, poolStats, statsCopy); 
-    } 
- 
+    void TActorSystem::GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
+        CpuManager->GetPoolStats(poolId, poolStats, statsCopy);
+    }
+
     void TActorSystem::Start() {
         Y_VERIFY(StartExecuted == false);
         StartExecuted = true;
 
-        ScheduleQueue.Reset(new NSchedulerQueue::TQueueType()); 
+        ScheduleQueue.Reset(new NSchedulerQueue::TQueueType());
         TVector<NSchedulerQueue::TReader*> scheduleReaders;
         scheduleReaders.push_back(&ScheduleQueue->Reader);
-        CpuManager->PrepareStart(scheduleReaders, this); 
+        CpuManager->PrepareStart(scheduleReaders, this);
         Scheduler->Prepare(this, &CurrentTimestamp, &CurrentMonotonic);
-        Scheduler->PrepareSchedules(&scheduleReaders.front(), (ui32)scheduleReaders.size()); 
+        Scheduler->PrepareSchedules(&scheduleReaders.front(), (ui32)scheduleReaders.size());
 
         // setup interconnect proxies
         {
@@ -243,7 +243,7 @@ namespace NActors {
         SystemSetup.Destroy();
 
         Scheduler->PrepareStart();
-        CpuManager->Start(); 
+        CpuManager->Start();
         Send(MakeSchedulerActorId(), new TEvSchedulerInitialize(scheduleReaders, &CurrentTimestamp, &CurrentMonotonic));
         Scheduler->Start();
     }
@@ -259,9 +259,9 @@ namespace NActors {
         }
 
         Scheduler->PrepareStop();
-        CpuManager->PrepareStop(); 
+        CpuManager->PrepareStop();
         Scheduler->Stop();
-        CpuManager->Shutdown(); 
+        CpuManager->Shutdown();
     }
 
     void TActorSystem::Cleanup() {
@@ -269,7 +269,7 @@ namespace NActors {
         if (CleanupExecuted || !StartExecuted)
             return;
         CleanupExecuted = true;
-        CpuManager->Cleanup(); 
+        CpuManager->Cleanup();
         Scheduler.Destroy();
     }
 

+ 42 - 42
library/cpp/actors/core/actorsystem.h

@@ -1,27 +1,27 @@
 #pragma once
 
 #include "defs.h"
- 
-#include "actor.h" 
-#include "balancer.h" 
-#include "config.h" 
+
+#include "actor.h"
+#include "balancer.h"
+#include "config.h"
 #include "event.h"
 #include "log_settings.h"
 #include "scheduler_cookie.h"
 #include "mon_stats.h"
- 
+
 #include <library/cpp/threading/future/future.h>
 #include <library/cpp/actors/util/ticket_lock.h>
- 
+
 #include <util/generic/vector.h>
 #include <util/datetime/base.h>
 #include <util/system/mutex.h>
 
 namespace NActors {
     class TActorSystem;
-    class TCpuManager; 
+    class TCpuManager;
     class IExecutorPool;
-    struct TWorkerContext; 
+    struct TWorkerContext;
 
     inline TActorId MakeInterconnectProxyId(ui32 destNodeId) {
         char data[12];
@@ -62,9 +62,9 @@ namespace NActors {
         virtual ~IExecutorPool() {
         }
 
-        // for workers 
-        virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0; 
-        virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0; 
+        // for workers
+        virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0;
+        virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0;
 
         /**
          * Schedule one-shot event that will be send at given time point in the future.
@@ -72,9 +72,9 @@ namespace NActors {
          * @param deadline   the wallclock time point in future when event must be send
          * @param ev         the event to send
          * @param cookie     cookie that will be piggybacked with event
-         * @param workerId   index of thread which will perform event dispatching 
+         * @param workerId   index of thread which will perform event dispatching
          */
-        virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; 
+        virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
 
         /**
          * Schedule one-shot event that will be send at given time point in the future.
@@ -82,9 +82,9 @@ namespace NActors {
          * @param deadline   the monotonic time point in future when event must be send
          * @param ev         the event to send
          * @param cookie     cookie that will be piggybacked with event
-         * @param workerId   index of thread which will perform event dispatching 
+         * @param workerId   index of thread which will perform event dispatching
          */
-        virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; 
+        virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
 
         /**
          * Schedule one-shot event that will be send after given delay.
@@ -92,9 +92,9 @@ namespace NActors {
          * @param delta      the time from now to delay event sending
          * @param ev         the event to send
          * @param cookie     cookie that will be piggybacked with event
-         * @param workerId   index of thread which will perform event dispatching 
+         * @param workerId   index of thread which will perform event dispatching
          */
-        virtual void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0; 
+        virtual void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
 
         // for actorsystem
         virtual bool Send(TAutoPtr<IEventHandle>& ev) = 0;
@@ -104,7 +104,7 @@ namespace NActors {
         virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0;
 
         // lifecycle stuff
-        virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0; 
+        virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0;
         virtual void Start() = 0;
         virtual void PrepareStop() = 0;
         virtual void Shutdown() = 0;
@@ -115,7 +115,7 @@ namespace NActors {
             Y_UNUSED(poolStats);
             Y_UNUSED(statsCopy);
         }
- 
+
         virtual TString GetName() const {
             return TString();
         }
@@ -127,7 +127,7 @@ namespace NActors {
         // generic
         virtual TAffinity* Affinity() const = 0;
 
-        virtual void SetRealTimeMode() const {} 
+        virtual void SetRealTimeMode() const {}
     };
 
     // could be proxy to in-pool schedulers (for NUMA-aware executors)
@@ -137,7 +137,7 @@ namespace NActors {
         }
 
         virtual void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) = 0;
-        virtual void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) = 0; 
+        virtual void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) = 0;
         virtual void PrepareStart() { /* empty */ }
         virtual void Start() = 0;
         virtual void PrepareStop() = 0;
@@ -180,14 +180,14 @@ namespace NActors {
     struct TActorSystemSetup {
         ui32 NodeId = 0;
 
-        // Either Executors or CpuManager must be initialized 
+        // Either Executors or CpuManager must be initialized
         ui32 ExecutorsCount = 0;
         TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
- 
-        TAutoPtr<IBalancer> Balancer; // main implementation will be implicitly created if not set 
- 
-        TCpuManagerConfig CpuManager; 
- 
+
+        TAutoPtr<IBalancer> Balancer; // main implementation will be implicitly created if not set
+
+        TCpuManagerConfig CpuManager;
+
         TAutoPtr<ISchedulerThread> Scheduler;
         ui32 MaxActivityType = 5; // for default entries
 
@@ -195,18 +195,18 @@ namespace NActors {
 
         using TLocalServices = TVector<std::pair<TActorId, TActorSetupCmd>>;
         TLocalServices LocalServices;
- 
-        ui32 GetExecutorsCount() const { 
-            return Executors ? ExecutorsCount : CpuManager.GetExecutorsCount(); 
-        } 
- 
-        TString GetPoolName(ui32 poolId) const { 
-            return Executors ? Executors[poolId]->GetName() : CpuManager.GetPoolName(poolId); 
-        } 
- 
-        ui32 GetThreads(ui32 poolId) const { 
-            return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId); 
-        } 
+
+        ui32 GetExecutorsCount() const {
+            return Executors ? ExecutorsCount : CpuManager.GetExecutorsCount();
+        }
+
+        TString GetPoolName(ui32 poolId) const {
+            return Executors ? Executors[poolId]->GetName() : CpuManager.GetPoolName(poolId);
+        }
+
+        ui32 GetThreads(ui32 poolId) const {
+            return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId);
+        }
     };
 
     class TActorSystem : TNonCopyable {
@@ -214,9 +214,9 @@ namespace NActors {
 
     public:
         const ui32 NodeId;
- 
+
     private:
-        THolder<TCpuManager> CpuManager; 
+        THolder<TCpuManager> CpuManager;
         const ui32 ExecutorPoolCount;
 
         TAutoPtr<ISchedulerThread> Scheduler;
@@ -353,7 +353,7 @@ namespace NActors {
             return LoggerSettings0.Get();
         }
 
-        void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const; 
+        void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const;
 
         void DeferPreStop(std::function<void()> fn) {
             DeferredPreStop.push_back(std::move(fn));

+ 293 - 293
library/cpp/actors/core/balancer.cpp

@@ -1,293 +1,293 @@
-#include "balancer.h" 
- 
-#include "probes.h" 
- 
-#include <library/cpp/actors/util/intrinsics.h> 
-#include <library/cpp/actors/util/datetime.h> 
- 
-#include <util/system/spinlock.h> 
- 
-#include <algorithm> 
- 
-namespace NActors { 
-    LWTRACE_USING(ACTORLIB_PROVIDER); 
- 
-    // Describes balancing-related state of pool, the most notable is `Importance` to add new cpu 
-    struct TLevel { 
-        // Balancer will try to give more cpu to overloaded pools 
-        enum ELoadClass { 
-            Underloaded = 0, 
-            Moderate = 1, 
-            Overloaded = 2, 
-        }; 
- 
-        double ScaleFactor; 
-        ELoadClass LoadClass; 
-        ui64 Importance; // pool with lower importance is allowed to pass cpu to pool with higher, but the opposite is forbidden 
- 
-        TLevel() {} 
- 
-        TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle) { 
-            ScaleFactor = double(currentCpus) / cfg.Cpus; 
-            if (cpuIdle > 1.3) { // TODO: add a better underload criterion, based on estimated latency w/o 1 cpu 
-                LoadClass = Underloaded; 
-            } else if (cpuIdle < 0.2) { // TODO: add a better overload criterion, based on latency 
-                LoadClass = Overloaded; 
-            } else { 
-                LoadClass = Moderate; 
-            } 
-            Importance = MakeImportance(LoadClass, cfg.Priority, ScaleFactor, cpuIdle, poolId); 
-        } 
- 
-    private: 
-        // Importance is simple ui64 value (from highest to lowest): 
-        //   2 Bits: LoadClass 
-        //   8 Bits: Priority 
-        //  10 Bits: -ScaleFactor (for max-min fairness with weights equal to TBalancingConfig::Cpus) 
-        //  10 Bits: -CpuIdle 
-        //   6 Bits: PoolId 
-        static ui64 MakeImportance(ELoadClass load, ui8 priority, double scaleFactor, double cpuIdle, TPoolId poolId) { 
-            ui64 idle = std::clamp<i64>(1024 - cpuIdle * 512, 0, 1023); 
-            ui64 scale = std::clamp<i64>(1024 - scaleFactor * 32, 0, 1023); 
- 
-            Y_VERIFY(ui64(load)     < (1ull << 2ull)); 
-            Y_VERIFY(ui64(priority) < (1ull << 8ull)); 
-            Y_VERIFY(ui64(scale)    < (1ull << 10ull)); 
-            Y_VERIFY(ui64(idle)     < (1ull << 10ull)); 
-            Y_VERIFY(ui64(poolId)   < (1ull << 6ull)); 
- 
-            static_assert(ui64(MaxPools) <= (1ull << 6ull)); 
- 
-            ui64 importance = 
-                (ui64(load)     << ui64(6 + 10 + 10 + 8)) | 
-                (ui64(priority) << ui64(6 + 10 + 10)) | 
-                (ui64(scale)    << ui64(6 + 10)) | 
-                (ui64(idle)     << ui64(6)) | 
-                ui64(poolId); 
-            return importance; 
-        } 
-    }; 
- 
-    // Main balancer implemenation 
-    class TBalancer: public IBalancer { 
-    private: 
-        struct TCpu; 
-        struct TPool; 
- 
-        bool Disabled = true; 
-        TSpinLock Lock; 
-        ui64 NextBalanceTs; 
-        TVector<TCpu> Cpus; // Indexed by CpuId, can have gaps 
-        TVector<TPool> Pools; // Indexed by PoolId, can have gaps 
-        TBalancerConfig Config; 
- 
-    public: 
-        // Setup 
-        TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts); 
-        bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override; 
-        ~TBalancer(); 
- 
-        // Balancing 
-        bool TryLock(ui64 ts) override; 
-        void SetPoolStats(TPoolId pool, const TBalancerStats& stats) override; 
-        void Balance() override; 
-        void Unlock() override; 
- 
-    private: 
-        void MoveCpu(TPool& from, TPool& to); 
-    }; 
- 
-    struct TBalancer::TPool { 
-        TBalancingConfig Config; 
-        TPoolId PoolId; 
-        TString PoolName; 
- 
-        // Input data for balancing 
-        TBalancerStats Prev; 
-        TBalancerStats Next; 
- 
-        // Derived stats 
-        double CpuLoad; 
-        double CpuIdle; 
- 
-        // Classification 
-        // NOTE: We want to avoid passing cpu back and forth, so we must consider not only current level, 
-        // NOTE: but expected levels after movements also 
-        TLevel CurLevel; // Level with current amount of cpu 
-        TLevel AddLevel; // Level after one cpu acception 
-        TLevel SubLevel; // Level after one cpu donation 
- 
-        // Balancing state 
-        ui64 CurrentCpus = 0; // Total number of cpus assigned for this pool (zero means pools is not balanced) 
-        ui64 PrevCpus = 0; // Cpus in last period 
- 
-        explicit TPool(const TBalancingConfig& cfg = {}) 
-            : Config(cfg) 
-        {} 
- 
-        void Configure(const TBalancingConfig& cfg, const TString& poolName) { 
-            Config = cfg; 
-            // Enforce constraints 
-            Config.MinCpus = std::clamp<ui32>(Config.MinCpus, 1, Config.Cpus); 
-            Config.MaxCpus = Max<ui32>(Config.MaxCpus, Config.Cpus); 
-            PoolName = poolName; 
-        } 
-    }; 
- 
-    struct TBalancer::TCpu { 
-        TCpuState* State = nullptr; // Cpu state, nullptr means cpu is not used (gap) 
-        TCpuAllocation Alloc; 
-        TPoolId Current; 
-        TPoolId Assigned; 
-    }; 
- 
-    TBalancer::TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) 
-        : NextBalanceTs(ts) 
-        , Config(config) 
-    { 
-        for (TPoolId pool = 0; pool < MaxPools; pool++) { 
-            Pools.emplace_back(); 
-            Pools.back().PoolId = pool; 
-        } 
-        for (const TUnitedExecutorPoolConfig& united : unitedPools) { 
-            Pools[united.PoolId].Configure(united.Balancing, united.PoolName); 
-        } 
-    } 
- 
-    TBalancer::~TBalancer() { 
-    } 
- 
-    bool TBalancer::AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* state) { 
-        // Setup 
-        TCpuId cpuId = cpuAlloc.CpuId; 
-        if (Cpus.size() <= cpuId) { 
-            Cpus.resize(cpuId + 1); 
-        } 
-        TCpu& cpu = Cpus[cpuId]; 
-        cpu.State = state; 
-        cpu.Alloc = cpuAlloc; 
- 
-        // Fill every pool with cpus up to TBalancingConfig::Cpus 
-        TPoolId pool = 0; 
-        for (TPool& p : Pools) { 
-            if (p.CurrentCpus < p.Config.Cpus) { 
-                p.CurrentCpus++; 
-                break; 
-            } 
-            pool++; 
-        } 
-        if (pool != MaxPools) { // cpu under balancer control 
-            state->SwitchPool(pool); 
-            state->AssignPool(pool); 
-            Disabled = false; 
-            return true; 
-        } 
-        return false; // non-balanced cpu 
-    } 
- 
-    bool TBalancer::TryLock(ui64 ts) { 
-        if (!Disabled && NextBalanceTs < ts && Lock.TryAcquire()) { 
-            NextBalanceTs = ts + Us2Ts(Config.PeriodUs); 
-            return true; 
-        } 
-        return false; 
-    } 
- 
-    void TBalancer::SetPoolStats(TPoolId pool, const TBalancerStats& stats) { 
-        Y_VERIFY(pool < MaxPools); 
-        TPool& p = Pools[pool]; 
-        p.Prev = p.Next; 
-        p.Next = stats; 
-    } 
- 
-    void TBalancer::Balance() { 
-        // Update every cpu state 
-        for (TCpu& cpu : Cpus) { 
-            if (cpu.State) { 
-                cpu.State->Load(cpu.Assigned, cpu.Current); 
-                if (cpu.Current < MaxPools && cpu.Current != cpu.Assigned) { 
-                    return; // previous movement has not been applied yet, wait 
-                } 
-            } 
-        } 
- 
-        // Process stats, classify and compute pool importance 
-        TStackVec<TPool*, MaxPools> order; 
-        for (TPool& pool : Pools) { 
-            if (pool.Config.Cpus == 0) { 
-                continue; // skip gaps (non-existent or non-united pools) 
-            } 
-            if (pool.Prev.Ts == 0 || pool.Prev.Ts >= pool.Next.Ts) { 
-                return; // invalid stats 
-            } 
- 
-            // Compute derived stats 
-            pool.CpuLoad = (pool.Next.CpuUs - pool.Prev.CpuUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts); 
-            if (pool.Prev.IdleUs == ui64(-1) || pool.Next.IdleUs == ui64(-1)) { 
-                pool.CpuIdle = pool.CurrentCpus - pool.CpuLoad; // for tests 
-            } else { 
-                pool.CpuIdle = (pool.Next.IdleUs - pool.Prev.IdleUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts); 
-            } 
- 
-            // Compute levels 
-            pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle); 
-            pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle); // we expect taken cpu to became utilized 
-            pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1); 
- 
-            // Prepare for balancing 
-            pool.PrevCpus = pool.CurrentCpus; 
-            order.push_back(&pool); 
-        } 
- 
-        // Sort pools by importance 
-        std::sort(order.begin(), order.end(), [] (TPool* l, TPool* r) {return l->CurLevel.Importance < r->CurLevel.Importance; }); 
-        for (TPool* pool : order) { 
-            LWPROBE(PoolStats, pool->PoolId, pool->PoolName, pool->CurrentCpus, pool->CurLevel.LoadClass, pool->Config.Priority, pool->CurLevel.ScaleFactor, pool->CpuIdle, pool->CpuLoad, pool->CurLevel.Importance, pool->AddLevel.Importance, pool->SubLevel.Importance); 
-        } 
- 
-        // Move cpus from lower importance to higher importance pools 
-        for (auto toIter = order.rbegin(); toIter != order.rend(); ++toIter) { 
-            TPool& to = **toIter; 
-            if (to.CurLevel.LoadClass == TLevel::Overloaded && // if pool is overloaded 
-                to.CurrentCpus < to.Config.MaxCpus) // and constraints would not be violated 
-            { 
-                for (auto fromIter = order.begin(); (*fromIter)->CurLevel.Importance < to.CurLevel.Importance; ++fromIter) { 
-                    TPool& from = **fromIter; 
-                    if (from.CurrentCpus == from.PrevCpus && // if not balanced yet 
-                        from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated 
-                        from.SubLevel.Importance < to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement 
-                    { 
-                        MoveCpu(from, to); 
-                        from.CurrentCpus--; 
-                        to.CurrentCpus++; 
-                        break; 
-                    } 
-                } 
-            } 
-        } 
-    } 
- 
-    void TBalancer::MoveCpu(TBalancer::TPool& from, TBalancer::TPool& to) { 
-        for (auto ci = Cpus.rbegin(), ce = Cpus.rend(); ci != ce; ci++) { 
-            TCpu& cpu = *ci; 
-            if (!cpu.State) { 
-                continue; 
-            } 
-            if (cpu.Assigned == from.PoolId) { 
-                cpu.State->AssignPool(to.PoolId); 
-                cpu.Assigned = to.PoolId; 
-                LWPROBE(MoveCpu, from.PoolId, to.PoolId, from.PoolName, to.PoolName, cpu.Alloc.CpuId); 
-                return; 
-            } 
-        } 
-        Y_FAIL(); 
-    } 
- 
-    void TBalancer::Unlock() { 
-        Lock.Release(); 
-    } 
- 
-    IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) { 
-        return new TBalancer(config, unitedPools, ts); 
-    } 
-} 
+#include "balancer.h"
+
+#include "probes.h"
+
+#include <library/cpp/actors/util/intrinsics.h>
+#include <library/cpp/actors/util/datetime.h>
+
+#include <util/system/spinlock.h>
+
+#include <algorithm>
+
+namespace NActors {
+    LWTRACE_USING(ACTORLIB_PROVIDER);
+
+    // Describes balancing-related state of pool, the most notable is `Importance` to add new cpu
+    struct TLevel {
+        // Balancer will try to give more cpu to overloaded pools
+        enum ELoadClass {
+            Underloaded = 0,
+            Moderate = 1,
+            Overloaded = 2,
+        };
+
+        double ScaleFactor;
+        ELoadClass LoadClass;
+        ui64 Importance; // pool with lower importance is allowed to pass cpu to pool with higher, but the opposite is forbidden
+
+        TLevel() {}
+
+        TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle) {
+            ScaleFactor = double(currentCpus) / cfg.Cpus;
+            if (cpuIdle > 1.3) { // TODO: add a better underload criterion, based on estimated latency w/o 1 cpu
+                LoadClass = Underloaded;
+            } else if (cpuIdle < 0.2) { // TODO: add a better overload criterion, based on latency
+                LoadClass = Overloaded;
+            } else {
+                LoadClass = Moderate;
+            }
+            Importance = MakeImportance(LoadClass, cfg.Priority, ScaleFactor, cpuIdle, poolId);
+        }
+
+    private:
+        // Importance is simple ui64 value (from highest to lowest):
+        //   2 Bits: LoadClass
+        //   8 Bits: Priority
+        //  10 Bits: -ScaleFactor (for max-min fairness with weights equal to TBalancingConfig::Cpus)
+        //  10 Bits: -CpuIdle
+        //   6 Bits: PoolId
+        static ui64 MakeImportance(ELoadClass load, ui8 priority, double scaleFactor, double cpuIdle, TPoolId poolId) {
+            ui64 idle = std::clamp<i64>(1024 - cpuIdle * 512, 0, 1023);
+            ui64 scale = std::clamp<i64>(1024 - scaleFactor * 32, 0, 1023);
+
+            Y_VERIFY(ui64(load)     < (1ull << 2ull));
+            Y_VERIFY(ui64(priority) < (1ull << 8ull));
+            Y_VERIFY(ui64(scale)    < (1ull << 10ull));
+            Y_VERIFY(ui64(idle)     < (1ull << 10ull));
+            Y_VERIFY(ui64(poolId)   < (1ull << 6ull));
+
+            static_assert(ui64(MaxPools) <= (1ull << 6ull));
+
+            ui64 importance =
+                (ui64(load)     << ui64(6 + 10 + 10 + 8)) |
+                (ui64(priority) << ui64(6 + 10 + 10)) |
+                (ui64(scale)    << ui64(6 + 10)) |
+                (ui64(idle)     << ui64(6)) |
+                ui64(poolId);
+            return importance;
+        }
+    };
+
+    // Main balancer implemenation
+    class TBalancer: public IBalancer {
+    private:
+        struct TCpu;
+        struct TPool;
+
+        bool Disabled = true;
+        TSpinLock Lock;
+        ui64 NextBalanceTs;
+        TVector<TCpu> Cpus; // Indexed by CpuId, can have gaps
+        TVector<TPool> Pools; // Indexed by PoolId, can have gaps
+        TBalancerConfig Config;
+
+    public:
+        // Setup
+        TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts);
+        bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override;
+        ~TBalancer();
+
+        // Balancing
+        bool TryLock(ui64 ts) override;
+        void SetPoolStats(TPoolId pool, const TBalancerStats& stats) override;
+        void Balance() override;
+        void Unlock() override;
+
+    private:
+        void MoveCpu(TPool& from, TPool& to);
+    };
+
+    struct TBalancer::TPool {
+        TBalancingConfig Config;
+        TPoolId PoolId;
+        TString PoolName;
+
+        // Input data for balancing
+        TBalancerStats Prev;
+        TBalancerStats Next;
+
+        // Derived stats
+        double CpuLoad;
+        double CpuIdle;
+
+        // Classification
+        // NOTE: We want to avoid passing cpu back and forth, so we must consider not only current level,
+        // NOTE: but expected levels after movements also
+        TLevel CurLevel; // Level with current amount of cpu
+        TLevel AddLevel; // Level after one cpu acception
+        TLevel SubLevel; // Level after one cpu donation
+
+        // Balancing state
+        ui64 CurrentCpus = 0; // Total number of cpus assigned for this pool (zero means pools is not balanced)
+        ui64 PrevCpus = 0; // Cpus in last period
+
+        explicit TPool(const TBalancingConfig& cfg = {})
+            : Config(cfg)
+        {}
+
+        void Configure(const TBalancingConfig& cfg, const TString& poolName) {
+            Config = cfg;
+            // Enforce constraints
+            Config.MinCpus = std::clamp<ui32>(Config.MinCpus, 1, Config.Cpus);
+            Config.MaxCpus = Max<ui32>(Config.MaxCpus, Config.Cpus);
+            PoolName = poolName;
+        }
+    };
+
+    struct TBalancer::TCpu {
+        TCpuState* State = nullptr; // Cpu state, nullptr means cpu is not used (gap)
+        TCpuAllocation Alloc;
+        TPoolId Current;
+        TPoolId Assigned;
+    };
+
+    TBalancer::TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts)
+        : NextBalanceTs(ts)
+        , Config(config)
+    {
+        for (TPoolId pool = 0; pool < MaxPools; pool++) {
+            Pools.emplace_back();
+            Pools.back().PoolId = pool;
+        }
+        for (const TUnitedExecutorPoolConfig& united : unitedPools) {
+            Pools[united.PoolId].Configure(united.Balancing, united.PoolName);
+        }
+    }
+
+    TBalancer::~TBalancer() {
+    }
+
+    bool TBalancer::AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* state) {
+        // Setup
+        TCpuId cpuId = cpuAlloc.CpuId;
+        if (Cpus.size() <= cpuId) {
+            Cpus.resize(cpuId + 1);
+        }
+        TCpu& cpu = Cpus[cpuId];
+        cpu.State = state;
+        cpu.Alloc = cpuAlloc;
+
+        // Fill every pool with cpus up to TBalancingConfig::Cpus
+        TPoolId pool = 0;
+        for (TPool& p : Pools) {
+            if (p.CurrentCpus < p.Config.Cpus) {
+                p.CurrentCpus++;
+                break;
+            }
+            pool++;
+        }
+        if (pool != MaxPools) { // cpu under balancer control
+            state->SwitchPool(pool);
+            state->AssignPool(pool);
+            Disabled = false;
+            return true;
+        }
+        return false; // non-balanced cpu
+    }
+
+    bool TBalancer::TryLock(ui64 ts) {
+        if (!Disabled && NextBalanceTs < ts && Lock.TryAcquire()) {
+            NextBalanceTs = ts + Us2Ts(Config.PeriodUs);
+            return true;
+        }
+        return false;
+    }
+
+    void TBalancer::SetPoolStats(TPoolId pool, const TBalancerStats& stats) {
+        Y_VERIFY(pool < MaxPools);
+        TPool& p = Pools[pool];
+        p.Prev = p.Next;
+        p.Next = stats;
+    }
+
+    void TBalancer::Balance() {
+        // Update every cpu state
+        for (TCpu& cpu : Cpus) {
+            if (cpu.State) {
+                cpu.State->Load(cpu.Assigned, cpu.Current);
+                if (cpu.Current < MaxPools && cpu.Current != cpu.Assigned) {
+                    return; // previous movement has not been applied yet, wait
+                }
+            }
+        }
+
+        // Process stats, classify and compute pool importance
+        TStackVec<TPool*, MaxPools> order;
+        for (TPool& pool : Pools) {
+            if (pool.Config.Cpus == 0) {
+                continue; // skip gaps (non-existent or non-united pools)
+            }
+            if (pool.Prev.Ts == 0 || pool.Prev.Ts >= pool.Next.Ts) {
+                return; // invalid stats
+            }
+
+            // Compute derived stats
+            pool.CpuLoad = (pool.Next.CpuUs - pool.Prev.CpuUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts);
+            if (pool.Prev.IdleUs == ui64(-1) || pool.Next.IdleUs == ui64(-1)) {
+                pool.CpuIdle = pool.CurrentCpus - pool.CpuLoad; // for tests
+            } else {
+                pool.CpuIdle = (pool.Next.IdleUs - pool.Prev.IdleUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts);
+            }
+
+            // Compute levels
+            pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle);
+            pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle); // we expect taken cpu to became utilized
+            pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1);
+
+            // Prepare for balancing
+            pool.PrevCpus = pool.CurrentCpus;
+            order.push_back(&pool);
+        }
+
+        // Sort pools by importance
+        std::sort(order.begin(), order.end(), [] (TPool* l, TPool* r) {return l->CurLevel.Importance < r->CurLevel.Importance; });
+        for (TPool* pool : order) {
+            LWPROBE(PoolStats, pool->PoolId, pool->PoolName, pool->CurrentCpus, pool->CurLevel.LoadClass, pool->Config.Priority, pool->CurLevel.ScaleFactor, pool->CpuIdle, pool->CpuLoad, pool->CurLevel.Importance, pool->AddLevel.Importance, pool->SubLevel.Importance);
+        }
+
+        // Move cpus from lower importance to higher importance pools
+        for (auto toIter = order.rbegin(); toIter != order.rend(); ++toIter) {
+            TPool& to = **toIter;
+            if (to.CurLevel.LoadClass == TLevel::Overloaded && // if pool is overloaded
+                to.CurrentCpus < to.Config.MaxCpus) // and constraints would not be violated
+            {
+                for (auto fromIter = order.begin(); (*fromIter)->CurLevel.Importance < to.CurLevel.Importance; ++fromIter) {
+                    TPool& from = **fromIter;
+                    if (from.CurrentCpus == from.PrevCpus && // if not balanced yet
+                        from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated
+                        from.SubLevel.Importance < to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
+                    {
+                        MoveCpu(from, to);
+                        from.CurrentCpus--;
+                        to.CurrentCpus++;
+                        break;
+                    }
+                }
+            }
+        }
+    }
+
+    void TBalancer::MoveCpu(TBalancer::TPool& from, TBalancer::TPool& to) {
+        for (auto ci = Cpus.rbegin(), ce = Cpus.rend(); ci != ce; ci++) {
+            TCpu& cpu = *ci;
+            if (!cpu.State) {
+                continue;
+            }
+            if (cpu.Assigned == from.PoolId) {
+                cpu.State->AssignPool(to.PoolId);
+                cpu.Assigned = to.PoolId;
+                LWPROBE(MoveCpu, from.PoolId, to.PoolId, from.PoolName, to.PoolName, cpu.Alloc.CpuId);
+                return;
+            }
+        }
+        Y_FAIL();
+    }
+
+    void TBalancer::Unlock() {
+        Lock.Release();
+    }
+
+    IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) {
+        return new TBalancer(config, unitedPools, ts);
+    }
+}

+ 27 - 27
library/cpp/actors/core/balancer.h

@@ -1,27 +1,27 @@
-#pragma once 
- 
-#include "defs.h" 
-#include "config.h" 
-#include "cpu_state.h" 
- 
-namespace NActors { 
-    // Per-pool statistics used by balancer 
-    struct TBalancerStats { 
-        ui64 Ts = 0; // Measurement timestamp 
-        ui64 CpuUs = 0; // Total cpu microseconds consumed by pool on all cpus since start 
-        ui64 IdleUs = ui64(-1); // Total cpu microseconds in spinning or waiting on futex 
-    }; 
- 
-    // Pool cpu balancer 
-    struct IBalancer { 
-        virtual ~IBalancer() {} 
-        virtual bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) = 0; 
-        virtual bool TryLock(ui64 ts) = 0; 
-        virtual void SetPoolStats(TPoolId pool, const TBalancerStats& stats) = 0; 
-        virtual void Balance() = 0; 
-        virtual void Unlock() = 0; 
-        // TODO: add method for reconfiguration on fly 
-    }; 
- 
-    IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts); 
-} 
+#pragma once
+
+#include "defs.h"
+#include "config.h"
+#include "cpu_state.h"
+
+namespace NActors {
+    // Per-pool statistics used by balancer
+    struct TBalancerStats {
+        ui64 Ts = 0; // Measurement timestamp
+        ui64 CpuUs = 0; // Total cpu microseconds consumed by pool on all cpus since start
+        ui64 IdleUs = ui64(-1); // Total cpu microseconds in spinning or waiting on futex
+    };
+
+    // Pool cpu balancer
+    struct IBalancer {
+        virtual ~IBalancer() {}
+        virtual bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) = 0;
+        virtual bool TryLock(ui64 ts) = 0;
+        virtual void SetPoolStats(TPoolId pool, const TBalancerStats& stats) = 0;
+        virtual void Balance() = 0;
+        virtual void Unlock() = 0;
+        // TODO: add method for reconfiguration on fly
+    };
+
+    IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts);
+}

+ 225 - 225
library/cpp/actors/core/balancer_ut.cpp

@@ -1,225 +1,225 @@
-#include "balancer.h" 
- 
-#include <library/cpp/actors/util/datetime.h> 
-#include <library/cpp/lwtrace/all.h> 
-#include <library/cpp/testing/unittest/registar.h> 
- 
-#include <util/stream/str.h> 
- 
-using namespace NActors; 
- 
-//////////////////////////////////////////////////////////////////////////////// 
- 
-Y_UNIT_TEST_SUITE(PoolCpuBalancer) { 
-    struct TTest { 
-        TCpuManagerConfig Config; 
-        TCpuMask Available; 
-        THolder<IBalancer> Balancer; 
-        TVector<TCpuState> CpuStates; 
-        TVector<ui64> CpuUs; 
-        ui64 Now = 0; 
- 
-        void SetCpuCount(size_t count) { 
-            Config.UnitedWorkers.CpuCount = count; 
-            for (TCpuId cpuId = 0; cpuId < count; cpuId++) { 
-                Available.Set(cpuId); 
-            } 
-        } 
- 
-        void AddPool(ui32 minCpus, ui32 cpus, ui32 maxCpus, ui8 priority = 0) { 
-            TUnitedExecutorPoolConfig u; 
-            u.PoolId = TPoolId(Config.United.size()); 
-            u.Balancing.Cpus = cpus; 
-            u.Balancing.MinCpus = minCpus; 
-            u.Balancing.MaxCpus = maxCpus; 
-            u.Balancing.Priority = priority; 
-            Config.United.push_back(u); 
-        } 
- 
-        void Start() { 
-            TCpuAllocationConfig allocation(Available, Config); 
-            Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, 0)); 
-            CpuStates.resize(allocation.Items.size()); // do not resize it later to avoid dangling pointers 
-            CpuUs.resize(CpuStates.size()); 
-            for (const TCpuAllocation& cpuAlloc : allocation.Items) { 
-                bool added = Balancer->AddCpu(cpuAlloc, &CpuStates[cpuAlloc.CpuId]); 
-                UNIT_ASSERT(added); 
-            } 
-        } 
- 
-        void Balance(ui64 deltaTs, const TVector<ui64>& cpuUs) { 
-            Now += deltaTs; 
-            ui64 ts = Now; 
-            if (Balancer->TryLock(ts)) { 
-                for (TPoolId pool = 0; pool < cpuUs.size(); pool++) { 
-                    CpuUs[pool] += cpuUs[pool]; 
-                    TBalancerStats stats; 
-                    stats.Ts = ts; 
-                    stats.CpuUs = CpuUs[pool]; 
-                    Balancer->SetPoolStats(pool, stats); 
-                } 
-                Balancer->Balance(); 
-                Balancer->Unlock(); 
-            } 
-        } 
- 
-        void ApplyMovements() { 
-            for (TCpuState& state : CpuStates) { 
-                TPoolId current; 
-                TPoolId assigned; 
-                state.Load(assigned, current); 
-                state.SwitchPool(assigned); 
-            } 
-        } 
- 
-        static TString ToStr(const TVector<ui64>& values) { 
-            TStringStream ss; 
-            ss << "{"; 
-            for (auto v : values) { 
-                ss << " " << v; 
-            } 
-            ss << " }"; 
-            return ss.Str(); 
-        } 
- 
-        void AssertPoolsCurrentCpus(const TVector<ui64>& cpuRequired) { 
-            TVector<ui64> cpuCurrent; 
-            cpuCurrent.resize(cpuRequired.size()); 
-            for (TCpuState& state : CpuStates) { 
-                TPoolId current; 
-                TPoolId assigned; 
-                state.Load(assigned, current); 
-                cpuCurrent[current]++; 
-            } 
-            for (TPoolId pool = 0; pool < cpuRequired.size(); pool++) { 
-                UNIT_ASSERT_C(cpuCurrent[pool] == cpuRequired[pool], 
-                    "cpu distribution mismatch, required " << ToStr(cpuRequired) << " but got " << ToStr(cpuCurrent)); 
-            } 
-        } 
-    }; 
- 
-    Y_UNIT_TEST(StartLwtrace) { 
-        NLWTrace::StartLwtraceFromEnv(); 
-    } 
- 
-    Y_UNIT_TEST(AllOverloaded) { 
-        TTest t; 
-        int cpus = 10; 
-        t.SetCpuCount(cpus); 
-        t.AddPool(1, 1, 10); // pool=0 
-        t.AddPool(1, 2, 10); // pool=1 
-        t.AddPool(1, 3, 10); // pool=2 
-        t.AddPool(1, 4, 10); // pool=2 
-        t.Start(); 
-        ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs); 
-        ui64 totalCpuUs = cpus * Ts2Us(dts); // pretend every pool has consumed as whole actorsystem, overload 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {totalCpuUs, totalCpuUs, totalCpuUs, totalCpuUs}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({1, 2, 3, 4}); 
-    } 
- 
-    Y_UNIT_TEST(OneOverloaded) { 
-        TTest t; 
-        int cpus = 10; 
-        t.SetCpuCount(cpus); 
-        t.AddPool(1, 1, 10); // pool=0 
-        t.AddPool(1, 2, 10); // pool=1 
-        t.AddPool(1, 3, 10); // pool=2 
-        t.AddPool(1, 4, 10); // pool=2 
-        t.Start(); 
-        ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs); 
-        ui64 totalCpuUs = cpus * Ts2Us(dts); 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {totalCpuUs, 0, 0, 0}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({7, 1, 1, 1}); 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {0, totalCpuUs, 0, 0}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({1, 7, 1, 1}); 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {0, 0, totalCpuUs, 0}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({1, 1, 7, 1}); 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {0, 0, 0, totalCpuUs}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({1, 1, 1, 7}); 
-    } 
- 
-    Y_UNIT_TEST(TwoOverloadedFairness) { 
-        TTest t; 
-        int cpus = 10; 
-        t.SetCpuCount(cpus); 
-        t.AddPool(1, 1, 10); // pool=0 
-        t.AddPool(1, 2, 10); // pool=1 
-        t.AddPool(1, 3, 10); // pool=2 
-        t.AddPool(1, 4, 10); // pool=2 
-        t.Start(); 
-        ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs); 
-        ui64 totalCpuUs = cpus * Ts2Us(dts); 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {totalCpuUs, totalCpuUs, 0, 0}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({3, 5, 1, 1}); 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {totalCpuUs, 0, totalCpuUs, 0}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({2, 1, 6, 1}); 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {totalCpuUs, 0, 0, totalCpuUs}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({2, 1, 1, 6}); 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {0, totalCpuUs, totalCpuUs, 0}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({1, 3, 5, 1}); 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {0, totalCpuUs, 0, totalCpuUs}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({1, 3, 1, 5}); 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {0, 0, totalCpuUs, totalCpuUs}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({1, 1, 3, 5}); 
-    } 
- 
-    Y_UNIT_TEST(TwoOverloadedPriority) { 
-        TTest t; 
-        int cpus = 20; 
-        t.SetCpuCount(cpus); 
-        t.AddPool(1, 5, 20, 0); // pool=0 
-        t.AddPool(1, 5, 20, 1); // pool=1 
-        t.AddPool(1, 5, 20, 2); // pool=2 
-        t.AddPool(1, 5, 20, 3); // pool=3 
-        t.Start(); 
-        ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs); 
-        ui64 mErlang = Ts2Us(dts) / 1000; 
-        for (int i = 0; i < cpus; i++) { 
-            t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 4500 * mErlang, 9500 * mErlang}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({2, 3, 5, 10}); 
-        t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 4500 * mErlang, 8500 * mErlang}); 
-        t.ApplyMovements(); 
-        t.AssertPoolsCurrentCpus({3, 3, 5, 9}); 
-        // NOTE: this operation require one move, but we do not make global analysis, so multiple steps (1->2 & 0->1) are required (can be optimized later) 
-        for (int i = 0; i < 3; i++) { 
-            t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 5500 * mErlang, 8500 * mErlang}); 
-            t.ApplyMovements(); 
-        } 
-        t.AssertPoolsCurrentCpus({2, 3, 6, 9}); 
-    } 
-} 
+#include "balancer.h"
+
+#include <library/cpp/actors/util/datetime.h>
+#include <library/cpp/lwtrace/all.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/stream/str.h>
+
+using namespace NActors;
+
+////////////////////////////////////////////////////////////////////////////////
+
+Y_UNIT_TEST_SUITE(PoolCpuBalancer) {
+    struct TTest {
+        TCpuManagerConfig Config;
+        TCpuMask Available;
+        THolder<IBalancer> Balancer;
+        TVector<TCpuState> CpuStates;
+        TVector<ui64> CpuUs;
+        ui64 Now = 0;
+
+        void SetCpuCount(size_t count) {
+            Config.UnitedWorkers.CpuCount = count;
+            for (TCpuId cpuId = 0; cpuId < count; cpuId++) {
+                Available.Set(cpuId);
+            }
+        }
+
+        void AddPool(ui32 minCpus, ui32 cpus, ui32 maxCpus, ui8 priority = 0) {
+            TUnitedExecutorPoolConfig u;
+            u.PoolId = TPoolId(Config.United.size());
+            u.Balancing.Cpus = cpus;
+            u.Balancing.MinCpus = minCpus;
+            u.Balancing.MaxCpus = maxCpus;
+            u.Balancing.Priority = priority;
+            Config.United.push_back(u);
+        }
+
+        void Start() {
+            TCpuAllocationConfig allocation(Available, Config);
+            Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, 0));
+            CpuStates.resize(allocation.Items.size()); // do not resize it later to avoid dangling pointers
+            CpuUs.resize(CpuStates.size());
+            for (const TCpuAllocation& cpuAlloc : allocation.Items) {
+                bool added = Balancer->AddCpu(cpuAlloc, &CpuStates[cpuAlloc.CpuId]);
+                UNIT_ASSERT(added);
+            }
+        }
+
+        void Balance(ui64 deltaTs, const TVector<ui64>& cpuUs) {
+            Now += deltaTs;
+            ui64 ts = Now;
+            if (Balancer->TryLock(ts)) {
+                for (TPoolId pool = 0; pool < cpuUs.size(); pool++) {
+                    CpuUs[pool] += cpuUs[pool];
+                    TBalancerStats stats;
+                    stats.Ts = ts;
+                    stats.CpuUs = CpuUs[pool];
+                    Balancer->SetPoolStats(pool, stats);
+                }
+                Balancer->Balance();
+                Balancer->Unlock();
+            }
+        }
+
+        void ApplyMovements() {
+            for (TCpuState& state : CpuStates) {
+                TPoolId current;
+                TPoolId assigned;
+                state.Load(assigned, current);
+                state.SwitchPool(assigned);
+            }
+        }
+
+        static TString ToStr(const TVector<ui64>& values) {
+            TStringStream ss;
+            ss << "{";
+            for (auto v : values) {
+                ss << " " << v;
+            }
+            ss << " }";
+            return ss.Str();
+        }
+
+        void AssertPoolsCurrentCpus(const TVector<ui64>& cpuRequired) {
+            TVector<ui64> cpuCurrent;
+            cpuCurrent.resize(cpuRequired.size());
+            for (TCpuState& state : CpuStates) {
+                TPoolId current;
+                TPoolId assigned;
+                state.Load(assigned, current);
+                cpuCurrent[current]++;
+            }
+            for (TPoolId pool = 0; pool < cpuRequired.size(); pool++) {
+                UNIT_ASSERT_C(cpuCurrent[pool] == cpuRequired[pool],
+                    "cpu distribution mismatch, required " << ToStr(cpuRequired) << " but got " << ToStr(cpuCurrent));
+            }
+        }
+    };
+
+    Y_UNIT_TEST(StartLwtrace) {
+        NLWTrace::StartLwtraceFromEnv();
+    }
+
+    Y_UNIT_TEST(AllOverloaded) {
+        TTest t;
+        int cpus = 10;
+        t.SetCpuCount(cpus);
+        t.AddPool(1, 1, 10); // pool=0
+        t.AddPool(1, 2, 10); // pool=1
+        t.AddPool(1, 3, 10); // pool=2
+        t.AddPool(1, 4, 10); // pool=2
+        t.Start();
+        ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs);
+        ui64 totalCpuUs = cpus * Ts2Us(dts); // pretend every pool has consumed as whole actorsystem, overload
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {totalCpuUs, totalCpuUs, totalCpuUs, totalCpuUs});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({1, 2, 3, 4});
+    }
+
+    Y_UNIT_TEST(OneOverloaded) {
+        TTest t;
+        int cpus = 10;
+        t.SetCpuCount(cpus);
+        t.AddPool(1, 1, 10); // pool=0
+        t.AddPool(1, 2, 10); // pool=1
+        t.AddPool(1, 3, 10); // pool=2
+        t.AddPool(1, 4, 10); // pool=2
+        t.Start();
+        ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs);
+        ui64 totalCpuUs = cpus * Ts2Us(dts);
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {totalCpuUs, 0, 0, 0});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({7, 1, 1, 1});
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {0, totalCpuUs, 0, 0});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({1, 7, 1, 1});
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {0, 0, totalCpuUs, 0});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({1, 1, 7, 1});
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {0, 0, 0, totalCpuUs});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({1, 1, 1, 7});
+    }
+
+    Y_UNIT_TEST(TwoOverloadedFairness) {
+        TTest t;
+        int cpus = 10;
+        t.SetCpuCount(cpus);
+        t.AddPool(1, 1, 10); // pool=0
+        t.AddPool(1, 2, 10); // pool=1
+        t.AddPool(1, 3, 10); // pool=2
+        t.AddPool(1, 4, 10); // pool=2
+        t.Start();
+        ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs);
+        ui64 totalCpuUs = cpus * Ts2Us(dts);
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {totalCpuUs, totalCpuUs, 0, 0});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({3, 5, 1, 1});
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {totalCpuUs, 0, totalCpuUs, 0});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({2, 1, 6, 1});
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {totalCpuUs, 0, 0, totalCpuUs});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({2, 1, 1, 6});
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {0, totalCpuUs, totalCpuUs, 0});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({1, 3, 5, 1});
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {0, totalCpuUs, 0, totalCpuUs});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({1, 3, 1, 5});
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {0, 0, totalCpuUs, totalCpuUs});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({1, 1, 3, 5});
+    }
+
+    Y_UNIT_TEST(TwoOverloadedPriority) {
+        TTest t;
+        int cpus = 20;
+        t.SetCpuCount(cpus);
+        t.AddPool(1, 5, 20, 0); // pool=0
+        t.AddPool(1, 5, 20, 1); // pool=1
+        t.AddPool(1, 5, 20, 2); // pool=2
+        t.AddPool(1, 5, 20, 3); // pool=3
+        t.Start();
+        ui64 dts = 1.01 * Us2Ts(t.Config.UnitedWorkers.Balancer.PeriodUs);
+        ui64 mErlang = Ts2Us(dts) / 1000;
+        for (int i = 0; i < cpus; i++) {
+            t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 4500 * mErlang, 9500 * mErlang});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({2, 3, 5, 10});
+        t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 4500 * mErlang, 8500 * mErlang});
+        t.ApplyMovements();
+        t.AssertPoolsCurrentCpus({3, 3, 5, 9});
+        // NOTE: this operation require one move, but we do not make global analysis, so multiple steps (1->2 & 0->1) are required (can be optimized later)
+        for (int i = 0; i < 3; i++) {
+            t.Balance(dts, {20000 * mErlang, 2500 * mErlang, 5500 * mErlang, 8500 * mErlang});
+            t.ApplyMovements();
+        }
+        t.AssertPoolsCurrentCpus({2, 3, 6, 9});
+    }
+}

+ 239 - 239
library/cpp/actors/core/config.h

@@ -1,239 +1,239 @@
-#pragma once 
- 
-#include "defs.h" 
-#include <library/cpp/actors/util/cpumask.h> 
-#include <library/cpp/monlib/dynamic_counters/counters.h> 
-#include <util/datetime/base.h> 
-#include <util/generic/ptr.h> 
-#include <util/generic/string.h> 
-#include <util/generic/vector.h> 
- 
-namespace NActors { 
- 
-    struct TBalancingConfig { 
-        // Default cpu count (used during overload). Zero value disables this pool balancing 
-        // 1) Sum of `Cpus` on all pools cannot be changed without restart 
-        //    (changing cpu mode between Shared and Assigned is not implemented yet) 
-        // 2) This sum must be equal to TUnitedWorkersConfig::CpuCount, 
-        //    otherwise `CpuCount - SUM(Cpus)` cpus will be in Shared mode (i.e. actorsystem 2.0) 
-        ui32 Cpus = 0; 
- 
-        ui32 MinCpus = 0; // Lower balancing bound, should be at least 1, and not greater than `Cpus` 
-        ui32 MaxCpus = 0; // Higher balancing bound, should be not lower than `Cpus` 
-        ui8 Priority = 0; // Priority of pool to obtain cpu due to balancing (higher is better) 
-        ui64 ToleratedLatencyUs = 0; // p100-latency threshold indicating that more cpus are required by pool 
-    }; 
- 
-    struct TBalancerConfig { 
-        ui64 PeriodUs = 15000000; // Time between balancer steps 
-    }; 
- 
-    struct TBasicExecutorPoolConfig { 
-        static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10); 
-        static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100; 
- 
-        ui32 PoolId = 0; 
-        TString PoolName; 
-        ui32 Threads = 1; 
-        ui64 SpinThreshold = 100; 
-        TCpuMask Affinity; // Executor thread affinity 
-        TDuration TimePerMailbox = DEFAULT_TIME_PER_MAILBOX; 
-        ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX; 
-        int RealtimePriority = 0; 
-        ui32 MaxActivityType = 1; 
-    }; 
- 
-    struct TIOExecutorPoolConfig { 
-        ui32 PoolId = 0; 
-        TString PoolName; 
-        ui32 Threads = 1; 
-        TCpuMask Affinity; // Executor thread affinity 
-        ui32 MaxActivityType = 1; 
-    }; 
- 
-    struct TUnitedExecutorPoolConfig { 
-        static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10); 
-        static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100; 
- 
-        ui32 PoolId = 0; 
-        TString PoolName; 
- 
-        // Resource sharing 
-        ui32 Concurrency = 0; // Limits simultaneously running mailboxes count if set to non-zero value (do not set if Balancing.Cpus != 0) 
-        TPoolWeight Weight = 0; // Weight in fair cpu-local pool scheduler 
-        TCpuMask Allowed; // Allowed CPUs for workers to run this pool on (ignored if balancer works, i.e. actorsystem 1.5) 
- 
-        // Single mailbox execution limits 
-        TDuration TimePerMailbox = DEFAULT_TIME_PER_MAILBOX; 
-        ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX; 
- 
-        // Introspection 
-        ui32 MaxActivityType = 1; 
- 
-        // Long-term balancing 
-        TBalancingConfig Balancing; 
-    }; 
- 
-    struct TUnitedWorkersConfig { 
-        ui32 CpuCount = 0; // Total CPUs running united workers (i.e. TBasicExecutorPoolConfig::Threads analog); set to zero to disable united workers 
-        ui64 SpinThresholdUs = 100; // Limit for active spinning in case all pools became idle 
-        ui64 PoolLimitUs = 500; // Soft limit on pool execution 
-        ui64 EventLimitUs = 100; // Hard limit on last event execution exceeding pool limit 
-        ui64 LimitPrecisionUs = 100; // Maximum delay of timer on limit excess (delay needed to avoid settimer syscall on every pool switch) 
-        ui64 FastWorkerPriority = 10; // Real-time priority of workers not exceeding hard limits 
-        ui64 IdleWorkerPriority = 20; // Real-time priority of standby workers waiting for hard preemption on timers (should be greater than FastWorkerPriority) 
-        TCpuMask Allowed; // Allowed CPUs for workers to run on (every worker has affinity for exactly one cpu) 
-        bool NoRealtime = false; // For environments w/o permissions for RT-threads 
-        bool NoAffinity = false; // For environments w/o permissions for cpu affinity 
-        TBalancerConfig Balancer; 
-    }; 
- 
-    struct TCpuManagerConfig { 
-        TUnitedWorkersConfig UnitedWorkers; 
-        TVector<TBasicExecutorPoolConfig> Basic; 
-        TVector<TIOExecutorPoolConfig> IO; 
-        TVector<TUnitedExecutorPoolConfig> United; 
- 
-        ui32 GetExecutorsCount() const { 
-            return Basic.size() + IO.size() + United.size(); 
-        } 
- 
-        TString GetPoolName(ui32 poolId) const { 
-            for (const auto& p : Basic) { 
-                if (p.PoolId == poolId) { 
-                    return p.PoolName; 
-                } 
-            } 
-            for (const auto& p : IO) { 
-                if (p.PoolId == poolId) { 
-                    return p.PoolName; 
-                } 
-            } 
-            for (const auto& p : United) { 
-                if (p.PoolId == poolId) { 
-                    return p.PoolName; 
-                } 
-            } 
-            Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId); 
-        } 
- 
-        ui32 GetThreads(ui32 poolId) const { 
-            for (const auto& p : Basic) { 
-                if (p.PoolId == poolId) { 
-                    return p.Threads; 
-                } 
-            } 
-            for (const auto& p : IO) { 
-                if (p.PoolId == poolId) { 
-                    return p.Threads; 
-                } 
-            } 
-            for (const auto& p : United) { 
-                if (p.PoolId == poolId) { 
-                    return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount; 
-                } 
-            } 
-            Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId); 
-        } 
-    }; 
- 
-    struct TSchedulerConfig { 
-        TSchedulerConfig( 
-                ui64 resolution = 1024, 
-                ui64 spinThreshold = 100, 
-                ui64 progress = 10000, 
-                bool useSchedulerActor = false) 
-            : ResolutionMicroseconds(resolution) 
-            , SpinThreshold(spinThreshold) 
-            , ProgressThreshold(progress) 
-            , UseSchedulerActor(useSchedulerActor) 
-        {} 
- 
-        ui64 ResolutionMicroseconds = 1024; 
-        ui64 SpinThreshold = 100; 
-        ui64 ProgressThreshold = 10000; 
-        bool UseSchedulerActor = false; // False is default because tests use scheduler thread 
-        ui64 RelaxedSendPaceEventsPerSecond = 200000; 
-        ui64 RelaxedSendPaceEventsPerCycle = RelaxedSendPaceEventsPerSecond * ResolutionMicroseconds / 1000000; 
-        // For resolution >= 250000 microseconds threshold is SendPace 
-        // For resolution <= 250 microseconds threshold is 20 * SendPace 
-        ui64 RelaxedSendThresholdEventsPerSecond = RelaxedSendPaceEventsPerSecond * 
-            (20 - ((20 - 1) * ClampVal(ResolutionMicroseconds, ui64(250), ui64(250000)) - 250) / (250000 - 250)); 
-        ui64 RelaxedSendThresholdEventsPerCycle = RelaxedSendThresholdEventsPerSecond * ResolutionMicroseconds / 1000000; 
- 
-        // Optional subsection for scheduler counters (usually subsystem=utils) 
-        NMonitoring::TDynamicCounterPtr MonCounters = nullptr; 
-    }; 
- 
-    struct TCpuAllocation { 
-        struct TPoolAllocation { 
-            TPoolId PoolId; 
-            TPoolWeight Weight; 
- 
-            TPoolAllocation(TPoolId poolId = 0, TPoolWeight weight = 0) 
-                : PoolId(poolId) 
-                , Weight(weight) 
-            {} 
-        }; 
- 
-        TCpuId CpuId; 
-        TVector<TPoolAllocation> AllowedPools; 
- 
-        TPoolsMask GetPoolsMask() const { 
-            TPoolsMask mask = 0; 
-            for (const auto& pa : AllowedPools) { 
-                if (pa.PoolId < MaxPools) { 
-                    mask &= (1ull << pa.PoolId); 
-                } 
-            } 
-            return mask; 
-        } 
- 
-        bool HasPool(TPoolId pool) const { 
-            for (const auto& pa : AllowedPools) { 
-                if (pa.PoolId == pool) { 
-                    return true; 
-                } 
-            } 
-            return false; 
-        } 
-    }; 
- 
-    struct TCpuAllocationConfig { 
-        TVector<TCpuAllocation> Items; 
- 
-        TCpuAllocationConfig(const TCpuMask& available, const TCpuManagerConfig& cfg) { 
-            for (const TUnitedExecutorPoolConfig& pool : cfg.United) { 
-                Y_VERIFY(pool.PoolId < MaxPools, "wrong PoolId of united executor pool: %s(%d)", 
-                    pool.PoolName.c_str(), (pool.PoolId)); 
-            } 
-            ui32 allocated[MaxPools] = {0}; 
-            for (TCpuId cpu = 0; cpu < available.Size() && Items.size() < cfg.UnitedWorkers.CpuCount; cpu++) { 
-                if (available.IsSet(cpu)) { 
-                    TCpuAllocation item; 
-                    item.CpuId = cpu; 
-                    for (const TUnitedExecutorPoolConfig& pool : cfg.United) { 
-                        if (cfg.UnitedWorkers.Allowed.IsEmpty() || cfg.UnitedWorkers.Allowed.IsSet(cpu)) { 
-                            if (pool.Allowed.IsEmpty() || pool.Allowed.IsSet(cpu)) { 
-                                item.AllowedPools.emplace_back(pool.PoolId, pool.Weight); 
-                                allocated[pool.PoolId]++; 
-                            } 
-                        } 
-                    } 
-                    if (!item.AllowedPools.empty()) { 
-                        Items.push_back(item); 
-                    } 
-                } 
-            } 
-            for (const TUnitedExecutorPoolConfig& pool : cfg.United) { 
-                Y_VERIFY(allocated[pool.PoolId] > 0, "unable to allocate cpu for united executor pool: %s(%d)", 
-                    pool.PoolName.c_str(), (pool.PoolId)); 
-            } 
-        } 
- 
-        operator bool() const { 
-            return !Items.empty(); 
-        } 
-    }; 
- 
-} 
+#pragma once
+
+#include "defs.h"
+#include <library/cpp/actors/util/cpumask.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <util/datetime/base.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+
+namespace NActors {
+
+    struct TBalancingConfig {
+        // Default cpu count (used during overload). Zero value disables this pool balancing
+        // 1) Sum of `Cpus` on all pools cannot be changed without restart
+        //    (changing cpu mode between Shared and Assigned is not implemented yet)
+        // 2) This sum must be equal to TUnitedWorkersConfig::CpuCount,
+        //    otherwise `CpuCount - SUM(Cpus)` cpus will be in Shared mode (i.e. actorsystem 2.0)
+        ui32 Cpus = 0;
+
+        ui32 MinCpus = 0; // Lower balancing bound, should be at least 1, and not greater than `Cpus`
+        ui32 MaxCpus = 0; // Higher balancing bound, should be not lower than `Cpus`
+        ui8 Priority = 0; // Priority of pool to obtain cpu due to balancing (higher is better)
+        ui64 ToleratedLatencyUs = 0; // p100-latency threshold indicating that more cpus are required by pool
+    };
+
+    struct TBalancerConfig {
+        ui64 PeriodUs = 15000000; // Time between balancer steps
+    };
+
+    struct TBasicExecutorPoolConfig {
+        static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10);
+        static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100;
+
+        ui32 PoolId = 0;
+        TString PoolName;
+        ui32 Threads = 1;
+        ui64 SpinThreshold = 100;
+        TCpuMask Affinity; // Executor thread affinity
+        TDuration TimePerMailbox = DEFAULT_TIME_PER_MAILBOX;
+        ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX;
+        int RealtimePriority = 0;
+        ui32 MaxActivityType = 1;
+    };
+
+    struct TIOExecutorPoolConfig {
+        ui32 PoolId = 0;
+        TString PoolName;
+        ui32 Threads = 1;
+        TCpuMask Affinity; // Executor thread affinity
+        ui32 MaxActivityType = 1;
+    };
+
+    struct TUnitedExecutorPoolConfig {
+        static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TDuration::MilliSeconds(10);
+        static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = 100;
+
+        ui32 PoolId = 0;
+        TString PoolName;
+
+        // Resource sharing
+        ui32 Concurrency = 0; // Limits simultaneously running mailboxes count if set to non-zero value (do not set if Balancing.Cpus != 0)
+        TPoolWeight Weight = 0; // Weight in fair cpu-local pool scheduler
+        TCpuMask Allowed; // Allowed CPUs for workers to run this pool on (ignored if balancer works, i.e. actorsystem 1.5)
+
+        // Single mailbox execution limits
+        TDuration TimePerMailbox = DEFAULT_TIME_PER_MAILBOX;
+        ui32 EventsPerMailbox = DEFAULT_EVENTS_PER_MAILBOX;
+
+        // Introspection
+        ui32 MaxActivityType = 1;
+
+        // Long-term balancing
+        TBalancingConfig Balancing;
+    };
+
+    struct TUnitedWorkersConfig {
+        ui32 CpuCount = 0; // Total CPUs running united workers (i.e. TBasicExecutorPoolConfig::Threads analog); set to zero to disable united workers
+        ui64 SpinThresholdUs = 100; // Limit for active spinning in case all pools became idle
+        ui64 PoolLimitUs = 500; // Soft limit on pool execution
+        ui64 EventLimitUs = 100; // Hard limit on last event execution exceeding pool limit
+        ui64 LimitPrecisionUs = 100; // Maximum delay of timer on limit excess (delay needed to avoid settimer syscall on every pool switch)
+        ui64 FastWorkerPriority = 10; // Real-time priority of workers not exceeding hard limits
+        ui64 IdleWorkerPriority = 20; // Real-time priority of standby workers waiting for hard preemption on timers (should be greater than FastWorkerPriority)
+        TCpuMask Allowed; // Allowed CPUs for workers to run on (every worker has affinity for exactly one cpu)
+        bool NoRealtime = false; // For environments w/o permissions for RT-threads
+        bool NoAffinity = false; // For environments w/o permissions for cpu affinity
+        TBalancerConfig Balancer;
+    };
+
+    struct TCpuManagerConfig {
+        TUnitedWorkersConfig UnitedWorkers;
+        TVector<TBasicExecutorPoolConfig> Basic;
+        TVector<TIOExecutorPoolConfig> IO;
+        TVector<TUnitedExecutorPoolConfig> United;
+
+        ui32 GetExecutorsCount() const {
+            return Basic.size() + IO.size() + United.size();
+        }
+
+        TString GetPoolName(ui32 poolId) const {
+            for (const auto& p : Basic) {
+                if (p.PoolId == poolId) {
+                    return p.PoolName;
+                }
+            }
+            for (const auto& p : IO) {
+                if (p.PoolId == poolId) {
+                    return p.PoolName;
+                }
+            }
+            for (const auto& p : United) {
+                if (p.PoolId == poolId) {
+                    return p.PoolName;
+                }
+            }
+            Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId);
+        }
+
+        ui32 GetThreads(ui32 poolId) const {
+            for (const auto& p : Basic) {
+                if (p.PoolId == poolId) {
+                    return p.Threads;
+                }
+            }
+            for (const auto& p : IO) {
+                if (p.PoolId == poolId) {
+                    return p.Threads;
+                }
+            }
+            for (const auto& p : United) {
+                if (p.PoolId == poolId) {
+                    return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount;
+                }
+            }
+            Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId);
+        }
+    };
+
+    struct TSchedulerConfig {
+        TSchedulerConfig(
+                ui64 resolution = 1024,
+                ui64 spinThreshold = 100,
+                ui64 progress = 10000,
+                bool useSchedulerActor = false)
+            : ResolutionMicroseconds(resolution)
+            , SpinThreshold(spinThreshold)
+            , ProgressThreshold(progress)
+            , UseSchedulerActor(useSchedulerActor)
+        {}
+
+        ui64 ResolutionMicroseconds = 1024;
+        ui64 SpinThreshold = 100;
+        ui64 ProgressThreshold = 10000;
+        bool UseSchedulerActor = false; // False is default because tests use scheduler thread
+        ui64 RelaxedSendPaceEventsPerSecond = 200000;
+        ui64 RelaxedSendPaceEventsPerCycle = RelaxedSendPaceEventsPerSecond * ResolutionMicroseconds / 1000000;
+        // For resolution >= 250000 microseconds threshold is SendPace
+        // For resolution <= 250 microseconds threshold is 20 * SendPace
+        ui64 RelaxedSendThresholdEventsPerSecond = RelaxedSendPaceEventsPerSecond *
+            (20 - ((20 - 1) * ClampVal(ResolutionMicroseconds, ui64(250), ui64(250000)) - 250) / (250000 - 250));
+        ui64 RelaxedSendThresholdEventsPerCycle = RelaxedSendThresholdEventsPerSecond * ResolutionMicroseconds / 1000000;
+
+        // Optional subsection for scheduler counters (usually subsystem=utils)
+        NMonitoring::TDynamicCounterPtr MonCounters = nullptr;
+    };
+
+    struct TCpuAllocation {
+        struct TPoolAllocation {
+            TPoolId PoolId;
+            TPoolWeight Weight;
+
+            TPoolAllocation(TPoolId poolId = 0, TPoolWeight weight = 0)
+                : PoolId(poolId)
+                , Weight(weight)
+            {}
+        };
+
+        TCpuId CpuId;
+        TVector<TPoolAllocation> AllowedPools;
+
+        TPoolsMask GetPoolsMask() const {
+            TPoolsMask mask = 0;
+            for (const auto& pa : AllowedPools) {
+                if (pa.PoolId < MaxPools) {
+                    mask &= (1ull << pa.PoolId);
+                }
+            }
+            return mask;
+        }
+
+        bool HasPool(TPoolId pool) const {
+            for (const auto& pa : AllowedPools) {
+                if (pa.PoolId == pool) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    };
+
+    struct TCpuAllocationConfig {
+        TVector<TCpuAllocation> Items;
+
+        TCpuAllocationConfig(const TCpuMask& available, const TCpuManagerConfig& cfg) {
+            for (const TUnitedExecutorPoolConfig& pool : cfg.United) {
+                Y_VERIFY(pool.PoolId < MaxPools, "wrong PoolId of united executor pool: %s(%d)",
+                    pool.PoolName.c_str(), (pool.PoolId));
+            }
+            ui32 allocated[MaxPools] = {0};
+            for (TCpuId cpu = 0; cpu < available.Size() && Items.size() < cfg.UnitedWorkers.CpuCount; cpu++) {
+                if (available.IsSet(cpu)) {
+                    TCpuAllocation item;
+                    item.CpuId = cpu;
+                    for (const TUnitedExecutorPoolConfig& pool : cfg.United) {
+                        if (cfg.UnitedWorkers.Allowed.IsEmpty() || cfg.UnitedWorkers.Allowed.IsSet(cpu)) {
+                            if (pool.Allowed.IsEmpty() || pool.Allowed.IsSet(cpu)) {
+                                item.AllowedPools.emplace_back(pool.PoolId, pool.Weight);
+                                allocated[pool.PoolId]++;
+                            }
+                        }
+                    }
+                    if (!item.AllowedPools.empty()) {
+                        Items.push_back(item);
+                    }
+                }
+            }
+            for (const TUnitedExecutorPoolConfig& pool : cfg.United) {
+                Y_VERIFY(allocated[pool.PoolId] > 0, "unable to allocate cpu for united executor pool: %s(%d)",
+                    pool.PoolName.c_str(), (pool.PoolId));
+            }
+        }
+
+        operator bool() const {
+            return !Items.empty();
+        }
+    };
+
+}

+ 108 - 108
library/cpp/actors/core/cpu_manager.cpp

@@ -1,108 +1,108 @@
-#include "cpu_manager.h" 
-#include "probes.h" 
- 
-namespace NActors { 
-    LWTRACE_USING(ACTORLIB_PROVIDER); 
- 
-    void TCpuManager::Setup() { 
-        TAffinity available; 
-        available.Current(); 
-        TCpuAllocationConfig allocation(available, Config); 
- 
-        if (allocation) { 
-            if (!Balancer) { 
-                Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, GetCycleCountFast())); 
-            } 
-            UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get())); 
-        } 
- 
-        Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]); 
- 
-        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { 
-            Executors[excIdx].Reset(CreateExecutorPool(excIdx)); 
-        } 
-    } 
- 
-    void TCpuManager::PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem) { 
-        if (UnitedWorkers) { 
-            UnitedWorkers->Prepare(actorSystem, scheduleReaders); 
-        } 
-        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { 
-            NSchedulerQueue::TReader* readers; 
-            ui32 readersCount = 0; 
-            Executors[excIdx]->Prepare(actorSystem, &readers, &readersCount); 
-            for (ui32 i = 0; i != readersCount; ++i, ++readers) { 
-                scheduleReaders.push_back(readers); 
-            } 
-        } 
-    } 
- 
-    void TCpuManager::Start() { 
-        if (UnitedWorkers) { 
-            UnitedWorkers->Start(); 
-        } 
-        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { 
-            Executors[excIdx]->Start(); 
-        } 
-    } 
- 
-    void TCpuManager::PrepareStop() { 
-        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { 
-            Executors[excIdx]->PrepareStop(); 
-        } 
-        if (UnitedWorkers) { 
-            UnitedWorkers->PrepareStop(); 
-        } 
-    } 
- 
-    void TCpuManager::Shutdown() { 
-        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { 
-            Executors[excIdx]->Shutdown(); 
-        } 
-        if (UnitedWorkers) { 
-            UnitedWorkers->Shutdown(); 
-        } 
-        for (ui32 round = 0, done = 0; done < ExecutorPoolCount && round < 3; ++round) { 
-            done = 0; 
-            for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { 
-                if (Executors[excIdx]->Cleanup()) { 
-                    ++done; 
-                } 
-            } 
-        } 
-    } 
- 
-    void TCpuManager::Cleanup() { 
-        for (ui32 round = 0, done = 0; done < ExecutorPoolCount; ++round) { 
-            Y_VERIFY(round < 10, "actorsystem cleanup could not be completed in 10 rounds"); 
-            done = 0; 
-            for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) { 
-                if (Executors[excIdx]->Cleanup()) { 
-                    ++done; 
-                } 
-            } 
-        } 
-        Executors.Destroy(); 
-        UnitedWorkers.Destroy(); 
-    } 
- 
-    IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) { 
-        for (TBasicExecutorPoolConfig& cfg : Config.Basic) { 
-            if (cfg.PoolId == poolId) { 
-                return new TBasicExecutorPool(cfg); 
-            } 
-        } 
-        for (TIOExecutorPoolConfig& cfg : Config.IO) { 
-            if (cfg.PoolId == poolId) { 
-                return new TIOExecutorPool(cfg); 
-            } 
-        } 
-        for (TUnitedExecutorPoolConfig& cfg : Config.United) { 
-            if (cfg.PoolId == poolId) { 
-                IExecutorPool* result = new TUnitedExecutorPool(cfg, UnitedWorkers.Get()); 
-                return result; 
-            } 
-        } 
-        Y_FAIL("missing PoolId: %d", int(poolId)); 
-    } 
-} 
+#include "cpu_manager.h"
+#include "probes.h"
+
+namespace NActors {
+    LWTRACE_USING(ACTORLIB_PROVIDER);
+
+    void TCpuManager::Setup() {
+        TAffinity available;
+        available.Current();
+        TCpuAllocationConfig allocation(available, Config);
+
+        if (allocation) {
+            if (!Balancer) {
+                Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, GetCycleCountFast()));
+            }
+            UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get()));
+        }
+
+        Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]);
+
+        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
+            Executors[excIdx].Reset(CreateExecutorPool(excIdx));
+        }
+    }
+
+    void TCpuManager::PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem) {
+        if (UnitedWorkers) {
+            UnitedWorkers->Prepare(actorSystem, scheduleReaders);
+        }
+        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
+            NSchedulerQueue::TReader* readers;
+            ui32 readersCount = 0;
+            Executors[excIdx]->Prepare(actorSystem, &readers, &readersCount);
+            for (ui32 i = 0; i != readersCount; ++i, ++readers) {
+                scheduleReaders.push_back(readers);
+            }
+        }
+    }
+
+    void TCpuManager::Start() {
+        if (UnitedWorkers) {
+            UnitedWorkers->Start();
+        }
+        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
+            Executors[excIdx]->Start();
+        }
+    }
+
+    void TCpuManager::PrepareStop() {
+        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
+            Executors[excIdx]->PrepareStop();
+        }
+        if (UnitedWorkers) {
+            UnitedWorkers->PrepareStop();
+        }
+    }
+
+    void TCpuManager::Shutdown() {
+        for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
+            Executors[excIdx]->Shutdown();
+        }
+        if (UnitedWorkers) {
+            UnitedWorkers->Shutdown();
+        }
+        for (ui32 round = 0, done = 0; done < ExecutorPoolCount && round < 3; ++round) {
+            done = 0;
+            for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
+                if (Executors[excIdx]->Cleanup()) {
+                    ++done;
+                }
+            }
+        }
+    }
+
+    void TCpuManager::Cleanup() {
+        for (ui32 round = 0, done = 0; done < ExecutorPoolCount; ++round) {
+            Y_VERIFY(round < 10, "actorsystem cleanup could not be completed in 10 rounds");
+            done = 0;
+            for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
+                if (Executors[excIdx]->Cleanup()) {
+                    ++done;
+                }
+            }
+        }
+        Executors.Destroy();
+        UnitedWorkers.Destroy();
+    }
+
+    IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
+        for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
+            if (cfg.PoolId == poolId) {
+                return new TBasicExecutorPool(cfg);
+            }
+        }
+        for (TIOExecutorPoolConfig& cfg : Config.IO) {
+            if (cfg.PoolId == poolId) {
+                return new TIOExecutorPool(cfg);
+            }
+        }
+        for (TUnitedExecutorPoolConfig& cfg : Config.United) {
+            if (cfg.PoolId == poolId) {
+                IExecutorPool* result = new TUnitedExecutorPool(cfg, UnitedWorkers.Get());
+                return result;
+            }
+        }
+        Y_FAIL("missing PoolId: %d", int(poolId));
+    }
+}

Some files were not shown because too many files changed in this diff