balancer.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. #include "balancer.h"
  2. #include "probes.h"
  3. #include <library/cpp/actors/util/cpu_load_log.h>
  4. #include <library/cpp/actors/util/datetime.h>
  5. #include <library/cpp/actors/util/intrinsics.h>
  6. #include <util/system/spinlock.h>
  7. #include <algorithm>
  8. namespace NActors {
  9. LWTRACE_USING(ACTORLIB_PROVIDER);
  10. // Describes balancing-related state of pool, the most notable is `Importance` to add new cpu
  11. struct TLevel {
  12. // Balancer will try to give more cpu to overloaded pools
  13. enum ELoadClass {
  14. Underloaded = 0,
  15. Moderate = 1,
  16. Overloaded = 2,
  17. };
  18. double ScaleFactor;
  19. ELoadClass LoadClass;
  20. ui64 Importance; // pool with lower importance is allowed to pass cpu to pool with higher, but the opposite is forbidden
  21. TLevel() {}
  22. TLevel(const TBalancingConfig& cfg, TPoolId poolId, ui64 currentCpus, double cpuIdle, ui64 addLatencyUs, ui64 worstLatencyUs) {
  23. ScaleFactor = double(currentCpus) / cfg.Cpus;
  24. if ((worstLatencyUs + addLatencyUs) < 2000 && cpuIdle > 1.0) { // Uderload criterion, based on estimated latency w/o 1 cpu
  25. LoadClass = Underloaded;
  26. } else if (worstLatencyUs > 2000 || cpuIdle < 0.2) { // Overload criterion, based on latency
  27. LoadClass = Overloaded;
  28. } else {
  29. LoadClass = Moderate;
  30. }
  31. Importance = MakeImportance(LoadClass, cfg.Priority, ScaleFactor, cpuIdle, poolId);
  32. }
  33. private:
  34. // Importance is simple ui64 value (from highest to lowest):
  35. // 2 Bits: LoadClass
  36. // 8 Bits: Priority
  37. // 10 Bits: -ScaleFactor (for max-min fairness with weights equal to TBalancingConfig::Cpus)
  38. // 10 Bits: -CpuIdle
  39. // 6 Bits: PoolId
  40. static ui64 MakeImportance(ELoadClass load, ui8 priority, double scaleFactor, double cpuIdle, TPoolId poolId) {
  41. ui64 idle = std::clamp<i64>(1024 - cpuIdle * 512, 0, 1023);
  42. ui64 scale = std::clamp<i64>(1024 - scaleFactor * 32, 0, 1023);
  43. Y_ABORT_UNLESS(ui64(load) < (1ull << 2ull));
  44. Y_ABORT_UNLESS(ui64(priority) < (1ull << 8ull));
  45. Y_ABORT_UNLESS(ui64(scale) < (1ull << 10ull));
  46. Y_ABORT_UNLESS(ui64(idle) < (1ull << 10ull));
  47. Y_ABORT_UNLESS(ui64(poolId) < (1ull << 6ull));
  48. static_assert(ui64(MaxPools) <= (1ull << 6ull));
  49. ui64 importance =
  50. (ui64(load) << ui64(6 + 10 + 10 + 8)) |
  51. (ui64(priority) << ui64(6 + 10 + 10)) |
  52. (ui64(scale) << ui64(6 + 10)) |
  53. (ui64(idle) << ui64(6)) |
  54. ui64(poolId);
  55. return importance;
  56. }
  57. };
  58. // Main balancer implemenation
  59. class TBalancer: public IBalancer {
  60. private:
  61. struct TCpu;
  62. struct TPool;
  63. bool Disabled = true;
  64. TSpinLock Lock;
  65. ui64 NextBalanceTs;
  66. TVector<TCpu> Cpus; // Indexed by CpuId, can have gaps
  67. TVector<TPool> Pools; // Indexed by PoolId, can have gaps
  68. TBalancerConfig Config;
  69. public:
  70. ui64 GetPeriodUs() override;
  71. // Setup
  72. TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts);
  73. bool AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* cpu) override;
  74. ~TBalancer();
  75. // Balancing
  76. bool TryLock(ui64 ts) override;
  77. void SetPoolStats(TPoolId pool, const TBalancerStats& stats) override;
  78. void Balance() override;
  79. void Unlock() override;
  80. private:
  81. void MoveCpu(TPool& from, TPool& to);
  82. };
  83. struct TBalancer::TPool {
  84. TBalancingConfig Config;
  85. TPoolId PoolId;
  86. TString PoolName;
  87. // Input data for balancing
  88. TBalancerStats Prev;
  89. TBalancerStats Next;
  90. // Derived stats
  91. double CpuLoad;
  92. double CpuIdle;
  93. // Classification
  94. // NOTE: We want to avoid passing cpu back and forth, so we must consider not only current level,
  95. // NOTE: but expected levels after movements also
  96. TLevel CurLevel; // Level with current amount of cpu
  97. TLevel AddLevel; // Level after one cpu acception
  98. TLevel SubLevel; // Level after one cpu donation
  99. // Balancing state
  100. ui64 CurrentCpus = 0; // Total number of cpus assigned for this pool (zero means pools is not balanced)
  101. ui64 PrevCpus = 0; // Cpus in last period
  102. explicit TPool(const TBalancingConfig& cfg = {})
  103. : Config(cfg)
  104. {}
  105. void Configure(const TBalancingConfig& cfg, const TString& poolName) {
  106. Config = cfg;
  107. // Enforce constraints
  108. if (Config.Cpus > 0) {
  109. Config.MinCpus = std::clamp<ui32>(Config.MinCpus, 1, Config.Cpus);
  110. Config.MaxCpus = Max<ui32>(Config.MaxCpus, Config.Cpus);
  111. } else {
  112. Y_ABORT_UNLESS(Config.Cpus == 0,
  113. "Unexpected negative Config.Cpus# %" PRIi64,
  114. (i64)Config.Cpus);
  115. Config.MinCpus = 0;
  116. Config.MaxCpus = 0;
  117. }
  118. PoolName = poolName;
  119. }
  120. };
  121. struct TBalancer::TCpu {
  122. TCpuState* State = nullptr; // Cpu state, nullptr means cpu is not used (gap)
  123. TCpuAllocation Alloc;
  124. TPoolId Current;
  125. TPoolId Assigned;
  126. };
  127. TBalancer::TBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts)
  128. : NextBalanceTs(ts)
  129. , Config(config)
  130. {
  131. for (TPoolId pool = 0; pool < MaxPools; pool++) {
  132. Pools.emplace_back();
  133. Pools.back().PoolId = pool;
  134. }
  135. for (const TUnitedExecutorPoolConfig& united : unitedPools) {
  136. Pools[united.PoolId].Configure(united.Balancing, united.PoolName);
  137. }
  138. }
  139. TBalancer::~TBalancer() {
  140. }
  141. bool TBalancer::AddCpu(const TCpuAllocation& cpuAlloc, TCpuState* state) {
  142. // Setup
  143. TCpuId cpuId = cpuAlloc.CpuId;
  144. if (Cpus.size() <= cpuId) {
  145. Cpus.resize(cpuId + 1);
  146. }
  147. TCpu& cpu = Cpus[cpuId];
  148. cpu.State = state;
  149. cpu.Alloc = cpuAlloc;
  150. // Fill every pool with cpus up to TBalancingConfig::Cpus
  151. TPoolId pool = 0;
  152. for (TPool& p : Pools) {
  153. if (p.CurrentCpus < p.Config.Cpus) {
  154. p.CurrentCpus++;
  155. break;
  156. }
  157. pool++;
  158. }
  159. if (pool != MaxPools) { // cpu under balancer control
  160. state->SwitchPool(pool);
  161. state->AssignPool(pool);
  162. Disabled = false;
  163. return true;
  164. }
  165. return false; // non-balanced cpu
  166. }
  167. bool TBalancer::TryLock(ui64 ts) {
  168. if (!Disabled && NextBalanceTs < ts && Lock.TryAcquire()) {
  169. NextBalanceTs = ts + Us2Ts(Config.PeriodUs);
  170. return true;
  171. }
  172. return false;
  173. }
  174. void TBalancer::SetPoolStats(TPoolId pool, const TBalancerStats& stats) {
  175. Y_ABORT_UNLESS(pool < MaxPools);
  176. TPool& p = Pools[pool];
  177. p.Prev = p.Next;
  178. p.Next = stats;
  179. }
  180. void TBalancer::Balance() {
  181. // Update every cpu state
  182. for (TCpu& cpu : Cpus) {
  183. if (cpu.State) {
  184. cpu.State->Load(cpu.Assigned, cpu.Current);
  185. if (cpu.Current < MaxPools && cpu.Current != cpu.Assigned) {
  186. return; // previous movement has not been applied yet, wait
  187. }
  188. }
  189. }
  190. // Process stats, classify and compute pool importance
  191. TStackVec<TPool*, MaxPools> order;
  192. for (TPool& pool : Pools) {
  193. if (pool.Config.Cpus == 0) {
  194. continue; // skip gaps (non-existent or non-united pools)
  195. }
  196. if (pool.Prev.Ts == 0 || pool.Prev.Ts >= pool.Next.Ts) {
  197. return; // invalid stats
  198. }
  199. // Compute derived stats
  200. pool.CpuLoad = (pool.Next.CpuUs - pool.Prev.CpuUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts);
  201. if (pool.Prev.IdleUs == ui64(-1) || pool.Next.IdleUs == ui64(-1)) {
  202. pool.CpuIdle = pool.CurrentCpus - pool.CpuLoad; // for tests
  203. } else {
  204. pool.CpuIdle = (pool.Next.IdleUs - pool.Prev.IdleUs) / Ts2Us(pool.Next.Ts - pool.Prev.Ts);
  205. }
  206. // Compute levels
  207. pool.CurLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus, pool.CpuIdle,
  208. pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
  209. pool.AddLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus + 1, pool.CpuIdle,
  210. 0, pool.Next.WorstActivationTimeUs); // we expect taken cpu to became utilized
  211. pool.SubLevel = TLevel(pool.Config, pool.PoolId, pool.CurrentCpus - 1, pool.CpuIdle - 1,
  212. pool.Next.ExpectedLatencyIncreaseUs, pool.Next.WorstActivationTimeUs);
  213. // Prepare for balancing
  214. pool.PrevCpus = pool.CurrentCpus;
  215. order.push_back(&pool);
  216. }
  217. // Sort pools by importance
  218. std::sort(order.begin(), order.end(), [] (TPool* l, TPool* r) {return l->CurLevel.Importance < r->CurLevel.Importance; });
  219. for (TPool* pool : order) {
  220. LWPROBE(PoolStats, pool->PoolId, pool->PoolName, pool->CurrentCpus, pool->CurLevel.LoadClass, pool->Config.Priority, pool->CurLevel.ScaleFactor, pool->CpuIdle, pool->CpuLoad, pool->CurLevel.Importance, pool->AddLevel.Importance, pool->SubLevel.Importance);
  221. }
  222. // Move cpus from lower importance to higher importance pools
  223. for (auto toIter = order.rbegin(); toIter != order.rend(); ++toIter) {
  224. TPool& to = **toIter;
  225. if (to.CurLevel.LoadClass == TLevel::Overloaded && // if pool is overloaded
  226. to.CurrentCpus < to.Config.MaxCpus) // and constraints would not be violated
  227. {
  228. for (auto fromIter = order.begin(); (*fromIter)->CurLevel.Importance < to.CurLevel.Importance; ++fromIter) {
  229. TPool& from = **fromIter;
  230. if (from.CurrentCpus == from.PrevCpus && // if not balanced yet
  231. from.CurrentCpus > from.Config.MinCpus && // and constraints would not be violated
  232. from.SubLevel.Importance <= to.AddLevel.Importance) // and which of two pools is more important would not change after cpu movement
  233. {
  234. MoveCpu(from, to);
  235. from.CurrentCpus--;
  236. to.CurrentCpus++;
  237. break;
  238. }
  239. }
  240. }
  241. }
  242. }
  243. void TBalancer::MoveCpu(TBalancer::TPool& from, TBalancer::TPool& to) {
  244. for (auto ci = Cpus.rbegin(), ce = Cpus.rend(); ci != ce; ci++) {
  245. TCpu& cpu = *ci;
  246. if (!cpu.State) {
  247. continue;
  248. }
  249. if (cpu.Assigned == from.PoolId) {
  250. cpu.State->AssignPool(to.PoolId);
  251. cpu.Assigned = to.PoolId;
  252. LWPROBE(MoveCpu, from.PoolId, to.PoolId, from.PoolName, to.PoolName, cpu.Alloc.CpuId);
  253. return;
  254. }
  255. }
  256. Y_ABORT();
  257. }
  258. void TBalancer::Unlock() {
  259. Lock.Release();
  260. }
  261. ui64 TBalancer::GetPeriodUs() {
  262. return Config.PeriodUs;
  263. }
  264. IBalancer* MakeBalancer(const TBalancerConfig& config, const TVector<TUnitedExecutorPoolConfig>& unitedPools, ui64 ts) {
  265. return new TBalancer(config, unitedPools, ts);
  266. }
  267. }