mkql_combine_ut.cpp 78 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535
  1. #include "mkql_computation_node_ut.h"
  2. #include <yql/essentials/minikql/mkql_runtime_version.h>
  3. #include <yql/essentials/minikql/mkql_node_cast.h>
  4. #include <yql/essentials/minikql/mkql_string_util.h>
  5. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  6. #include <cstring>
  7. #include <random>
  8. #include <ctime>
  9. #include <algorithm>
  10. namespace NKikimr {
  11. namespace NMiniKQL {
  12. namespace {
  13. ui64 g_Yield = std::numeric_limits<ui64>::max();
  14. ui64 g_TestStreamData[] = {0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2};
  15. ui64 g_TestYieldStreamData[] = {0, 1, 2, g_Yield, 0, g_Yield, 1, 2, 0, 1, 2, 0, g_Yield, 1, 2};
  16. template <bool WithYields>
  17. class TTestStreamWrapper: public TMutableComputationNode<TTestStreamWrapper<WithYields>> {
  18. typedef TMutableComputationNode<TTestStreamWrapper<WithYields>> TBaseComputation;
  19. public:
  20. class TStreamValue : public TComputationValue<TStreamValue> {
  21. public:
  22. using TBase = TComputationValue<TStreamValue>;
  23. TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, const TTestStreamWrapper* parent)
  24. : TBase(memInfo)
  25. , CompCtx(compCtx)
  26. , Parent(parent)
  27. {
  28. }
  29. private:
  30. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  31. constexpr auto size = WithYields ? Y_ARRAY_SIZE(g_TestYieldStreamData) : Y_ARRAY_SIZE(g_TestStreamData);
  32. if (Index == size) {
  33. return NUdf::EFetchStatus::Finish;
  34. }
  35. const auto val = WithYields ? g_TestYieldStreamData[Index] : g_TestStreamData[Index];
  36. if (g_Yield == val) {
  37. ++Index;
  38. return NUdf::EFetchStatus::Yield;
  39. }
  40. NUdf::TUnboxedValue* items = nullptr;
  41. result = CompCtx.HolderFactory.CreateDirectArrayHolder(2, items);
  42. items[0] = NUdf::TUnboxedValuePod(val);
  43. if (((Index + 1) % Parent->PeakStep) == 0) {
  44. auto str = MakeStringNotFilled(64ul << 20);
  45. const auto& buf = str.AsStringRef();
  46. memset(buf.Data(), ' ', buf.Size());
  47. items[1] = std::move(str);
  48. } else {
  49. items[1] = NUdf::TUnboxedValuePod::Zero();
  50. }
  51. ++Index;
  52. return NUdf::EFetchStatus::Ok;
  53. }
  54. private:
  55. TComputationContext& CompCtx;
  56. const TTestStreamWrapper* const Parent;
  57. ui64 Index = 0;
  58. };
  59. TTestStreamWrapper(TComputationMutables& mutables, ui64 peakStep)
  60. : TBaseComputation(mutables)
  61. , PeakStep(peakStep)
  62. {
  63. }
  64. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  65. return ctx.HolderFactory.Create<TStreamValue>(ctx, this);
  66. }
  67. private:
  68. void RegisterDependencies() const final {
  69. }
  70. private:
  71. const ui64 PeakStep;
  72. };
  73. template <bool WithYields>
  74. IComputationNode* WrapTestStream(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  75. MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args");
  76. const ui64 peakStep = AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().Get<ui64>();
  77. return new TTestStreamWrapper<WithYields>(ctx.Mutables, peakStep);
  78. }
  79. TComputationNodeFactory GetNodeFactory() {
  80. return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
  81. if (callable.GetType()->GetName() == "TestList") {
  82. return new TExternalComputationNode(ctx.Mutables);
  83. }
  84. if (callable.GetType()->GetName() == "TestStream") {
  85. return WrapTestStream<false>(callable, ctx);
  86. }
  87. if (callable.GetType()->GetName() == "TestYieldStream") {
  88. return WrapTestStream<true>(callable, ctx);
  89. }
  90. return GetBuiltinFactory()(callable, ctx);
  91. };
  92. }
  93. template <bool LLVM, bool WithYields = false>
  94. TRuntimeNode MakeStream(TSetup<LLVM>& setup, ui64 peakStep) {
  95. TProgramBuilder& pb = *setup.PgmBuilder;
  96. TCallableBuilder callableBuilder(*setup.Env, WithYields ? "TestYieldStream" : "TestStream",
  97. pb.NewStreamType(
  98. pb.NewStructType({
  99. {TStringBuf("a"), pb.NewDataType(NUdf::EDataSlot::Uint64)},
  100. {TStringBuf("b"), pb.NewDataType(NUdf::EDataSlot::String)}
  101. })
  102. )
  103. );
  104. callableBuilder.Add(pb.NewDataLiteral(peakStep));
  105. return TRuntimeNode(callableBuilder.Build(), false);
  106. }
  107. template <bool OverFlow>
  108. TRuntimeNode Combine(TProgramBuilder& pb, TRuntimeNode stream, std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)> finishLambda) {
  109. const auto keyExtractor = [&](TRuntimeNode item) {
  110. return pb.Member(item, "a");
  111. };
  112. const auto init = [&](TRuntimeNode /*key*/, TRuntimeNode item) {
  113. return item;
  114. };
  115. const auto update = [&](TRuntimeNode /*key*/, TRuntimeNode item, TRuntimeNode state) {
  116. const auto a = pb.Add(pb.Member(item, "a"), pb.Member(state, "a"));
  117. const auto b = pb.Concat(pb.Member(item, "b"), pb.Member(state, "b"));
  118. return pb.NewStruct({
  119. {TStringBuf("a"), a},
  120. {TStringBuf("b"), b},
  121. });
  122. };
  123. return OverFlow ?
  124. pb.FromFlow(pb.CombineCore(pb.ToFlow(stream), keyExtractor, init, update, finishLambda, 64ul << 20)):
  125. pb.CombineCore(stream, keyExtractor, init, update, finishLambda, 64ul << 20);
  126. }
  127. TRuntimeNode Reduce(TProgramBuilder& pb, TRuntimeNode stream) {
  128. return pb.Condense(stream, pb.NewDataLiteral<ui64>(0),
  129. [&] (TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral<bool>(false); },
  130. [&] (TRuntimeNode item, TRuntimeNode state) { return pb.Add(state, item); }
  131. );
  132. }
  133. TRuntimeNode StreamToString(TProgramBuilder& pb, TRuntimeNode stream) {
  134. const auto sorted = pb.Sort(stream, pb.NewDataLiteral(true),
  135. [&](TRuntimeNode item) {
  136. return item;
  137. });
  138. return pb.Condense(sorted, pb.NewDataLiteral<NUdf::EDataSlot::String>("|"),
  139. [&] (TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral<bool>(false); },
  140. [&] (TRuntimeNode item, TRuntimeNode state) {
  141. return pb.Concat(pb.Concat(state, pb.ToString(item)), pb.NewDataLiteral<NUdf::EDataSlot::String>("|"));
  142. }
  143. );
  144. }
  145. } // unnamed
  146. Y_UNIT_TEST_SUITE(TMiniKQLCombineStreamTest) {
  147. Y_UNIT_TEST_LLVM(TestFullCombineWithOptOut) {
  148. TSetup<LLVM> setup(GetNodeFactory());
  149. TProgramBuilder& pb = *setup.PgmBuilder;
  150. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  151. return pb.NewOptional(pb.Member(state, "a"));
  152. };
  153. const auto stream = MakeStream(setup, Max<ui64>());
  154. const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
  155. const auto graph = setup.BuildGraph(pgm);
  156. const auto streamVal = graph->GetValue();
  157. NUdf::TUnboxedValue result;
  158. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  159. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|4|8|");
  160. }
  161. Y_UNIT_TEST_LLVM(TestFullCombineWithListOut) {
  162. TSetup<LLVM> setup(GetNodeFactory());
  163. TProgramBuilder& pb = *setup.PgmBuilder;
  164. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  165. const auto item = pb.Member(state, "a");
  166. const auto itemType = item.GetStaticType();
  167. auto list = pb.NewEmptyList(itemType);
  168. list = pb.Append(list, item);
  169. list = pb.Append(list, item);
  170. return list;
  171. };
  172. const auto stream = MakeStream(setup, Max<ui64>());
  173. const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
  174. const auto graph = setup.BuildGraph(pgm);
  175. const auto streamVal = graph->GetValue();
  176. NUdf::TUnboxedValue result;
  177. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  178. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|4|4|8|8|");
  179. }
  180. Y_UNIT_TEST_LLVM(TestFullCombineWithStreamOut) {
  181. TSetup<LLVM> setup(GetNodeFactory());
  182. TProgramBuilder& pb = *setup.PgmBuilder;
  183. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  184. const auto item = pb.Member(state, "a");
  185. const auto itemType = item.GetStaticType();
  186. auto list = pb.NewEmptyList(itemType);
  187. list = pb.Append(list, item);
  188. list = pb.Append(list, item);
  189. return pb.Iterator(list, MakeArrayRef(&state, 1));
  190. };
  191. const auto stream = MakeStream(setup, Max<ui64>());
  192. const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
  193. const auto graph = setup.BuildGraph(pgm);
  194. const auto streamVal = graph->GetValue();
  195. NUdf::TUnboxedValue result;
  196. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  197. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|4|4|8|8|");
  198. }
  199. Y_UNIT_TEST_LLVM(TestFullCombineWithOptOutAndYields) {
  200. TSetup<LLVM> setup(GetNodeFactory());
  201. TProgramBuilder& pb = *setup.PgmBuilder;
  202. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  203. return pb.NewOptional(pb.Member(state, "a"));
  204. };
  205. const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
  206. const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
  207. const auto graph = setup.BuildGraph(pgm);
  208. const auto streamVal = graph->GetValue();
  209. NUdf::TUnboxedValue result;
  210. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  211. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  212. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  213. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  214. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|1|1|2|2|2|4|");
  215. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
  216. }
  217. Y_UNIT_TEST_LLVM(TestFullCombineWithListAndYields) {
  218. TSetup<LLVM> setup(GetNodeFactory());
  219. TProgramBuilder& pb = *setup.PgmBuilder;
  220. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  221. const auto item = pb.Member(state, "a");
  222. const auto itemType = item.GetStaticType();
  223. auto list = pb.NewEmptyList(itemType);
  224. list = pb.Append(list, item);
  225. list = pb.Append(list, item);
  226. return list;
  227. };
  228. const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
  229. const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
  230. const auto graph = setup.BuildGraph(pgm);
  231. const auto streamVal = graph->GetValue();
  232. NUdf::TUnboxedValue result;
  233. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  234. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  235. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  236. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  237. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|0|1|1|1|1|2|2|2|2|2|2|4|4|");
  238. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
  239. }
  240. Y_UNIT_TEST_LLVM(TestFullCombineWithStreamAndYields) {
  241. TSetup<LLVM> setup(GetNodeFactory());
  242. TProgramBuilder& pb = *setup.PgmBuilder;
  243. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  244. const auto item = pb.Member(state, "a");
  245. const auto itemType = item.GetStaticType();
  246. auto list = pb.NewEmptyList(itemType);
  247. list = pb.Append(list, item);
  248. list = pb.Append(list, item);
  249. return pb.Iterator(list, MakeArrayRef(&state, 1));
  250. };
  251. const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
  252. const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
  253. const auto graph = setup.BuildGraph(pgm);
  254. const auto streamVal = graph->GetValue();
  255. NUdf::TUnboxedValue result;
  256. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  257. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  258. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  259. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  260. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|0|1|1|1|1|2|2|2|2|2|2|4|4|");
  261. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
  262. }
  263. Y_UNIT_TEST_LLVM(TestPartialFlush) {
  264. TSetup<LLVM> setup(GetNodeFactory());
  265. TProgramBuilder& pb = *setup.PgmBuilder;
  266. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  267. return pb.NewOptional(pb.Member(state, "a"));
  268. };
  269. const auto stream = MakeStream(setup, 6ul);
  270. const auto combine = Combine<false>(pb, stream, finish);
  271. {
  272. const auto pgm = Reduce(pb, combine);
  273. const auto graph = setup.BuildGraph(pgm);
  274. const auto streamVal = graph->GetValue();
  275. NUdf::TUnboxedValue result;
  276. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  277. UNIT_ASSERT_VALUES_EQUAL(result.Get<ui64>(), 12ul);
  278. }
  279. {
  280. const auto pgm = StreamToString(pb, combine);
  281. const auto graph = setup.BuildGraph(pgm);
  282. const auto streamVal = graph->GetValue();
  283. NUdf::TUnboxedValue result;
  284. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  285. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|2|2|4|4|");
  286. }
  287. }
  288. Y_UNIT_TEST_LLVM(TestCombineInSingleProc) {
  289. TSetup<LLVM> setup(GetNodeFactory());
  290. TProgramBuilder& pb = *setup.PgmBuilder;
  291. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  292. return pb.NewOptional(pb.Member(state, "a"));
  293. };
  294. const auto stream = MakeStream(setup, 6ul);
  295. const auto pgm = Reduce(pb, Combine<false>(pb, stream, finish));
  296. const auto graph = setup.BuildGraph(pgm, EGraphPerProcess::Single);
  297. const auto streamVal = graph->GetValue();
  298. NUdf::TUnboxedValue result;
  299. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  300. UNIT_ASSERT_VALUES_EQUAL(result.Get<ui64>(), 12ul);
  301. }
  302. Y_UNIT_TEST_LLVM(TestCombineSwithYield) {
  303. TSetup<LLVM> setup(GetNodeFactory());
  304. TProgramBuilder& pb = *setup.PgmBuilder;
  305. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  306. return pb.NewOptional(pb.Member(state, "a"));
  307. };
  308. auto stream = MakeStream(setup, Max<ui64>());
  309. TSwitchInput switchInput;
  310. switchInput.Indicies.push_back(0);
  311. switchInput.InputType = stream.GetStaticType();
  312. stream = pb.Switch(stream,
  313. MakeArrayRef(&switchInput, 1),
  314. [&](ui32 /*index*/, TRuntimeNode item) { return Combine<false>(pb, item, finish); },
  315. 1,
  316. pb.NewStreamType(pb.NewDataType(NUdf::EDataSlot::Uint64))
  317. );
  318. const auto pgm = StreamToString(pb, stream);
  319. const auto graph = setup.BuildGraph(pgm);
  320. const auto streamVal = graph->GetValue();
  321. NUdf::TUnboxedValue result;
  322. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  323. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|1|1|1|1|2|2|2|2|");
  324. }
  325. }
  326. Y_UNIT_TEST_SUITE(TMiniKQLCombineStreamPerfTest) {
  327. Y_UNIT_TEST_LLVM(TestSumDoubleBooleanKeys) {
  328. TSetup<LLVM> setup(GetNodeFactory());
  329. double positive = 0.0, negative = 0.0;
  330. const auto t = TInstant::Now();
  331. for (const auto& sample : I8Samples) {
  332. (sample.second > 0.0 ? positive : negative) += sample.second;
  333. }
  334. const auto cppTime = TInstant::Now() - t;
  335. TProgramBuilder& pb = *setup.PgmBuilder;
  336. const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
  337. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  338. const auto pgmReturn = pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
  339. [&](TRuntimeNode item) { return pb.AggrGreater(item, pb.NewDataLiteral(0.0)); },
  340. [&](TRuntimeNode, TRuntimeNode item) { return item; },
  341. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, item); },
  342. [&](TRuntimeNode, TRuntimeNode state) { return pb.NewOptional(state); },
  343. 0ULL
  344. );
  345. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  346. NUdf::TUnboxedValue* items = nullptr;
  347. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
  348. std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });
  349. NUdf::TUnboxedValue first, second;
  350. const auto t1 = TInstant::Now();
  351. const auto& value = graph->GetValue();
  352. UNIT_ASSERT_EQUAL(value.Fetch(first), NUdf::EFetchStatus::Ok);
  353. UNIT_ASSERT_EQUAL(value.Fetch(second), NUdf::EFetchStatus::Ok);
  354. const auto t2 = TInstant::Now();
  355. if (first.template Get<double>() > 0.0) {
  356. UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), positive);
  357. UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), negative);
  358. } else {
  359. UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), negative);
  360. UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), positive);
  361. }
  362. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  363. }
  364. Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleBooleanKeys) {
  365. TSetup<LLVM> setup(GetNodeFactory());
  366. double pSum = 0.0, nSum = 0.0, pMax = 0.0, nMax = -1000.0, pMin = 1000.0, nMin = 0.0;
  367. const auto t = TInstant::Now();
  368. for (const auto& sample : I8Samples) {
  369. if (sample.second > 0.0) {
  370. pSum += sample.second;
  371. pMax = std::max(pMax, sample.second);
  372. pMin = std::min(pMin, sample.second);
  373. } else {
  374. nSum += sample.second;
  375. nMax = std::max(nMax, sample.second);
  376. nMin = std::min(nMin, sample.second);
  377. }
  378. }
  379. const auto cppTime = TInstant::Now() - t;
  380. TProgramBuilder& pb = *setup.PgmBuilder;
  381. const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
  382. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  383. const auto pgmReturn = pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
  384. [&](TRuntimeNode item) { return pb.AggrGreater(item, pb.NewDataLiteral(0.0)); },
  385. [&](TRuntimeNode, TRuntimeNode item) { return pb.NewTuple({item, item, item}); },
  386. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), item), pb.AggrMin(pb.Nth(state, 1U), item), pb.AggrMax(pb.Nth(state, 2U), item) }); },
  387. [&](TRuntimeNode, TRuntimeNode state) { return pb.NewOptional(state); },
  388. 0ULL
  389. );
  390. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  391. NUdf::TUnboxedValue* items = nullptr;
  392. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
  393. std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });
  394. NUdf::TUnboxedValue first, second;
  395. const auto t1 = TInstant::Now();
  396. const auto& value = graph->GetValue();
  397. UNIT_ASSERT_EQUAL(value.Fetch(first), NUdf::EFetchStatus::Ok);
  398. UNIT_ASSERT_EQUAL(value.Fetch(second), NUdf::EFetchStatus::Ok);
  399. const auto t2 = TInstant::Now();
  400. if (first.GetElement(0).template Get<double>() > 0.0) {
  401. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), pSum);
  402. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), pMin);
  403. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), pMax);
  404. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), nSum);
  405. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), nMin);
  406. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), nMax);
  407. } else {
  408. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), nSum);
  409. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), nMin);
  410. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), nMax);
  411. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), pSum);
  412. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), pMin);
  413. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), pMax);
  414. }
  415. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  416. }
  417. Y_UNIT_TEST_LLVM(TestSumDoubleSmallKey) {
  418. TSetup<LLVM> setup(GetNodeFactory());
  419. std::unordered_map<i8, double> expects(201);
  420. const auto t = TInstant::Now();
  421. for (const auto& sample : I8Samples) {
  422. expects.emplace(sample.first, 0.0).first->second += sample.second;
  423. }
  424. const auto cppTime = TInstant::Now() - t;
  425. std::vector<std::pair<i8, double>> one, two;
  426. one.reserve(expects.size());
  427. two.reserve(expects.size());
  428. one.insert(one.cend(), expects.cbegin(), expects.cend());
  429. std::sort(one.begin(), one.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });
  430. TProgramBuilder& pb = *setup.PgmBuilder;
  431. const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
  432. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  433. const auto pgmReturn = pb.Collect(pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
  434. [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
  435. [&](TRuntimeNode, TRuntimeNode item) { return pb.Nth(item, 1U); },
  436. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, pb.Nth(item, 1U)); },
  437. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, state})); },
  438. 0ULL
  439. ));
  440. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  441. NUdf::TUnboxedValue* items = nullptr;
  442. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
  443. for (const auto& sample : I8Samples) {
  444. NUdf::TUnboxedValue* pair = nullptr;
  445. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
  446. pair[0] = NUdf::TUnboxedValuePod(sample.first);
  447. pair[1] = NUdf::TUnboxedValuePod(sample.second);
  448. }
  449. const auto t1 = TInstant::Now();
  450. const auto& value = graph->GetValue();
  451. const auto t2 = TInstant::Now();
  452. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  453. const auto ptr = value.GetElements();
  454. for (size_t i = 0ULL; i < expects.size(); ++i) {
  455. two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), ptr[i].GetElement(1).template Get<double>());
  456. }
  457. std::sort(two.begin(), two.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });
  458. UNIT_ASSERT_VALUES_EQUAL(one, two);
  459. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  460. }
  461. Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleSmallKey) {
  462. TSetup<LLVM> setup(GetNodeFactory());
  463. std::unordered_map<i8, std::array<double, 3U>> expects(201);
  464. const auto t = TInstant::Now();
  465. for (const auto& sample : I8Samples) {
  466. auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, std::numeric_limits<double>::max(), std::numeric_limits<double>::min()}).first->second;
  467. std::get<0U>(item) += sample.second;
  468. std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
  469. std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
  470. }
  471. const auto cppTime = TInstant::Now() - t;
  472. std::vector<std::pair<i8, std::array<double, 3U>>> one, two;
  473. one.reserve(expects.size());
  474. two.reserve(expects.size());
  475. one.insert(one.cend(), expects.cbegin(), expects.cend());
  476. std::sort(one.begin(), one.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });
  477. TProgramBuilder& pb = *setup.PgmBuilder;
  478. const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
  479. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  480. const auto pgmReturn = pb.Collect(pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
  481. [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
  482. [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
  483. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
  484. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
  485. 0ULL
  486. ));
  487. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  488. NUdf::TUnboxedValue* items = nullptr;
  489. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
  490. for (const auto& sample : I8Samples) {
  491. NUdf::TUnboxedValue* pair = nullptr;
  492. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
  493. pair[0] = NUdf::TUnboxedValuePod(sample.first);
  494. pair[1] = NUdf::TUnboxedValuePod(sample.second);
  495. }
  496. const auto t1 = TInstant::Now();
  497. const auto& value = graph->GetValue();
  498. const auto t2 = TInstant::Now();
  499. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  500. const auto ptr = value.GetElements();
  501. for (size_t i = 0ULL; i < expects.size(); ++i) {
  502. two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
  503. }
  504. std::sort(two.begin(), two.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });
  505. UNIT_ASSERT_VALUES_EQUAL(one, two);
  506. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  507. }
  508. Y_UNIT_TEST_LLVM(TestSumDoubleStringKey) {
  509. TSetup<LLVM> setup(GetNodeFactory());
  510. std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
  511. std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });
  512. std::unordered_map<std::string, double> expects(201);
  513. const auto t = TInstant::Now();
  514. for (const auto& sample : stringI8Samples) {
  515. expects.emplace(sample.first, 0.0).first->second += sample.second;
  516. }
  517. const auto cppTime = TInstant::Now() - t;
  518. std::vector<std::pair<std::string_view, double>> one, two;
  519. one.reserve(expects.size());
  520. two.reserve(expects.size());
  521. one.insert(one.cend(), expects.cbegin(), expects.cend());
  522. std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });
  523. TProgramBuilder& pb = *setup.PgmBuilder;
  524. const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
  525. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  526. const auto pgmReturn = pb.Collect(pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
  527. [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
  528. [&](TRuntimeNode, TRuntimeNode item) { return pb.Nth(item, 1U); },
  529. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, pb.Nth(item, 1U)); },
  530. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, state})); },
  531. 0ULL
  532. ));
  533. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  534. NUdf::TUnboxedValue* items = nullptr;
  535. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
  536. for (const auto& sample : stringI8Samples) {
  537. NUdf::TUnboxedValue* pair = nullptr;
  538. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
  539. pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
  540. pair[1] = NUdf::TUnboxedValuePod(sample.second);
  541. }
  542. const auto t1 = TInstant::Now();
  543. const auto& value = graph->GetValue();
  544. const auto t2 = TInstant::Now();
  545. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  546. const auto ptr = value.GetElements();
  547. for (size_t i = 0ULL; i < expects.size(); ++i) {
  548. two.emplace_back(ptr[i].GetElements()->AsStringRef(), ptr[i].GetElement(1).template Get<double>());
  549. }
  550. std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });
  551. UNIT_ASSERT_VALUES_EQUAL(one, two);
  552. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  553. }
  554. Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleStringKey) {
  555. TSetup<LLVM> setup(GetNodeFactory());
  556. std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
  557. std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });
  558. std::unordered_map<std::string, std::array<double, 3U>> expects(201);
  559. const auto t = TInstant::Now();
  560. for (const auto& sample : stringI8Samples) {
  561. auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
  562. std::get<0U>(item) += sample.second;
  563. std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
  564. std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
  565. }
  566. const auto cppTime = TInstant::Now() - t;
  567. std::vector<std::pair<std::string_view, std::array<double, 3U>>> one, two;
  568. one.reserve(expects.size());
  569. two.reserve(expects.size());
  570. one.insert(one.cend(), expects.cbegin(), expects.cend());
  571. std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });
  572. TProgramBuilder& pb = *setup.PgmBuilder;
  573. const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
  574. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  575. const auto pgmReturn = pb.Collect(pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
  576. [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
  577. [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
  578. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
  579. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
  580. 0ULL
  581. ));
  582. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  583. NUdf::TUnboxedValue* items = nullptr;
  584. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
  585. for (const auto& sample : stringI8Samples) {
  586. NUdf::TUnboxedValue* pair = nullptr;
  587. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
  588. pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
  589. pair[1] = NUdf::TUnboxedValuePod(sample.second);
  590. }
  591. const auto t1 = TInstant::Now();
  592. const auto& value = graph->GetValue();
  593. const auto t2 = TInstant::Now();
  594. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  595. const auto ptr = value.GetElements();
  596. for (size_t i = 0ULL; i < expects.size(); ++i) {
  597. two.emplace_back(ptr[i].GetElements()->AsStringRef(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
  598. }
  599. std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });
  600. UNIT_ASSERT_VALUES_EQUAL(one, two);
  601. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  602. }
  603. Y_UNIT_TEST_LLVM(TestMinMaxSumTupleKey) {
  604. TSetup<LLVM> setup(GetNodeFactory());
  605. std::vector<std::pair<std::pair<ui32, std::string>, double>> pairI8Samples(Ui16Samples.size());
  606. std::transform(Ui16Samples.cbegin(), Ui16Samples.cend(), pairI8Samples.begin(), [](std::pair<ui32, double> src){ return std::make_pair(std::make_pair(ui32(src.first / 10U % 100U), ToString(src.first % 10U)), src.second); });
  607. struct TPairHash { size_t operator()(const std::pair<ui16, std::string>& p) const { return CombineHashes(std::hash<ui32>()(p.first), std::hash<std::string_view>()(p.second)); } };
  608. std::unordered_map<std::pair<ui32, std::string>, std::array<double, 3U>, TPairHash> expects;
  609. const auto t = TInstant::Now();
  610. for (const auto& sample : pairI8Samples) {
  611. auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
  612. std::get<0U>(item) += sample.second;
  613. std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
  614. std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
  615. }
  616. const auto cppTime = TInstant::Now() - t;
  617. std::vector<std::pair<std::pair<ui32, std::string>, std::array<double, 3U>>> one, two;
  618. one.reserve(expects.size());
  619. two.reserve(expects.size());
  620. one.insert(one.cend(), expects.cbegin(), expects.cend());
  621. std::sort(one.begin(), one.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });
  622. TProgramBuilder& pb = *setup.PgmBuilder;
  623. const auto listType = pb.NewListType(pb.NewTupleType({pb.NewTupleType({pb.NewDataType(NUdf::TDataType<ui32>::Id), pb.NewDataType(NUdf::TDataType<const char*>::Id)}), pb.NewDataType(NUdf::TDataType<double>::Id)}));
  624. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  625. const auto pgmReturn = pb.Collect(pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
  626. [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
  627. [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
  628. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
  629. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
  630. 0ULL
  631. ));
  632. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  633. NUdf::TUnboxedValue* items = nullptr;
  634. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(pairI8Samples.size(), items));
  635. for (const auto& sample : pairI8Samples) {
  636. NUdf::TUnboxedValue* pair = nullptr;
  637. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
  638. pair[1] = NUdf::TUnboxedValuePod(sample.second);
  639. NUdf::TUnboxedValue* keys = nullptr;
  640. pair[0] = graph->GetHolderFactory().CreateDirectArrayHolder(2U, keys);
  641. keys[0] = NUdf::TUnboxedValuePod(sample.first.first);
  642. keys[1] = NUdf::TUnboxedValuePod::Embedded(sample.first.second);
  643. }
  644. const auto t1 = TInstant::Now();
  645. const auto& value = graph->GetValue();
  646. const auto t2 = TInstant::Now();
  647. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  648. const auto ptr = value.GetElements();
  649. for (size_t i = 0ULL; i < expects.size(); ++i) {
  650. const auto elements = ptr[i].GetElements();
  651. two.emplace_back(std::make_pair(elements[0].GetElement(0).template Get<ui32>(), (elements[0].GetElements()[1]).AsStringRef()), std::array<double, 3U>{elements[1].template Get<double>(), elements[2].template Get<double>(), elements[3].template Get<double>()});
  652. }
  653. std::sort(two.begin(), two.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });
  654. UNIT_ASSERT_VALUES_EQUAL(one, two);
  655. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  656. }
  657. }
  658. #if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 3u
  659. Y_UNIT_TEST_SUITE(TMiniKQLCombineFlowTest) {
  660. Y_UNIT_TEST_LLVM(TestFullCombineWithOptOut) {
  661. TSetup<LLVM> setup(GetNodeFactory());
  662. TProgramBuilder& pb = *setup.PgmBuilder;
  663. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  664. return pb.NewOptional(pb.Member(state, "a"));
  665. };
  666. const auto stream = MakeStream(setup, Max<ui64>());
  667. const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
  668. const auto graph = setup.BuildGraph(pgm);
  669. const auto streamVal = graph->GetValue();
  670. NUdf::TUnboxedValue result;
  671. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  672. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|4|8|");
  673. }
  674. Y_UNIT_TEST_LLVM(TestFullCombineWithListOut) {
  675. TSetup<LLVM> setup(GetNodeFactory());
  676. TProgramBuilder& pb = *setup.PgmBuilder;
  677. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  678. const auto item = pb.Member(state, "a");
  679. const auto itemType = item.GetStaticType();
  680. auto list = pb.NewEmptyList(itemType);
  681. list = pb.Append(list, item);
  682. list = pb.Append(list, item);
  683. return list;
  684. };
  685. const auto stream = MakeStream(setup, Max<ui64>());
  686. const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
  687. const auto graph = setup.BuildGraph(pgm);
  688. const auto streamVal = graph->GetValue();
  689. NUdf::TUnboxedValue result;
  690. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  691. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|4|4|8|8|");
  692. }
  693. Y_UNIT_TEST_LLVM(TestFullCombineWithStreamOut) {
  694. TSetup<LLVM> setup(GetNodeFactory());
  695. TProgramBuilder& pb = *setup.PgmBuilder;
  696. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  697. const auto item = pb.Member(state, "a");
  698. const auto itemType = item.GetStaticType();
  699. auto list = pb.NewEmptyList(itemType);
  700. list = pb.Append(list, item);
  701. list = pb.Append(list, item);
  702. return pb.Iterator(list, MakeArrayRef(&state, 1));
  703. };
  704. const auto stream = MakeStream(setup, Max<ui64>());
  705. const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
  706. const auto graph = setup.BuildGraph(pgm);
  707. const auto streamVal = graph->GetValue();
  708. NUdf::TUnboxedValue result;
  709. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  710. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|4|4|8|8|");
  711. }
  712. Y_UNIT_TEST_LLVM(TestFullCombineWithOptOutAndYields) {
  713. TSetup<LLVM> setup(GetNodeFactory());
  714. TProgramBuilder& pb = *setup.PgmBuilder;
  715. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  716. return pb.NewOptional(pb.Member(state, "a"));
  717. };
  718. const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
  719. const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
  720. const auto graph = setup.BuildGraph(pgm);
  721. const auto streamVal = graph->GetValue();
  722. NUdf::TUnboxedValue result;
  723. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  724. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  725. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  726. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  727. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|1|1|2|2|2|4|");
  728. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
  729. }
  730. Y_UNIT_TEST_LLVM(TestFullCombineWithListAndYields) {
  731. TSetup<LLVM> setup(GetNodeFactory());
  732. TProgramBuilder& pb = *setup.PgmBuilder;
  733. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  734. const auto item = pb.Member(state, "a");
  735. const auto itemType = item.GetStaticType();
  736. auto list = pb.NewEmptyList(itemType);
  737. list = pb.Append(list, item);
  738. list = pb.Append(list, item);
  739. return list;
  740. };
  741. const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
  742. const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
  743. const auto graph = setup.BuildGraph(pgm);
  744. const auto streamVal = graph->GetValue();
  745. NUdf::TUnboxedValue result;
  746. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  747. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  748. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  749. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  750. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|0|1|1|1|1|2|2|2|2|2|2|4|4|");
  751. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
  752. }
  753. Y_UNIT_TEST_LLVM(TestFullCombineWithStreamAndYields) {
  754. TSetup<LLVM> setup(GetNodeFactory());
  755. TProgramBuilder& pb = *setup.PgmBuilder;
  756. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  757. const auto item = pb.Member(state, "a");
  758. const auto itemType = item.GetStaticType();
  759. auto list = pb.NewEmptyList(itemType);
  760. list = pb.Append(list, item);
  761. list = pb.Append(list, item);
  762. return pb.Iterator(list, MakeArrayRef(&state, 1));
  763. };
  764. const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
  765. const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
  766. const auto graph = setup.BuildGraph(pgm);
  767. const auto streamVal = graph->GetValue();
  768. NUdf::TUnboxedValue result;
  769. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  770. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  771. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
  772. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  773. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|0|1|1|1|1|2|2|2|2|2|2|4|4|");
  774. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
  775. }
  776. Y_UNIT_TEST_LLVM(TestPartialFlush) {
  777. TSetup<LLVM> setup(GetNodeFactory());
  778. TProgramBuilder& pb = *setup.PgmBuilder;
  779. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  780. return pb.NewOptional(pb.Member(state, "a"));
  781. };
  782. const auto stream = MakeStream(setup, 6ul);
  783. const auto combine = Combine<true>(pb, stream, finish);
  784. {
  785. const auto pgm = Reduce(pb, combine);
  786. const auto graph = setup.BuildGraph(pgm);
  787. const auto streamVal = graph->GetValue();
  788. NUdf::TUnboxedValue result;
  789. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  790. UNIT_ASSERT_VALUES_EQUAL(result.Get<ui64>(), 12ul);
  791. }
  792. {
  793. const auto pgm = StreamToString(pb, combine);
  794. const auto graph = setup.BuildGraph(pgm);
  795. const auto streamVal = graph->GetValue();
  796. NUdf::TUnboxedValue result;
  797. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  798. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|2|2|4|4|");
  799. }
  800. }
  801. Y_UNIT_TEST_LLVM(TestCombineInSingleProc) {
  802. TSetup<LLVM> setup(GetNodeFactory());
  803. TProgramBuilder& pb = *setup.PgmBuilder;
  804. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  805. return pb.NewOptional(pb.Member(state, "a"));
  806. };
  807. const auto stream = MakeStream(setup, 6ul);
  808. const auto pgm = Reduce(pb, Combine<true>(pb, stream, finish));
  809. const auto graph = setup.BuildGraph(pgm, EGraphPerProcess::Single);
  810. const auto streamVal = graph->GetValue();
  811. NUdf::TUnboxedValue result;
  812. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  813. UNIT_ASSERT_VALUES_EQUAL(result.Get<ui64>(), 12ul);
  814. }
  815. Y_UNIT_TEST_LLVM(TestCombineSwithYield) {
  816. TSetup<LLVM> setup(GetNodeFactory());
  817. TProgramBuilder& pb = *setup.PgmBuilder;
  818. const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
  819. return pb.NewOptional(pb.Member(state, "a"));
  820. };
  821. auto stream = MakeStream(setup, Max<ui64>());
  822. TSwitchInput switchInput;
  823. switchInput.Indicies.push_back(0);
  824. switchInput.InputType = stream.GetStaticType();
  825. stream = pb.Switch(stream,
  826. MakeArrayRef(&switchInput, 1),
  827. [&](ui32 /*index*/, TRuntimeNode item) { return Combine<true>(pb, item, finish); },
  828. 1,
  829. pb.NewStreamType(pb.NewDataType(NUdf::EDataSlot::Uint64))
  830. );
  831. const auto pgm = StreamToString(pb, stream);
  832. const auto graph = setup.BuildGraph(pgm);
  833. const auto streamVal = graph->GetValue();
  834. NUdf::TUnboxedValue result;
  835. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  836. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|1|1|1|1|2|2|2|2|");
  837. }
  838. }
  839. Y_UNIT_TEST_SUITE(TMiniKQLCombineFlowPerfTest) {
  840. Y_UNIT_TEST_LLVM(TestSumDoubleBooleanKeys) {
  841. TSetup<LLVM> setup(GetNodeFactory());
  842. double positive = 0.0, negative = 0.0;
  843. const auto t = TInstant::Now();
  844. for (const auto& sample : I8Samples) {
  845. (sample.second > 0.0 ? positive : negative) += sample.second;
  846. }
  847. const auto cppTime = TInstant::Now() - t;
  848. TProgramBuilder& pb = *setup.PgmBuilder;
  849. const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
  850. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  851. const auto pgmReturn = pb.FromFlow(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
  852. [&](TRuntimeNode item) { return pb.AggrGreater(item, pb.NewDataLiteral(0.0)); },
  853. [&](TRuntimeNode, TRuntimeNode item) { return item; },
  854. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, item); },
  855. [&](TRuntimeNode, TRuntimeNode state) { return pb.NewOptional(state); },
  856. 0ULL
  857. ));
  858. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  859. NUdf::TUnboxedValue* items = nullptr;
  860. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
  861. std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });
  862. NUdf::TUnboxedValue first, second;
  863. const auto t1 = TInstant::Now();
  864. const auto& value = graph->GetValue();
  865. UNIT_ASSERT_EQUAL(value.Fetch(first), NUdf::EFetchStatus::Ok);
  866. UNIT_ASSERT_EQUAL(value.Fetch(second), NUdf::EFetchStatus::Ok);
  867. const auto t2 = TInstant::Now();
  868. if (first.template Get<double>() > 0.0) {
  869. UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), positive);
  870. UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), negative);
  871. } else {
  872. UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), negative);
  873. UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), positive);
  874. }
  875. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  876. }
  877. Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleBooleanKeys) {
  878. TSetup<LLVM> setup(GetNodeFactory());
  879. double pSum = 0.0, nSum = 0.0, pMax = 0.0, nMax = -1000.0, pMin = 1000.0, nMin = 0.0;
  880. const auto t = TInstant::Now();
  881. for (const auto& sample : I8Samples) {
  882. if (sample.second > 0.0) {
  883. pSum += sample.second;
  884. pMax = std::max(pMax, sample.second);
  885. pMin = std::min(pMin, sample.second);
  886. } else {
  887. nSum += sample.second;
  888. nMax = std::max(nMax, sample.second);
  889. nMin = std::min(nMin, sample.second);
  890. }
  891. }
  892. const auto cppTime = TInstant::Now() - t;
  893. TProgramBuilder& pb = *setup.PgmBuilder;
  894. const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
  895. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  896. const auto pgmReturn = pb.FromFlow(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
  897. [&](TRuntimeNode item) { return pb.AggrGreater(item, pb.NewDataLiteral(0.0)); },
  898. [&](TRuntimeNode, TRuntimeNode item) { return pb.NewTuple({item, item, item}); },
  899. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), item), pb.AggrMin(pb.Nth(state, 1U), item), pb.AggrMax(pb.Nth(state, 2U), item) }); },
  900. [&](TRuntimeNode, TRuntimeNode state) { return pb.NewOptional(state); },
  901. 0ULL
  902. ));
  903. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  904. NUdf::TUnboxedValue* items = nullptr;
  905. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
  906. std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });
  907. NUdf::TUnboxedValue first, second;
  908. const auto t1 = TInstant::Now();
  909. const auto& value = graph->GetValue();
  910. UNIT_ASSERT_EQUAL(value.Fetch(first), NUdf::EFetchStatus::Ok);
  911. UNIT_ASSERT_EQUAL(value.Fetch(second), NUdf::EFetchStatus::Ok);
  912. const auto t2 = TInstant::Now();
  913. if (first.GetElement(0).template Get<double>() > 0.0) {
  914. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), pSum);
  915. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), pMin);
  916. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), pMax);
  917. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), nSum);
  918. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), nMin);
  919. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), nMax);
  920. } else {
  921. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), nSum);
  922. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), nMin);
  923. UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), nMax);
  924. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), pSum);
  925. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), pMin);
  926. UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), pMax);
  927. }
  928. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  929. }
  930. Y_UNIT_TEST_LLVM(TestSumDoubleSmallKey) {
  931. TSetup<LLVM> setup(GetNodeFactory());
  932. std::unordered_map<i8, double> expects(201);
  933. const auto t = TInstant::Now();
  934. for (const auto& sample : I8Samples) {
  935. expects.emplace(sample.first, 0.0).first->second += sample.second;
  936. }
  937. const auto cppTime = TInstant::Now() - t;
  938. std::vector<std::pair<i8, double>> one, two;
  939. one.reserve(expects.size());
  940. two.reserve(expects.size());
  941. one.insert(one.cend(), expects.cbegin(), expects.cend());
  942. std::sort(one.begin(), one.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });
  943. TProgramBuilder& pb = *setup.PgmBuilder;
  944. const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
  945. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  946. const auto pgmReturn = pb.Collect(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
  947. [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
  948. [&](TRuntimeNode, TRuntimeNode item) { return pb.Nth(item, 1U); },
  949. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, pb.Nth(item, 1U)); },
  950. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, state})); },
  951. 0ULL
  952. ));
  953. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  954. NUdf::TUnboxedValue* items = nullptr;
  955. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
  956. for (const auto& sample : I8Samples) {
  957. NUdf::TUnboxedValue* pair = nullptr;
  958. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
  959. pair[0] = NUdf::TUnboxedValuePod(sample.first);
  960. pair[1] = NUdf::TUnboxedValuePod(sample.second);
  961. }
  962. const auto t1 = TInstant::Now();
  963. const auto& value = graph->GetValue();
  964. const auto t2 = TInstant::Now();
  965. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  966. const auto ptr = value.GetElements();
  967. for (size_t i = 0ULL; i < expects.size(); ++i) {
  968. two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), ptr[i].GetElement(1).template Get<double>());
  969. }
  970. std::sort(two.begin(), two.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });
  971. UNIT_ASSERT_VALUES_EQUAL(one, two);
  972. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  973. }
  974. Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleSmallKey) {
  975. TSetup<LLVM> setup(GetNodeFactory());
  976. std::unordered_map<i8, std::array<double, 3U>> expects(201);
  977. const auto t = TInstant::Now();
  978. for (const auto& sample : I8Samples) {
  979. auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, std::numeric_limits<double>::max(), std::numeric_limits<double>::min()}).first->second;
  980. std::get<0U>(item) += sample.second;
  981. std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
  982. std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
  983. }
  984. const auto cppTime = TInstant::Now() - t;
  985. std::vector<std::pair<i8, std::array<double, 3U>>> one, two;
  986. one.reserve(expects.size());
  987. two.reserve(expects.size());
  988. one.insert(one.cend(), expects.cbegin(), expects.cend());
  989. std::sort(one.begin(), one.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });
  990. TProgramBuilder& pb = *setup.PgmBuilder;
  991. const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
  992. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  993. const auto pgmReturn = pb.Collect(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
  994. [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
  995. [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
  996. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
  997. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
  998. 0ULL
  999. ));
  1000. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  1001. NUdf::TUnboxedValue* items = nullptr;
  1002. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
  1003. for (const auto& sample : I8Samples) {
  1004. NUdf::TUnboxedValue* pair = nullptr;
  1005. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
  1006. pair[0] = NUdf::TUnboxedValuePod(sample.first);
  1007. pair[1] = NUdf::TUnboxedValuePod(sample.second);
  1008. }
  1009. const auto t1 = TInstant::Now();
  1010. const auto& value = graph->GetValue();
  1011. const auto t2 = TInstant::Now();
  1012. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  1013. const auto ptr = value.GetElements();
  1014. for (size_t i = 0ULL; i < expects.size(); ++i) {
  1015. two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
  1016. }
  1017. std::sort(two.begin(), two.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });
  1018. UNIT_ASSERT_VALUES_EQUAL(one, two);
  1019. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  1020. }
  1021. Y_UNIT_TEST_LLVM(TestSumDoubleStringKey) {
  1022. TSetup<LLVM> setup(GetNodeFactory());
  1023. std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
  1024. std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });
  1025. std::unordered_map<std::string, double> expects(201);
  1026. const auto t = TInstant::Now();
  1027. for (const auto& sample : stringI8Samples) {
  1028. expects.emplace(sample.first, 0.0).first->second += sample.second;
  1029. }
  1030. const auto cppTime = TInstant::Now() - t;
  1031. std::vector<std::pair<std::string_view, double>> one, two;
  1032. one.reserve(expects.size());
  1033. two.reserve(expects.size());
  1034. one.insert(one.cend(), expects.cbegin(), expects.cend());
  1035. std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });
  1036. TProgramBuilder& pb = *setup.PgmBuilder;
  1037. const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
  1038. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  1039. const auto pgmReturn = pb.Collect(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
  1040. [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
  1041. [&](TRuntimeNode, TRuntimeNode item) { return pb.Nth(item, 1U); },
  1042. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, pb.Nth(item, 1U)); },
  1043. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, state})); },
  1044. 0ULL
  1045. ));
  1046. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  1047. NUdf::TUnboxedValue* items = nullptr;
  1048. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
  1049. for (const auto& sample : stringI8Samples) {
  1050. NUdf::TUnboxedValue* pair = nullptr;
  1051. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
  1052. pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
  1053. pair[1] = NUdf::TUnboxedValuePod(sample.second);
  1054. }
  1055. const auto t1 = TInstant::Now();
  1056. const auto& value = graph->GetValue();
  1057. const auto t2 = TInstant::Now();
  1058. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  1059. const auto ptr = value.GetElements();
  1060. for (size_t i = 0ULL; i < expects.size(); ++i) {
  1061. two.emplace_back(ptr[i].GetElements()->AsStringRef(), ptr[i].GetElement(1).template Get<double>());
  1062. }
  1063. std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });
  1064. UNIT_ASSERT_VALUES_EQUAL(one, two);
  1065. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  1066. }
  1067. Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleStringKey) {
  1068. TSetup<LLVM> setup(GetNodeFactory());
  1069. std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
  1070. std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });
  1071. std::unordered_map<std::string, std::array<double, 3U>> expects(201);
  1072. const auto t = TInstant::Now();
  1073. for (const auto& sample : stringI8Samples) {
  1074. auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
  1075. std::get<0U>(item) += sample.second;
  1076. std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
  1077. std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
  1078. }
  1079. const auto cppTime = TInstant::Now() - t;
  1080. std::vector<std::pair<std::string_view, std::array<double, 3U>>> one, two;
  1081. one.reserve(expects.size());
  1082. two.reserve(expects.size());
  1083. one.insert(one.cend(), expects.cbegin(), expects.cend());
  1084. std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });
  1085. TProgramBuilder& pb = *setup.PgmBuilder;
  1086. const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
  1087. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  1088. const auto pgmReturn = pb.Collect(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
  1089. [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
  1090. [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
  1091. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
  1092. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
  1093. 0ULL
  1094. ));
  1095. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  1096. NUdf::TUnboxedValue* items = nullptr;
  1097. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
  1098. for (const auto& sample : stringI8Samples) {
  1099. NUdf::TUnboxedValue* pair = nullptr;
  1100. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
  1101. pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
  1102. pair[1] = NUdf::TUnboxedValuePod(sample.second);
  1103. }
  1104. const auto t1 = TInstant::Now();
  1105. const auto& value = graph->GetValue();
  1106. const auto t2 = TInstant::Now();
  1107. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  1108. const auto ptr = value.GetElements();
  1109. for (size_t i = 0ULL; i < expects.size(); ++i) {
  1110. two.emplace_back(ptr[i].GetElements()->AsStringRef(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
  1111. }
  1112. std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });
  1113. UNIT_ASSERT_VALUES_EQUAL(one, two);
  1114. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  1115. }
  1116. Y_UNIT_TEST_LLVM(TestMinMaxSumTupleKey) {
  1117. TSetup<LLVM> setup(GetNodeFactory());
  1118. std::vector<std::pair<std::pair<ui32, std::string>, double>> pairI8Samples(Ui16Samples.size());
  1119. std::transform(Ui16Samples.cbegin(), Ui16Samples.cend(), pairI8Samples.begin(), [](std::pair<ui16, double> src){ return std::make_pair(std::make_pair(ui32(src.first / 10U % 100U), ToString(src.first % 10U)), src.second); });
  1120. struct TPairHash { size_t operator()(const std::pair<ui32, std::string>& p) const { return CombineHashes(std::hash<ui32>()(p.first), std::hash<std::string_view>()(p.second)); } };
  1121. std::unordered_map<std::pair<ui32, std::string>, std::array<double, 3U>, TPairHash> expects;
  1122. const auto t = TInstant::Now();
  1123. for (const auto& sample : pairI8Samples) {
  1124. auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
  1125. std::get<0U>(item) += sample.second;
  1126. std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
  1127. std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
  1128. }
  1129. const auto cppTime = TInstant::Now() - t;
  1130. std::vector<std::pair<std::pair<ui32, std::string>, std::array<double, 3U>>> one, two;
  1131. one.reserve(expects.size());
  1132. two.reserve(expects.size());
  1133. one.insert(one.cend(), expects.cbegin(), expects.cend());
  1134. std::sort(one.begin(), one.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });
  1135. TProgramBuilder& pb = *setup.PgmBuilder;
  1136. const auto listType = pb.NewListType(pb.NewTupleType({pb.NewTupleType({pb.NewDataType(NUdf::TDataType<ui32>::Id), pb.NewDataType(NUdf::TDataType<const char*>::Id)}), pb.NewDataType(NUdf::TDataType<double>::Id)}));
  1137. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  1138. const auto pgmReturn = pb.Collect(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
  1139. [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
  1140. [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
  1141. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
  1142. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
  1143. 0ULL
  1144. ));
  1145. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  1146. NUdf::TUnboxedValue* items = nullptr;
  1147. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(pairI8Samples.size(), items));
  1148. for (const auto& sample : pairI8Samples) {
  1149. NUdf::TUnboxedValue* pair = nullptr;
  1150. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
  1151. pair[1] = NUdf::TUnboxedValuePod(sample.second);
  1152. NUdf::TUnboxedValue* keys = nullptr;
  1153. pair[0] = graph->GetHolderFactory().CreateDirectArrayHolder(2U, keys);
  1154. keys[0] = NUdf::TUnboxedValuePod(sample.first.first);
  1155. keys[1] = NUdf::TUnboxedValuePod::Embedded(sample.first.second);
  1156. }
  1157. const auto t1 = TInstant::Now();
  1158. const auto& value = graph->GetValue();
  1159. const auto t2 = TInstant::Now();
  1160. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  1161. const auto ptr = value.GetElements();
  1162. for (size_t i = 0ULL; i < expects.size(); ++i) {
  1163. const auto elements = ptr[i].GetElements();
  1164. two.emplace_back(std::make_pair(elements[0].GetElement(0).template Get<ui32>(), (elements[0].GetElements()[1]).AsStringRef()), std::array<double, 3U>{elements[1].template Get<double>(), elements[2].template Get<double>(), elements[3].template Get<double>()});
  1165. }
  1166. std::sort(two.begin(), two.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });
  1167. UNIT_ASSERT_VALUES_EQUAL(one, two);
  1168. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  1169. }
  1170. const auto border = 9124596000000000ULL;
  1171. Y_UNIT_TEST_LLVM(TestTpch) {
  1172. TSetup<LLVM> setup(GetNodeFactory());
  1173. struct TPairHash { size_t operator()(const std::pair<std::string_view, std::string_view>& p) const { return CombineHashes(std::hash<std::string_view>()(p.first), std::hash<std::string_view>()(p.second)); } };
  1174. std::unordered_map<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>, TPairHash> expects;
  1175. const auto t = TInstant::Now();
  1176. for (auto& sample : TpchSamples) {
  1177. if (std::get<0U>(sample) <= border) {
  1178. const auto& ins = expects.emplace(std::pair<std::string_view, std::string_view>{std::get<1U>(sample), std::get<2U>(sample)}, std::pair<ui64, std::array<double, 5U>>{0ULL, {0., 0., 0., 0., 0.}});
  1179. auto& item = ins.first->second;
  1180. ++item.first;
  1181. std::get<0U>(item.second) += std::get<3U>(sample);
  1182. std::get<1U>(item.second) += std::get<5U>(sample);
  1183. std::get<2U>(item.second) += std::get<6U>(sample);
  1184. const auto v = std::get<3U>(sample) * (1. - std::get<5U>(sample));
  1185. std::get<3U>(item.second) += v;
  1186. std::get<4U>(item.second) += v * (1. + std::get<4U>(sample));
  1187. }
  1188. }
  1189. for (auto& item : expects) {
  1190. std::get<1U>(item.second.second) /= item.second.first;
  1191. }
  1192. const auto cppTime = TInstant::Now() - t;
  1193. std::vector<std::pair<std::pair<std::string, std::string>, std::pair<ui64, std::array<double, 5U>>>> one, two;
  1194. one.reserve(expects.size());
  1195. two.reserve(expects.size());
  1196. one.insert(one.cend(), expects.cbegin(), expects.cend());
  1197. std::sort(one.begin(), one.end(), [](const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> l, const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> r){ return l.first < r.first; });
  1198. TProgramBuilder& pb = *setup.PgmBuilder;
  1199. const auto listType = pb.NewListType(pb.NewTupleType({
  1200. pb.NewDataType(NUdf::TDataType<ui64>::Id),
  1201. pb.NewDataType(NUdf::TDataType<const char*>::Id),
  1202. pb.NewDataType(NUdf::TDataType<const char*>::Id),
  1203. pb.NewDataType(NUdf::TDataType<double>::Id),
  1204. pb.NewDataType(NUdf::TDataType<double>::Id),
  1205. pb.NewDataType(NUdf::TDataType<double>::Id),
  1206. pb.NewDataType(NUdf::TDataType<double>::Id)
  1207. }));
  1208. const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
  1209. const auto pgmReturn = pb.Collect(pb.CombineCore(
  1210. pb.Map(pb.Filter(pb.ToFlow(TRuntimeNode(list, false)),
  1211. [&](TRuntimeNode item) { return pb.AggrLessOrEqual(pb.Nth(item, 0U), pb.NewDataLiteral<ui64>(border)); }
  1212. ),
  1213. [&](TRuntimeNode item) { return pb.NewTuple({pb.Nth(item, 1U), pb.Nth(item, 2U),pb.Nth(item, 3U),pb.Nth(item, 4U),pb.Nth(item, 5U),pb.Nth(item, 6U)}); } ),
  1214. [&](TRuntimeNode item) { return pb.NewTuple({pb.Nth(item, 0U), pb.Nth(item, 1U)}); },
  1215. [&](TRuntimeNode, TRuntimeNode item) {
  1216. const auto price = pb.Nth(item, 2U);
  1217. const auto disco = pb.Nth(item, 4U);
  1218. const auto v = pb.Mul(price, pb.Sub(pb.NewDataLiteral<double>(1.), disco));
  1219. return pb.NewTuple({pb.NewDataLiteral<ui64>(1ULL), price, disco, pb.Nth(item, 5U), v, pb.Mul(v, pb.Add(pb.NewDataLiteral<double>(1.), pb.Nth(item, 3U))) });
  1220. },
  1221. [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) {
  1222. const auto price = pb.Nth(item, 2U);
  1223. const auto disco = pb.Nth(item, 4U);
  1224. const auto v = pb.Mul(price, pb.Sub(pb.NewDataLiteral<double>(1.), disco));
  1225. return pb.NewTuple({pb.Increment(pb.Nth(state, 0U)), pb.AggrAdd(pb.Nth(state, 1U), price), pb.AggrAdd(pb.Nth(state, 2U), disco), pb.AggrAdd(pb.Nth(state, 3U), pb.Nth(item, 5U)), pb.AggrAdd(pb.Nth(state, 4U), v), pb.AggrAdd(pb.Nth(state, 5U), pb.Mul(v, pb.Add(pb.NewDataLiteral<double>(1.), pb.Nth(item, 3U)))) });
  1226. },
  1227. [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({pb.Nth(key, 0U), pb.Nth(key, 1U), pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Div(pb.Nth(state, 2U), pb.Nth(state, 0U)), pb.Nth(state, 3U), pb.Nth(state, 4U), pb.Nth(state, 5U)})); },
  1228. 0ULL
  1229. ));
  1230. const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
  1231. NUdf::TUnboxedValue* items = nullptr;
  1232. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(TpchSamples.size(), items));
  1233. for (const auto& sample : TpchSamples) {
  1234. NUdf::TUnboxedValue* elements = nullptr;
  1235. *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(7U, elements);
  1236. elements[0] = NUdf::TUnboxedValuePod(std::get<0U>(sample));
  1237. elements[1] = NUdf::TUnboxedValuePod::Embedded(std::get<1U>(sample));
  1238. elements[2] = NUdf::TUnboxedValuePod::Embedded(std::get<2U>(sample));
  1239. elements[3] = NUdf::TUnboxedValuePod(std::get<3U>(sample));
  1240. elements[4] = NUdf::TUnboxedValuePod(std::get<4U>(sample));
  1241. elements[5] = NUdf::TUnboxedValuePod(std::get<5U>(sample));
  1242. elements[6] = NUdf::TUnboxedValuePod(std::get<6U>(sample));
  1243. }
  1244. const auto t1 = TInstant::Now();
  1245. const auto& value = graph->GetValue();
  1246. const auto t2 = TInstant::Now();
  1247. UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
  1248. const auto ptr = value.GetElements();
  1249. for (size_t i = 0ULL; i < expects.size(); ++i) {
  1250. const auto elements = ptr[i].GetElements();
  1251. two.emplace_back(std::make_pair(elements[0].AsStringRef(), elements[1].AsStringRef()), std::pair<ui64, std::array<double, 5U>>{elements[2].template Get<ui64>(), {elements[3].template Get<double>(), elements[4].template Get<double>(), elements[5].template Get<double>(), elements[6].template Get<double>(), elements[7].template Get<double>()}});
  1252. }
  1253. std::sort(two.begin(), two.end(), [](const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> l, const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> r){ return l.first < r.first; });
  1254. UNIT_ASSERT_VALUES_EQUAL(one, two);
  1255. Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
  1256. }
  1257. }
  1258. #endif
  1259. } // NMiniKQL
  1260. } // NKikimr