123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882 |
- #include "library/cpp/threading/local_executor/local_executor.h"
- #include "yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h"
- #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
- #include <yql/essentials/minikql/computation/mkql_computation_pattern_cache.h>
- #include <yql/essentials/minikql/mkql_type_builder.h>
- #include <yql/essentials/minikql/mkql_node_serialization.h>
- #include <yql/essentials/utils/yql_panic.h>
- #include <yql/essentials/minikql/mkql_node.h>
- #include <yql/essentials/minikql/mkql_program_builder.h>
- #include <yql/essentials/minikql/computation/mkql_computation_node.h>
- #include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
- #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
- #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
- #include <library/cpp/testing/unittest/registar.h>
- #include <util/datetime/cputimer.h>
- namespace NKikimr {
- namespace NMiniKQL {
- using namespace NYql::NUdf;
- TComputationNodeFactory GetListTestFactory() {
- return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
- if (callable.GetType()->GetName() == "TestList") {
- return new TExternalComputationNode(ctx.Mutables);
- }
- return GetBuiltinFactory()(callable, ctx);
- };
- }
- TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- if (list) {
- return pb.ToFlow(TRuntimeNode(list, false));
- } else {
- std::vector<const TRuntimeNode> arr;
- arr.reserve(vecSize);
- for (ui64 i = 0; i < vecSize; ++i) {
- arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234));
- }
- TArrayRef<const TRuntimeNode> arrRef(std::move(arr));
- return pb.ToFlow(pb.AsList(arrRef));
- }
- }
- template<bool Wide>
- TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable* list);
- template<>
- TRuntimeNode CreateFilter<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- };
- return pb.Filter(flow, handler);
- }
- template<>
- TRuntimeNode CreateFilter<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- };
- return pb.NarrowMap(
- pb.WideFilter(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- handler
- ),
- [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
- );
- }
- template<bool Wide>
- TRuntimeNode CreateMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
- template<>
- TRuntimeNode CreateMap<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- };
- return pb.Map(flow, handler);
- }
- template<>
- TRuntimeNode CreateMap<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode::TList {
- return {pb.AggrEquals(
- pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)))};
- };
- return pb.NarrowMap(
- pb.WideMap(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- handler
- ),
- [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
- );
- }
- template<bool Wide>
- TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
- template<>
- TRuntimeNode CreateCondense<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto switcherHandler = [&](TRuntimeNode, TRuntimeNode) -> TRuntimeNode {
- return pb.NewDataLiteral<bool>(false);
- };
- auto updateHandler = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode {
- return pb.Add(item, state);
- };
- TRuntimeNode state = pb.NewDataLiteral<ui64>(0);
- return pb.Condense(flow, state, switcherHandler, updateHandler);
- }
- template<>
- TRuntimeNode CreateCondense<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- TRuntimeNode state = pb.NewDataLiteral<ui64>(0);
- return pb.NarrowMap(
- pb.WideCondense1(
- /* stream */
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- /* init */
- [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; },
- /* switcher */
- [&](TRuntimeNode::TList, TRuntimeNode::TList) -> TRuntimeNode { return pb.NewDataLiteral<bool>(false); },
- /* handler */
- [&](TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }
- ),
- [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
- );
- }
- template<bool Wide>
- TRuntimeNode CreateChopper(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
- template<>
- TRuntimeNode CreateChopper<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.Chopper(flow,
- /* keyExtractor */
- [&](TRuntimeNode item) -> TRuntimeNode { return item; },
- /* groupSwitch */
- [&](TRuntimeNode key, TRuntimeNode /*item*/) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(key, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- },
- /* groupHandler */
- [&](TRuntimeNode, TRuntimeNode list) -> TRuntimeNode { return list; }
- );
- };
- template<>
- TRuntimeNode CreateChopper<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.NarrowMap(
- pb.WideChopper(
- /* stream */
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- /* keyExtractor */
- [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; },
- /* groupSwitch */
- [&](TRuntimeNode::TList key, TRuntimeNode::TList /*item*/) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(key.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- },
- /* groupHandler */
- [&](TRuntimeNode::TList, TRuntimeNode input) { return pb.WideMap(input, [](TRuntimeNode::TList items) { return items; }); }
- ),
- [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
- );
- };
- template<bool Wide>
- TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
- template<>
- TRuntimeNode CreateCombine<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.CombineCore(
- /* stream */
- flow,
- /* keyExtractor */
- [&] (TRuntimeNode /*item*/) -> TRuntimeNode { return pb.NewDataLiteral<ui64>(0);},
- /* init */
- [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return item; },
- /* update */
- [&] (TRuntimeNode /* key */, TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); },
- /* finish */
- [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return pb.NewOptional(item); },
- /* memlimit */
- 64 << 20
- );
- };
- template<>
- TRuntimeNode CreateCombine<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.NarrowMap(
- pb.WideCombiner(
- /* stream */
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- /* memlimit */
- 64 << 20,
- /* keyExtractor */
- [&] (TRuntimeNode::TList /*item*/) -> TRuntimeNode::TList { return {pb.NewDataLiteral<ui64>(0)};},
- /* init */
- [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; },
- /* update */
- [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList {
- return {pb.Add(item.front(), state.front())};
- },
- /* finish */
- [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {pb.NewOptional(item.front())}; }
- ),
- [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
- );
- };
- template<bool Wide>
- TRuntimeNode CreateChain1Map(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
- template<>
- TRuntimeNode CreateChain1Map<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.Chain1Map(
- flow,
- /* init */
- [&] (TRuntimeNode item) -> TRuntimeNode { return item; },
- /* update */
- [&] (TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); }
- );
- }
- template<>
- TRuntimeNode CreateChain1Map<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.NarrowMap(
- pb.WideChain1Map(
- /* stream */
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- /* init */
- [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; },
- /* update */
- [&] (TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }
- ),
- [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); }
- );
- }
- template<bool Wide>
- TRuntimeNode CreateDiscard(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- if (Wide) {
- return pb.Discard(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- )
- );
- } else {
- return pb.Discard(flow);
- }
- }
- template<bool Wide>
- TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto count = pb.NewDataLiteral<ui64>(500);
- if (Wide) {
- return pb.NarrowMap(
- pb.Skip(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- count
- ),
- [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); }
- );
- } else {
- return pb.Skip(flow, count);
- }
- }
- template<bool Flow>
- TRuntimeNode CreateNarrowFlatMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.NarrowFlatMap(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- [&] (TRuntimeNode::TList item) -> TRuntimeNode {
- auto x = pb.NewOptional(item.front());
- return Flow ? pb.ToFlow(x) : x;
- }
- );
- }
- TRuntimeNode CreateNarrowMultiMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.NarrowMultiMap(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList {
- return {item.front(), item.front()};
- }
- );
- }
- template<bool WithPayload>
- TRuntimeNode CreateSqueezeToSortedDict(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.FlatMap(
- pb.NarrowSqueezeToSortedDict(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- /*all*/ false,
- /*keySelector*/ [&](TRuntimeNode::TList item) { return item.front(); },
- /*payloadSelector*/ [&](TRuntimeNode::TList ) { return WithPayload ? pb.NewDataLiteral<ui64>(0) : pb.NewVoid(); }
- ),
- [&] (TRuntimeNode item) { return pb.DictKeys(item); }
- );
- }
- TRuntimeNode CreateMapJoin(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- const auto tupleType = pb.NewTupleType({
- pb.NewDataType(NUdf::TDataType<ui32>::Id),
- pb.NewDataType(NUdf::TDataType<ui64>::Id)
- });
- const auto list1 = pb.Map(flow, [&] (TRuntimeNode item) {
- return pb.NewTuple({pb.Mod(item, pb.NewDataLiteral<ui64>(1000)), pb.NewDataLiteral<ui32>(1)});
- });
- const auto list2 = pb.NewList(tupleType, {
- pb.NewTuple({pb.NewDataLiteral<ui32>(1), pb.NewDataLiteral<ui64>(3 * 1000)}),
- pb.NewTuple({pb.NewDataLiteral<ui32>(2), pb.NewDataLiteral<ui64>(4 * 1000)}),
- pb.NewTuple({pb.NewDataLiteral<ui32>(3), pb.NewDataLiteral<ui64>(5 * 1000)}),
- });
- const auto dict = pb.ToSortedDict(list2, false,
- [&](TRuntimeNode item) {
- return pb.Nth(item, 0);
- },
- [&](TRuntimeNode item) {
- return pb.NewTuple({pb.Nth(item, 1U)});
- });
- const auto resultType = pb.NewFlowType(pb.NewMultiType({
- pb.NewDataType(NUdf::TDataType<char*>::Id),
- pb.NewDataType(NUdf::TDataType<char*>::Id),
- }));
- return pb.Map(
- pb.NarrowMap(pb.MapJoinCore(
- pb.ExpandMap(list1, [&] (TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0), pb.Nth(item, 1)}; }),
- dict,
- EJoinKind::Inner,
- {0U},
- {1U, 0U},
- {0U, 1U},
- resultType
- ),
- [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
- ),
- [&](TRuntimeNode item) { return pb.Nth(item, 1); }
- );
- }
- Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
- template<class T>
- void ParallelProgTest(T f, bool useLLVM, ui64 testResult, size_t vecSize = 10'000) {
- TTimer t("total: ");
- const ui32 cacheSizeInBytes = 104857600; // 100 MiB
- const ui32 inFlight = 7;
- TComputationPatternLRUCache cache({cacheSizeInBytes, cacheSizeInBytes});
- auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
- auto entry = std::make_shared<TPatternCacheEntry>();
- TScopedAlloc& alloc = entry->Alloc;
- TTypeEnvironment& typeEnv = entry->Env;
- TProgramBuilder pb(typeEnv, *functionRegistry);
- const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- TRuntimeNode progReturn;
- with_lock(alloc) {
- progReturn = f(pb, vecSize, list);
- }
- TExploringNodeVisitor explorer;
- explorer.Walk(progReturn.GetNode(), typeEnv);
- TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), functionRegistry.Get(),
- NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, useLLVM ? "" : "OFF", EGraphPerProcess::Multi);
- {
- auto guard = entry->Env.BindAllocator();
- entry->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
- }
- cache.EmplacePattern("a", entry);
- auto genData = [&]() {
- std::vector<ui64> data;
- data.reserve(vecSize);
- for (ui64 i = 0; i < vecSize; ++i) {
- data.push_back((i + 124515) % 6740234);
- }
- return data;
- };
- const auto data = genData();
- std::vector<std::vector<ui64>> results(inFlight);
- NPar::LocalExecutor().RunAdditionalThreads(inFlight);
- NPar::LocalExecutor().ExecRange([&](int id) {
- for (ui32 i = 0; i < 100; ++i) {
- auto key = "a";
- auto randomProvider = CreateDeterministicRandomProvider(1);
- auto timeProvider = CreateDeterministicTimeProvider(10000000);
- TScopedAlloc graphAlloc(__LOCATION__);
- auto entry = cache.Find(key);
- TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetListTestFactory(),
- functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
- useLLVM ? "" : "OFF", EGraphPerProcess::Multi);
- auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
- TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
- std::transform(data.cbegin(), data.cend(), items,
- [](const auto s) {
- return ToValue<ui64>(s);
- });
- ui64 acc = 0;
- TUnboxedValue v = graph->GetValue();
- while (v.HasValue()) {
- acc += v.Get<ui64>();
- v = graph->GetValue();
- }
- results[id].push_back(acc);
- }
- }, 0, inFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
- for (auto threadResults : results) {
- for (auto res : threadResults) {
- UNIT_ASSERT_VALUES_EQUAL(res, testResult);
- }
- }
- }
- Y_UNIT_TEST_QUAD(Filter, Wide, UseLLVM) {
- ParallelProgTest(CreateFilter<Wide>, UseLLVM, 10098816);
- }
- Y_UNIT_TEST_QUAD(Map, Wide, UseLLVM) {
- ParallelProgTest(CreateMap<Wide>, UseLLVM, 78);
- }
- Y_UNIT_TEST_QUAD(Condense, Wide, UseLLVM) {
- ParallelProgTest(CreateCondense<Wide>, UseLLVM, 1295145000);
- }
- Y_UNIT_TEST_QUAD(Chopper, Wide, UseLLVM) {
- ParallelProgTest(CreateChopper<Wide>, UseLLVM, 1295145000);
- }
- Y_UNIT_TEST_QUAD(Combine, Wide, UseLLVM) {
- ParallelProgTest(CreateCombine<Wide>, UseLLVM, 1295145000);
- }
- Y_UNIT_TEST_QUAD(Chain1Map, Wide, UseLLVM) {
- ParallelProgTest(CreateChain1Map<Wide>, UseLLVM, 6393039240000);
- }
- Y_UNIT_TEST_QUAD(Discard, Wide, UseLLVM) {
- ParallelProgTest(CreateDiscard<Wide>, UseLLVM, 0);
- }
- Y_UNIT_TEST_QUAD(Skip, Wide, UseLLVM) {
- ParallelProgTest(CreateSkip<Wide>, UseLLVM, 1232762750);
- }
- Y_UNIT_TEST_QUAD(NarrowFlatMap, Flow, UseLLVM) {
- ParallelProgTest(CreateNarrowFlatMap<Flow>, UseLLVM, 1295145000);
- }
- Y_UNIT_TEST_TWIN(NarrowMultiMap, UseLLVM) {
- ParallelProgTest(CreateNarrowMultiMap, UseLLVM, 1295145000ull * 2);
- }
- Y_UNIT_TEST_QUAD(SqueezeToSortedDict, WithPayload, UseLLVM) {
- ParallelProgTest(CreateSqueezeToSortedDict<WithPayload>, UseLLVM, 125014500, 1000);
- }
- Y_UNIT_TEST_TWIN(MapJoin, UseLLVM) {
- ParallelProgTest(CreateMapJoin, UseLLVM, 120000, 10'000);
- }
- }
- Y_UNIT_TEST_SUITE(ComputationPatternCache) {
- Y_UNIT_TEST(Smoke) {
- const ui32 cacheSize = 10'000'000;
- const ui32 cacheItems = 10;
- TComputationPatternLRUCache cache({cacheSize, cacheSize});
- auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
- for (ui32 i = 0; i < cacheItems; ++i) {
- auto entry = std::make_shared<TPatternCacheEntry>();
- TScopedAlloc& alloc = entry->Alloc;
- TTypeEnvironment& typeEnv = entry->Env;
- TProgramBuilder pb(typeEnv, *functionRegistry);
- TRuntimeNode progReturn;
- with_lock(alloc) {
- progReturn = pb.NewDataLiteral<NYql::NUdf::EDataSlot::String>("qwerty");
- }
- TExploringNodeVisitor explorer;
- explorer.Walk(progReturn.GetNode(), typeEnv);
- TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
- functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
- "OFF", EGraphPerProcess::Multi);
- {
- auto guard = entry->Env.BindAllocator();
- entry->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
- }
- // XXX: There is no way to accurately define how the entry's
- // allocator obtains the memory pages: using the free ones from the
- // global page pool or the ones directly requested by <mmap>. At the
- // same time, it is the total allocated bytes (not just the number
- // of the borrowed pages) that is a good estimate of the memory
- // consumed by the pattern cache entry for real life workload.
- // Hence, to avoid undesired cache flushes, release the free pages
- // of the allocator of the particular entry.
- alloc.ReleaseFreePages();
- cache.EmplacePattern(TString((char)('a' + i)), entry);
- }
- for (ui32 i = 0; i < cacheItems; ++i) {
- auto key = TString((char)('a' + i));
- auto randomProvider = CreateDeterministicRandomProvider(1);
- auto timeProvider = CreateDeterministicTimeProvider(10000000);
- TScopedAlloc graphAlloc(__LOCATION__);
- auto entry = cache.Find(key);
- UNIT_ASSERT(entry);
- TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetBuiltinFactory(),
- functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
- "OFF", EGraphPerProcess::Multi);
- auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
- auto value = graph->GetValue();
- UNIT_ASSERT_EQUAL(NYql::NUdf::TStringRef("qwerty"), value.AsStringRef());
- }
- }
- Y_UNIT_TEST(DoubleNotifyPatternCompiled) {
- class TMockComputationPattern final : public IComputationPattern {
- public:
- explicit TMockComputationPattern(size_t codeSize) : Size_(codeSize) {}
- void Compile(TString, IStatsRegistry*) override { Compiled_ = true; }
- bool IsCompiled() const override { return Compiled_; }
- size_t CompiledCodeSize() const override { return Size_; }
- void RemoveCompiledCode() override { Compiled_ = false; }
- THolder<IComputationGraph> Clone(const TComputationOptsFull&) override { return {}; }
- bool GetSuitableForCache() const override { return true; }
- private:
- const size_t Size_;
- bool Compiled_ = false;
- };
- const TString key = "program";
- const ui32 cacheSize = 2;
- TComputationPatternLRUCache cache({cacheSize, cacheSize});
- auto entry = std::make_shared<TPatternCacheEntry>();
- entry->Pattern = MakeIntrusive<TMockComputationPattern>(1u);
- cache.EmplacePattern(key, entry);
- for (ui32 i = 0; i < cacheSize + 1; ++i) {
- entry->Pattern->Compile("", nullptr);
- cache.NotifyPatternCompiled(key);
- }
- entry = std::make_shared<TPatternCacheEntry>();
- entry->Pattern = MakeIntrusive<TMockComputationPattern>(cacheSize + 1);
- entry->Pattern->Compile("", nullptr);
- cache.EmplacePattern(key, entry);
- }
- Y_UNIT_TEST(AddPerf) {
- TTimer t("all: ");
- TScopedAlloc alloc(__LOCATION__);
- TTypeEnvironment typeEnv(alloc);
- auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
- TProgramBuilder pb(typeEnv, *functionRegistry);
- auto prog1 = pb.NewDataLiteral<ui64>(123591592ULL);
- auto prog2 = pb.NewDataLiteral<ui64>(323591592ULL);
- auto progReturn = pb.Add(prog1, prog2);
- TExploringNodeVisitor explorer;
- explorer.Walk(progReturn.GetNode(), typeEnv);
- NUdf::EValidateMode validateMode = NUdf::EValidateMode::Lazy;
- TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
- functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception,
- "OFF", EGraphPerProcess::Multi);
- auto t_make_pattern = std::make_unique<TTimer>("make_pattern: ");
- auto pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
- t_make_pattern.reset();
- auto randomProvider = CreateDeterministicRandomProvider(1);
- auto timeProvider = CreateDeterministicTimeProvider(10000000);
- auto t_clone = std::make_unique<TTimer>("clone: ");
- auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
- t_clone.reset();
- const ui64 repeats = 100'000;
- {
- TTimer t("graph: ");
- ui64 acc = 0;
- for (ui64 i = 0; i < repeats; ++i) {
- acc += graph->GetValue().Get<ui64>();
- }
- Y_DO_NOT_OPTIMIZE_AWAY(acc);
- }
- {
- std::function<ui64(ui64, ui64)> add = [](ui64 a, ui64 b) {
- return a + b;
- };
- TTimer t("lambda: ");
- ui64 acc = 0;
- for (ui64 i = 0; i < repeats; ++i) {
- acc += add(123591592ULL, 323591592ULL);
- }
- Y_DO_NOT_OPTIMIZE_AWAY(acc);
- }
- {
- std::function<TUnboxedValue(TUnboxedValue&, TUnboxedValue&)> add =
- [](TUnboxedValue& a, TUnboxedValue& b) {
- return TUnboxedValuePod(a.Get<ui64>() + b.Get<ui64>());
- };
- Y_DO_NOT_OPTIMIZE_AWAY(add);
- TTimer t("lambda unboxed value: ");
- TUnboxedValue acc(TUnboxedValuePod(0));
- TUnboxedValue v1(TUnboxedValuePod(ui64{123591592UL}));
- TUnboxedValue v2(TUnboxedValuePod(ui64{323591592UL}));
- for (ui64 i = 0; i < repeats; ++i) {
- auto r = add(v1, v2);
- acc = add(r, acc);
- }
- Y_DO_NOT_OPTIMIZE_AWAY(acc.Get<ui64>());
- }
- }
- Y_UNIT_TEST_TWIN(FilterPerf, Wide) {
- TScopedAlloc alloc(__LOCATION__);
- TTypeEnvironment typeEnv(alloc);
- auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
- TProgramBuilder pb(typeEnv, *functionRegistry);
- const ui64 vecSize = 100'000;
- Cerr << "vecSize: " << vecSize << Endl;
- const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- auto progReturn = CreateFilter<Wide>(pb, vecSize, list);
- TExploringNodeVisitor explorer;
- explorer.Walk(progReturn.GetNode(), typeEnv);
- NUdf::EValidateMode validateMode = NUdf::EValidateMode::Max;
- TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(),
- functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception,
- "OFF", EGraphPerProcess::Multi);
- auto t_make_pattern = std::make_unique<TTimer>("make_pattern: ");
- auto pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
- t_make_pattern.reset();
- auto randomProvider = CreateDeterministicRandomProvider(1);
- auto timeProvider = CreateDeterministicTimeProvider(10000000);
- auto t_clone = std::make_unique<TTimer>("clone: ");
- auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
- t_clone.reset();
- auto genData = [&]() {
- std::vector<ui64> data;
- data.reserve(vecSize);
- for (ui64 i = 0; i < vecSize; ++i) {
- data.push_back((i + 124515) % 6740234);
- }
- return data;
- };
- auto testResult = [&] (ui64 acc, ui64 count) {
- if (vecSize == 100'000'000) {
- UNIT_ASSERT_VALUES_EQUAL(acc, 2614128386688);
- UNIT_ASSERT_VALUES_EQUAL(count, 781263);
- } else if (vecSize == 10'000'000) {
- UNIT_ASSERT_VALUES_EQUAL(acc, 222145217664);
- } else if (vecSize == 100'000) {
- UNIT_ASSERT_VALUES_EQUAL(acc, 136480896);
- UNIT_ASSERT_VALUES_EQUAL(count, 782);
- } else {
- UNIT_FAIL("result is not checked");
- }
- };
- ui64 kIter = 2;
- {
- TDuration total;
- for (ui64 i = 0; i < kIter; ++i) {
- ui64 acc = 0;
- ui64 count = 0;
- auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
- auto data = genData();
- TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
- std::transform(data.cbegin(), data.cend(), items,
- [](const auto s) {
- return ToValue<ui64>(s);
- });
- TSimpleTimer t;
- TUnboxedValue v = graph->GetValue();
- while (v.HasValue()) {
- acc += v.Get<ui64>();
- ++count;
- v = graph->GetValue();
- }
- testResult(acc, count);
- total += t.Get();
- }
- Cerr << "graph: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
- }
- {
- auto data = genData();
- std::function<bool(ui64)> predicate = [](ui64 a) {
- return a % 128 == 0;
- };
- Y_DO_NOT_OPTIMIZE_AWAY(predicate);
- TDuration total;
- for (ui64 i = 0; i < kIter; ++i) {
- TSimpleTimer t;
- ui64 acc = 0;
- ui64 count = 0;
- for (ui64 j = 0; j < data.size(); ++j) {
- if (predicate(data[j])) {
- acc += data[j];
- ++count;
- }
- }
- total += t.Get();
- testResult(acc, count);
- }
- Cerr << "std::function: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
- }
- {
- auto data = genData();
- static auto predicate = [](ui64 a) {
- return a % 128 == 0;
- };
- Y_DO_NOT_OPTIMIZE_AWAY(predicate);
- TDuration total;
- for (ui64 i = 0; i < kIter; ++i) {
- TSimpleTimer t;
- ui64 acc = 0;
- ui64 count = 0;
- for (ui64 j = 0; j < data.size(); ++j) {
- if (predicate(data[j])) {
- acc += data[j];
- ++count;
- }
- }
- total += t.Get();
- testResult(acc, count);
- }
- Cerr << "lambda: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
- }
- }
- }
- }
- }
|