harmonizer.cpp 23 KB


  1. #include "harmonizer.h"
  2. #include "probes.h"
  3. #include "actorsystem.h"
  4. #include "executor_pool_basic.h"
  5. #include "executor_pool_basic_feature_flags.h"
  6. #include <library/cpp/actors/util/cpu_load_log.h>
  7. #include <library/cpp/actors/util/datetime.h>
  8. #include <library/cpp/actors/util/intrinsics.h>
  9. #include <util/system/spinlock.h>
  10. #include <algorithm>
  11. namespace NActors {
  12. LWTRACE_USING(ACTORLIB_PROVIDER);
  13. constexpr bool CheckBinaryPower(ui64 value) {
  14. return !(value & (value - 1));
  15. }
  16. template <ui8 HistoryBufferSize = 8>
  17. struct TValueHistory {
  18. static_assert(CheckBinaryPower(HistoryBufferSize));
  19. double History[HistoryBufferSize] = {0.0};
  20. ui64 HistoryIdx = 0;
  21. ui64 LastTs = Max<ui64>();
  22. double LastUs = 0.0;
  23. double AccumulatedUs = 0.0;
  24. ui64 AccumulatedTs = 0;
  25. template <bool WithTail=false>
  26. double Accumulate(auto op, auto comb, ui8 seconds) {
  27. double acc = AccumulatedUs;
  28. size_t idx = HistoryIdx;
  29. ui8 leftSeconds = seconds;
  30. if constexpr (!WithTail) {
  31. idx--;
  32. leftSeconds--;
  33. if (idx >= HistoryBufferSize) {
  34. idx = HistoryBufferSize - 1;
  35. }
  36. acc = History[idx];
  37. }
  38. do {
  39. idx--;
  40. leftSeconds--;
  41. if (idx >= HistoryBufferSize) {
  42. idx = HistoryBufferSize - 1;
  43. }
  44. if constexpr (WithTail) {
  45. acc = op(acc, History[idx]);
  46. } else if (leftSeconds) {
  47. acc = op(acc, History[idx]);
  48. } else {
  49. ui64 tsInSecond = Us2Ts(1'000'000.0);
  50. acc = op(acc, History[idx] * (tsInSecond - AccumulatedTs) / tsInSecond);
  51. }
  52. } while (leftSeconds);
  53. double duration = 1'000'000.0 * seconds;
  54. if constexpr (WithTail) {
  55. duration += Ts2Us(AccumulatedTs);
  56. }
  57. return comb(acc, duration);
  58. }
  59. template <bool WithTail=false>
  60. double GetAvgPartForLastSeconds(ui8 seconds) {
  61. auto sum = [](double acc, double value) {
  62. return acc + value;
  63. };
  64. auto avg = [](double sum, double duration) {
  65. return sum / duration;
  66. };
  67. return Accumulate<WithTail>(sum, avg, seconds);
  68. }
  69. double GetAvgPart() {
  70. return GetAvgPartForLastSeconds<true>(HistoryBufferSize);
  71. }
  72. double GetMaxForLastSeconds(ui8 seconds) {
  73. auto max = [](const double& acc, const double& value) {
  74. return Max(acc, value);
  75. };
  76. auto fst = [](const double& value, const double&) { return value; };
  77. return Accumulate<false>(max, fst, seconds);
  78. }
  79. double GetMax() {
  80. return GetMaxForLastSeconds(HistoryBufferSize);
  81. }
  82. i64 GetMaxInt() {
  83. return static_cast<i64>(GetMax());
  84. }
  85. double GetMinForLastSeconds(ui8 seconds) {
  86. auto min = [](const double& acc, const double& value) {
  87. return Min(acc, value);
  88. };
  89. auto fst = [](const double& value, const double&) { return value; };
  90. return Accumulate<false>(min, fst, seconds);
  91. }
  92. double GetMin() {
  93. return GetMinForLastSeconds(HistoryBufferSize);
  94. }
  95. i64 GetMinInt() {
  96. return static_cast<i64>(GetMin());
  97. }
  98. void Register(ui64 ts, double valueUs) {
  99. if (ts < LastTs) {
  100. LastTs = ts;
  101. LastUs = valueUs;
  102. AccumulatedUs = 0.0;
  103. AccumulatedTs = 0;
  104. return;
  105. }
  106. ui64 lastTs = std::exchange(LastTs, ts);
  107. ui64 dTs = ts - lastTs;
  108. double lastUs = std::exchange(LastUs, valueUs);
  109. double dUs = valueUs - lastUs;
  110. LWPROBE(RegisterValue, ts, lastTs, dTs, Us2Ts(8'000'000.0), valueUs, lastUs, dUs);
  111. if (dTs > Us2Ts(8'000'000.0)) {
  112. dUs = dUs * 1'000'000.0 / Ts2Us(dTs);
  113. for (size_t idx = 0; idx < HistoryBufferSize; ++idx) {
  114. History[idx] = dUs;
  115. }
  116. AccumulatedUs = 0.0;
  117. AccumulatedTs = 0;
  118. return;
  119. }
  120. while (dTs > 0) {
  121. if (AccumulatedTs + dTs < Us2Ts(1'000'000.0)) {
  122. AccumulatedTs += dTs;
  123. AccumulatedUs += dUs;
  124. break;
  125. } else {
  126. ui64 addTs = Us2Ts(1'000'000.0) - AccumulatedTs;
  127. double addUs = dUs * addTs / dTs;
  128. dTs -= addTs;
  129. dUs -= addUs;
  130. History[HistoryIdx] = AccumulatedUs + addUs;
  131. HistoryIdx = (HistoryIdx + 1) % HistoryBufferSize;
  132. AccumulatedUs = 0.0;
  133. AccumulatedTs = 0;
  134. }
  135. }
  136. }
  137. };
  138. struct TThreadInfo {
  139. TValueHistory<8> Consumed;
  140. TValueHistory<8> Booked;
  141. };
  142. struct TPoolInfo {
  143. std::vector<TThreadInfo> ThreadInfo;
  144. IExecutorPool* Pool = nullptr;
  145. TBasicExecutorPool* BasicPool = nullptr;
  146. i16 DefaultThreadCount = 0;
  147. i16 MinThreadCount = 0;
  148. i16 MaxThreadCount = 0;
  149. i16 Priority = 0;
  150. NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounter;
  151. NMonitoring::TDynamicCounters::TCounterPtr AvgPingCounterWithSmallWindow;
  152. ui32 MaxAvgPingUs = 0;
  153. ui64 LastUpdateTs = 0;
  154. ui64 NotEnoughCpuExecutions = 0;
  155. ui64 NewNotEnoughCpuExecutions = 0;
  156. ui16 LocalQueueSize = NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE;
  157. TAtomic LastFlags = 0; // 0 - isNeedy; 1 - isStarved; 2 - isHoggish
  158. TAtomic IncreasingThreadsByNeedyState = 0;
  159. TAtomic IncreasingThreadsByExchange = 0;
  160. TAtomic DecreasingThreadsByStarvedState = 0;
  161. TAtomic DecreasingThreadsByHoggishState = 0;
  162. TAtomic DecreasingThreadsByExchange = 0;
  163. TAtomic PotentialMaxThreadCount = 0;
  164. TValueHistory<16> Consumed;
  165. TValueHistory<16> Booked;
  166. TAtomic MaxConsumedCpu = 0;
  167. TAtomic MinConsumedCpu = 0;
  168. TAtomic MaxBookedCpu = 0;
  169. TAtomic MinBookedCpu = 0;
  170. double GetBooked(i16 threadIdx);
  171. double GetlastSecondPoolBooked(i16 threadIdx);
  172. double GetConsumed(i16 threadIdx);
  173. double GetlastSecondPoolConsumed(i16 threadIdx);
  174. TCpuConsumption PullStats(ui64 ts);
  175. i16 GetThreadCount();
  176. void SetThreadCount(i16 threadCount);
  177. bool IsAvgPingGood();
  178. };
  179. double TPoolInfo::GetBooked(i16 threadIdx) {
  180. if ((size_t)threadIdx < ThreadInfo.size()) {
  181. return ThreadInfo[threadIdx].Booked.GetAvgPart();
  182. }
  183. return 0.0;
  184. }
  185. double TPoolInfo::GetlastSecondPoolBooked(i16 threadIdx) {
  186. if ((size_t)threadIdx < ThreadInfo.size()) {
  187. return ThreadInfo[threadIdx].Booked.GetAvgPartForLastSeconds(1);
  188. }
  189. return 0.0;
  190. }
  191. double TPoolInfo::GetConsumed(i16 threadIdx) {
  192. if ((size_t)threadIdx < ThreadInfo.size()) {
  193. return ThreadInfo[threadIdx].Consumed.GetAvgPart();
  194. }
  195. return 0.0;
  196. }
  197. double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) {
  198. if ((size_t)threadIdx < ThreadInfo.size()) {
  199. return ThreadInfo[threadIdx].Consumed.GetAvgPartForLastSeconds(1);
  200. }
  201. return 0.0;
  202. }
  203. #define UNROLL_HISTORY(history) (history)[0], (history)[1], (history)[2], (history)[3], (history)[4], (history)[5], (history)[6], (history)[7]
  204. TCpuConsumption TPoolInfo::PullStats(ui64 ts) {
  205. TCpuConsumption acc;
  206. for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) {
  207. TThreadInfo &threadInfo = ThreadInfo[threadIdx];
  208. TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx);
  209. acc.Add(cpuConsumption);
  210. threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs);
  211. LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History));
  212. threadInfo.Booked.Register(ts, cpuConsumption.BookedUs);
  213. LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History));
  214. }
  215. Consumed.Register(ts, acc.ConsumedUs);
  216. RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt());
  217. RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt());
  218. Booked.Register(ts, acc.BookedUs);
  219. RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt());
  220. RelaxedStore(&MinBookedCpu, Booked.GetMinInt());
  221. NewNotEnoughCpuExecutions = acc.NotEnoughCpuExecutions - NotEnoughCpuExecutions;
  222. NotEnoughCpuExecutions = acc.NotEnoughCpuExecutions;
  223. return acc;
  224. }
  225. #undef UNROLL_HISTORY
  226. i16 TPoolInfo::GetThreadCount() {
  227. return Pool->GetThreadCount();
  228. }
  229. void TPoolInfo::SetThreadCount(i16 threadCount) {
  230. Pool->SetThreadCount(threadCount);
  231. }
  232. bool TPoolInfo::IsAvgPingGood() {
  233. bool res = true;
  234. if (AvgPingCounter) {
  235. res &= *AvgPingCounter > MaxAvgPingUs;
  236. }
  237. if (AvgPingCounterWithSmallWindow) {
  238. res &= *AvgPingCounterWithSmallWindow > MaxAvgPingUs;
  239. }
  240. return res;
  241. }
  242. class THarmonizer: public IHarmonizer {
  243. private:
  244. std::atomic<bool> IsDisabled = false;
  245. TSpinLock Lock;
  246. std::atomic<ui64> NextHarmonizeTs = 0;
  247. std::vector<TPoolInfo> Pools;
  248. std::vector<ui16> PriorityOrder;
  249. TValueHistory<16> Consumed;
  250. TValueHistory<16> Booked;
  251. TAtomic MaxConsumedCpu = 0;
  252. TAtomic MinConsumedCpu = 0;
  253. TAtomic MaxBookedCpu = 0;
  254. TAtomic MinBookedCpu = 0;
  255. void PullStats(ui64 ts);
  256. void HarmonizeImpl(ui64 ts);
  257. void CalculatePriorityOrder();
  258. public:
  259. THarmonizer(ui64 ts);
  260. virtual ~THarmonizer();
  261. double Rescale(double value) const;
  262. void Harmonize(ui64 ts) override;
  263. void DeclareEmergency(ui64 ts) override;
  264. void AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) override;
  265. void Enable(bool enable) override;
  266. TPoolHarmonizerStats GetPoolStats(i16 poolId) const override;
  267. THarmonizerStats GetStats() const override;
  268. };
  269. THarmonizer::THarmonizer(ui64 ts) {
  270. NextHarmonizeTs = ts;
  271. }
  272. THarmonizer::~THarmonizer() {
  273. }
  274. double THarmonizer::Rescale(double value) const {
  275. return Max(0.0, Min(1.0, value * (1.0/0.9)));
  276. }
  277. void THarmonizer::PullStats(ui64 ts) {
  278. TCpuConsumption acc;
  279. for (TPoolInfo &pool : Pools) {
  280. TCpuConsumption consumption = pool.PullStats(ts);
  281. acc.Add(consumption);
  282. }
  283. Consumed.Register(ts, acc.ConsumedUs);
  284. RelaxedStore(&MaxConsumedCpu, Consumed.GetMaxInt());
  285. RelaxedStore(&MinConsumedCpu, Consumed.GetMinInt());
  286. Booked.Register(ts, acc.BookedUs);
  287. RelaxedStore(&MaxBookedCpu, Booked.GetMaxInt());
  288. RelaxedStore(&MinBookedCpu, Booked.GetMinInt());
  289. }
  290. Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
  291. return Max(consumed, booked) > 0.1 && consumed < booked * 0.7;
  292. }
  293. Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) {
  294. return booked < currentThreadCount - 1;
  295. }
  296. void THarmonizer::HarmonizeImpl(ui64 ts) {
  297. bool isStarvedPresent = false;
  298. double booked = 0.0;
  299. double consumed = 0.0;
  300. double lastSecondBooked = 0.0;
  301. i64 beingStopped = 0;
  302. i64 total = 0;
  303. TStackVec<size_t, 8> needyPools;
  304. TStackVec<size_t, 8> hoggishPools;
  305. TStackVec<bool, 8> isNeedyByPool;
  306. size_t sumOfAdditionalThreads = 0;
  307. for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
  308. TPoolInfo& pool = Pools[poolIdx];
  309. total += pool.DefaultThreadCount;
  310. ui32 currentThreadCount = pool.GetThreadCount();
  311. sumOfAdditionalThreads += currentThreadCount - pool.DefaultThreadCount;
  312. double poolBooked = 0.0;
  313. double poolConsumed = 0.0;
  314. double lastSecondPoolBooked = 0.0;
  315. double lastSecondPoolConsumed = 0.0;
  316. beingStopped += pool.Pool->GetBlockingThreadCount();
  317. for (i16 threadIdx = 0; threadIdx < pool.MaxThreadCount; ++threadIdx) {
  318. poolBooked += Rescale(pool.GetBooked(threadIdx));
  319. lastSecondPoolBooked += Rescale(pool.GetlastSecondPoolBooked(threadIdx));
  320. poolConsumed += Rescale(pool.GetConsumed(threadIdx));
  321. lastSecondPoolConsumed += Rescale(pool.GetlastSecondPoolConsumed(threadIdx));
  322. }
  323. bool isStarved = IsStarved(poolConsumed, poolBooked) || IsStarved(lastSecondPoolConsumed, lastSecondPoolBooked);
  324. if (isStarved) {
  325. isStarvedPresent = true;
  326. }
  327. bool isNeedy = (pool.IsAvgPingGood() || pool.NewNotEnoughCpuExecutions) && poolBooked >= currentThreadCount;
  328. if (pool.AvgPingCounter) {
  329. if (pool.LastUpdateTs + Us2Ts(3'000'000ull) > ts) {
  330. isNeedy = false;
  331. } else {
  332. pool.LastUpdateTs = ts;
  333. }
  334. }
  335. isNeedyByPool.push_back(isNeedy);
  336. if (isNeedy) {
  337. needyPools.push_back(poolIdx);
  338. }
  339. bool isHoggish = IsHoggish(poolBooked, currentThreadCount)
  340. || IsHoggish(lastSecondPoolBooked, currentThreadCount);
  341. if (isHoggish) {
  342. hoggishPools.push_back(poolIdx);
  343. }
  344. booked += poolBooked;
  345. consumed += poolConsumed;
  346. AtomicSet(pool.LastFlags, (i64)isNeedy | ((i64)isStarved << 1) | ((i64)isHoggish << 2));
  347. LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
  348. }
  349. double budget = total - Max(booked, lastSecondBooked);
  350. i16 budgetInt = static_cast<i16>(Max(budget, 0.0));
  351. if (budget < -0.1) {
  352. isStarvedPresent = true;
  353. }
  354. for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
  355. TPoolInfo& pool = Pools[poolIdx];
  356. AtomicSet(pool.PotentialMaxThreadCount, Min(pool.MaxThreadCount, budgetInt));
  357. }
  358. double overbooked = consumed - booked;
  359. if (overbooked < 0) {
  360. isStarvedPresent = false;
  361. }
  362. if (needyPools.size()) {
  363. Sort(needyPools.begin(), needyPools.end(), [&] (i16 lhs, i16 rhs) {
  364. if (Pools[lhs].Priority != Pools[rhs].Priority) {
  365. return Pools[lhs].Priority > Pools[rhs].Priority;
  366. }
  367. return Pools[lhs].Pool->PoolId < Pools[rhs].Pool->PoolId;
  368. });
  369. }
  370. if (isStarvedPresent) {
  371. // last_starved_at_consumed_value = сумма по всем пулам consumed;
  372. // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
  373. // использовать вместо total
  374. if (beingStopped && beingStopped >= overbooked) {
  375. // do nothing
  376. } else {
  377. for (ui16 poolIdx : PriorityOrder) {
  378. TPoolInfo &pool = Pools[poolIdx];
  379. i64 threadCount = pool.GetThreadCount();
  380. while (threadCount > pool.DefaultThreadCount) {
  381. pool.SetThreadCount(--threadCount);
  382. AtomicIncrement(pool.DecreasingThreadsByStarvedState);
  383. overbooked--;
  384. sumOfAdditionalThreads--;
  385. LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by starving", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
  386. if (overbooked < 1) {
  387. break;
  388. }
  389. }
  390. if (overbooked < 1) {
  391. break;
  392. }
  393. }
  394. }
  395. } else {
  396. for (size_t needyPoolIdx : needyPools) {
  397. TPoolInfo &pool = Pools[needyPoolIdx];
  398. i64 threadCount = pool.GetThreadCount();
  399. if (budget >= 1.0) {
  400. if (threadCount + 1 <= pool.MaxThreadCount) {
  401. AtomicIncrement(pool.IncreasingThreadsByNeedyState);
  402. isNeedyByPool[needyPoolIdx] = false;
  403. sumOfAdditionalThreads++;
  404. pool.SetThreadCount(threadCount + 1);
  405. budget -= 1.0;
  406. LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by needs", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount);
  407. }
  408. }
  409. if constexpr (NFeatures::IsLocalQueues()) {
  410. bool needToExpandLocalQueue = budget < 1.0 || threadCount >= pool.MaxThreadCount;
  411. needToExpandLocalQueue &= (bool)pool.BasicPool;
  412. needToExpandLocalQueue &= (pool.MaxThreadCount > 1);
  413. needToExpandLocalQueue &= (pool.LocalQueueSize < NFeatures::TLocalQueuesFeatureFlags::MAX_LOCAL_QUEUE_SIZE);
  414. if (needToExpandLocalQueue) {
  415. pool.BasicPool->SetLocalQueueSize(++pool.LocalQueueSize);
  416. }
  417. }
  418. }
  419. }
  420. if (budget < 1.0) {
  421. size_t takingAwayThreads = 0;
  422. for (size_t needyPoolIdx : needyPools) {
  423. TPoolInfo &pool = Pools[needyPoolIdx];
  424. i64 threadCount = pool.GetThreadCount();
  425. sumOfAdditionalThreads -= threadCount - pool.DefaultThreadCount;
  426. if (sumOfAdditionalThreads < takingAwayThreads + 1) {
  427. break;
  428. }
  429. if (!isNeedyByPool[needyPoolIdx]) {
  430. continue;
  431. }
  432. AtomicIncrement(pool.IncreasingThreadsByExchange);
  433. isNeedyByPool[needyPoolIdx] = false;
  434. takingAwayThreads++;
  435. pool.SetThreadCount(threadCount + 1);
  436. LWPROBE(HarmonizeOperation, needyPoolIdx, pool.Pool->GetName(), "increase by exchanging", threadCount + 1, pool.DefaultThreadCount, pool.MaxThreadCount);
  437. }
  438. for (ui16 poolIdx : PriorityOrder) {
  439. if (takingAwayThreads <= 0) {
  440. break;
  441. }
  442. TPoolInfo &pool = Pools[poolIdx];
  443. size_t threadCount = pool.GetThreadCount();
  444. size_t additionalThreadsCount = Max<size_t>(0L, threadCount - pool.DefaultThreadCount);
  445. size_t currentTakingAwayThreads = Min(additionalThreadsCount, takingAwayThreads);
  446. if (!currentTakingAwayThreads) {
  447. continue;
  448. }
  449. takingAwayThreads -= currentTakingAwayThreads;
  450. pool.SetThreadCount(threadCount - currentTakingAwayThreads);
  451. AtomicAdd(pool.DecreasingThreadsByExchange, takingAwayThreads);
  452. LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease by exchanging", threadCount - currentTakingAwayThreads, pool.DefaultThreadCount, pool.MaxThreadCount);
  453. }
  454. }
  455. for (size_t hoggishPoolIdx : hoggishPools) {
  456. TPoolInfo &pool = Pools[hoggishPoolIdx];
  457. i64 threadCount = pool.GetThreadCount();
  458. if (pool.BasicPool && pool.LocalQueueSize > NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE) {
  459. pool.LocalQueueSize = std::min<ui16>(NFeatures::TLocalQueuesFeatureFlags::MIN_LOCAL_QUEUE_SIZE, pool.LocalQueueSize / 2);
  460. pool.BasicPool->SetLocalQueueSize(pool.LocalQueueSize);
  461. }
  462. if (threadCount > pool.MinThreadCount) {
  463. AtomicIncrement(pool.DecreasingThreadsByHoggishState);
  464. LWPROBE(HarmonizeOperation, hoggishPoolIdx, pool.Pool->GetName(), "decrease by hoggish", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
  465. pool.SetThreadCount(threadCount - 1);
  466. }
  467. }
  468. }
  469. void THarmonizer::CalculatePriorityOrder() {
  470. PriorityOrder.resize(Pools.size());
  471. Iota(PriorityOrder.begin(), PriorityOrder.end(), 0);
  472. Sort(PriorityOrder.begin(), PriorityOrder.end(), [&] (i16 lhs, i16 rhs) {
  473. if (Pools[lhs].Priority != Pools[rhs].Priority) {
  474. return Pools[lhs].Priority < Pools[rhs].Priority;
  475. }
  476. return Pools[lhs].Pool->PoolId > Pools[rhs].Pool->PoolId;
  477. });
  478. }
  479. void THarmonizer::Harmonize(ui64 ts) {
  480. if (IsDisabled || NextHarmonizeTs > ts || !Lock.TryAcquire()) {
  481. LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, false);
  482. return;
  483. }
  484. // Check again under the lock
  485. if (IsDisabled) {
  486. LWPROBE(TryToHarmonizeFailed, ts, NextHarmonizeTs, IsDisabled, true);
  487. Lock.Release();
  488. return;
  489. }
  490. // Will never reach this line disabled
  491. ui64 previousNextHarmonizeTs = NextHarmonizeTs.exchange(ts + Us2Ts(1'000'000ull));
  492. LWPROBE(TryToHarmonizeSuccess, ts, NextHarmonizeTs, previousNextHarmonizeTs);
  493. if (PriorityOrder.empty()) {
  494. CalculatePriorityOrder();
  495. }
  496. PullStats(ts);
  497. HarmonizeImpl(ts);
  498. Lock.Release();
  499. }
  500. void THarmonizer::DeclareEmergency(ui64 ts) {
  501. NextHarmonizeTs = ts;
  502. }
  503. void THarmonizer::AddPool(IExecutorPool* pool, TSelfPingInfo *pingInfo) {
  504. TGuard<TSpinLock> guard(Lock);
  505. TPoolInfo poolInfo;
  506. poolInfo.Pool = pool;
  507. poolInfo.BasicPool = dynamic_cast<TBasicExecutorPool*>(pool);
  508. poolInfo.DefaultThreadCount = pool->GetDefaultThreadCount();
  509. poolInfo.MinThreadCount = pool->GetMinThreadCount();
  510. poolInfo.MaxThreadCount = pool->GetMaxThreadCount();
  511. poolInfo.ThreadInfo.resize(poolInfo.MaxThreadCount);
  512. poolInfo.Priority = pool->GetPriority();
  513. pool->SetThreadCount(poolInfo.DefaultThreadCount);
  514. if (pingInfo) {
  515. poolInfo.AvgPingCounter = pingInfo->AvgPingCounter;
  516. poolInfo.AvgPingCounterWithSmallWindow = pingInfo->AvgPingCounterWithSmallWindow;
  517. poolInfo.MaxAvgPingUs = pingInfo->MaxAvgPingUs;
  518. }
  519. Pools.push_back(poolInfo);
  520. PriorityOrder.clear();
  521. }
  522. void THarmonizer::Enable(bool enable) {
  523. TGuard<TSpinLock> guard(Lock);
  524. IsDisabled = enable;
  525. }
  526. IHarmonizer* MakeHarmonizer(ui64 ts) {
  527. return new THarmonizer(ts);
  528. }
  529. TPoolHarmonizerStats THarmonizer::GetPoolStats(i16 poolId) const {
  530. const TPoolInfo &pool = Pools[poolId];
  531. ui64 flags = RelaxedLoad(&pool.LastFlags);
  532. return TPoolHarmonizerStats{
  533. .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)),
  534. .IncreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByExchange)),
  535. .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)),
  536. .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)),
  537. .DecreasingThreadsByExchange = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByExchange)),
  538. .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxConsumedCpu)),
  539. .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&pool.MinConsumedCpu)),
  540. .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&pool.MaxBookedCpu)),
  541. .MinBookedCpu = static_cast<i64>(RelaxedLoad(&pool.MinBookedCpu)),
  542. .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)),
  543. .IsNeedy = static_cast<bool>(flags & 1),
  544. .IsStarved = static_cast<bool>(flags & 2),
  545. .IsHoggish = static_cast<bool>(flags & 4),
  546. };
  547. }
  548. THarmonizerStats THarmonizer::GetStats() const {
  549. return THarmonizerStats{
  550. .MaxConsumedCpu = static_cast<i64>(RelaxedLoad(&MaxConsumedCpu)),
  551. .MinConsumedCpu = static_cast<i64>(RelaxedLoad(&MinConsumedCpu)),
  552. .MaxBookedCpu = static_cast<i64>(RelaxedLoad(&MaxBookedCpu)),
  553. .MinBookedCpu = static_cast<i64>(RelaxedLoad(&MinBookedCpu)),
  554. };
  555. }
  556. }