benchmark_ut.cpp 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111
  1. #include "actorsystem.h"
  2. #include "actor_bootstrapped.h"
  3. #include "config.h"
  4. #include "executor_pool_basic.h"
  5. #include "hfunc.h"
  6. #include "scheduler_basic.h"
  7. #include <library/cpp/testing/unittest/registar.h>
  8. #include <util/thread/pool.h>
  9. #include <algorithm>
  10. #include <atomic>
  11. #include <chrono>
  12. #include <condition_variable>
  13. #include <iostream>
  14. #include <memory>
  15. #include <mutex>
  16. #include <optional>
  17. #include <span>
  18. #include <string>
  19. #include <thread>
  20. #include <utility>
  21. #include <unordered_set>
  22. #include <unordered_map>
  23. #include <vector>
  24. #define BENCH_START(label) auto label##Start = std::chrono::steady_clock::now()
  25. #define BENCH_END(label) std::chrono::steady_clock::now() - label##Start
  26. using namespace NActors;
  27. using namespace std::chrono_literals;
  28. Y_UNIT_TEST_SUITE(ActorSystemBenchmark) {
  29. enum ESimpleEventType {
  30. EvQuickSort,
  31. EvSumVector,
  32. EvSumVectorResult,
  33. EvSumSendRequests,
  34. EvKvSearch,
  35. EvKvSendRequests,
  36. };
  37. using TContainer = std::vector<i32>;
  38. using TActorIds = std::span<TActorId>;
  39. template <typename TId>
  40. class TActiveEntityRegistry {
  41. private:
  42. std::unordered_set<TId> ActiveIds_;
  43. std::mutex ActiveIdsMutex_;
  44. std::condition_variable ActiveIdsCv_;
  45. public:
  46. void SetActive(TId id) {
  47. std::unique_lock lock(ActiveIdsMutex_);
  48. ActiveIds_.insert(std::move(id));
  49. }
  50. void SetInactive(const TId& id) {
  51. std::unique_lock lock(ActiveIdsMutex_);
  52. ActiveIds_.erase(id);
  53. if (ActiveIds_.empty()) {
  54. ActiveIdsCv_.notify_all();
  55. }
  56. }
  57. bool WaitForAllInactive(std::chrono::microseconds timeout = 1ms) {
  58. std::unique_lock lock(ActiveIdsMutex_);
  59. ActiveIdsCv_.wait_for(lock, timeout, [this] {
  60. return ActiveIds_.empty();
  61. });
  62. return ActiveIds_.empty();
  63. }
  64. };
  65. class TQuickSortEngine {
  66. public:
  67. struct TParameters {
  68. TContainer &Container;
  69. i32 Left;
  70. i32 Right;
  71. void* CustomData = nullptr;
  72. TParameters() = delete;
  73. TParameters(TContainer& container, i32 left, i32 right, void* customData)
  74. : Container(container)
  75. , Left(left)
  76. , Right(right)
  77. , CustomData(customData)
  78. {}
  79. };
  80. public:
  81. void Sort(TQuickSortEngine::TParameters& params) {
  82. auto [newRight, newLeft] = Partition(params.Container, params.Left, params.Right);
  83. if (!(params.Left < newRight || newLeft < params.Right)) {
  84. return;
  85. }
  86. auto [leftParams, rightParams] = SplitParameters(params, newRight, newLeft);
  87. bool ranAsync = false;
  88. if (newLeft < params.Right) {
  89. ranAsync = TryRunAsync(rightParams);
  90. if (ranAsync) {
  91. Sort(rightParams);
  92. }
  93. }
  94. if (params.Left < newRight) {
  95. if (!ranAsync && !TryRunAsync(leftParams)) {
  96. Sort(leftParams);
  97. }
  98. }
  99. }
  100. // returns bounds of left and right sub-arrays for the next iteration
  101. std::pair<i32, i32> Partition(TContainer& container, i32 left, i32 right) {
  102. ui32 pivotIndex = (left + right) / 2;
  103. auto pivot = container[pivotIndex];
  104. while (left <= right) {
  105. while (container[left] < pivot) {
  106. left++;
  107. }
  108. while (container[right] > pivot) {
  109. right--;
  110. }
  111. if (left <= right) {
  112. std::swap(container[left++], container[right--]);
  113. }
  114. }
  115. return {right, left};
  116. }
  117. protected:
  118. virtual std::pair<TParameters, TParameters> SplitParameters(const TParameters& params, i32 newRight, i32 newLeft) = 0;
  119. virtual bool TryRunAsync(const TParameters& params) = 0;
  120. };
  121. class TQuickSortTask : public TQuickSortEngine, public IObjectInQueue {
  122. public:
  123. using TParameters = TQuickSortEngine::TParameters;
  124. struct TThreadPoolParameters {
  125. const ui32 ThreadsLimit = 0;
  126. std::atomic<ui32> ThreadsUsed = 0;
  127. TThreadPool& ThreadPool;
  128. TThreadPoolParameters(ui32 threadsLimit, ui32 threadsUsed, TThreadPool &threadPool)
  129. : ThreadsLimit(threadsLimit)
  130. , ThreadsUsed(threadsUsed)
  131. , ThreadPool(threadPool)
  132. {}
  133. };
  134. TParameters Params;
  135. TActiveEntityRegistry<TQuickSortTask*>& ActiveThreadRegistry;
  136. public:
  137. TQuickSortTask() = delete;
  138. TQuickSortTask(TParameters params, TActiveEntityRegistry<TQuickSortTask*>& activeThreadRegistry)
  139. : Params(params)
  140. , ActiveThreadRegistry(activeThreadRegistry)
  141. {
  142. ActiveThreadRegistry.SetActive(this);
  143. }
  144. void Process(void*) override {
  145. Sort(Params);
  146. ActiveThreadRegistry.SetInactive(this);
  147. }
  148. protected:
  149. std::pair<TParameters, TParameters> SplitParameters(const TParameters& params, i32 newRight, i32 newLeft) override {
  150. return {
  151. {params.Container, params.Left, newRight, params.CustomData},
  152. {params.Container, newLeft, params.Right, params.CustomData}
  153. };
  154. }
  155. bool TryRunAsync(const TParameters& params) override {
  156. auto threadPoolParams = static_cast<TThreadPoolParameters*>(params.CustomData);
  157. if (threadPoolParams->ThreadsUsed++ >= threadPoolParams->ThreadsLimit) {
  158. threadPoolParams->ThreadsUsed--;
  159. return false;
  160. }
  161. return threadPoolParams->ThreadPool.AddAndOwn(THolder(new TQuickSortTask(params, ActiveThreadRegistry)));
  162. }
  163. };
  164. class TEvQuickSort : public TEventLocal<TEvQuickSort, EvQuickSort> {
  165. public:
  166. using TParameters = TQuickSortEngine::TParameters;
  167. struct TActorSystemParameters {
  168. TActorIds ActorIds;
  169. std::atomic<ui32> ActorIdsUsed = 0;
  170. TActorSystemParameters() = delete;
  171. TActorSystemParameters(const TActorIds& actorIds, ui32 actorIdsUsed = 0)
  172. : ActorIds(actorIds)
  173. , ActorIdsUsed(actorIdsUsed)
  174. {}
  175. };
  176. TQuickSortEngine::TParameters Params;
  177. public:
  178. TEvQuickSort() = delete;
  179. TEvQuickSort(TParameters params, TActiveEntityRegistry<TEvQuickSort*>& activeEventRegistry)
  180. : Params(params)
  181. , ActiveEventRegistry_(activeEventRegistry)
  182. {
  183. Y_VERIFY(!Params.Container.empty());
  184. Y_VERIFY(Params.Right - Params.Left + 1 <= static_cast<i32>(Params.Container.size()),
  185. "left: %d, right: %d, cont.size: %d", Params.Left, Params.Right, static_cast<i32>(Params.Container.size()));
  186. ActiveEventRegistry_.SetActive(this);
  187. }
  188. virtual ~TEvQuickSort() {
  189. ActiveEventRegistry_.SetInactive(this);
  190. }
  191. private:
  192. TActiveEntityRegistry<TEvQuickSort*>& ActiveEventRegistry_;
  193. };
  194. class TQuickSortActor : public TQuickSortEngine, public TActorBootstrapped<TQuickSortActor> {
  195. public:
  196. using TParameters = TQuickSortEngine::TParameters;
  197. private:
  198. TActiveEntityRegistry<TEvQuickSort*>& ActiveEventRegistry_;
  199. public:
  200. TQuickSortActor() = delete;
  201. TQuickSortActor(TActiveEntityRegistry<TEvQuickSort*>& activeEventRegistry)
  202. : TActorBootstrapped<TQuickSortActor>()
  203. , ActiveEventRegistry_(activeEventRegistry)
  204. {}
  205. STFUNC(StateInit) {
  206. switch (ev->GetTypeRewrite()) {
  207. hFunc(TEvQuickSort, Handle);
  208. default:
  209. Y_VERIFY(false);
  210. }
  211. }
  212. void Bootstrap() {
  213. Become(&TThis::StateInit);
  214. }
  215. protected:
  216. std::pair<TParameters, TParameters> SplitParameters(const TParameters& params, i32 newRight, i32 newLeft) override {
  217. return {
  218. {params.Container, params.Left, newRight, params.CustomData},
  219. {params.Container, newLeft, params.Right, params.CustomData}
  220. };
  221. }
  222. bool TryRunAsync(const TParameters& params) override {
  223. auto actorSystemParams = static_cast<TEvQuickSort::TActorSystemParameters*>(params.CustomData);
  224. const auto actorIdIndex = actorSystemParams->ActorIdsUsed++;
  225. if (actorIdIndex >= actorSystemParams->ActorIds.size()) {
  226. actorSystemParams->ActorIdsUsed--;
  227. return false;
  228. }
  229. auto targetActorId = actorSystemParams->ActorIds[actorIdIndex];
  230. Send(targetActorId, new TEvQuickSort(params, ActiveEventRegistry_));
  231. return true;
  232. }
  233. private:
  234. void Handle(TEvQuickSort::TPtr& ev) {
  235. auto evPtr = ev->Get();
  236. Sort(evPtr->Params);
  237. }
  238. };
  239. std::vector<i32> PrepareVectorToSort(ui32 n) {
  240. std::vector<i32> numbers(n);
  241. for (ui32 i = 0; i < numbers.size(); i++) {
  242. numbers[i] = numbers.size() - i;
  243. }
  244. return numbers;
  245. }
  246. std::unique_ptr<TActorSystem> PrepareActorSystem(ui32 poolThreads, TAffinity* affinity = nullptr) {
  247. auto setup = MakeHolder<TActorSystemSetup>();
  248. setup->NodeId = 1;
  249. setup->ExecutorsCount = 1;
  250. setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
  251. ui32 poolId = 0;
  252. ui64 poolSpinThreashold = 20;
  253. setup->Executors[0].Reset(new TBasicExecutorPool(
  254. poolId, poolThreads, poolSpinThreashold, "", nullptr, affinity));
  255. TSchedulerConfig schedulerConfig;
  256. schedulerConfig.ResolutionMicroseconds = 512;
  257. schedulerConfig.SpinThreshold = 100;
  258. setup->Scheduler.Reset(new TBasicSchedulerThread(schedulerConfig));
  259. return std::make_unique<TActorSystem>(setup);
  260. }
  261. std::vector<TActorId> prepareQuickSortActors(
  262. TActorSystem* actorSystem, ui32 actorsNum, TActiveEntityRegistry<TEvQuickSort*>& activeEventRegistry
  263. ) {
  264. std::vector<TActorId> actorIds;
  265. actorIds.reserve(actorsNum);
  266. for (ui32 i = 0; i < actorsNum; i++) {
  267. auto actor = new TQuickSortActor(activeEventRegistry);
  268. auto actorId = actorSystem->Register(actor);
  269. actorIds.push_back(actorId);
  270. }
  271. return actorIds;
  272. }
  273. std::pair<std::chrono::microseconds, std::vector<i32>> BenchmarkQuickSortActor(
  274. ui32 threads,
  275. ui32 iterations,
  276. ui32 vectorSize
  277. ) {
  278. auto actorSystem = PrepareActorSystem(threads);
  279. actorSystem->Start();
  280. TActiveEntityRegistry<TEvQuickSort*> activeEventRegistry;
  281. auto actorIds = prepareQuickSortActors(actorSystem.get(), threads, activeEventRegistry);
  282. std::vector<i32> actorSortResult;
  283. auto actorQsDurationTotal = 0us;
  284. for (ui32 i = 0; i < iterations; i++) {
  285. auto numbers = PrepareVectorToSort(vectorSize);
  286. TEvQuickSort::TActorSystemParameters actorSystemParams(actorIds, 1);
  287. TEvQuickSort::TParameters params(numbers, 0, numbers.size() - 1, &actorSystemParams);
  288. auto ev3 = new TEvQuickSort(params, activeEventRegistry);
  289. BENCH_START(qs);
  290. actorSystem->Send(actorIds.front(), ev3);
  291. UNIT_ASSERT_C(activeEventRegistry.WaitForAllInactive(60s), "timeout");
  292. actorQsDurationTotal += std::chrono::duration_cast<std::chrono::microseconds>(BENCH_END(qs));
  293. if (i + 1 == iterations) {
  294. actorSortResult = numbers;
  295. }
  296. }
  297. return {actorQsDurationTotal / iterations, actorSortResult};
  298. }
  299. std::pair<std::chrono::microseconds, std::vector<i32>> BenchmarkQuickSortThreadPool(
  300. ui32 threads,
  301. ui32 iterations,
  302. ui32 vectorSize
  303. ) {
  304. TThreadPool threadPool;
  305. threadPool.Start(threads);
  306. TActiveEntityRegistry<TQuickSortTask*> activeThreadRegistry;
  307. auto threaPoolSortDurationTotal = 0us;
  308. std::vector<i32> threadPoolSortResult;
  309. for (ui32 i = 0; i < iterations; i++) {
  310. auto numbers = PrepareVectorToSort(vectorSize);
  311. TQuickSortTask::TThreadPoolParameters threadPoolParams(threads, 1, threadPool);
  312. TQuickSortTask::TParameters params(numbers, 0, numbers.size() - 1, &threadPoolParams);
  313. BENCH_START(thread);
  314. Y_VERIFY(threadPool.AddAndOwn(THolder(new TQuickSortTask(params, activeThreadRegistry))));
  315. UNIT_ASSERT_C(activeThreadRegistry.WaitForAllInactive(60s), "timeout");
  316. threaPoolSortDurationTotal += std::chrono::duration_cast<std::chrono::microseconds>(BENCH_END(thread));
  317. if (i + 1 == iterations) {
  318. threadPoolSortResult = numbers;
  319. }
  320. }
  321. threadPool.Stop();
  322. return {threaPoolSortDurationTotal / iterations, threadPoolSortResult};
  323. }
  324. Y_UNIT_TEST(QuickSortActor) {
  325. const std::vector<ui32> threadss{1, 4};
  326. const std::vector<ui32> vectorSizes{100, 1'000, 1'000'000};
  327. const ui32 iterations = 3;
  328. std::cout << "sep=," << std::endl;
  329. std::cout << "size,threads,actor_time(us),thread_pool_time(us)" << std::endl;
  330. for (auto vectorSize : vectorSizes) {
  331. for (auto threads : threadss) {
  332. std::cerr << "vector size: " << vectorSize << ", threads: " << threads << std::endl;
  333. auto [actorSortDuration, actorSortResult] = BenchmarkQuickSortActor(threads, iterations, vectorSize);
  334. std::cerr << "actor sort duration: " << actorSortDuration.count() << "us" << std::endl;
  335. auto [threadPoolSortDuration, threadPoolSortResult] = BenchmarkQuickSortThreadPool(threads, iterations, vectorSize);
  336. std::cerr << "thread pool sort duration: " << threadPoolSortDuration.count() << "us" << std::endl;
  337. auto referenceVector = PrepareVectorToSort(vectorSize);
  338. std::sort(referenceVector.begin(), referenceVector.end());
  339. UNIT_ASSERT_EQUAL_C(actorSortResult, referenceVector,
  340. "vector size: " << vectorSize << "; threads: " << threads);
  341. UNIT_ASSERT_EQUAL_C(threadPoolSortResult, referenceVector,
  342. "vector size: " << vectorSize << "; threads: " << threads);
  343. std::cout << vectorSize << ","
  344. << threads << ","
  345. << actorSortDuration.count() << ","
  346. << threadPoolSortDuration.count() << std::endl;
  347. }
  348. std::cerr << "-----" << std::endl << std::endl;
  349. }
  350. }
  351. // KV-storage benchmark
  352. using TKvKey = std::string;
  353. using TKvValue = i32;
  354. using TDict = std::unordered_map<TKvKey, TKvValue>;
  355. struct TSearchStat {
  356. ui32 Found = 0;
  357. ui32 NotFound = 0;
  358. bool operator==(const TSearchStat& other) {
  359. return Found == other.Found && NotFound == other.NotFound;
  360. }
  361. };
  362. class TKvSearchTask : public IObjectInQueue {
  363. private:
  364. TKvKey Key_;
  365. const TDict& Dict_;
  366. TSearchStat SearchStat_ = {};
  367. public:
  368. TKvSearchTask() = delete;
  369. TKvSearchTask(TKvKey key, const TDict& dict)
  370. : Key_(key)
  371. , Dict_(dict)
  372. {}
  373. void Process(void*) override {
  374. if (Dict_.contains(Key_)) {
  375. SearchStat_.Found++;
  376. } else {
  377. SearchStat_.NotFound++;
  378. }
  379. }
  380. };
  381. class TEvKvSearch : public TEventLocal<TEvKvSearch, EvKvSearch> {
  382. public:
  383. TKvKey Key;
  384. public:
  385. TEvKvSearch() = delete;
  386. TEvKvSearch(TKvKey key)
  387. : Key(std::move(key))
  388. {}
  389. };
  390. class TEvKvSendRequests : public TEventLocal<TEvKvSendRequests, EvKvSendRequests> {
  391. public:
  392. const std::vector<std::string>& KeysToSearch;
  393. const std::vector<TActorId> SearchActorIds;
  394. public:
  395. TEvKvSendRequests() = delete;
  396. TEvKvSendRequests(const std::vector<std::string>& keysToSearch, std::vector<TActorId>&& searchActorIds)
  397. : KeysToSearch(keysToSearch)
  398. , SearchActorIds(std::move(searchActorIds))
  399. {}
  400. };
  401. class TKvSendRequestActor : public TActorBootstrapped<TKvSendRequestActor> {
  402. public:
  403. STFUNC(StateInit) {
  404. switch (ev->GetTypeRewrite()) {
  405. hFunc(TEvKvSendRequests, Handle);
  406. default:
  407. Y_VERIFY(false);
  408. }
  409. }
  410. void Bootstrap() {
  411. Become(&TThis::StateInit);
  412. }
  413. private:
  414. void Handle(TEvKvSendRequests::TPtr& ev) {
  415. auto evPtr = ev->Get();
  416. ui32 actorIdx = 0;
  417. for (auto& key : evPtr->KeysToSearch) {
  418. auto actorId = evPtr->SearchActorIds[actorIdx];
  419. actorIdx = (actorIdx + 1) % evPtr->SearchActorIds.size();
  420. Send(actorId, new TEvKvSearch(key));
  421. }
  422. }
  423. };
  424. class TKvSearchActor : public TActorBootstrapped<TKvSearchActor> {
  425. private:
  426. const TDict& Dict_;
  427. TSearchStat SearchStat_ = {};
  428. std::atomic<ui32> CompletedEvents_ = 0;
  429. public:
  430. TKvSearchActor() = delete;
  431. TKvSearchActor(const TDict& dict)
  432. : TActorBootstrapped<TKvSearchActor>()
  433. , Dict_(dict)
  434. {}
  435. STFUNC(StateInit) {
  436. switch (ev->GetTypeRewrite()) {
  437. hFunc(TEvKvSearch, Handle);
  438. default:
  439. Y_VERIFY(false);
  440. }
  441. }
  442. void Bootstrap() {
  443. Become(&TThis::StateInit);
  444. }
  445. const TSearchStat& SearchStat() {
  446. return SearchStat_;
  447. }
  448. ui32 CompletedEvents() {
  449. return CompletedEvents_;
  450. }
  451. private:
  452. void Handle(TEvKvSearch::TPtr& ev) {
  453. auto evPtr = ev->Get();
  454. if (Dict_.contains(evPtr->Key)) {
  455. SearchStat_.Found++;
  456. } else {
  457. SearchStat_.NotFound++;
  458. }
  459. CompletedEvents_++;
  460. }
  461. };
  462. TDict prepareKvSearchDict(const i32 dictSize) {
  463. std::string permutableString = "abcdefghijklm";
  464. TDict dict;
  465. for (i32 i = 0; i < dictSize; i++) {
  466. dict.emplace(permutableString, i);
  467. std::next_permutation(permutableString.begin(), permutableString.end());
  468. }
  469. return dict;
  470. }
  471. std::vector<std::string> prepareKeysToSearch(const TDict &dict, ui32 requestsNumber) {
  472. std::vector<std::string> keys;
  473. auto keyAppearances = requestsNumber / dict.size() + 1;
  474. keys.reserve(keyAppearances * dict.size());
  475. for (auto& [key, _] : dict) {
  476. for (ui32 i = 0; i < keyAppearances; i++) {
  477. keys.push_back(key);
  478. // keep the original key value to search
  479. if (i % 4 == 0) {
  480. continue;
  481. }
  482. // make non-exising key
  483. keys.back() += "nonexistingkey";
  484. }
  485. }
  486. Y_VERIFY(keys.size() >= requestsNumber);
  487. std::random_shuffle(keys.begin(), keys.end());
  488. keys.resize(requestsNumber);
  489. return keys;
  490. }
  491. std::pair<std::vector<TKvSearchActor*>, std::vector<TActorId>> prepareKvSearchActors(
  492. TActorSystem* actorSystem, ui32 searchActorsNum, const std::vector<TDict>& dicts
  493. ) {
  494. std::vector<TKvSearchActor*> searchActors;
  495. std::vector<TActorId> searchActorIds;
  496. searchActors.reserve(searchActorsNum);
  497. searchActorIds.reserve(searchActorsNum);
  498. for (ui32 i = 0, dictIdx = 0; i < searchActorsNum; i++) {
  499. const auto& dict = dicts[dictIdx];
  500. dictIdx = (dictIdx + 1) % dicts.size();
  501. auto kvSearchActor = new TKvSearchActor(dict);
  502. auto kvSearchActorId = actorSystem->Register(kvSearchActor);
  503. searchActors.push_back(kvSearchActor);
  504. searchActorIds.push_back(kvSearchActorId);
  505. }
  506. return {searchActors, searchActorIds};
  507. }
  508. ui32 CalculateCompletedEvents(const std::vector<TKvSearchActor*>& actors) {
  509. ui32 completedEvents = 0;
  510. for (auto actor : actors) {
  511. completedEvents += actor->CompletedEvents();
  512. }
  513. return completedEvents;
  514. }
  515. TSearchStat CollectKvSearchActorStat(const std::vector<TKvSearchActor*>& actors) {
  516. TSearchStat stat;
  517. for (auto actor : actors) {
  518. stat.Found += actor->SearchStat().Found;
  519. stat.NotFound += actor->SearchStat().NotFound;
  520. }
  521. return stat;
  522. }
  523. std::pair<std::chrono::microseconds, TSearchStat> BenchmarkKvActor(
  524. ui32 threads, ui32 actors, ui32 iterations, const std::vector<TDict>& dicts, const std::vector<std::string>& keysToSearch
  525. ) {
  526. TSearchStat stat = {};
  527. auto kvSearchActorDuration = 0us;
  528. for (ui32 i = 0; i < iterations; i++) {
  529. auto actorSystem = PrepareActorSystem(threads);
  530. actorSystem->Start();
  531. auto [kvSearchActors, kvSearchActorIds] = prepareKvSearchActors(actorSystem.get(), actors, dicts);
  532. auto kvSendRequestActorId = actorSystem->Register(new TKvSendRequestActor());
  533. BENCH_START(kvSearch);
  534. actorSystem->Send(kvSendRequestActorId, new TEvKvSendRequests(keysToSearch, std::move(kvSearchActorIds)));
  535. // CondVar logic gives too much of overhead (2-10 times more than just sleep_for)
  536. while (CalculateCompletedEvents(kvSearchActors) < keysToSearch.size()) {
  537. std::this_thread::sleep_for(1us);
  538. }
  539. kvSearchActorDuration += std::chrono::duration_cast<std::chrono::microseconds>(BENCH_END(kvSearch));
  540. if (i + 1 == iterations) {
  541. stat = CollectKvSearchActorStat(kvSearchActors);
  542. }
  543. }
  544. return {kvSearchActorDuration / iterations, stat};
  545. }
  546. std::pair<std::chrono::microseconds, TSearchStat> BenchmarkKvActorExternalSender(
  547. ui32 threads, ui32 actors, ui32 iterations, const std::vector<TDict>& dicts, const std::vector<std::string>& keysToSearch
  548. ) {
  549. TSearchStat stat = {};
  550. auto kvSearchActorDuration = 0us;
  551. for (ui32 i = 0; i < iterations; i++) {
  552. auto actorSystem = PrepareActorSystem(threads);
  553. actorSystem->Start();
  554. auto [kvSearchActors, kvSearchActorIds] = prepareKvSearchActors(actorSystem.get(), actors, dicts);
  555. BENCH_START(kvSearch);
  556. ui32 actorIdToUseIndex = 0;
  557. for (auto& key : keysToSearch) {
  558. actorSystem->Send(kvSearchActorIds[actorIdToUseIndex], new TEvKvSearch(key));
  559. actorIdToUseIndex = (actorIdToUseIndex + 1) % kvSearchActorIds.size();
  560. }
  561. // CondVar logic gives too much of overhead (2-10 times more than just sleep_for)
  562. while (CalculateCompletedEvents(kvSearchActors) < keysToSearch.size()) {
  563. std::this_thread::sleep_for(1us);
  564. }
  565. kvSearchActorDuration += std::chrono::duration_cast<std::chrono::microseconds>(BENCH_END(kvSearch));
  566. if (i + 1 == iterations) {
  567. stat = CollectKvSearchActorStat(kvSearchActors);
  568. }
  569. }
  570. return {kvSearchActorDuration / iterations, stat};
  571. }
  572. std::chrono::microseconds BenchmarkKvThreadPool(
  573. ui32 threads, ui32 iterations, const TDict& dict, const std::vector<std::string>& keysToSearch
  574. ) {
  575. TThreadPool threadPool;
  576. auto kvSearchActorDuration = 0us;
  577. for (ui32 i = 0; i < iterations; i++) {
  578. threadPool.Start(threads);
  579. BENCH_START(kvSearch);
  580. for (auto& key : keysToSearch) {
  581. Y_VERIFY(threadPool.AddAndOwn(THolder(new TKvSearchTask(key, dict))));
  582. }
  583. // CondVar logic gives too much of overhead (2-10 times more than just sleep_for)
  584. while (threadPool.Size() > 0) {
  585. std::this_thread::sleep_for(1us);
  586. }
  587. threadPool.Stop();
  588. kvSearchActorDuration += std::chrono::duration_cast<std::chrono::microseconds>(BENCH_END(kvSearch));
  589. }
  590. return {kvSearchActorDuration / iterations};
  591. }
  592. std::pair<std::chrono::microseconds, TSearchStat> BenchmarkKvSingleThread(
  593. ui32 iterations, const TDict& dict, const std::vector<std::string>& keysToSearch
  594. ) {
  595. TSearchStat stat = {};
  596. auto kvSearchDuration = 0us;
  597. for (ui32 i = 0; i < iterations; i++) {
  598. TSearchStat iterationStat = {};
  599. BENCH_START(kvSearch);
  600. for (auto& key : keysToSearch) {
  601. if (dict.contains(key)) {
  602. iterationStat.Found++;
  603. } else {
  604. iterationStat.NotFound++;
  605. }
  606. }
  607. kvSearchDuration += std::chrono::duration_cast<std::chrono::microseconds>(BENCH_END(kvSearch));
  608. if (i + 1 == iterations) {
  609. stat = iterationStat;
  610. }
  611. }
  612. return {kvSearchDuration / iterations, stat};
  613. }
  614. Y_UNIT_TEST(KvActor) {
  615. const bool forCI = true;
  616. using TNumbers = std::vector<ui32>;
  617. const TNumbers threadNumbers = forCI ? TNumbers{1} : TNumbers{1, 4, 8};
  618. const TNumbers actorNumbers = forCI ? TNumbers{1, 8} : TNumbers{1, 4, 8, 16, 32, 64};
  619. const TNumbers dictSizes = forCI ? TNumbers{1'000} : TNumbers{1'000, 1'000'000};
  620. const TNumbers dictsNumbers = forCI ? TNumbers{1} : TNumbers{1, 8};
  621. const ui32 iterations = 5;
  622. std::cout << "sep=," << std::endl;
  623. std::cout << "requests_number,dicts_number,dict_size,threads,actors,actor_time(us),actor_ext_time(us),thread_pool_time(us),single_thread_time(us)" << std::endl;
  624. for (auto dictsNumber : dictsNumbers) {
  625. for (auto dictSize : dictSizes) {
  626. const auto dict = prepareKvSearchDict(dictSize);
  627. const ui32 requestsNumber = forCI ? 10'000 : 1'000'000;
  628. const auto keysToSearch = prepareKeysToSearch(dict, requestsNumber);
  629. for (auto threads : threadNumbers) {
  630. std::cerr << "requestsNumber: " << requestsNumber
  631. << ", dictSize: " << dictSize
  632. << ", threads: " << threads << std::endl;
  633. auto tpKvDuration = BenchmarkKvThreadPool(threads, iterations, dict, keysToSearch);
  634. std::cerr << "kv search threadpool duration: " << tpKvDuration.count() << "us" << std::endl;
  635. auto [singleThreadKvDuration, singleThreadKvStat] = BenchmarkKvSingleThread(iterations, dict, keysToSearch);
  636. std::cerr << "kv search single thread duration: " << singleThreadKvDuration.count() << "us" << std::endl;
  637. std::vector<TDict> dicts(dictsNumber, dict);
  638. for (auto actors : actorNumbers) {
  639. std::cerr << "----" << std::endl
  640. << "requestsNumber: " << requestsNumber
  641. << ", dictsNumber: " << dictsNumber
  642. << ", dictSize: " << dictSize
  643. << ", threads: " << threads
  644. << ", actors: " << actors << std::endl;
  645. auto [actorKvDuration, actorKvStat] = BenchmarkKvActor(threads, actors, iterations, dicts, keysToSearch);
  646. std::cerr << "kv search actor duration: " << actorKvDuration.count() << "us" << std::endl;
  647. auto [actorKvExtDuration, actorKvExtStat] =
  648. BenchmarkKvActorExternalSender(threads, actors, iterations, dicts, keysToSearch);
  649. std::cerr << "kv search actor with external message sender duration: "
  650. << actorKvExtDuration.count() << "us" << std::endl;
  651. Y_UNUSED(actorKvExtStat);
  652. UNIT_ASSERT_EQUAL_C(actorKvStat, singleThreadKvStat,
  653. "single thread found/not found: " << singleThreadKvStat.Found << "/" << singleThreadKvStat.NotFound << "; "
  654. "actor stat found/not found: " << actorKvStat.Found << "/" << actorKvStat.NotFound);
  655. std::cout << requestsNumber << ","
  656. << dictsNumber << ","
  657. << dictSize << ","
  658. << threads << ","
  659. << actors << ","
  660. << actorKvDuration.count() << ","
  661. << actorKvExtDuration.count() << ","
  662. << tpKvDuration.count() << ","
  663. << singleThreadKvDuration.count() << std::endl;
  664. }
  665. std::cerr << "----" << std::endl;
  666. }
  667. }
  668. }
  669. }
  670. // vector sum benchmark
  671. i64 CalculateOddSum(const TContainer& numbers) {
  672. i64 result = 0;
  673. for (auto x : numbers) {
  674. if (x % 2 == 1) {
  675. result += x;
  676. }
  677. }
  678. return result;
  679. }
  680. TContainer prepareVectorToSum(const ui32 vectorSize) {
  681. TContainer numbers;
  682. numbers.reserve(vectorSize);
  683. for (ui32 i = 0; i < vectorSize; i++) {
  684. numbers.push_back(i + 1);
  685. }
  686. return numbers;
  687. }
  688. class TEvSumVector : public TEventLocal<TEvSumVector, EvSumVector> {
  689. public:
  690. const TContainer Numbers;
  691. public:
  692. TEvSumVector() = delete;
  693. TEvSumVector(TContainer&& numbers)
  694. : Numbers(std::move(numbers))
  695. {}
  696. };
  697. class TEvSumVectorResult : public TEventLocal<TEvSumVectorResult, EvSumVectorResult> {
  698. public:
  699. const i64 Sum = 0;
  700. public:
  701. TEvSumVectorResult(i64 sum)
  702. : Sum(sum)
  703. {}
  704. };
  705. class TEvSumSendRequests : public TEventLocal<TEvSumSendRequests, EvSumSendRequests> {
  706. public:
  707. const ui32 VectorSize;
  708. const ui32 RequestsNumber;
  709. const TActorIds ActorIds;
  710. public:
  711. TEvSumSendRequests() = delete;
  712. TEvSumSendRequests(ui32 vectorSize, ui32 requestsNumber, TActorIds actorIds)
  713. : VectorSize(vectorSize)
  714. , RequestsNumber(requestsNumber)
  715. , ActorIds(actorIds)
  716. {}
  717. };
  718. class TSumProxyActor : public TActorBootstrapped<TSumProxyActor> {
  719. private:
  720. i64 LastSum_ = 0;
  721. ui32 NumberOfResults_ = 0;
  722. ui32 ExpectedResults_ = 0;
  723. ui32 VectorSize_ = 0;
  724. TActorIds SumVectorActorIds_ = {};
  725. ui32 LastUsedActor_ = 0;
  726. std::mutex NumberOfResultsMutex_;
  727. std::condition_variable NumberOfResultsCv_;
  728. public:
  729. STFUNC(StateInit) {
  730. switch (ev->GetTypeRewrite()) {
  731. hFunc(TEvSumSendRequests, HandleRequest);
  732. hFunc(TEvSumVectorResult, HandleResult);
  733. default:
  734. Y_VERIFY(false);
  735. }
  736. }
  737. void Bootstrap() {
  738. Become(&TThis::StateInit);
  739. }
  740. i64 LastSum() {
  741. return LastSum_;
  742. }
  743. bool WaitForResults(std::chrono::microseconds timeout = 1ms, bool nonZero = true) {
  744. std::unique_lock lock(NumberOfResultsMutex_);
  745. NumberOfResultsCv_.wait_for(lock, timeout, [this, nonZero] {
  746. return ((nonZero && NumberOfResults_ != 0) || !nonZero)
  747. && NumberOfResults_ == ExpectedResults_;
  748. });
  749. return NumberOfResults_ == ExpectedResults_;
  750. }
  751. void ShiftLastUsedActor(ui32 shift) {
  752. LastUsedActor_ += shift;
  753. }
  754. private:
  755. TActorId NextActorId() {
  756. auto actorId = SumVectorActorIds_[LastUsedActor_ % SumVectorActorIds_.size()];
  757. LastUsedActor_ = (LastUsedActor_ + 1) % SumVectorActorIds_.size();
  758. return actorId;
  759. }
  760. bool SendVectorIfNeeded() {
  761. if (NumberOfResults_ < ExpectedResults_) {
  762. Send(NextActorId(), new TEvSumVector(prepareVectorToSum(VectorSize_)));
  763. return true;
  764. }
  765. return false;
  766. }
  767. void HandleRequest(TEvSumSendRequests::TPtr& ev) {
  768. auto evPtr = ev->Get();
  769. ExpectedResults_ = evPtr->RequestsNumber;
  770. VectorSize_ = evPtr->VectorSize;
  771. SumVectorActorIds_ = evPtr->ActorIds;
  772. {
  773. std::unique_lock lock(NumberOfResultsMutex_);
  774. NumberOfResults_ = 0;
  775. SendVectorIfNeeded();
  776. }
  777. }
  778. void HandleResult(TEvSumVectorResult::TPtr& ev) {
  779. LastSum_ = ev->Get()->Sum;
  780. {
  781. std::unique_lock lock(NumberOfResultsMutex_);
  782. NumberOfResults_++;
  783. if (!SendVectorIfNeeded()) {
  784. NumberOfResultsCv_.notify_all();
  785. }
  786. }
  787. }
  788. };
  789. class TSumVectorActor : public TActorBootstrapped<TSumVectorActor> {
  790. private:
  791. TActorId ResultActorId_;
  792. public:
  793. STFUNC(StateInit) {
  794. switch (ev->GetTypeRewrite()) {
  795. hFunc(TEvSumVector, Handle);
  796. default:
  797. Y_VERIFY(false);
  798. }
  799. }
  800. void Bootstrap() {
  801. Become(&TThis::StateInit);
  802. }
  803. private:
  804. void Handle(TEvSumVector::TPtr& ev) {
  805. auto evPtr = ev->Get();
  806. auto oddSum = CalculateOddSum(evPtr->Numbers);
  807. Send(ev->Sender, new TEvSumVectorResult(oddSum));
  808. }
  809. };
  810. std::vector<TActorId> prepareSumActors(TActorSystem* actorSystem, ui32 actorsNumber) {
  811. std::vector<TActorId> actorIds;
  812. actorIds.reserve(actorsNumber);
  813. for (ui32 i = 0; i < actorsNumber; i++) {
  814. actorIds.push_back(actorSystem->Register(new TSumVectorActor()));
  815. }
  816. return actorIds;
  817. }
  818. std::pair<std::vector<TSumProxyActor*>, std::vector<TActorId>> prepareProxyActors(
  819. TActorSystem* actorSystem, ui32 actorsNumber
  820. ) {
  821. std::pair<std::vector<TSumProxyActor*>, std::vector<TActorId>> result;
  822. auto& [actors, actorIds] = result;
  823. actors.reserve(actorsNumber);
  824. actorIds.reserve(actorsNumber);
  825. for (ui32 i = 0; i < actorsNumber; i++) {
  826. actors.push_back(new TSumProxyActor());
  827. actorIds.push_back(actorSystem->Register(actors.back()));
  828. actors.back()->ShiftLastUsedActor(i);
  829. }
  830. return result;
  831. }
  832. std::chrono::microseconds calcTimeoutForSumVector(
  833. ui32 vectorSize, ui32 iterations, ui32 proxyActorsNum, ui32 sumActorsNum, ui32 threadsNum
  834. ) {
  835. auto expectedMaxTimePerMillion = 100000us;
  836. auto vectorSizeRatio = vectorSize / 1000000 + 1;
  837. return expectedMaxTimePerMillion * vectorSizeRatio * iterations * proxyActorsNum / std::min(threadsNum, sumActorsNum);
  838. }
  839. bool WaitForSumActorResult(const std::vector<TSumProxyActor*>& actors, std::chrono::microseconds timeout = 1ms) {
  840. for (auto& actor : actors) {
  841. if (!actor->WaitForResults(timeout)) {
  842. return false;
  843. }
  844. }
  845. return true;
  846. }
  847. std::pair<std::chrono::microseconds, i64> BenchmarkSumVectorActor(
  848. ui32 threads,
  849. ui32 proxyActorsNumber,
  850. ui32 sumActorsNumber,
  851. ui32 iterations,
  852. ui32 vectorSize
  853. ) {
  854. auto actorSystem = PrepareActorSystem(threads);
  855. actorSystem->Start();
  856. auto sumActorIds = prepareSumActors(actorSystem.get(), sumActorsNumber);
  857. auto [proxyActors, proxyActorIds] = prepareProxyActors(actorSystem.get(), proxyActorsNumber);
  858. auto timeout = calcTimeoutForSumVector(vectorSize, iterations, proxyActorsNumber, sumActorsNumber, threads);
  859. BENCH_START(sumVectorActor);
  860. for (auto proxyActorId : proxyActorIds) {
  861. actorSystem->Send(proxyActorId, new TEvSumSendRequests(vectorSize, iterations, sumActorIds));
  862. }
  863. UNIT_ASSERT_C(WaitForSumActorResult(proxyActors, timeout), "timeout");
  864. auto totalDuration = std::chrono::duration_cast<std::chrono::microseconds>(BENCH_END(sumVectorActor));
  865. auto checkSum = proxyActors.back()->LastSum();
  866. return {totalDuration / iterations, checkSum};
  867. }
  868. Y_UNIT_TEST(SumVector) {
  869. using TVui64 = std::vector<ui64>;
  870. const bool forCI = true;
  871. const TVui64 vectorSizes = forCI ?
  872. TVui64{1'000, 1'000'000} : TVui64{1'000, 1'000'000, 10'000'000, 100'000'000};
  873. const TVui64 threadsNumbers = forCI ? TVui64{1} : TVui64{1, 4};
  874. const TVui64 proxyActorsNumbers = forCI ? TVui64{1} : TVui64{1, 4};
  875. const TVui64 sumActorsNumbers = forCI ? TVui64{1} : TVui64{1, 8, 32};
  876. const ui32 iterations = 30;
  877. std::cout << "sep=," << std::endl;
  878. std::cout << "size,threads,proxy_actors,sum_actors,duration(us)" << std::endl;
  879. for (auto vectorSize : vectorSizes) {
  880. for (auto threads : threadsNumbers) {
  881. for (auto proxyActors : proxyActorsNumbers) {
  882. for (auto sumActors : sumActorsNumbers) {
  883. std::cerr << "vector size: " << vectorSize
  884. << ", threads: " << threads
  885. << ", proxy actors: " << proxyActors
  886. << ", sum actors: " << sumActors << std::endl;
  887. auto [duration, resultSum] = BenchmarkSumVectorActor(
  888. threads, proxyActors, sumActors, iterations, vectorSize);
  889. std::cerr << "duration: " << duration.count() << "us" << std::endl;
  890. const i64 referenceSum = vectorSize * vectorSize / 4;
  891. UNIT_ASSERT_EQUAL_C(
  892. resultSum, referenceSum,
  893. resultSum << "!=" << referenceSum << "; failed on vectorSize=" << vectorSize
  894. << ", threads=" << threads
  895. << ", proxyActors=" << proxyActors
  896. << ", sumActors=" << sumActors);
  897. std::cout << vectorSize << ","
  898. << threads << ","
  899. << proxyActors << ","
  900. << sumActors << ","
  901. << duration.count()
  902. << std::endl;
  903. }
  904. }
  905. }
  906. }
  907. }
  908. }