#include "library/cpp/threading/local_executor/local_executor.h" #include "yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h" #include #include #include #include #include #include #include #include #include #include #include #include #include namespace NKikimr { namespace NMiniKQL { using namespace NYql::NUdf; TComputationNodeFactory GetListTestFactory() { return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { if (callable.GetType()->GetName() == "TestList") { return new TExternalComputationNode(ctx.Mutables); } return GetBuiltinFactory()(callable, ctx); }; } TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable* list) { if (list) { return pb.ToFlow(TRuntimeNode(list, false)); } else { std::vector arr; arr.reserve(vecSize); for (ui64 i = 0; i < vecSize; ++i) { arr.push_back(pb.NewDataLiteral((i + 124515) % 6740234)); } TArrayRef arrRef(std::move(arr)); return pb.ToFlow(pb.AsList(arrRef)); } } template TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable* list); template<> TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); auto handler = [&](TRuntimeNode node) -> TRuntimeNode { return pb.AggrEquals( pb.Mod(node, pb.NewOptional(pb.NewDataLiteral(128))), pb.NewOptional(pb.NewDataLiteral(0))); }; return pb.Filter(flow, handler); } template<> TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode { return pb.AggrEquals( pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral(128))), pb.NewOptional(pb.NewDataLiteral(0))); }; return pb.NarrowMap( pb.WideFilter( pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ), handler ), [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } ); } template TRuntimeNode CreateMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr); template<> TRuntimeNode CreateMap(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); auto handler = [&](TRuntimeNode node) -> TRuntimeNode { return pb.AggrEquals( pb.Mod(node, pb.NewOptional(pb.NewDataLiteral(128))), pb.NewOptional(pb.NewDataLiteral(0))); }; return pb.Map(flow, handler); } template<> TRuntimeNode CreateMap(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode::TList { return {pb.AggrEquals( pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral(128))), pb.NewOptional(pb.NewDataLiteral(0)))}; }; return pb.NarrowMap( pb.WideMap( pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ), handler ), [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } ); } template TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr); template<> TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); auto switcherHandler = [&](TRuntimeNode, TRuntimeNode) -> TRuntimeNode { return pb.NewDataLiteral(false); }; auto updateHandler = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); }; TRuntimeNode state = pb.NewDataLiteral(0); return pb.Condense(flow, state, switcherHandler, updateHandler); } template<> TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); TRuntimeNode state = pb.NewDataLiteral(0); return pb.NarrowMap( pb.WideCondense1( /* stream */ pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ), /* init */ [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; }, /* switcher */ [&](TRuntimeNode::TList, TRuntimeNode::TList) -> TRuntimeNode { return pb.NewDataLiteral(false); }, /* handler */ [&](TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; } ), [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } ); } template TRuntimeNode CreateChopper(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr); template<> TRuntimeNode CreateChopper(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); return pb.Chopper(flow, /* keyExtractor */ [&](TRuntimeNode item) -> TRuntimeNode { return item; }, /* groupSwitch */ [&](TRuntimeNode key, TRuntimeNode /*item*/) -> TRuntimeNode { return pb.AggrEquals( pb.Mod(key, pb.NewOptional(pb.NewDataLiteral(128))), pb.NewOptional(pb.NewDataLiteral(0))); }, /* groupHandler */ [&](TRuntimeNode, TRuntimeNode list) -> TRuntimeNode { return list; } ); }; template<> TRuntimeNode CreateChopper(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); return pb.NarrowMap( pb.WideChopper( /* stream */ pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ), /* keyExtractor */ [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; }, /* groupSwitch */ [&](TRuntimeNode::TList key, TRuntimeNode::TList /*item*/) -> TRuntimeNode { return pb.AggrEquals( pb.Mod(key.front(), pb.NewOptional(pb.NewDataLiteral(128))), pb.NewOptional(pb.NewDataLiteral(0))); }, /* groupHandler */ [&](TRuntimeNode::TList, TRuntimeNode input) { return pb.WideMap(input, [](TRuntimeNode::TList items) { return items; }); } ), [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } ); }; template TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr); template<> TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); return pb.CombineCore( /* stream */ flow, /* keyExtractor */ [&] (TRuntimeNode /*item*/) -> TRuntimeNode { return pb.NewDataLiteral(0);}, /* init */ [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return item; }, /* update */ [&] (TRuntimeNode /* key */, TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); }, /* finish */ [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return pb.NewOptional(item); }, /* memlimit */ 64 << 20 ); }; template<> TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); return pb.NarrowMap( pb.WideCombiner( /* stream */ pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ), /* memlimit */ 64 << 20, /* keyExtractor */ [&] (TRuntimeNode::TList /*item*/) -> TRuntimeNode::TList { return {pb.NewDataLiteral(0)};}, /* init */ [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; }, /* update */ [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }, /* finish */ [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {pb.NewOptional(item.front())}; } ), [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); } ); }; template TRuntimeNode CreateChain1Map(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr); template<> TRuntimeNode CreateChain1Map(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); return pb.Chain1Map( flow, /* init */ [&] (TRuntimeNode item) -> TRuntimeNode { return item; }, /* update */ [&] (TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); } ); } template<> TRuntimeNode CreateChain1Map(TProgramBuilder& pb, size_t vecSize, TCallable* list) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); return pb.NarrowMap( pb.WideChain1Map( /* stream */ pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ), /* init */ [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; }, /* update */ [&] (TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; } ), [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); } ); } template TRuntimeNode CreateDiscard(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); if (Wide) { return pb.Discard( pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ) ); } else { return pb.Discard(flow); } } template TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); auto count = pb.NewDataLiteral(500); if (Wide) { return pb.NarrowMap( pb.Skip( pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ), count ), [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); } ); } else { return pb.Skip(flow, count); } } template TRuntimeNode CreateNarrowFlatMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); return pb.NarrowFlatMap( pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ), [&] (TRuntimeNode::TList item) -> TRuntimeNode { auto x = pb.NewOptional(item.front()); return Flow ? pb.ToFlow(x) : x; } ); } TRuntimeNode CreateNarrowMultiMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); return pb.NarrowMultiMap( pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ), [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item.front(), item.front()}; } ); } template TRuntimeNode CreateSqueezeToSortedDict(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); return pb.FlatMap( pb.NarrowSqueezeToSortedDict( pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; } ), /*all*/ false, /*keySelector*/ [&](TRuntimeNode::TList item) { return item.front(); }, /*payloadSelector*/ [&](TRuntimeNode::TList ) { return WithPayload ? pb.NewDataLiteral(0) : pb.NewVoid(); } ), [&] (TRuntimeNode item) { return pb.DictKeys(item); } ); } TRuntimeNode CreateMapJoin(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) { TTimer t(TString(__func__) + ": "); auto flow = CreateFlow(pb, vecSize, list); const auto tupleType = pb.NewTupleType({ pb.NewDataType(NUdf::TDataType::Id), pb.NewDataType(NUdf::TDataType::Id) }); const auto list1 = pb.Map(flow, [&] (TRuntimeNode item) { return pb.NewTuple({pb.Mod(item, pb.NewDataLiteral(1000)), pb.NewDataLiteral(1)}); }); const auto list2 = pb.NewList(tupleType, { pb.NewTuple({pb.NewDataLiteral(1), pb.NewDataLiteral(3 * 1000)}), pb.NewTuple({pb.NewDataLiteral(2), pb.NewDataLiteral(4 * 1000)}), pb.NewTuple({pb.NewDataLiteral(3), pb.NewDataLiteral(5 * 1000)}), }); const auto dict = pb.ToSortedDict(list2, false, [&](TRuntimeNode item) { return pb.Nth(item, 0); }, [&](TRuntimeNode item) { return pb.NewTuple({pb.Nth(item, 1U)}); }); const auto resultType = pb.NewFlowType(pb.NewMultiType({ pb.NewDataType(NUdf::TDataType::Id), pb.NewDataType(NUdf::TDataType::Id), })); return pb.Map( pb.NarrowMap(pb.MapJoinCore( pb.ExpandMap(list1, [&] (TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0), pb.Nth(item, 1)}; }), dict, EJoinKind::Inner, {0U}, {1U, 0U}, {0U, 1U}, resultType ), [&](TRuntimeNode::TList items) { return pb.NewTuple(items); } ), [&](TRuntimeNode item) { return pb.Nth(item, 1); } ); } Y_UNIT_TEST_SUITE(ComputationGraphDataRace) { template void ParallelProgTest(T f, bool useLLVM, ui64 testResult, size_t vecSize = 10'000) { TTimer t("total: "); const ui32 cacheSizeInBytes = 104857600; // 100 MiB const ui32 inFlight = 7; TComputationPatternLRUCache cache({cacheSizeInBytes, cacheSizeInBytes}); auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); auto entry = std::make_shared(); TScopedAlloc& alloc = entry->Alloc; TTypeEnvironment& typeEnv = entry->Env; TProgramBuilder pb(typeEnv, *functionRegistry); const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType::Id)); const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); TRuntimeNode progReturn; with_lock(alloc) { progReturn = f(pb, vecSize, list); } TExploringNodeVisitor explorer; explorer.Walk(progReturn.GetNode(), typeEnv); TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, useLLVM ? "" : "OFF", EGraphPerProcess::Multi); { auto guard = entry->Env.BindAllocator(); entry->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts); } cache.EmplacePattern("a", entry); auto genData = [&]() { std::vector data; data.reserve(vecSize); for (ui64 i = 0; i < vecSize; ++i) { data.push_back((i + 124515) % 6740234); } return data; }; const auto data = genData(); std::vector> results(inFlight); NPar::LocalExecutor().RunAdditionalThreads(inFlight); NPar::LocalExecutor().ExecRange([&](int id) { for (ui32 i = 0; i < 100; ++i) { auto key = "a"; auto randomProvider = CreateDeterministicRandomProvider(1); auto timeProvider = CreateDeterministicTimeProvider(10000000); TScopedAlloc graphAlloc(__LOCATION__); auto entry = cache.Find(key); TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetListTestFactory(), functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, useLLVM ? "" : "OFF", EGraphPerProcess::Multi); auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); TUnboxedValue* items = nullptr; graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items)); std::transform(data.cbegin(), data.cend(), items, [](const auto s) { return ToValue(s); }); ui64 acc = 0; TUnboxedValue v = graph->GetValue(); while (v.HasValue()) { acc += v.Get(); v = graph->GetValue(); } results[id].push_back(acc); } }, 0, inFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY); for (auto threadResults : results) { for (auto res : threadResults) { UNIT_ASSERT_VALUES_EQUAL(res, testResult); } } } Y_UNIT_TEST_QUAD(Filter, Wide, UseLLVM) { ParallelProgTest(CreateFilter, UseLLVM, 10098816); } Y_UNIT_TEST_QUAD(Map, Wide, UseLLVM) { ParallelProgTest(CreateMap, UseLLVM, 78); } Y_UNIT_TEST_QUAD(Condense, Wide, UseLLVM) { ParallelProgTest(CreateCondense, UseLLVM, 1295145000); } Y_UNIT_TEST_QUAD(Chopper, Wide, UseLLVM) { ParallelProgTest(CreateChopper, UseLLVM, 1295145000); } Y_UNIT_TEST_QUAD(Combine, Wide, UseLLVM) { ParallelProgTest(CreateCombine, UseLLVM, 1295145000); } Y_UNIT_TEST_QUAD(Chain1Map, Wide, UseLLVM) { ParallelProgTest(CreateChain1Map, UseLLVM, 6393039240000); } Y_UNIT_TEST_QUAD(Discard, Wide, UseLLVM) { ParallelProgTest(CreateDiscard, UseLLVM, 0); } Y_UNIT_TEST_QUAD(Skip, Wide, UseLLVM) { ParallelProgTest(CreateSkip, UseLLVM, 1232762750); } Y_UNIT_TEST_QUAD(NarrowFlatMap, Flow, UseLLVM) { ParallelProgTest(CreateNarrowFlatMap, UseLLVM, 1295145000); } Y_UNIT_TEST_TWIN(NarrowMultiMap, UseLLVM) { ParallelProgTest(CreateNarrowMultiMap, UseLLVM, 1295145000ull * 2); } Y_UNIT_TEST_QUAD(SqueezeToSortedDict, WithPayload, UseLLVM) { ParallelProgTest(CreateSqueezeToSortedDict, UseLLVM, 125014500, 1000); } Y_UNIT_TEST_TWIN(MapJoin, UseLLVM) { ParallelProgTest(CreateMapJoin, UseLLVM, 120000, 10'000); } } Y_UNIT_TEST_SUITE(ComputationPatternCache) { Y_UNIT_TEST(Smoke) { const ui32 cacheSize = 10'000'000; const ui32 cacheItems = 10; TComputationPatternLRUCache cache({cacheSize, cacheSize}); auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); for (ui32 i = 0; i < cacheItems; ++i) { auto entry = std::make_shared(); TScopedAlloc& alloc = entry->Alloc; TTypeEnvironment& typeEnv = entry->Env; TProgramBuilder pb(typeEnv, *functionRegistry); TRuntimeNode progReturn; with_lock(alloc) { progReturn = pb.NewDataLiteral("qwerty"); } TExploringNodeVisitor explorer; explorer.Walk(progReturn.GetNode(), typeEnv); TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(), functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Multi); { auto guard = entry->Env.BindAllocator(); entry->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts); } // XXX: There is no way to accurately define how the entry's // allocator obtains the memory pages: using the free ones from the // global page pool or the ones directly requested by . At the // same time, it is the total allocated bytes (not just the number // of the borrowed pages) that is a good estimate of the memory // consumed by the pattern cache entry for real life workload. // Hence, to avoid undesired cache flushes, release the free pages // of the allocator of the particular entry. alloc.ReleaseFreePages(); cache.EmplacePattern(TString((char)('a' + i)), entry); } for (ui32 i = 0; i < cacheItems; ++i) { auto key = TString((char)('a' + i)); auto randomProvider = CreateDeterministicRandomProvider(1); auto timeProvider = CreateDeterministicTimeProvider(10000000); TScopedAlloc graphAlloc(__LOCATION__); auto entry = cache.Find(key); UNIT_ASSERT(entry); TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetBuiltinFactory(), functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Multi); auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref())); auto value = graph->GetValue(); UNIT_ASSERT_EQUAL(NYql::NUdf::TStringRef("qwerty"), value.AsStringRef()); } } Y_UNIT_TEST(DoubleNotifyPatternCompiled) { class TMockComputationPattern final : public IComputationPattern { public: explicit TMockComputationPattern(size_t codeSize) : Size_(codeSize) {} void Compile(TString, IStatsRegistry*) override { Compiled_ = true; } bool IsCompiled() const override { return Compiled_; } size_t CompiledCodeSize() const override { return Size_; } void RemoveCompiledCode() override { Compiled_ = false; } THolder Clone(const TComputationOptsFull&) override { return {}; } bool GetSuitableForCache() const override { return true; } private: const size_t Size_; bool Compiled_ = false; }; const TString key = "program"; const ui32 cacheSize = 2; TComputationPatternLRUCache cache({cacheSize, cacheSize}); auto entry = std::make_shared(); entry->Pattern = MakeIntrusive(1u); cache.EmplacePattern(key, entry); for (ui32 i = 0; i < cacheSize + 1; ++i) { entry->Pattern->Compile("", nullptr); cache.NotifyPatternCompiled(key); } entry = std::make_shared(); entry->Pattern = MakeIntrusive(cacheSize + 1); entry->Pattern->Compile("", nullptr); cache.EmplacePattern(key, entry); } Y_UNIT_TEST(AddPerf) { TTimer t("all: "); TScopedAlloc alloc(__LOCATION__); TTypeEnvironment typeEnv(alloc); auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); TProgramBuilder pb(typeEnv, *functionRegistry); auto prog1 = pb.NewDataLiteral(123591592ULL); auto prog2 = pb.NewDataLiteral(323591592ULL); auto progReturn = pb.Add(prog1, prog2); TExploringNodeVisitor explorer; explorer.Walk(progReturn.GetNode(), typeEnv); NUdf::EValidateMode validateMode = NUdf::EValidateMode::Lazy; TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(), functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Multi); auto t_make_pattern = std::make_unique("make_pattern: "); auto pattern = MakeComputationPattern(explorer, progReturn, {}, opts); t_make_pattern.reset(); auto randomProvider = CreateDeterministicRandomProvider(1); auto timeProvider = CreateDeterministicTimeProvider(10000000); auto t_clone = std::make_unique("clone: "); auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider)); t_clone.reset(); const ui64 repeats = 100'000; { TTimer t("graph: "); ui64 acc = 0; for (ui64 i = 0; i < repeats; ++i) { acc += graph->GetValue().Get(); } Y_DO_NOT_OPTIMIZE_AWAY(acc); } { std::function add = [](ui64 a, ui64 b) { return a + b; }; TTimer t("lambda: "); ui64 acc = 0; for (ui64 i = 0; i < repeats; ++i) { acc += add(123591592ULL, 323591592ULL); } Y_DO_NOT_OPTIMIZE_AWAY(acc); } { std::function add = [](TUnboxedValue& a, TUnboxedValue& b) { return TUnboxedValuePod(a.Get() + b.Get()); }; Y_DO_NOT_OPTIMIZE_AWAY(add); TTimer t("lambda unboxed value: "); TUnboxedValue acc(TUnboxedValuePod(0)); TUnboxedValue v1(TUnboxedValuePod(ui64{123591592UL})); TUnboxedValue v2(TUnboxedValuePod(ui64{323591592UL})); for (ui64 i = 0; i < repeats; ++i) { auto r = add(v1, v2); acc = add(r, acc); } Y_DO_NOT_OPTIMIZE_AWAY(acc.Get()); } } Y_UNIT_TEST_TWIN(FilterPerf, Wide) { TScopedAlloc alloc(__LOCATION__); TTypeEnvironment typeEnv(alloc); auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); TProgramBuilder pb(typeEnv, *functionRegistry); const ui64 vecSize = 100'000; Cerr << "vecSize: " << vecSize << Endl; const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType::Id)); const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build(); auto progReturn = CreateFilter(pb, vecSize, list); TExploringNodeVisitor explorer; explorer.Walk(progReturn.GetNode(), typeEnv); NUdf::EValidateMode validateMode = NUdf::EValidateMode::Max; TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Multi); auto t_make_pattern = std::make_unique("make_pattern: "); auto pattern = MakeComputationPattern(explorer, progReturn, {list}, opts); t_make_pattern.reset(); auto randomProvider = CreateDeterministicRandomProvider(1); auto timeProvider = CreateDeterministicTimeProvider(10000000); auto t_clone = std::make_unique("clone: "); auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider)); t_clone.reset(); auto genData = [&]() { std::vector data; data.reserve(vecSize); for (ui64 i = 0; i < vecSize; ++i) { data.push_back((i + 124515) % 6740234); } return data; }; auto testResult = [&] (ui64 acc, ui64 count) { if (vecSize == 100'000'000) { UNIT_ASSERT_VALUES_EQUAL(acc, 2614128386688); UNIT_ASSERT_VALUES_EQUAL(count, 781263); } else if (vecSize == 10'000'000) { UNIT_ASSERT_VALUES_EQUAL(acc, 222145217664); } else if (vecSize == 100'000) { UNIT_ASSERT_VALUES_EQUAL(acc, 136480896); UNIT_ASSERT_VALUES_EQUAL(count, 782); } else { UNIT_FAIL("result is not checked"); } }; ui64 kIter = 2; { TDuration total; for (ui64 i = 0; i < kIter; ++i) { ui64 acc = 0; ui64 count = 0; auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider)); auto data = genData(); TUnboxedValue* items = nullptr; graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items)); std::transform(data.cbegin(), data.cend(), items, [](const auto s) { return ToValue(s); }); TSimpleTimer t; TUnboxedValue v = graph->GetValue(); while (v.HasValue()) { acc += v.Get(); ++count; v = graph->GetValue(); } testResult(acc, count); total += t.Get(); } Cerr << "graph: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl; } { auto data = genData(); std::function predicate = [](ui64 a) { return a % 128 == 0; }; Y_DO_NOT_OPTIMIZE_AWAY(predicate); TDuration total; for (ui64 i = 0; i < kIter; ++i) { TSimpleTimer t; ui64 acc = 0; ui64 count = 0; for (ui64 j = 0; j < data.size(); ++j) { if (predicate(data[j])) { acc += data[j]; ++count; } } total += t.Get(); testResult(acc, count); } Cerr << "std::function: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl; } { auto data = genData(); static auto predicate = [](ui64 a) { return a % 128 == 0; }; Y_DO_NOT_OPTIMIZE_AWAY(predicate); TDuration total; for (ui64 i = 0; i < kIter; ++i) { TSimpleTimer t; ui64 acc = 0; ui64 count = 0; for (ui64 j = 0; j < data.size(); ++j) { if (predicate(data[j])) { acc += data[j]; ++count; } } total += t.Get(); testResult(acc, count); } Cerr << "lambda: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl; } } } } }