123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487 |
- #include "mkql_computation_node_ut.h"
- #include <yql/essentials/minikql/mkql_node_cast.h>
- #include <yql/essentials/minikql/mkql_string_util.h>
- namespace NKikimr {
- namespace NMiniKQL {
- namespace {
- template<bool UseLLVM>
- TRuntimeNode MakeStream(TSetup<UseLLVM>& setup, ui64 count = 9U) {
- TProgramBuilder& pb = *setup.PgmBuilder;
- TCallableBuilder callableBuilder(*setup.Env, "TestStream",
- pb.NewStreamType(
- pb.NewDataType(NUdf::EDataSlot::Uint64)
- )
- );
- callableBuilder.Add(pb.NewDataLiteral(count));
- return TRuntimeNode(callableBuilder.Build(), false);
- }
- template<bool UseLLVM>
- TRuntimeNode MakeFlow(TSetup<UseLLVM>& setup, ui64 count = 9U) {
- TProgramBuilder& pb = *setup.PgmBuilder;
- return pb.ToFlow(MakeStream<UseLLVM>(setup, count));
- }
- template<bool UseLLVM>
- TRuntimeNode GroupWithBomb(TSetup<UseLLVM>& setup, TRuntimeNode stream) {
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto keyExtractor = [&](TRuntimeNode item) { return item; };
- const auto groupSwitch = [&](TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral<bool>(false); };
- return pb.Chopper(stream, keyExtractor, groupSwitch, [&](TRuntimeNode, TRuntimeNode group) {
- const auto bomb = pb.NewDataLiteral<NUdf::EDataSlot::String>("BOMB");
- return pb.Ensure(pb.Map(group, [&] (TRuntimeNode) { return bomb; }), pb.NewDataLiteral<bool>(false), bomb, "", 0, 0);
- });
- }
- template<bool UseLLVM>
- TRuntimeNode Group(TSetup<UseLLVM>& setup, TRuntimeNode stream, const std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)>& groupSwitch) {
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto keyExtractor = [&](TRuntimeNode item) { return item; };
- return pb.Chopper(stream, keyExtractor, groupSwitch, [&](TRuntimeNode key, TRuntimeNode grpItem) {
- return pb.Condense(grpItem, pb.NewDataLiteral<NUdf::EDataSlot::String>("*"),
- [&] (TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral<bool>(false); },
- [&] (TRuntimeNode item, TRuntimeNode state) {
- auto res = pb.Concat(pb.ToString(key), pb.ToString(item));
- res = pb.Concat(state, res);
- return pb.Concat(res, pb.NewDataLiteral<NUdf::EDataSlot::String>("*"));
- });
- });
- }
- template<bool UseLLVM>
- TRuntimeNode GroupGetKeysFirst(TSetup<UseLLVM>& setup, TRuntimeNode stream, const std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)>& groupSwitch) {
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto keyExtractor = [&](TRuntimeNode item) { return item; };
- const bool isFlow = stream.GetStaticType()->IsFlow();
- return pb.Chopper(stream, keyExtractor, groupSwitch, [&](TRuntimeNode key, TRuntimeNode grpItem) {
- auto list = pb.ToList(pb.AggrConcat(
- pb.NewOptional(pb.Concat(pb.ToString(key), pb.NewDataLiteral<NUdf::EDataSlot::String>(":"))),
- pb.ToOptional(pb.Collect(pb.Condense1(grpItem,
- [&] (TRuntimeNode item) { return pb.ToString(item); },
- [&] (TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral<bool>(false); },
- [&] (TRuntimeNode item, TRuntimeNode state) {
- return pb.Concat(state, pb.ToString(item));
- }
- )))
- ));
- return isFlow ? pb.ToFlow(list) : pb.Iterator(list, {});
- });
- }
- template<bool UseLLVM>
- TRuntimeNode GroupKeys(TSetup<UseLLVM>& setup, TRuntimeNode stream, const std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)>& groupSwitch) {
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto keyExtractor = [&](TRuntimeNode item) { return item; };
- return pb.Chopper(stream, keyExtractor, groupSwitch,
- [&](TRuntimeNode key, TRuntimeNode group) {
- return pb.Map(pb.Take(group, pb.NewDataLiteral<ui64>(1ULL)), [&](TRuntimeNode) { return pb.ToString(key); });
- }
- );
- }
- template<bool UseLLVM>
- TRuntimeNode StreamToString(TSetup<UseLLVM>& setup, TRuntimeNode stream) {
- TProgramBuilder& pb = *setup.PgmBuilder;
- stream = pb.Condense(stream, pb.NewDataLiteral<NUdf::EDataSlot::String>("|"),
- [&] (TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral(false); },
- [&] (TRuntimeNode item, TRuntimeNode state) {
- return pb.Concat(pb.Concat(state, item), pb.NewDataLiteral<NUdf::EDataSlot::String>("|"));
- }
- );
- if (stream.GetStaticType()->IsFlow()) {
- stream = pb.FromFlow(stream);
- }
- return stream;
- }
- } // unnamed
- Y_UNIT_TEST_SUITE(TMiniKQLChopperStreamTest) {
- Y_UNIT_TEST_LLVM(TestEmpty) {
- TSetup<LLVM> setup;
- const auto stream = GroupWithBomb(setup, MakeStream(setup, 0U));
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|");
- }
- Y_UNIT_TEST_LLVM(TestGrouping) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeStream(setup);
- stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- Y_UNUSED(key);
- return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*|*00*01*|*00*|*00*|*00*01*02*03*|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingGetKeysFirst) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeStream(setup);
- stream = GroupGetKeysFirst(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- Y_UNUSED(key);
- return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0:0|0:01|0:0|0:0|0:0123|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingKeyNotEquals) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeStream(setup);
- stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- return pb.NotEquals(item, key);
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*00*|*11*|*00*00*00*|*11*|*22*|*33*|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingWithEmptyInput) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeStream(setup, 0);
- stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- Y_UNUSED(key);
- return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|");
- }
- Y_UNIT_TEST_LLVM(TestSingleGroup) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeStream(setup);
- stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- Y_UNUSED(key);
- Y_UNUSED(item);
- return pb.NewDataLiteral<bool>(false);
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*00*01*00*00*00*01*02*03*|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingWithYield) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeStream(setup);
- TSwitchInput switchInput;
- switchInput.Indicies.push_back(0);
- switchInput.InputType = stream.GetStaticType();
- stream = pb.Switch(stream,
- MakeArrayRef(&switchInput, 1),
- [&](ui32 /*index*/, TRuntimeNode item1) {
- return Group(setup, item1, [&](TRuntimeNode key, TRuntimeNode item2) {
- Y_UNUSED(key);
- return pb.Equals(item2, pb.NewDataLiteral<ui64>(0));
- });
- },
- 1,
- pb.NewStreamType(pb.NewDataType(NUdf::EDataSlot::String))
- );
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*|*00*01*|*00*|*00*|*00*01*02*03*|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingWithCutSubStreams) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeStream(setup);
- stream = GroupKeys(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- Y_UNUSED(key);
- return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingWithYieldAndCutSubStreams) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeStream(setup);
- TSwitchInput switchInput;
- switchInput.Indicies.push_back(0);
- switchInput.InputType = stream.GetStaticType();
- stream = pb.Switch(stream,
- MakeArrayRef(&switchInput, 1),
- [&](ui32 /*index*/, TRuntimeNode item1) {
- return GroupKeys(setup, item1, [&](TRuntimeNode key, TRuntimeNode item2) {
- Y_UNUSED(key);
- return pb.Equals(item2, pb.NewDataLiteral<ui64>(0));
- });
- },
- 1,
- pb.NewStreamType(pb.NewDataType(NUdf::EDataSlot::String))
- );
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
- }
- }
- #if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 9u
- Y_UNIT_TEST_SUITE(TMiniKQLChopperFlowTest) {
- Y_UNIT_TEST_LLVM(TestEmpty) {
- TSetup<LLVM> setup;
- const auto stream = GroupWithBomb(setup, MakeFlow(setup, 0U));
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|");
- }
- Y_UNIT_TEST_LLVM(TestGrouping) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeFlow(setup);
- stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- Y_UNUSED(key);
- return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*|*00*01*|*00*|*00*|*00*01*02*03*|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingGetKeysFirst) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeFlow(setup);
- stream = GroupGetKeysFirst(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- Y_UNUSED(key);
- return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0:0|0:01|0:0|0:0|0:0123|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingKeyNotEquals) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeFlow(setup);
- stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- return pb.NotEquals(item, key);
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*00*|*11*|*00*00*00*|*11*|*22*|*33*|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingWithEmptyInput) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeFlow(setup, 0);
- stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- Y_UNUSED(key);
- return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|");
- }
- Y_UNIT_TEST_LLVM(TestSingleGroup) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeFlow(setup);
- stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- Y_UNUSED(key);
- Y_UNUSED(item);
- return pb.NewDataLiteral<bool>(false);
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*00*01*00*00*00*01*02*03*|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingWithYield) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeFlow(setup);
- TSwitchInput switchInput;
- switchInput.Indicies.push_back(0);
- switchInput.InputType = stream.GetStaticType();
- stream = pb.Switch(stream,
- MakeArrayRef(&switchInput, 1),
- [&](ui32 /*index*/, TRuntimeNode item1) {
- return Group(setup, item1, [&](TRuntimeNode key, TRuntimeNode item2) {
- Y_UNUSED(key);
- return pb.Equals(item2, pb.NewDataLiteral<ui64>(0));
- });
- },
- 1,
- pb.NewFlowType(pb.NewDataType(NUdf::EDataSlot::String))
- );
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*|*00*01*|*00*|*00*|*00*01*02*03*|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingWithCutSubStreams) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeFlow(setup);
- stream = GroupKeys(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
- Y_UNUSED(key);
- return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
- });
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
- }
- Y_UNIT_TEST_LLVM(TestGroupingWithYieldAndCutSubStreams) {
- TSetup<LLVM> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- auto stream = MakeFlow(setup);
- TSwitchInput switchInput;
- switchInput.Indicies.push_back(0);
- switchInput.InputType = stream.GetStaticType();
- stream = pb.Switch(stream,
- MakeArrayRef(&switchInput, 1),
- [&](ui32 /*index*/, TRuntimeNode item1) {
- return GroupKeys(setup, item1, [&](TRuntimeNode key, TRuntimeNode item2) {
- Y_UNUSED(key);
- return pb.Equals(item2, pb.NewDataLiteral<ui64>(0));
- });
- },
- 1,
- pb.NewFlowType(pb.NewDataType(NUdf::EDataSlot::String))
- );
- const auto pgm = StreamToString(setup, stream);
- const auto graph = setup.BuildGraph(pgm);
- const auto streamVal = graph->GetValue();
- NUdf::TUnboxedValue result;
- UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
- UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
- }
- }
- #endif
- } // NMiniKQL
- } // NKikimr
|