12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535 |
- #include "mkql_computation_node_ut.h"
- #include <yql/essentials/minikql/mkql_runtime_version.h>
- #include <yql/essentials/minikql/mkql_node_cast.h>
- #include <yql/essentials/minikql/mkql_string_util.h>
- #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
- #include <cstring>
- #include <random>
- #include <ctime>
- #include <algorithm>
- namespace NKikimr {
- namespace NMiniKQL {
- namespace {
- ui64 g_Yield = std::numeric_limits<ui64>::max();
- ui64 g_TestStreamData[] = {0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2};
- ui64 g_TestYieldStreamData[] = {0, 1, 2, g_Yield, 0, g_Yield, 1, 2, 0, 1, 2, 0, g_Yield, 1, 2};
- template <bool WithYields>
- class TTestStreamWrapper: public TMutableComputationNode<TTestStreamWrapper<WithYields>> {
- typedef TMutableComputationNode<TTestStreamWrapper<WithYields>> TBaseComputation;
- public:
- class TStreamValue : public TComputationValue<TStreamValue> {
- public:
- using TBase = TComputationValue<TStreamValue>;
- TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& compCtx, const TTestStreamWrapper* parent)
- : TBase(memInfo)
- , CompCtx(compCtx)
- , Parent(parent)
- {
- }
- private:
- NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
- constexpr auto size = WithYields ? Y_ARRAY_SIZE(g_TestYieldStreamData) : Y_ARRAY_SIZE(g_TestStreamData);
- if (Index == size) {
- return NUdf::EFetchStatus::Finish;
- }
- const auto val = WithYields ? g_TestYieldStreamData[Index] : g_TestStreamData[Index];
- if (g_Yield == val) {
- ++Index;
- return NUdf::EFetchStatus::Yield;
- }
- NUdf::TUnboxedValue* items = nullptr;
- result = CompCtx.HolderFactory.CreateDirectArrayHolder(2, items);
- items[0] = NUdf::TUnboxedValuePod(val);
- if (((Index + 1) % Parent->PeakStep) == 0) {
- auto str = MakeStringNotFilled(64ul << 20);
- const auto& buf = str.AsStringRef();
- memset(buf.Data(), ' ', buf.Size());
- items[1] = std::move(str);
- } else {
- items[1] = NUdf::TUnboxedValuePod::Zero();
- }
- ++Index;
- return NUdf::EFetchStatus::Ok;
- }
- private:
- TComputationContext& CompCtx;
- const TTestStreamWrapper* const Parent;
- ui64 Index = 0;
- };
- TTestStreamWrapper(TComputationMutables& mutables, ui64 peakStep)
- : TBaseComputation(mutables)
- , PeakStep(peakStep)
- {
- }
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- return ctx.HolderFactory.Create<TStreamValue>(ctx, this);
- }
- private:
- void RegisterDependencies() const final {
- }
- private:
- const ui64 PeakStep;
- };
- template <bool WithYields>
- IComputationNode* WrapTestStream(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args");
- const ui64 peakStep = AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().Get<ui64>();
- return new TTestStreamWrapper<WithYields>(ctx.Mutables, peakStep);
- }
- TComputationNodeFactory GetNodeFactory() {
- return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
- if (callable.GetType()->GetName() == "TestList") {
- return new TExternalComputationNode(ctx.Mutables);
- }
- if (callable.GetType()->GetName() == "TestStream") {
- return WrapTestStream<false>(callable, ctx);
- }
- if (callable.GetType()->GetName() == "TestYieldStream") {
- return WrapTestStream<true>(callable, ctx);
- }
- return GetBuiltinFactory()(callable, ctx);
- };
- }
- template <bool LLVM, bool WithYields = false>
- TRuntimeNode MakeStream(TSetup<LLVM>& setup, ui64 peakStep) {
- TProgramBuilder& pb = *setup.PgmBuilder;
- TCallableBuilder callableBuilder(*setup.Env, WithYields ? "TestYieldStream" : "TestStream",
- pb.NewStreamType(
- pb.NewStructType({
- {TStringBuf("a"), pb.NewDataType(NUdf::EDataSlot::Uint64)},
- {TStringBuf("b"), pb.NewDataType(NUdf::EDataSlot::String)}
- })
- )
- );
- callableBuilder.Add(pb.NewDataLiteral(peakStep));
- return TRuntimeNode(callableBuilder.Build(), false);
- }
- template <bool OverFlow>
- TRuntimeNode Combine(TProgramBuilder& pb, TRuntimeNode stream, std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)> finishLambda) {
- const auto keyExtractor = [&](TRuntimeNode item) {
- return pb.Member(item, "a");
- };
- const auto init = [&](TRuntimeNode /*key*/, TRuntimeNode item) {
- return item;
- };
- const auto update = [&](TRuntimeNode /*key*/, TRuntimeNode item, TRuntimeNode state) {
- const auto a = pb.Add(pb.Member(item, "a"), pb.Member(state, "a"));
- const auto b = pb.Concat(pb.Member(item, "b"), pb.Member(state, "b"));
- return pb.NewStruct({
- {TStringBuf("a"), a},
- {TStringBuf("b"), b},
- });
- };
- return OverFlow ?
- pb.FromFlow(pb.CombineCore(pb.ToFlow(stream), keyExtractor, init, update, finishLambda, 64ul << 20)):
- pb.CombineCore(stream, keyExtractor, init, update, finishLambda, 64ul << 20);
- }
- TRuntimeNode Reduce(TProgramBuilder& pb, TRuntimeNode stream) {
- return pb.Condense(stream, pb.NewDataLiteral<ui64>(0),
- [&] (TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral<bool>(false); },
- [&] (TRuntimeNode item, TRuntimeNode state) { return pb.Add(state, item); }
- );
- }
- TRuntimeNode StreamToString(TProgramBuilder& pb, TRuntimeNode stream) {
- const auto sorted = pb.Sort(stream, pb.NewDataLiteral(true),
- [&](TRuntimeNode item) {
- return item;
- });
- return pb.Condense(sorted, pb.NewDataLiteral<NUdf::EDataSlot::String>("|"),
- [&] (TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral<bool>(false); },
- [&] (TRuntimeNode item, TRuntimeNode state) {
- return pb.Concat(pb.Concat(state, pb.ToString(item)), pb.NewDataLiteral<NUdf::EDataSlot::String>("|"));
- }
- );
- }
- } // unnamed
- Y_UNIT_TEST_SUITE(TMiniKQLCombineStreamTest) {
- Y_UNIT_TEST_LLVM(TestFullCombineWithOptOut) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- return pb.NewOptional(pb.Member(state, "a"));
- };
- const auto stream = MakeStream(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|4|8|");
- }
- Y_UNIT_TEST_LLVM(TestFullCombineWithListOut) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- const auto item = pb.Member(state, "a");
- const auto itemType = item.GetStaticType();
- auto list = pb.NewEmptyList(itemType);
- list = pb.Append(list, item);
- list = pb.Append(list, item);
- return list;
- };
- const auto stream = MakeStream(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|4|4|8|8|");
- }
- Y_UNIT_TEST_LLVM(TestFullCombineWithStreamOut) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- const auto item = pb.Member(state, "a");
- const auto itemType = item.GetStaticType();
- auto list = pb.NewEmptyList(itemType);
- list = pb.Append(list, item);
- list = pb.Append(list, item);
- return pb.Iterator(list, MakeArrayRef(&state, 1));
- };
- const auto stream = MakeStream(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|4|4|8|8|");
- }
- Y_UNIT_TEST_LLVM(TestFullCombineWithOptOutAndYields) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- return pb.NewOptional(pb.Member(state, "a"));
- };
- const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|1|1|2|2|2|4|");
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
- }
- Y_UNIT_TEST_LLVM(TestFullCombineWithListAndYields) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- const auto item = pb.Member(state, "a");
- const auto itemType = item.GetStaticType();
- auto list = pb.NewEmptyList(itemType);
- list = pb.Append(list, item);
- list = pb.Append(list, item);
- return list;
- };
- const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|0|1|1|1|1|2|2|2|2|2|2|4|4|");
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
- }
- Y_UNIT_TEST_LLVM(TestFullCombineWithStreamAndYields) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- const auto item = pb.Member(state, "a");
- const auto itemType = item.GetStaticType();
- auto list = pb.NewEmptyList(itemType);
- list = pb.Append(list, item);
- list = pb.Append(list, item);
- return pb.Iterator(list, MakeArrayRef(&state, 1));
- };
- const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<false>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|0|1|1|1|1|2|2|2|2|2|2|4|4|");
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
- }
- Y_UNIT_TEST_LLVM(TestPartialFlush) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- return pb.NewOptional(pb.Member(state, "a"));
- };
- const auto stream = MakeStream(setup, 6ul);
- const auto combine = Combine<false>(pb, stream, finish);
- {
- const auto pgm = Reduce(pb, combine);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(result.Get<ui64>(), 12ul);
- }
- {
- const auto pgm = StreamToString(pb, combine);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|2|2|4|4|");
- }
- }
- Y_UNIT_TEST_LLVM(TestCombineInSingleProc) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- return pb.NewOptional(pb.Member(state, "a"));
- };
- const auto stream = MakeStream(setup, 6ul);
- const auto pgm = Reduce(pb, Combine<false>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm, EGraphPerProcess::Single);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(result.Get<ui64>(), 12ul);
- }
- Y_UNIT_TEST_LLVM(TestCombineSwithYield) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- return pb.NewOptional(pb.Member(state, "a"));
- };
- auto stream = MakeStream(setup, Max<ui64>());
- TSwitchInput switchInput;
- switchInput.Indicies.push_back(0);
- switchInput.InputType = stream.GetStaticType();
- stream = pb.Switch(stream,
- MakeArrayRef(&switchInput, 1),
- [&](ui32 /*index*/, TRuntimeNode item) { return Combine<false>(pb, item, finish); },
- 1,
- pb.NewStreamType(pb.NewDataType(NUdf::EDataSlot::Uint64))
- );
- const auto pgm = StreamToString(pb, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|1|1|1|1|2|2|2|2|");
- }
- }
- Y_UNIT_TEST_SUITE(TMiniKQLCombineStreamPerfTest) {
- Y_UNIT_TEST_LLVM(TestSumDoubleBooleanKeys) {
- TSetup<LLVM> setup(GetNodeFactory());
- double positive = 0.0, negative = 0.0;
- const auto t = TInstant::Now();
- for (const auto& sample : I8Samples) {
- (sample.second > 0.0 ? positive : negative) += sample.second;
- }
- const auto cppTime = TInstant::Now() - t;
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
- [&](TRuntimeNode item) { return pb.AggrGreater(item, pb.NewDataLiteral(0.0)); },
- [&](TRuntimeNode, TRuntimeNode item) { return item; },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, item); },
- [&](TRuntimeNode, TRuntimeNode state) { return pb.NewOptional(state); },
- 0ULL
- );
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
- std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });
- NUdf::TUnboxedValue first, second;
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- UNIT_ASSERT_EQUAL(value.Fetch(first), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_EQUAL(value.Fetch(second), NUdf::EFetchStatus::Ok);
- const auto t2 = TInstant::Now();
- if (first.template Get<double>() > 0.0) {
- UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), positive);
- UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), negative);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), negative);
- UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), positive);
- }
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleBooleanKeys) {
- TSetup<LLVM> setup(GetNodeFactory());
- double pSum = 0.0, nSum = 0.0, pMax = 0.0, nMax = -1000.0, pMin = 1000.0, nMin = 0.0;
- const auto t = TInstant::Now();
- for (const auto& sample : I8Samples) {
- if (sample.second > 0.0) {
- pSum += sample.second;
- pMax = std::max(pMax, sample.second);
- pMin = std::min(pMin, sample.second);
- } else {
- nSum += sample.second;
- nMax = std::max(nMax, sample.second);
- nMin = std::min(nMin, sample.second);
- }
- }
- const auto cppTime = TInstant::Now() - t;
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
- [&](TRuntimeNode item) { return pb.AggrGreater(item, pb.NewDataLiteral(0.0)); },
- [&](TRuntimeNode, TRuntimeNode item) { return pb.NewTuple({item, item, item}); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), item), pb.AggrMin(pb.Nth(state, 1U), item), pb.AggrMax(pb.Nth(state, 2U), item) }); },
- [&](TRuntimeNode, TRuntimeNode state) { return pb.NewOptional(state); },
- 0ULL
- );
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
- std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });
- NUdf::TUnboxedValue first, second;
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- UNIT_ASSERT_EQUAL(value.Fetch(first), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_EQUAL(value.Fetch(second), NUdf::EFetchStatus::Ok);
- const auto t2 = TInstant::Now();
- if (first.GetElement(0).template Get<double>() > 0.0) {
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), pSum);
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), pMin);
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), pMax);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), nSum);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), nMin);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), nMax);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), nSum);
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), nMin);
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), nMax);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), pSum);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), pMin);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), pMax);
- }
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestSumDoubleSmallKey) {
- TSetup<LLVM> setup(GetNodeFactory());
- std::unordered_map<i8, double> expects(201);
- const auto t = TInstant::Now();
- for (const auto& sample : I8Samples) {
- expects.emplace(sample.first, 0.0).first->second += sample.second;
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<i8, double>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
- [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
- [&](TRuntimeNode, TRuntimeNode item) { return pb.Nth(item, 1U); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, pb.Nth(item, 1U)); },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, state})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
- for (const auto& sample : I8Samples) {
- NUdf::TUnboxedValue* pair = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
- pair[0] = NUdf::TUnboxedValuePod(sample.first);
- pair[1] = NUdf::TUnboxedValuePod(sample.second);
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), ptr[i].GetElement(1).template Get<double>());
- }
- std::sort(two.begin(), two.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleSmallKey) {
- TSetup<LLVM> setup(GetNodeFactory());
- std::unordered_map<i8, std::array<double, 3U>> expects(201);
- const auto t = TInstant::Now();
- for (const auto& sample : I8Samples) {
- auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, std::numeric_limits<double>::max(), std::numeric_limits<double>::min()}).first->second;
- std::get<0U>(item) += sample.second;
- std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
- std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<i8, std::array<double, 3U>>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
- [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
- [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
- for (const auto& sample : I8Samples) {
- NUdf::TUnboxedValue* pair = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
- pair[0] = NUdf::TUnboxedValuePod(sample.first);
- pair[1] = NUdf::TUnboxedValuePod(sample.second);
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
- }
- std::sort(two.begin(), two.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestSumDoubleStringKey) {
- TSetup<LLVM> setup(GetNodeFactory());
- std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
- std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });
- std::unordered_map<std::string, double> expects(201);
- const auto t = TInstant::Now();
- for (const auto& sample : stringI8Samples) {
- expects.emplace(sample.first, 0.0).first->second += sample.second;
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<std::string_view, double>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
- [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
- [&](TRuntimeNode, TRuntimeNode item) { return pb.Nth(item, 1U); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, pb.Nth(item, 1U)); },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, state})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
- for (const auto& sample : stringI8Samples) {
- NUdf::TUnboxedValue* pair = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
- pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
- pair[1] = NUdf::TUnboxedValuePod(sample.second);
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- two.emplace_back(ptr[i].GetElements()->AsStringRef(), ptr[i].GetElement(1).template Get<double>());
- }
- std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleStringKey) {
- TSetup<LLVM> setup(GetNodeFactory());
- std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
- std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });
- std::unordered_map<std::string, std::array<double, 3U>> expects(201);
- const auto t = TInstant::Now();
- for (const auto& sample : stringI8Samples) {
- auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
- std::get<0U>(item) += sample.second;
- std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
- std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<std::string_view, std::array<double, 3U>>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
- [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
- [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
- for (const auto& sample : stringI8Samples) {
- NUdf::TUnboxedValue* pair = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
- pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
- pair[1] = NUdf::TUnboxedValuePod(sample.second);
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- two.emplace_back(ptr[i].GetElements()->AsStringRef(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
- }
- std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestMinMaxSumTupleKey) {
- TSetup<LLVM> setup(GetNodeFactory());
- std::vector<std::pair<std::pair<ui32, std::string>, double>> pairI8Samples(Ui16Samples.size());
- std::transform(Ui16Samples.cbegin(), Ui16Samples.cend(), pairI8Samples.begin(), [](std::pair<ui32, double> src){ return std::make_pair(std::make_pair(ui32(src.first / 10U % 100U), ToString(src.first % 10U)), src.second); });
- struct TPairHash { size_t operator()(const std::pair<ui16, std::string>& p) const { return CombineHashes(std::hash<ui32>()(p.first), std::hash<std::string_view>()(p.second)); } };
- std::unordered_map<std::pair<ui32, std::string>, std::array<double, 3U>, TPairHash> expects;
- const auto t = TInstant::Now();
- for (const auto& sample : pairI8Samples) {
- auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
- std::get<0U>(item) += sample.second;
- std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
- std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<std::pair<ui32, std::string>, std::array<double, 3U>>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({pb.NewTupleType({pb.NewDataType(NUdf::TDataType<ui32>::Id), pb.NewDataType(NUdf::TDataType<const char*>::Id)}), pb.NewDataType(NUdf::TDataType<double>::Id)}));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(pb.Iterator(TRuntimeNode(list, false), {}),
- [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
- [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(pairI8Samples.size(), items));
- for (const auto& sample : pairI8Samples) {
- NUdf::TUnboxedValue* pair = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
- pair[1] = NUdf::TUnboxedValuePod(sample.second);
- NUdf::TUnboxedValue* keys = nullptr;
- pair[0] = graph->GetHolderFactory().CreateDirectArrayHolder(2U, keys);
- keys[0] = NUdf::TUnboxedValuePod(sample.first.first);
- keys[1] = NUdf::TUnboxedValuePod::Embedded(sample.first.second);
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- const auto elements = ptr[i].GetElements();
- two.emplace_back(std::make_pair(elements[0].GetElement(0).template Get<ui32>(), (elements[0].GetElements()[1]).AsStringRef()), std::array<double, 3U>{elements[1].template Get<double>(), elements[2].template Get<double>(), elements[3].template Get<double>()});
- }
- std::sort(two.begin(), two.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- }
- #if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 3u
- Y_UNIT_TEST_SUITE(TMiniKQLCombineFlowTest) {
- Y_UNIT_TEST_LLVM(TestFullCombineWithOptOut) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- return pb.NewOptional(pb.Member(state, "a"));
- };
- const auto stream = MakeStream(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|4|8|");
- }
- Y_UNIT_TEST_LLVM(TestFullCombineWithListOut) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- const auto item = pb.Member(state, "a");
- const auto itemType = item.GetStaticType();
- auto list = pb.NewEmptyList(itemType);
- list = pb.Append(list, item);
- list = pb.Append(list, item);
- return list;
- };
- const auto stream = MakeStream(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|4|4|8|8|");
- }
- Y_UNIT_TEST_LLVM(TestFullCombineWithStreamOut) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- const auto item = pb.Member(state, "a");
- const auto itemType = item.GetStaticType();
- auto list = pb.NewEmptyList(itemType);
- list = pb.Append(list, item);
- list = pb.Append(list, item);
- return pb.Iterator(list, MakeArrayRef(&state, 1));
- };
- const auto stream = MakeStream(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|4|4|8|8|");
- }
- Y_UNIT_TEST_LLVM(TestFullCombineWithOptOutAndYields) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- return pb.NewOptional(pb.Member(state, "a"));
- };
- const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|1|1|2|2|2|4|");
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
- }
- Y_UNIT_TEST_LLVM(TestFullCombineWithListAndYields) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- const auto item = pb.Member(state, "a");
- const auto itemType = item.GetStaticType();
- auto list = pb.NewEmptyList(itemType);
- list = pb.Append(list, item);
- list = pb.Append(list, item);
- return list;
- };
- const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|0|1|1|1|1|2|2|2|2|2|2|4|4|");
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
- }
- Y_UNIT_TEST_LLVM(TestFullCombineWithStreamAndYields) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- const auto item = pb.Member(state, "a");
- const auto itemType = item.GetStaticType();
- auto list = pb.NewEmptyList(itemType);
- list = pb.Append(list, item);
- list = pb.Append(list, item);
- return pb.Iterator(list, MakeArrayRef(&state, 1));
- };
- const auto stream = MakeStream<LLVM, true>(setup, Max<ui64>());
- const auto pgm = StreamToString(pb, Combine<true>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Yield);
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|0|1|1|1|1|2|2|2|2|2|2|4|4|");
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Finish);
- }
- Y_UNIT_TEST_LLVM(TestPartialFlush) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- return pb.NewOptional(pb.Member(state, "a"));
- };
- const auto stream = MakeStream(setup, 6ul);
- const auto combine = Combine<true>(pb, stream, finish);
- {
- const auto pgm = Reduce(pb, combine);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(result.Get<ui64>(), 12ul);
- }
- {
- const auto pgm = StreamToString(pb, combine);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|2|2|4|4|");
- }
- }
- Y_UNIT_TEST_LLVM(TestCombineInSingleProc) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- return pb.NewOptional(pb.Member(state, "a"));
- };
- const auto stream = MakeStream(setup, 6ul);
- const auto pgm = Reduce(pb, Combine<true>(pb, stream, finish));
- const auto graph = setup.BuildGraph(pgm, EGraphPerProcess::Single);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(result.Get<ui64>(), 12ul);
- }
- Y_UNIT_TEST_LLVM(TestCombineSwithYield) {
- TSetup<LLVM> setup(GetNodeFactory());
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto finish = [&](TRuntimeNode /*key*/, TRuntimeNode state) {
- return pb.NewOptional(pb.Member(state, "a"));
- };
- auto stream = MakeStream(setup, Max<ui64>());
- TSwitchInput switchInput;
- switchInput.Indicies.push_back(0);
- switchInput.InputType = stream.GetStaticType();
- stream = pb.Switch(stream,
- MakeArrayRef(&switchInput, 1),
- [&](ui32 /*index*/, TRuntimeNode item) { return Combine<true>(pb, item, finish); },
- 1,
- pb.NewStreamType(pb.NewDataType(NUdf::EDataSlot::Uint64))
- );
- const auto pgm = StreamToString(pb, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|1|1|1|1|2|2|2|2|");
- }
- }
- Y_UNIT_TEST_SUITE(TMiniKQLCombineFlowPerfTest) {
- Y_UNIT_TEST_LLVM(TestSumDoubleBooleanKeys) {
- TSetup<LLVM> setup(GetNodeFactory());
- double positive = 0.0, negative = 0.0;
- const auto t = TInstant::Now();
- for (const auto& sample : I8Samples) {
- (sample.second > 0.0 ? positive : negative) += sample.second;
- }
- const auto cppTime = TInstant::Now() - t;
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.FromFlow(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
- [&](TRuntimeNode item) { return pb.AggrGreater(item, pb.NewDataLiteral(0.0)); },
- [&](TRuntimeNode, TRuntimeNode item) { return item; },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, item); },
- [&](TRuntimeNode, TRuntimeNode state) { return pb.NewOptional(state); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
- std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });
- NUdf::TUnboxedValue first, second;
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- UNIT_ASSERT_EQUAL(value.Fetch(first), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_EQUAL(value.Fetch(second), NUdf::EFetchStatus::Ok);
- const auto t2 = TInstant::Now();
- if (first.template Get<double>() > 0.0) {
- UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), positive);
- UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), negative);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(first.template Get<double>(), negative);
- UNIT_ASSERT_VALUES_EQUAL(second.template Get<double>(), positive);
- }
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleBooleanKeys) {
- TSetup<LLVM> setup(GetNodeFactory());
- double pSum = 0.0, nSum = 0.0, pMax = 0.0, nMax = -1000.0, pMin = 1000.0, nMin = 0.0;
- const auto t = TInstant::Now();
- for (const auto& sample : I8Samples) {
- if (sample.second > 0.0) {
- pSum += sample.second;
- pMax = std::max(pMax, sample.second);
- pMin = std::min(pMin, sample.second);
- } else {
- nSum += sample.second;
- nMax = std::max(nMax, sample.second);
- nMin = std::min(nMin, sample.second);
- }
- }
- const auto cppTime = TInstant::Now() - t;
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<double>::Id));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.FromFlow(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
- [&](TRuntimeNode item) { return pb.AggrGreater(item, pb.NewDataLiteral(0.0)); },
- [&](TRuntimeNode, TRuntimeNode item) { return pb.NewTuple({item, item, item}); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), item), pb.AggrMin(pb.Nth(state, 1U), item), pb.AggrMax(pb.Nth(state, 2U), item) }); },
- [&](TRuntimeNode, TRuntimeNode state) { return pb.NewOptional(state); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
- std::transform(I8Samples.cbegin(), I8Samples.cend(), items, [](const std::pair<i8, double> s){ return ToValue<double>(s.second); });
- NUdf::TUnboxedValue first, second;
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- UNIT_ASSERT_EQUAL(value.Fetch(first), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_EQUAL(value.Fetch(second), NUdf::EFetchStatus::Ok);
- const auto t2 = TInstant::Now();
- if (first.GetElement(0).template Get<double>() > 0.0) {
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), pSum);
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), pMin);
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), pMax);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), nSum);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), nMin);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), nMax);
- } else {
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(0).template Get<double>(), nSum);
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(1).template Get<double>(), nMin);
- UNIT_ASSERT_VALUES_EQUAL(first.GetElement(2).template Get<double>(), nMax);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(0).template Get<double>(), pSum);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(1).template Get<double>(), pMin);
- UNIT_ASSERT_VALUES_EQUAL(second.GetElement(2).template Get<double>(), pMax);
- }
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestSumDoubleSmallKey) {
- TSetup<LLVM> setup(GetNodeFactory());
- std::unordered_map<i8, double> expects(201);
- const auto t = TInstant::Now();
- for (const auto& sample : I8Samples) {
- expects.emplace(sample.first, 0.0).first->second += sample.second;
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<i8, double>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
- [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
- [&](TRuntimeNode, TRuntimeNode item) { return pb.Nth(item, 1U); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, pb.Nth(item, 1U)); },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, state})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
- for (const auto& sample : I8Samples) {
- NUdf::TUnboxedValue* pair = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
- pair[0] = NUdf::TUnboxedValuePod(sample.first);
- pair[1] = NUdf::TUnboxedValuePod(sample.second);
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), ptr[i].GetElement(1).template Get<double>());
- }
- std::sort(two.begin(), two.end(), [](const std::pair<i8, double> l, const std::pair<i8, double> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleSmallKey) {
- TSetup<LLVM> setup(GetNodeFactory());
- std::unordered_map<i8, std::array<double, 3U>> expects(201);
- const auto t = TInstant::Now();
- for (const auto& sample : I8Samples) {
- auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, std::numeric_limits<double>::max(), std::numeric_limits<double>::min()}).first->second;
- std::get<0U>(item) += sample.second;
- std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
- std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<i8, std::array<double, 3U>>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<i8>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
- [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
- [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(I8Samples.size(), items));
- for (const auto& sample : I8Samples) {
- NUdf::TUnboxedValue* pair = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
- pair[0] = NUdf::TUnboxedValuePod(sample.first);
- pair[1] = NUdf::TUnboxedValuePod(sample.second);
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- two.emplace_back(ptr[i].GetElement(0).template Get<i8>(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
- }
- std::sort(two.begin(), two.end(), [](const std::pair<i8, std::array<double, 3U>> l, const std::pair<i8, std::array<double, 3U>> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestSumDoubleStringKey) {
- TSetup<LLVM> setup(GetNodeFactory());
- std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
- std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });
- std::unordered_map<std::string, double> expects(201);
- const auto t = TInstant::Now();
- for (const auto& sample : stringI8Samples) {
- expects.emplace(sample.first, 0.0).first->second += sample.second;
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<std::string_view, double>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
- [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
- [&](TRuntimeNode, TRuntimeNode item) { return pb.Nth(item, 1U); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { return pb.AggrAdd(state, pb.Nth(item, 1U)); },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, state})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
- for (const auto& sample : stringI8Samples) {
- NUdf::TUnboxedValue* pair = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
- pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
- pair[1] = NUdf::TUnboxedValuePod(sample.second);
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- two.emplace_back(ptr[i].GetElements()->AsStringRef(), ptr[i].GetElement(1).template Get<double>());
- }
- std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, double> l, const std::pair<std::string_view, double> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestMinMaxSumDoubleStringKey) {
- TSetup<LLVM> setup(GetNodeFactory());
- std::vector<std::pair<std::string, double>> stringI8Samples(I8Samples.size());
- std::transform(I8Samples.cbegin(), I8Samples.cend(), stringI8Samples.begin(), [](std::pair<i8, double> src){ return std::make_pair(ToString(src.first), src.second); });
- std::unordered_map<std::string, std::array<double, 3U>> expects(201);
- const auto t = TInstant::Now();
- for (const auto& sample : stringI8Samples) {
- auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
- std::get<0U>(item) += sample.second;
- std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
- std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<std::string_view, std::array<double, 3U>>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({pb.NewDataType(NUdf::TDataType<const char*>::Id), pb.NewDataType(NUdf::TDataType<double>::Id)}));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
- [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
- [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(stringI8Samples.size(), items));
- for (const auto& sample : stringI8Samples) {
- NUdf::TUnboxedValue* pair = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
- pair[0] = NUdf::TUnboxedValuePod::Embedded(sample.first);
- pair[1] = NUdf::TUnboxedValuePod(sample.second);
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- two.emplace_back(ptr[i].GetElements()->AsStringRef(), std::array<double, 3U>{ptr[i].GetElement(1).template Get<double>(), ptr[i].GetElement(2).template Get<double>(), ptr[i].GetElement(3).template Get<double>()});
- }
- std::sort(two.begin(), two.end(), [](const std::pair<std::string_view, std::array<double, 3U>> l, const std::pair<std::string_view, std::array<double, 3U>> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- Y_UNIT_TEST_LLVM(TestMinMaxSumTupleKey) {
- TSetup<LLVM> setup(GetNodeFactory());
- std::vector<std::pair<std::pair<ui32, std::string>, double>> pairI8Samples(Ui16Samples.size());
- std::transform(Ui16Samples.cbegin(), Ui16Samples.cend(), pairI8Samples.begin(), [](std::pair<ui16, double> src){ return std::make_pair(std::make_pair(ui32(src.first / 10U % 100U), ToString(src.first % 10U)), src.second); });
- struct TPairHash { size_t operator()(const std::pair<ui32, std::string>& p) const { return CombineHashes(std::hash<ui32>()(p.first), std::hash<std::string_view>()(p.second)); } };
- std::unordered_map<std::pair<ui32, std::string>, std::array<double, 3U>, TPairHash> expects;
- const auto t = TInstant::Now();
- for (const auto& sample : pairI8Samples) {
- auto& item = expects.emplace(sample.first, std::array<double, 3U>{0.0, +1E7, -1E7}).first->second;
- std::get<0U>(item) += sample.second;
- std::get<1U>(item) = std::min(std::get<1U>(item), sample.second);
- std::get<2U>(item) = std::max(std::get<2U>(item), sample.second);
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<std::pair<ui32, std::string>, std::array<double, 3U>>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({pb.NewTupleType({pb.NewDataType(NUdf::TDataType<ui32>::Id), pb.NewDataType(NUdf::TDataType<const char*>::Id)}), pb.NewDataType(NUdf::TDataType<double>::Id)}));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(pb.ToFlow(TRuntimeNode(list, false)),
- [&](TRuntimeNode item) { return pb.Nth(item, 0U); },
- [&](TRuntimeNode, TRuntimeNode item) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({v, v, v}); },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) { const auto v = pb.Nth(item, 1U); return pb.NewTuple({pb.AggrAdd(pb.Nth(state, 0U), v), pb.AggrMin(pb.Nth(state, 1U), v), pb.AggrMax(pb.Nth(state, 2U), v)}); },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({key, pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Nth(state, 2U)})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(pairI8Samples.size(), items));
- for (const auto& sample : pairI8Samples) {
- NUdf::TUnboxedValue* pair = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(2U, pair);
- pair[1] = NUdf::TUnboxedValuePod(sample.second);
- NUdf::TUnboxedValue* keys = nullptr;
- pair[0] = graph->GetHolderFactory().CreateDirectArrayHolder(2U, keys);
- keys[0] = NUdf::TUnboxedValuePod(sample.first.first);
- keys[1] = NUdf::TUnboxedValuePod::Embedded(sample.first.second);
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- const auto elements = ptr[i].GetElements();
- two.emplace_back(std::make_pair(elements[0].GetElement(0).template Get<ui32>(), (elements[0].GetElements()[1]).AsStringRef()), std::array<double, 3U>{elements[1].template Get<double>(), elements[2].template Get<double>(), elements[3].template Get<double>()});
- }
- std::sort(two.begin(), two.end(), [](const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> l, const std::pair<std::pair<ui32, std::string_view>, std::array<double, 3U>> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- const auto border = 9124596000000000ULL;
- Y_UNIT_TEST_LLVM(TestTpch) {
- TSetup<LLVM> setup(GetNodeFactory());
- struct TPairHash { size_t operator()(const std::pair<std::string_view, std::string_view>& p) const { return CombineHashes(std::hash<std::string_view>()(p.first), std::hash<std::string_view>()(p.second)); } };
- std::unordered_map<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>, TPairHash> expects;
- const auto t = TInstant::Now();
- for (auto& sample : TpchSamples) {
- if (std::get<0U>(sample) <= border) {
- const auto& ins = expects.emplace(std::pair<std::string_view, std::string_view>{std::get<1U>(sample), std::get<2U>(sample)}, std::pair<ui64, std::array<double, 5U>>{0ULL, {0., 0., 0., 0., 0.}});
- auto& item = ins.first->second;
- ++item.first;
- std::get<0U>(item.second) += std::get<3U>(sample);
- std::get<1U>(item.second) += std::get<5U>(sample);
- std::get<2U>(item.second) += std::get<6U>(sample);
- const auto v = std::get<3U>(sample) * (1. - std::get<5U>(sample));
- std::get<3U>(item.second) += v;
- std::get<4U>(item.second) += v * (1. + std::get<4U>(sample));
- }
- }
- for (auto& item : expects) {
- std::get<1U>(item.second.second) /= item.second.first;
- }
- const auto cppTime = TInstant::Now() - t;
- std::vector<std::pair<std::pair<std::string, std::string>, std::pair<ui64, std::array<double, 5U>>>> one, two;
- one.reserve(expects.size());
- two.reserve(expects.size());
- one.insert(one.cend(), expects.cbegin(), expects.cend());
- std::sort(one.begin(), one.end(), [](const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> l, const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> r){ return l.first < r.first; });
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto listType = pb.NewListType(pb.NewTupleType({
- pb.NewDataType(NUdf::TDataType<ui64>::Id),
- pb.NewDataType(NUdf::TDataType<const char*>::Id),
- pb.NewDataType(NUdf::TDataType<const char*>::Id),
- pb.NewDataType(NUdf::TDataType<double>::Id),
- pb.NewDataType(NUdf::TDataType<double>::Id),
- pb.NewDataType(NUdf::TDataType<double>::Id),
- pb.NewDataType(NUdf::TDataType<double>::Id)
- }));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- const auto pgmReturn = pb.Collect(pb.CombineCore(
- pb.Map(pb.Filter(pb.ToFlow(TRuntimeNode(list, false)),
- [&](TRuntimeNode item) { return pb.AggrLessOrEqual(pb.Nth(item, 0U), pb.NewDataLiteral<ui64>(border)); }
- ),
- [&](TRuntimeNode item) { return pb.NewTuple({pb.Nth(item, 1U), pb.Nth(item, 2U),pb.Nth(item, 3U),pb.Nth(item, 4U),pb.Nth(item, 5U),pb.Nth(item, 6U)}); } ),
- [&](TRuntimeNode item) { return pb.NewTuple({pb.Nth(item, 0U), pb.Nth(item, 1U)}); },
- [&](TRuntimeNode, TRuntimeNode item) {
- const auto price = pb.Nth(item, 2U);
- const auto disco = pb.Nth(item, 4U);
- const auto v = pb.Mul(price, pb.Sub(pb.NewDataLiteral<double>(1.), disco));
- return pb.NewTuple({pb.NewDataLiteral<ui64>(1ULL), price, disco, pb.Nth(item, 5U), v, pb.Mul(v, pb.Add(pb.NewDataLiteral<double>(1.), pb.Nth(item, 3U))) });
- },
- [&](TRuntimeNode, TRuntimeNode item, TRuntimeNode state) {
- const auto price = pb.Nth(item, 2U);
- const auto disco = pb.Nth(item, 4U);
- const auto v = pb.Mul(price, pb.Sub(pb.NewDataLiteral<double>(1.), disco));
- return pb.NewTuple({pb.Increment(pb.Nth(state, 0U)), pb.AggrAdd(pb.Nth(state, 1U), price), pb.AggrAdd(pb.Nth(state, 2U), disco), pb.AggrAdd(pb.Nth(state, 3U), pb.Nth(item, 5U)), pb.AggrAdd(pb.Nth(state, 4U), v), pb.AggrAdd(pb.Nth(state, 5U), pb.Mul(v, pb.Add(pb.NewDataLiteral<double>(1.), pb.Nth(item, 3U)))) });
- },
- [&](TRuntimeNode key, TRuntimeNode state) { return pb.NewOptional(pb.NewTuple({pb.Nth(key, 0U), pb.Nth(key, 1U), pb.Nth(state, 0U), pb.Nth(state, 1U), pb.Div(pb.Nth(state, 2U), pb.Nth(state, 0U)), pb.Nth(state, 3U), pb.Nth(state, 4U), pb.Nth(state, 5U)})); },
- 0ULL
- ));
- const auto graph = setup.BuildGraph(pgmReturn, EGraphPerProcess::Multi, {list});
- NUdf::TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(TpchSamples.size(), items));
- for (const auto& sample : TpchSamples) {
- NUdf::TUnboxedValue* elements = nullptr;
- *items++ = graph->GetHolderFactory().CreateDirectArrayHolder(7U, elements);
- elements[0] = NUdf::TUnboxedValuePod(std::get<0U>(sample));
- elements[1] = NUdf::TUnboxedValuePod::Embedded(std::get<1U>(sample));
- elements[2] = NUdf::TUnboxedValuePod::Embedded(std::get<2U>(sample));
- elements[3] = NUdf::TUnboxedValuePod(std::get<3U>(sample));
- elements[4] = NUdf::TUnboxedValuePod(std::get<4U>(sample));
- elements[5] = NUdf::TUnboxedValuePod(std::get<5U>(sample));
- elements[6] = NUdf::TUnboxedValuePod(std::get<6U>(sample));
- }
- const auto t1 = TInstant::Now();
- const auto& value = graph->GetValue();
- const auto t2 = TInstant::Now();
- UNIT_ASSERT_VALUES_EQUAL(value.GetListLength(), expects.size());
- const auto ptr = value.GetElements();
- for (size_t i = 0ULL; i < expects.size(); ++i) {
- const auto elements = ptr[i].GetElements();
- two.emplace_back(std::make_pair(elements[0].AsStringRef(), elements[1].AsStringRef()), std::pair<ui64, std::array<double, 5U>>{elements[2].template Get<ui64>(), {elements[3].template Get<double>(), elements[4].template Get<double>(), elements[5].template Get<double>(), elements[6].template Get<double>(), elements[7].template Get<double>()}});
- }
- std::sort(two.begin(), two.end(), [](const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> l, const std::pair<std::pair<std::string_view, std::string_view>, std::pair<ui64, std::array<double, 5U>>> r){ return l.first < r.first; });
- UNIT_ASSERT_VALUES_EQUAL(one, two);
- Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
- }
- }
- #endif
- } // NMiniKQL
- } // NKikimr
|