mkql_computation_pattern_cache_ut.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882
  1. #include "library/cpp/threading/local_executor/local_executor.h"
  2. #include "yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h"
  3. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  4. #include <yql/essentials/minikql/computation/mkql_computation_pattern_cache.h>
  5. #include <yql/essentials/minikql/mkql_type_builder.h>
  6. #include <yql/essentials/minikql/mkql_node_serialization.h>
  7. #include <yql/essentials/utils/yql_panic.h>
  8. #include <yql/essentials/minikql/mkql_node.h>
  9. #include <yql/essentials/minikql/mkql_program_builder.h>
  10. #include <yql/essentials/minikql/computation/mkql_computation_node.h>
  11. #include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
  12. #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
  13. #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
  14. #include <library/cpp/testing/unittest/registar.h>
  15. #include <util/datetime/cputimer.h>
  16. namespace NKikimr {
  17. namespace NMiniKQL {
  18. using namespace NYql::NUdf;
  19. TComputationNodeFactory GetListTestFactory() {
  20. return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
  21. if (callable.GetType()->GetName() == "TestList") {
  22. return new TExternalComputationNode(ctx.Mutables);
  23. }
  24. return GetBuiltinFactory()(callable, ctx);
  25. };
  26. }
  27. TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  28. if (list) {
  29. return pb.ToFlow(TRuntimeNode(list, false));
  30. } else {
  31. std::vector<const TRuntimeNode> arr;
  32. arr.reserve(vecSize);
  33. for (ui64 i = 0; i < vecSize; ++i) {
  34. arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234));
  35. }
  36. TArrayRef<const TRuntimeNode> arrRef(std::move(arr));
  37. return pb.ToFlow(pb.AsList(arrRef));
  38. }
  39. }
  40. template<bool Wide>
  41. TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable* list);
  42. template<>
  43. TRuntimeNode CreateFilter<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  44. TTimer t(TString(__func__) + ": ");
  45. auto flow = CreateFlow(pb, vecSize, list);
  46. auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
  47. return pb.AggrEquals(
  48. pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
  49. pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
  50. };
  51. return pb.Filter(flow, handler);
  52. }
  53. template<>
  54. TRuntimeNode CreateFilter<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  55. TTimer t(TString(__func__) + ": ");
  56. auto flow = CreateFlow(pb, vecSize, list);
  57. auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode {
  58. return pb.AggrEquals(
  59. pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
  60. pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
  61. };
  62. return pb.NarrowMap(
  63. pb.WideFilter(
  64. pb.ExpandMap(flow,
  65. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  66. ),
  67. handler
  68. ),
  69. [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
  70. );
  71. }
  72. template<bool Wide>
  73. TRuntimeNode CreateMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
  74. template<>
  75. TRuntimeNode CreateMap<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  76. TTimer t(TString(__func__) + ": ");
  77. auto flow = CreateFlow(pb, vecSize, list);
  78. auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
  79. return pb.AggrEquals(
  80. pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
  81. pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
  82. };
  83. return pb.Map(flow, handler);
  84. }
  85. template<>
  86. TRuntimeNode CreateMap<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  87. TTimer t(TString(__func__) + ": ");
  88. auto flow = CreateFlow(pb, vecSize, list);
  89. auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode::TList {
  90. return {pb.AggrEquals(
  91. pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
  92. pb.NewOptional(pb.NewDataLiteral<ui64>(0)))};
  93. };
  94. return pb.NarrowMap(
  95. pb.WideMap(
  96. pb.ExpandMap(flow,
  97. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  98. ),
  99. handler
  100. ),
  101. [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
  102. );
  103. }
  104. template<bool Wide>
  105. TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
  106. template<>
  107. TRuntimeNode CreateCondense<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  108. TTimer t(TString(__func__) + ": ");
  109. auto flow = CreateFlow(pb, vecSize, list);
  110. auto switcherHandler = [&](TRuntimeNode, TRuntimeNode) -> TRuntimeNode {
  111. return pb.NewDataLiteral<bool>(false);
  112. };
  113. auto updateHandler = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode {
  114. return pb.Add(item, state);
  115. };
  116. TRuntimeNode state = pb.NewDataLiteral<ui64>(0);
  117. return pb.Condense(flow, state, switcherHandler, updateHandler);
  118. }
  119. template<>
  120. TRuntimeNode CreateCondense<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  121. TTimer t(TString(__func__) + ": ");
  122. auto flow = CreateFlow(pb, vecSize, list);
  123. TRuntimeNode state = pb.NewDataLiteral<ui64>(0);
  124. return pb.NarrowMap(
  125. pb.WideCondense1(
  126. /* stream */
  127. pb.ExpandMap(flow,
  128. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  129. ),
  130. /* init */
  131. [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; },
  132. /* switcher */
  133. [&](TRuntimeNode::TList, TRuntimeNode::TList) -> TRuntimeNode { return pb.NewDataLiteral<bool>(false); },
  134. /* handler */
  135. [&](TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }
  136. ),
  137. [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
  138. );
  139. }
  140. template<bool Wide>
  141. TRuntimeNode CreateChopper(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
  142. template<>
  143. TRuntimeNode CreateChopper<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  144. TTimer t(TString(__func__) + ": ");
  145. auto flow = CreateFlow(pb, vecSize, list);
  146. return pb.Chopper(flow,
  147. /* keyExtractor */
  148. [&](TRuntimeNode item) -> TRuntimeNode { return item; },
  149. /* groupSwitch */
  150. [&](TRuntimeNode key, TRuntimeNode /*item*/) -> TRuntimeNode {
  151. return pb.AggrEquals(
  152. pb.Mod(key, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
  153. pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
  154. },
  155. /* groupHandler */
  156. [&](TRuntimeNode, TRuntimeNode list) -> TRuntimeNode { return list; }
  157. );
  158. };
  159. template<>
  160. TRuntimeNode CreateChopper<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  161. TTimer t(TString(__func__) + ": ");
  162. auto flow = CreateFlow(pb, vecSize, list);
  163. return pb.NarrowMap(
  164. pb.WideChopper(
  165. /* stream */
  166. pb.ExpandMap(flow,
  167. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  168. ),
  169. /* keyExtractor */
  170. [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; },
  171. /* groupSwitch */
  172. [&](TRuntimeNode::TList key, TRuntimeNode::TList /*item*/) -> TRuntimeNode {
  173. return pb.AggrEquals(
  174. pb.Mod(key.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
  175. pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
  176. },
  177. /* groupHandler */
  178. [&](TRuntimeNode::TList, TRuntimeNode input) { return pb.WideMap(input, [](TRuntimeNode::TList items) { return items; }); }
  179. ),
  180. [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
  181. );
  182. };
  183. template<bool Wide>
  184. TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
  185. template<>
  186. TRuntimeNode CreateCombine<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  187. TTimer t(TString(__func__) + ": ");
  188. auto flow = CreateFlow(pb, vecSize, list);
  189. return pb.CombineCore(
  190. /* stream */
  191. flow,
  192. /* keyExtractor */
  193. [&] (TRuntimeNode /*item*/) -> TRuntimeNode { return pb.NewDataLiteral<ui64>(0);},
  194. /* init */
  195. [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return item; },
  196. /* update */
  197. [&] (TRuntimeNode /* key */, TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); },
  198. /* finish */
  199. [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return pb.NewOptional(item); },
  200. /* memlimit */
  201. 64 << 20
  202. );
  203. };
  204. template<>
  205. TRuntimeNode CreateCombine<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  206. TTimer t(TString(__func__) + ": ");
  207. auto flow = CreateFlow(pb, vecSize, list);
  208. return pb.NarrowMap(
  209. pb.WideCombiner(
  210. /* stream */
  211. pb.ExpandMap(flow,
  212. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  213. ),
  214. /* memlimit */
  215. 64 << 20,
  216. /* keyExtractor */
  217. [&] (TRuntimeNode::TList /*item*/) -> TRuntimeNode::TList { return {pb.NewDataLiteral<ui64>(0)};},
  218. /* init */
  219. [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; },
  220. /* update */
  221. [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList {
  222. return {pb.Add(item.front(), state.front())};
  223. },
  224. /* finish */
  225. [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {pb.NewOptional(item.front())}; }
  226. ),
  227. [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
  228. );
  229. };
  230. template<bool Wide>
  231. TRuntimeNode CreateChain1Map(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
  232. template<>
  233. TRuntimeNode CreateChain1Map<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  234. TTimer t(TString(__func__) + ": ");
  235. auto flow = CreateFlow(pb, vecSize, list);
  236. return pb.Chain1Map(
  237. flow,
  238. /* init */
  239. [&] (TRuntimeNode item) -> TRuntimeNode { return item; },
  240. /* update */
  241. [&] (TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); }
  242. );
  243. }
  244. template<>
  245. TRuntimeNode CreateChain1Map<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
  246. TTimer t(TString(__func__) + ": ");
  247. auto flow = CreateFlow(pb, vecSize, list);
  248. return pb.NarrowMap(
  249. pb.WideChain1Map(
  250. /* stream */
  251. pb.ExpandMap(flow,
  252. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  253. ),
  254. /* init */
  255. [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; },
  256. /* update */
  257. [&] (TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }
  258. ),
  259. [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); }
  260. );
  261. }
  262. template<bool Wide>
  263. TRuntimeNode CreateDiscard(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
  264. TTimer t(TString(__func__) + ": ");
  265. auto flow = CreateFlow(pb, vecSize, list);
  266. if (Wide) {
  267. return pb.Discard(
  268. pb.ExpandMap(flow,
  269. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  270. )
  271. );
  272. } else {
  273. return pb.Discard(flow);
  274. }
  275. }
  276. template<bool Wide>
  277. TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
  278. TTimer t(TString(__func__) + ": ");
  279. auto flow = CreateFlow(pb, vecSize, list);
  280. auto count = pb.NewDataLiteral<ui64>(500);
  281. if (Wide) {
  282. return pb.NarrowMap(
  283. pb.Skip(
  284. pb.ExpandMap(flow,
  285. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  286. ),
  287. count
  288. ),
  289. [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); }
  290. );
  291. } else {
  292. return pb.Skip(flow, count);
  293. }
  294. }
  295. template<bool Flow>
  296. TRuntimeNode CreateNarrowFlatMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
  297. TTimer t(TString(__func__) + ": ");
  298. auto flow = CreateFlow(pb, vecSize, list);
  299. return pb.NarrowFlatMap(
  300. pb.ExpandMap(flow,
  301. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  302. ),
  303. [&] (TRuntimeNode::TList item) -> TRuntimeNode {
  304. auto x = pb.NewOptional(item.front());
  305. return Flow ? pb.ToFlow(x) : x;
  306. }
  307. );
  308. }
  309. TRuntimeNode CreateNarrowMultiMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
  310. TTimer t(TString(__func__) + ": ");
  311. auto flow = CreateFlow(pb, vecSize, list);
  312. return pb.NarrowMultiMap(
  313. pb.ExpandMap(flow,
  314. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  315. ),
  316. [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList {
  317. return {item.front(), item.front()};
  318. }
  319. );
  320. }
  321. template<bool WithPayload>
  322. TRuntimeNode CreateSqueezeToSortedDict(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
  323. TTimer t(TString(__func__) + ": ");
  324. auto flow = CreateFlow(pb, vecSize, list);
  325. return pb.FlatMap(
  326. pb.NarrowSqueezeToSortedDict(
  327. pb.ExpandMap(flow,
  328. [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
  329. ),
  330. /*all*/ false,
  331. /*keySelector*/ [&](TRuntimeNode::TList item) { return item.front(); },
  332. /*payloadSelector*/ [&](TRuntimeNode::TList ) { return WithPayload ? pb.NewDataLiteral<ui64>(0) : pb.NewVoid(); }
  333. ),
  334. [&] (TRuntimeNode item) { return pb.DictKeys(item); }
  335. );
  336. }
  337. TRuntimeNode CreateMapJoin(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
  338. TTimer t(TString(__func__) + ": ");
  339. auto flow = CreateFlow(pb, vecSize, list);
  340. const auto tupleType = pb.NewTupleType({
  341. pb.NewDataType(NUdf::TDataType<ui32>::Id),
  342. pb.NewDataType(NUdf::TDataType<ui64>::Id)
  343. });
  344. const auto list1 = pb.Map(flow, [&] (TRuntimeNode item) {
  345. return pb.NewTuple({pb.Mod(item, pb.NewDataLiteral<ui64>(1000)), pb.NewDataLiteral<ui32>(1)});
  346. });
  347. const auto list2 = pb.NewList(tupleType, {
  348. pb.NewTuple({pb.NewDataLiteral<ui32>(1), pb.NewDataLiteral<ui64>(3 * 1000)}),
  349. pb.NewTuple({pb.NewDataLiteral<ui32>(2), pb.NewDataLiteral<ui64>(4 * 1000)}),
  350. pb.NewTuple({pb.NewDataLiteral<ui32>(3), pb.NewDataLiteral<ui64>(5 * 1000)}),
  351. });
  352. const auto dict = pb.ToSortedDict(list2, false,
  353. [&](TRuntimeNode item) {
  354. return pb.Nth(item, 0);
  355. },
  356. [&](TRuntimeNode item) {
  357. return pb.NewTuple({pb.Nth(item, 1U)});
  358. });
  359. const auto resultType = pb.NewFlowType(pb.NewMultiType({
  360. pb.NewDataType(NUdf::TDataType<char*>::Id),
  361. pb.NewDataType(NUdf::TDataType<char*>::Id),
  362. }));
  363. return pb.Map(
  364. pb.NarrowMap(pb.MapJoinCore(
  365. pb.ExpandMap(list1, [&] (TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0), pb.Nth(item, 1)}; }),
  366. dict,
  367. EJoinKind::Inner,
  368. {0U},
  369. {1U, 0U},
  370. {0U, 1U},
  371. resultType
  372. ),
  373. [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
  374. ),
  375. [&](TRuntimeNode item) { return pb.Nth(item, 1); }
  376. );
  377. }
  378. Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
  379. template<class T>
  380. void ParallelProgTest(T f, bool useLLVM, ui64 testResult, size_t vecSize = 10'000) {
  381. TTimer t("total: ");
  382. const ui32 cacheSizeInBytes = 104857600; // 100 MiB
  383. const ui32 inFlight = 7;
  384. TComputationPatternLRUCache cache({cacheSizeInBytes, cacheSizeInBytes});
  385. auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
  386. auto entry = std::make_shared<TPatternCacheEntry>();
  387. TScopedAlloc& alloc = entry->Alloc;
  388. TTypeEnvironment& typeEnv = entry->Env;
  389. TProgramBuilder pb(typeEnv, *functionRegistry);
  390. const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
  391. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  392. TRuntimeNode progReturn;
  393. with_lock(alloc) {
  394. progReturn = f(pb, vecSize, list);
  395. }
  396. TExploringNodeVisitor explorer;
  397. explorer.Walk(progReturn.GetNode(), typeEnv);
  398. TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), functionRegistry.Get(),
  399. NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, useLLVM ? "" : "OFF", EGraphPerProcess::Multi);
  400. {
  401. auto guard = entry->Env.BindAllocator();
  402. entry->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
  403. }
  404. cache.EmplacePattern("a", entry);
  405. auto genData = [&]() {
  406. std::vector<ui64> data;
  407. data.reserve(vecSize);
  408. for (ui64 i = 0; i < vecSize; ++i) {
  409. data.push_back((i + 124515) % 6740234);
  410. }
  411. return data;
  412. };
  413. const auto data = genData();
  414. std::vector<std::vector<ui64>> results(inFlight);
  415. NPar::LocalExecutor().RunAdditionalThreads(inFlight);
  416. NPar::LocalExecutor().ExecRange([&](int id) {
  417. for (ui32 i = 0; i < 100; ++i) {
  418. auto key = "a";
  419. auto randomProvider = CreateDeterministicRandomProvider(1);
  420. auto timeProvider = CreateDeterministicTimeProvider(10000000);
  421. TScopedAlloc graphAlloc(__LOCATION__);
  422. auto entry = cache.Find(key);
  423. TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetListTestFactory(),
  424. functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
  425. useLLVM ? "" : "OFF", EGraphPerProcess::Multi);
  426. auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
  427. TUnboxedValue* items = nullptr;
  428. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
  429. std::transform(data.cbegin(), data.cend(), items,
  430. [](const auto s) {
  431. return ToValue<ui64>(s);
  432. });
  433. ui64 acc = 0;
  434. TUnboxedValue v = graph->GetValue();
  435. while (v.HasValue()) {
  436. acc += v.Get<ui64>();
  437. v = graph->GetValue();
  438. }
  439. results[id].push_back(acc);
  440. }
  441. }, 0, inFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
  442. for (auto threadResults : results) {
  443. for (auto res : threadResults) {
  444. UNIT_ASSERT_VALUES_EQUAL(res, testResult);
  445. }
  446. }
  447. }
  448. Y_UNIT_TEST_QUAD(Filter, Wide, UseLLVM) {
  449. ParallelProgTest(CreateFilter<Wide>, UseLLVM, 10098816);
  450. }
  451. Y_UNIT_TEST_QUAD(Map, Wide, UseLLVM) {
  452. ParallelProgTest(CreateMap<Wide>, UseLLVM, 78);
  453. }
  454. Y_UNIT_TEST_QUAD(Condense, Wide, UseLLVM) {
  455. ParallelProgTest(CreateCondense<Wide>, UseLLVM, 1295145000);
  456. }
  457. Y_UNIT_TEST_QUAD(Chopper, Wide, UseLLVM) {
  458. ParallelProgTest(CreateChopper<Wide>, UseLLVM, 1295145000);
  459. }
  460. Y_UNIT_TEST_QUAD(Combine, Wide, UseLLVM) {
  461. ParallelProgTest(CreateCombine<Wide>, UseLLVM, 1295145000);
  462. }
  463. Y_UNIT_TEST_QUAD(Chain1Map, Wide, UseLLVM) {
  464. ParallelProgTest(CreateChain1Map<Wide>, UseLLVM, 6393039240000);
  465. }
  466. Y_UNIT_TEST_QUAD(Discard, Wide, UseLLVM) {
  467. ParallelProgTest(CreateDiscard<Wide>, UseLLVM, 0);
  468. }
  469. Y_UNIT_TEST_QUAD(Skip, Wide, UseLLVM) {
  470. ParallelProgTest(CreateSkip<Wide>, UseLLVM, 1232762750);
  471. }
  472. Y_UNIT_TEST_QUAD(NarrowFlatMap, Flow, UseLLVM) {
  473. ParallelProgTest(CreateNarrowFlatMap<Flow>, UseLLVM, 1295145000);
  474. }
  475. Y_UNIT_TEST_TWIN(NarrowMultiMap, UseLLVM) {
  476. ParallelProgTest(CreateNarrowMultiMap, UseLLVM, 1295145000ull * 2);
  477. }
  478. Y_UNIT_TEST_QUAD(SqueezeToSortedDict, WithPayload, UseLLVM) {
  479. ParallelProgTest(CreateSqueezeToSortedDict<WithPayload>, UseLLVM, 125014500, 1000);
  480. }
  481. Y_UNIT_TEST_TWIN(MapJoin, UseLLVM) {
  482. ParallelProgTest(CreateMapJoin, UseLLVM, 120000, 10'000);
  483. }
  484. }
  485. Y_UNIT_TEST_SUITE(ComputationPatternCache) {
  486. Y_UNIT_TEST(Smoke) {
  487. const ui32 cacheSize = 10'000'000;
  488. const ui32 cacheItems = 10;
  489. TComputationPatternLRUCache cache({cacheSize, cacheSize});
  490. auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
  491. for (ui32 i = 0; i < cacheItems; ++i) {
  492. auto entry = std::make_shared<TPatternCacheEntry>();
  493. TScopedAlloc& alloc = entry->Alloc;
  494. TTypeEnvironment& typeEnv = entry->Env;
  495. TProgramBuilder pb(typeEnv, *functionRegistry);
  496. TRuntimeNode progReturn;
  497. with_lock(alloc) {
  498. progReturn = pb.NewDataLiteral<NYql::NUdf::EDataSlot::String>("qwerty");
  499. }
  500. TExploringNodeVisitor explorer;
  501. explorer.Walk(progReturn.GetNode(), typeEnv);
  502. TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
  503. functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
  504. "OFF", EGraphPerProcess::Multi);
  505. {
  506. auto guard = entry->Env.BindAllocator();
  507. entry->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
  508. }
  509. // XXX: There is no way to accurately define how the entry's
  510. // allocator obtains the memory pages: using the free ones from the
  511. // global page pool or the ones directly requested by <mmap>. At the
  512. // same time, it is the total allocated bytes (not just the number
  513. // of the borrowed pages) that is a good estimate of the memory
  514. // consumed by the pattern cache entry for real life workload.
  515. // Hence, to avoid undesired cache flushes, release the free pages
  516. // of the allocator of the particular entry.
  517. alloc.ReleaseFreePages();
  518. cache.EmplacePattern(TString((char)('a' + i)), entry);
  519. }
  520. for (ui32 i = 0; i < cacheItems; ++i) {
  521. auto key = TString((char)('a' + i));
  522. auto randomProvider = CreateDeterministicRandomProvider(1);
  523. auto timeProvider = CreateDeterministicTimeProvider(10000000);
  524. TScopedAlloc graphAlloc(__LOCATION__);
  525. auto entry = cache.Find(key);
  526. UNIT_ASSERT(entry);
  527. TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetBuiltinFactory(),
  528. functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
  529. "OFF", EGraphPerProcess::Multi);
  530. auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
  531. auto value = graph->GetValue();
  532. UNIT_ASSERT_EQUAL(NYql::NUdf::TStringRef("qwerty"), value.AsStringRef());
  533. }
  534. }
  535. Y_UNIT_TEST(DoubleNotifyPatternCompiled) {
  536. class TMockComputationPattern final : public IComputationPattern {
  537. public:
  538. explicit TMockComputationPattern(size_t codeSize) : Size_(codeSize) {}
  539. void Compile(TString, IStatsRegistry*) override { Compiled_ = true; }
  540. bool IsCompiled() const override { return Compiled_; }
  541. size_t CompiledCodeSize() const override { return Size_; }
  542. void RemoveCompiledCode() override { Compiled_ = false; }
  543. THolder<IComputationGraph> Clone(const TComputationOptsFull&) override { return {}; }
  544. bool GetSuitableForCache() const override { return true; }
  545. private:
  546. const size_t Size_;
  547. bool Compiled_ = false;
  548. };
  549. const TString key = "program";
  550. const ui32 cacheSize = 2;
  551. TComputationPatternLRUCache cache({cacheSize, cacheSize});
  552. auto entry = std::make_shared<TPatternCacheEntry>();
  553. entry->Pattern = MakeIntrusive<TMockComputationPattern>(1u);
  554. cache.EmplacePattern(key, entry);
  555. for (ui32 i = 0; i < cacheSize + 1; ++i) {
  556. entry->Pattern->Compile("", nullptr);
  557. cache.NotifyPatternCompiled(key);
  558. }
  559. entry = std::make_shared<TPatternCacheEntry>();
  560. entry->Pattern = MakeIntrusive<TMockComputationPattern>(cacheSize + 1);
  561. entry->Pattern->Compile("", nullptr);
  562. cache.EmplacePattern(key, entry);
  563. }
  564. Y_UNIT_TEST(AddPerf) {
  565. TTimer t("all: ");
  566. TScopedAlloc alloc(__LOCATION__);
  567. TTypeEnvironment typeEnv(alloc);
  568. auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
  569. TProgramBuilder pb(typeEnv, *functionRegistry);
  570. auto prog1 = pb.NewDataLiteral<ui64>(123591592ULL);
  571. auto prog2 = pb.NewDataLiteral<ui64>(323591592ULL);
  572. auto progReturn = pb.Add(prog1, prog2);
  573. TExploringNodeVisitor explorer;
  574. explorer.Walk(progReturn.GetNode(), typeEnv);
  575. NUdf::EValidateMode validateMode = NUdf::EValidateMode::Lazy;
  576. TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
  577. functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception,
  578. "OFF", EGraphPerProcess::Multi);
  579. auto t_make_pattern = std::make_unique<TTimer>("make_pattern: ");
  580. auto pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
  581. t_make_pattern.reset();
  582. auto randomProvider = CreateDeterministicRandomProvider(1);
  583. auto timeProvider = CreateDeterministicTimeProvider(10000000);
  584. auto t_clone = std::make_unique<TTimer>("clone: ");
  585. auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
  586. t_clone.reset();
  587. const ui64 repeats = 100'000;
  588. {
  589. TTimer t("graph: ");
  590. ui64 acc = 0;
  591. for (ui64 i = 0; i < repeats; ++i) {
  592. acc += graph->GetValue().Get<ui64>();
  593. }
  594. Y_DO_NOT_OPTIMIZE_AWAY(acc);
  595. }
  596. {
  597. std::function<ui64(ui64, ui64)> add = [](ui64 a, ui64 b) {
  598. return a + b;
  599. };
  600. TTimer t("lambda: ");
  601. ui64 acc = 0;
  602. for (ui64 i = 0; i < repeats; ++i) {
  603. acc += add(123591592ULL, 323591592ULL);
  604. }
  605. Y_DO_NOT_OPTIMIZE_AWAY(acc);
  606. }
  607. {
  608. std::function<TUnboxedValue(TUnboxedValue&, TUnboxedValue&)> add =
  609. [](TUnboxedValue& a, TUnboxedValue& b) {
  610. return TUnboxedValuePod(a.Get<ui64>() + b.Get<ui64>());
  611. };
  612. Y_DO_NOT_OPTIMIZE_AWAY(add);
  613. TTimer t("lambda unboxed value: ");
  614. TUnboxedValue acc(TUnboxedValuePod(0));
  615. TUnboxedValue v1(TUnboxedValuePod(ui64{123591592UL}));
  616. TUnboxedValue v2(TUnboxedValuePod(ui64{323591592UL}));
  617. for (ui64 i = 0; i < repeats; ++i) {
  618. auto r = add(v1, v2);
  619. acc = add(r, acc);
  620. }
  621. Y_DO_NOT_OPTIMIZE_AWAY(acc.Get<ui64>());
  622. }
  623. }
  624. Y_UNIT_TEST_TWIN(FilterPerf, Wide) {
  625. TScopedAlloc alloc(__LOCATION__);
  626. TTypeEnvironment typeEnv(alloc);
  627. auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
  628. TProgramBuilder pb(typeEnv, *functionRegistry);
  629. const ui64 vecSize = 100'000;
  630. Cerr << "vecSize: " << vecSize << Endl;
  631. const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
  632. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  633. auto progReturn = CreateFilter<Wide>(pb, vecSize, list);
  634. TExploringNodeVisitor explorer;
  635. explorer.Walk(progReturn.GetNode(), typeEnv);
  636. NUdf::EValidateMode validateMode = NUdf::EValidateMode::Max;
  637. TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(),
  638. functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception,
  639. "OFF", EGraphPerProcess::Multi);
  640. auto t_make_pattern = std::make_unique<TTimer>("make_pattern: ");
  641. auto pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
  642. t_make_pattern.reset();
  643. auto randomProvider = CreateDeterministicRandomProvider(1);
  644. auto timeProvider = CreateDeterministicTimeProvider(10000000);
  645. auto t_clone = std::make_unique<TTimer>("clone: ");
  646. auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
  647. t_clone.reset();
  648. auto genData = [&]() {
  649. std::vector<ui64> data;
  650. data.reserve(vecSize);
  651. for (ui64 i = 0; i < vecSize; ++i) {
  652. data.push_back((i + 124515) % 6740234);
  653. }
  654. return data;
  655. };
  656. auto testResult = [&] (ui64 acc, ui64 count) {
  657. if (vecSize == 100'000'000) {
  658. UNIT_ASSERT_VALUES_EQUAL(acc, 2614128386688);
  659. UNIT_ASSERT_VALUES_EQUAL(count, 781263);
  660. } else if (vecSize == 10'000'000) {
  661. UNIT_ASSERT_VALUES_EQUAL(acc, 222145217664);
  662. } else if (vecSize == 100'000) {
  663. UNIT_ASSERT_VALUES_EQUAL(acc, 136480896);
  664. UNIT_ASSERT_VALUES_EQUAL(count, 782);
  665. } else {
  666. UNIT_FAIL("result is not checked");
  667. }
  668. };
  669. ui64 kIter = 2;
  670. {
  671. TDuration total;
  672. for (ui64 i = 0; i < kIter; ++i) {
  673. ui64 acc = 0;
  674. ui64 count = 0;
  675. auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
  676. auto data = genData();
  677. TUnboxedValue* items = nullptr;
  678. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
  679. std::transform(data.cbegin(), data.cend(), items,
  680. [](const auto s) {
  681. return ToValue<ui64>(s);
  682. });
  683. TSimpleTimer t;
  684. TUnboxedValue v = graph->GetValue();
  685. while (v.HasValue()) {
  686. acc += v.Get<ui64>();
  687. ++count;
  688. v = graph->GetValue();
  689. }
  690. testResult(acc, count);
  691. total += t.Get();
  692. }
  693. Cerr << "graph: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
  694. }
  695. {
  696. auto data = genData();
  697. std::function<bool(ui64)> predicate = [](ui64 a) {
  698. return a % 128 == 0;
  699. };
  700. Y_DO_NOT_OPTIMIZE_AWAY(predicate);
  701. TDuration total;
  702. for (ui64 i = 0; i < kIter; ++i) {
  703. TSimpleTimer t;
  704. ui64 acc = 0;
  705. ui64 count = 0;
  706. for (ui64 j = 0; j < data.size(); ++j) {
  707. if (predicate(data[j])) {
  708. acc += data[j];
  709. ++count;
  710. }
  711. }
  712. total += t.Get();
  713. testResult(acc, count);
  714. }
  715. Cerr << "std::function: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
  716. }
  717. {
  718. auto data = genData();
  719. static auto predicate = [](ui64 a) {
  720. return a % 128 == 0;
  721. };
  722. Y_DO_NOT_OPTIMIZE_AWAY(predicate);
  723. TDuration total;
  724. for (ui64 i = 0; i < kIter; ++i) {
  725. TSimpleTimer t;
  726. ui64 acc = 0;
  727. ui64 count = 0;
  728. for (ui64 j = 0; j < data.size(); ++j) {
  729. if (predicate(data[j])) {
  730. acc += data[j];
  731. ++count;
  732. }
  733. }
  734. total += t.Get();
  735. testResult(acc, count);
  736. }
  737. Cerr << "lambda: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
  738. }
  739. }
  740. }
  741. }
  742. }