cpu_manager.cpp 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. #include "cpu_manager.h"
  2. #include "probes.h"
  3. #include "executor_pool_basic.h"
  4. #include "executor_pool_io.h"
  5. #include "executor_pool_united.h"
  6. namespace NActors {
  7. LWTRACE_USING(ACTORLIB_PROVIDER);
  8. TCpuManager::TCpuManager(THolder<TActorSystemSetup>& setup)
  9. : ExecutorPoolCount(setup->GetExecutorsCount())
  10. , Balancer(setup->Balancer)
  11. , Config(setup->CpuManager)
  12. {
  13. if (setup->Executors) { // Explicit mode w/o united pools
  14. Executors.Reset(setup->Executors.Release());
  15. for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
  16. IExecutorPool* pool = Executors[excIdx].Get();
  17. Y_ABORT_UNLESS(dynamic_cast<TUnitedExecutorPool*>(pool) == nullptr,
  18. "united executor pool is prohibited in explicit mode of NActors::TCpuManager");
  19. }
  20. } else {
  21. Setup();
  22. }
  23. }
  24. void TCpuManager::Setup() {
  25. TAffinity available;
  26. available.Current();
  27. TCpuAllocationConfig allocation(available, Config);
  28. if (allocation) {
  29. if (!Balancer) {
  30. Balancer.Reset(MakeBalancer(Config.UnitedWorkers.Balancer, Config.United, GetCycleCountFast()));
  31. }
  32. UnitedWorkers.Reset(new TUnitedWorkers(Config.UnitedWorkers, Config.United, allocation, Balancer.Get()));
  33. }
  34. ui64 ts = GetCycleCountFast();
  35. Harmonizer.Reset(MakeHarmonizer(ts));
  36. Executors.Reset(new TAutoPtr<IExecutorPool>[ExecutorPoolCount]);
  37. for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
  38. Executors[excIdx].Reset(CreateExecutorPool(excIdx));
  39. if (excIdx < Config.PingInfoByPool.size()) {
  40. Harmonizer->AddPool(Executors[excIdx].Get(), &Config.PingInfoByPool[excIdx]);
  41. } else {
  42. Harmonizer->AddPool(Executors[excIdx].Get());
  43. }
  44. }
  45. }
  46. void TCpuManager::PrepareStart(TVector<NSchedulerQueue::TReader*>& scheduleReaders, TActorSystem* actorSystem) {
  47. if (UnitedWorkers) {
  48. UnitedWorkers->Prepare(actorSystem, scheduleReaders);
  49. }
  50. for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
  51. NSchedulerQueue::TReader* readers;
  52. ui32 readersCount = 0;
  53. Executors[excIdx]->Prepare(actorSystem, &readers, &readersCount);
  54. for (ui32 i = 0; i != readersCount; ++i, ++readers) {
  55. scheduleReaders.push_back(readers);
  56. }
  57. }
  58. }
  59. void TCpuManager::Start() {
  60. if (UnitedWorkers) {
  61. UnitedWorkers->Start();
  62. }
  63. for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
  64. Executors[excIdx]->Start();
  65. }
  66. }
  67. void TCpuManager::PrepareStop() {
  68. for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
  69. Executors[excIdx]->PrepareStop();
  70. }
  71. if (UnitedWorkers) {
  72. UnitedWorkers->PrepareStop();
  73. }
  74. }
  75. void TCpuManager::Shutdown() {
  76. for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
  77. Executors[excIdx]->Shutdown();
  78. }
  79. if (UnitedWorkers) {
  80. UnitedWorkers->Shutdown();
  81. }
  82. for (ui32 round = 0, done = 0; done < ExecutorPoolCount && round < 3; ++round) {
  83. done = 0;
  84. for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
  85. if (Executors[excIdx]->Cleanup()) {
  86. ++done;
  87. }
  88. }
  89. }
  90. }
  91. void TCpuManager::Cleanup() {
  92. for (ui32 round = 0, done = 0; done < ExecutorPoolCount; ++round) {
  93. Y_ABORT_UNLESS(round < 10, "actorsystem cleanup could not be completed in 10 rounds");
  94. done = 0;
  95. for (ui32 excIdx = 0; excIdx != ExecutorPoolCount; ++excIdx) {
  96. if (Executors[excIdx]->Cleanup()) {
  97. ++done;
  98. }
  99. }
  100. }
  101. Executors.Destroy();
  102. UnitedWorkers.Destroy();
  103. }
  104. IExecutorPool* TCpuManager::CreateExecutorPool(ui32 poolId) {
  105. for (TBasicExecutorPoolConfig& cfg : Config.Basic) {
  106. if (cfg.PoolId == poolId) {
  107. return new TBasicExecutorPool(cfg, Harmonizer.Get());
  108. }
  109. }
  110. for (TIOExecutorPoolConfig& cfg : Config.IO) {
  111. if (cfg.PoolId == poolId) {
  112. return new TIOExecutorPool(cfg);
  113. }
  114. }
  115. for (TUnitedExecutorPoolConfig& cfg : Config.United) {
  116. if (cfg.PoolId == poolId) {
  117. IExecutorPool* result = new TUnitedExecutorPool(cfg, UnitedWorkers.Get());
  118. return result;
  119. }
  120. }
  121. Y_ABORT("missing PoolId: %d", int(poolId));
  122. }
  123. TVector<IExecutorPool*> TCpuManager::GetBasicExecutorPools() const {
  124. TVector<IExecutorPool*> pools;
  125. for (ui32 idx = 0; idx < ExecutorPoolCount; ++idx) {
  126. if (auto basicPool = dynamic_cast<TBasicExecutorPool*>(Executors[idx].Get()); basicPool != nullptr) {
  127. pools.push_back(basicPool);
  128. }
  129. }
  130. return pools;
  131. }
  132. }