|
- #include "library/cpp/threading/local_executor/local_executor.h"
- #include "yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h"
- #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
- #include <yql/essentials/minikql/computation/mkql_computation_pattern_cache.h>
- #include <yql/essentials/minikql/mkql_type_builder.h>
- #include <yql/essentials/minikql/mkql_node_serialization.h>
- #include <yql/essentials/utils/yql_panic.h>
- #include <yql/essentials/minikql/mkql_node.h>
- #include <yql/essentials/minikql/mkql_program_builder.h>
- #include <yql/essentials/minikql/computation/mkql_computation_node.h>
- #include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
- #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
- #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
- #include <library/cpp/testing/unittest/registar.h>
- #include <util/datetime/cputimer.h>
- namespace NKikimr {
- namespace NMiniKQL {
- using namespace NYql::NUdf;
- TComputationNodeFactory GetListTestFactory() {
- return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
- if (callable.GetType()->GetName() == "TestList") {
- return new TExternalComputationNode(ctx.Mutables);
- }
- return GetBuiltinFactory()(callable, ctx);
- };
- }
- TRuntimeNode CreateFlow(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- if (list) {
- return pb.ToFlow(TRuntimeNode(list, false));
- } else {
- std::vector<const TRuntimeNode> arr;
- arr.reserve(vecSize);
- for (ui64 i = 0; i < vecSize; ++i) {
- arr.push_back(pb.NewDataLiteral<ui64>((i + 124515) % 6740234));
- }
- TArrayRef<const TRuntimeNode> arrRef(std::move(arr));
- return pb.ToFlow(pb.AsList(arrRef));
- }
- }
- template<bool Wide>
- TRuntimeNode CreateFilter(TProgramBuilder& pb, size_t vecSize, TCallable* list);
- template<>
- TRuntimeNode CreateFilter<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- };
- return pb.Filter(flow, handler);
- }
- template<>
- TRuntimeNode CreateFilter<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- };
- return pb.NarrowMap(
- pb.WideFilter(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- handler
- ),
- [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
- );
- }
- template<bool Wide>
- TRuntimeNode CreateMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
- template<>
- TRuntimeNode CreateMap<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto handler = [&](TRuntimeNode node) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(node, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- };
- return pb.Map(flow, handler);
- }
- template<>
- TRuntimeNode CreateMap<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto handler = [&](TRuntimeNode::TList node) -> TRuntimeNode::TList {
- return {pb.AggrEquals(
- pb.Mod(node.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)))};
- };
- return pb.NarrowMap(
- pb.WideMap(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- handler
- ),
- [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
- );
- }
- template<bool Wide>
- TRuntimeNode CreateCondense(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
- template<>
- TRuntimeNode CreateCondense<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto switcherHandler = [&](TRuntimeNode, TRuntimeNode) -> TRuntimeNode {
- return pb.NewDataLiteral<bool>(false);
- };
- auto updateHandler = [&](TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode {
- return pb.Add(item, state);
- };
- TRuntimeNode state = pb.NewDataLiteral<ui64>(0);
- return pb.Condense(flow, state, switcherHandler, updateHandler);
- }
- template<>
- TRuntimeNode CreateCondense<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- TRuntimeNode state = pb.NewDataLiteral<ui64>(0);
- return pb.NarrowMap(
- pb.WideCondense1(
- /* stream */
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- /* init */
- [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; },
- /* switcher */
- [&](TRuntimeNode::TList, TRuntimeNode::TList) -> TRuntimeNode { return pb.NewDataLiteral<bool>(false); },
- /* handler */
- [&](TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }
- ),
- [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
- );
- }
- template<bool Wide>
- TRuntimeNode CreateChopper(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
- template<>
- TRuntimeNode CreateChopper<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.Chopper(flow,
- /* keyExtractor */
- [&](TRuntimeNode item) -> TRuntimeNode { return item; },
- /* groupSwitch */
- [&](TRuntimeNode key, TRuntimeNode /*item*/) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(key, pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- },
- /* groupHandler */
- [&](TRuntimeNode, TRuntimeNode list) -> TRuntimeNode { return list; }
- );
- };
- template<>
- TRuntimeNode CreateChopper<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.NarrowMap(
- pb.WideChopper(
- /* stream */
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- /* keyExtractor */
- [&](TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; },
- /* groupSwitch */
- [&](TRuntimeNode::TList key, TRuntimeNode::TList /*item*/) -> TRuntimeNode {
- return pb.AggrEquals(
- pb.Mod(key.front(), pb.NewOptional(pb.NewDataLiteral<ui64>(128))),
- pb.NewOptional(pb.NewDataLiteral<ui64>(0)));
- },
- /* groupHandler */
- [&](TRuntimeNode::TList, TRuntimeNode input) { return pb.WideMap(input, [](TRuntimeNode::TList items) { return items; }); }
- ),
- [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
- );
- };
- template<bool Wide>
- TRuntimeNode CreateCombine(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
- template<>
- TRuntimeNode CreateCombine<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.CombineCore(
- /* stream */
- flow,
- /* keyExtractor */
- [&] (TRuntimeNode /*item*/) -> TRuntimeNode { return pb.NewDataLiteral<ui64>(0);},
- /* init */
- [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return item; },
- /* update */
- [&] (TRuntimeNode /* key */, TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); },
- /* finish */
- [&] (TRuntimeNode /* key */, TRuntimeNode item) -> TRuntimeNode { return pb.NewOptional(item); },
- /* memlimit */
- 64 << 20
- );
- };
- template<>
- TRuntimeNode CreateCombine<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.NarrowMap(
- pb.WideCombiner(
- /* stream */
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- /* memlimit */
- 64 << 20,
- /* keyExtractor */
- [&] (TRuntimeNode::TList /*item*/) -> TRuntimeNode::TList { return {pb.NewDataLiteral<ui64>(0)};},
- /* init */
- [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {item}; },
- /* update */
- [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList {
- return {pb.Add(item.front(), state.front())};
- },
- /* finish */
- [&] (TRuntimeNode::TList /* key */, TRuntimeNode::TList item) -> TRuntimeNode::TList { return {pb.NewOptional(item.front())}; }
- ),
- [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); }
- );
- };
- template<bool Wide>
- TRuntimeNode CreateChain1Map(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr);
- template<>
- TRuntimeNode CreateChain1Map<false>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.Chain1Map(
- flow,
- /* init */
- [&] (TRuntimeNode item) -> TRuntimeNode { return item; },
- /* update */
- [&] (TRuntimeNode item, TRuntimeNode state) -> TRuntimeNode { return pb.Add(item, state); }
- );
- }
- template<>
- TRuntimeNode CreateChain1Map<true>(TProgramBuilder& pb, size_t vecSize, TCallable* list) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.NarrowMap(
- pb.WideChain1Map(
- /* stream */
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- /* init */
- [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList { return item; },
- /* update */
- [&] (TRuntimeNode::TList item, TRuntimeNode::TList state) -> TRuntimeNode::TList { return {pb.Add(item.front(), state.front())}; }
- ),
- [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); }
- );
- }
- template<bool Wide>
- TRuntimeNode CreateDiscard(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- if (Wide) {
- return pb.Discard(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- )
- );
- } else {
- return pb.Discard(flow);
- }
- }
- template<bool Wide>
- TRuntimeNode CreateSkip(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- auto count = pb.NewDataLiteral<ui64>(500);
- if (Wide) {
- return pb.NarrowMap(
- pb.Skip(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- count
- ),
- [&] (TRuntimeNode::TList item) -> TRuntimeNode { return item.front(); }
- );
- } else {
- return pb.Skip(flow, count);
- }
- }
- template<bool Flow>
- TRuntimeNode CreateNarrowFlatMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.NarrowFlatMap(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- [&] (TRuntimeNode::TList item) -> TRuntimeNode {
- auto x = pb.NewOptional(item.front());
- return Flow ? pb.ToFlow(x) : x;
- }
- );
- }
- TRuntimeNode CreateNarrowMultiMap(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.NarrowMultiMap(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- [&] (TRuntimeNode::TList item) -> TRuntimeNode::TList {
- return {item.front(), item.front()};
- }
- );
- }
- template<bool WithPayload>
- TRuntimeNode CreateSqueezeToSortedDict(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- return pb.FlatMap(
- pb.NarrowSqueezeToSortedDict(
- pb.ExpandMap(flow,
- [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item}; }
- ),
- /*all*/ false,
- /*keySelector*/ [&](TRuntimeNode::TList item) { return item.front(); },
- /*payloadSelector*/ [&](TRuntimeNode::TList ) { return WithPayload ? pb.NewDataLiteral<ui64>(0) : pb.NewVoid(); }
- ),
- [&] (TRuntimeNode item) { return pb.DictKeys(item); }
- );
- }
- TRuntimeNode CreateMapJoin(TProgramBuilder& pb, size_t vecSize, TCallable* list = nullptr) {
- TTimer t(TString(__func__) + ": ");
- auto flow = CreateFlow(pb, vecSize, list);
- const auto tupleType = pb.NewTupleType({
- pb.NewDataType(NUdf::TDataType<ui32>::Id),
- pb.NewDataType(NUdf::TDataType<ui64>::Id)
- });
- const auto list1 = pb.Map(flow, [&] (TRuntimeNode item) {
- return pb.NewTuple({pb.Mod(item, pb.NewDataLiteral<ui64>(1000)), pb.NewDataLiteral<ui32>(1)});
- });
- const auto list2 = pb.NewList(tupleType, {
- pb.NewTuple({pb.NewDataLiteral<ui32>(1), pb.NewDataLiteral<ui64>(3 * 1000)}),
- pb.NewTuple({pb.NewDataLiteral<ui32>(2), pb.NewDataLiteral<ui64>(4 * 1000)}),
- pb.NewTuple({pb.NewDataLiteral<ui32>(3), pb.NewDataLiteral<ui64>(5 * 1000)}),
- });
- const auto dict = pb.ToSortedDict(list2, false,
- [&](TRuntimeNode item) {
- return pb.Nth(item, 0);
- },
- [&](TRuntimeNode item) {
- return pb.NewTuple({pb.Nth(item, 1U)});
- });
- const auto resultType = pb.NewFlowType(pb.NewMultiType({
- pb.NewDataType(NUdf::TDataType<char*>::Id),
- pb.NewDataType(NUdf::TDataType<char*>::Id),
- }));
- return pb.Map(
- pb.NarrowMap(pb.MapJoinCore(
- pb.ExpandMap(list1, [&] (TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0), pb.Nth(item, 1)}; }),
- dict,
- EJoinKind::Inner,
- {0U},
- {1U, 0U},
- {0U, 1U},
- resultType
- ),
- [&](TRuntimeNode::TList items) { return pb.NewTuple(items); }
- ),
- [&](TRuntimeNode item) { return pb.Nth(item, 1); }
- );
- }
- Y_UNIT_TEST_SUITE(ComputationGraphDataRace) {
- template<class T>
- void ParallelProgTest(T f, bool useLLVM, ui64 testResult, size_t vecSize = 10'000) {
- TTimer t("total: ");
- const ui32 cacheSizeInBytes = 104857600; // 100 MiB
- const ui32 inFlight = 7;
- TComputationPatternLRUCache cache({cacheSizeInBytes, cacheSizeInBytes});
- auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
- auto entry = std::make_shared<TPatternCacheEntry>();
- TScopedAlloc& alloc = entry->Alloc;
- TTypeEnvironment& typeEnv = entry->Env;
- TProgramBuilder pb(typeEnv, *functionRegistry);
- const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- TRuntimeNode progReturn;
- with_lock(alloc) {
- progReturn = f(pb, vecSize, list);
- }
- TExploringNodeVisitor explorer;
- explorer.Walk(progReturn.GetNode(), typeEnv);
- TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(), functionRegistry.Get(),
- NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception, useLLVM ? "" : "OFF", EGraphPerProcess::Multi);
- {
- auto guard = entry->Env.BindAllocator();
- entry->Pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
- }
- cache.EmplacePattern("a", entry);
- auto genData = [&]() {
- std::vector<ui64> data;
- data.reserve(vecSize);
- for (ui64 i = 0; i < vecSize; ++i) {
- data.push_back((i + 124515) % 6740234);
- }
- return data;
- };
- const auto data = genData();
- std::vector<std::vector<ui64>> results(inFlight);
- NPar::LocalExecutor().RunAdditionalThreads(inFlight);
- NPar::LocalExecutor().ExecRange([&](int id) {
- for (ui32 i = 0; i < 100; ++i) {
- auto key = "a";
- auto randomProvider = CreateDeterministicRandomProvider(1);
- auto timeProvider = CreateDeterministicTimeProvider(10000000);
- TScopedAlloc graphAlloc(__LOCATION__);
- auto entry = cache.Find(key);
- TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetListTestFactory(),
- functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
- useLLVM ? "" : "OFF", EGraphPerProcess::Multi);
- auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
- TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
- std::transform(data.cbegin(), data.cend(), items,
- [](const auto s) {
- return ToValue<ui64>(s);
- });
- ui64 acc = 0;
- TUnboxedValue v = graph->GetValue();
- while (v.HasValue()) {
- acc += v.Get<ui64>();
- v = graph->GetValue();
- }
- results[id].push_back(acc);
- }
- }, 0, inFlight, NPar::TLocalExecutor::WAIT_COMPLETE | NPar::TLocalExecutor::MED_PRIORITY);
- for (auto threadResults : results) {
- for (auto res : threadResults) {
- UNIT_ASSERT_VALUES_EQUAL(res, testResult);
- }
- }
- }
- Y_UNIT_TEST_QUAD(Filter, Wide, UseLLVM) {
- ParallelProgTest(CreateFilter<Wide>, UseLLVM, 10098816);
- }
- Y_UNIT_TEST_QUAD(Map, Wide, UseLLVM) {
- ParallelProgTest(CreateMap<Wide>, UseLLVM, 78);
- }
- Y_UNIT_TEST_QUAD(Condense, Wide, UseLLVM) {
- ParallelProgTest(CreateCondense<Wide>, UseLLVM, 1295145000);
- }
- Y_UNIT_TEST_QUAD(Chopper, Wide, UseLLVM) {
- ParallelProgTest(CreateChopper<Wide>, UseLLVM, 1295145000);
- }
- Y_UNIT_TEST_QUAD(Combine, Wide, UseLLVM) {
- ParallelProgTest(CreateCombine<Wide>, UseLLVM, 1295145000);
- }
- Y_UNIT_TEST_QUAD(Chain1Map, Wide, UseLLVM) {
- ParallelProgTest(CreateChain1Map<Wide>, UseLLVM, 6393039240000);
- }
- Y_UNIT_TEST_QUAD(Discard, Wide, UseLLVM) {
- ParallelProgTest(CreateDiscard<Wide>, UseLLVM, 0);
- }
- Y_UNIT_TEST_QUAD(Skip, Wide, UseLLVM) {
- ParallelProgTest(CreateSkip<Wide>, UseLLVM, 1232762750);
- }
- Y_UNIT_TEST_QUAD(NarrowFlatMap, Flow, UseLLVM) {
- ParallelProgTest(CreateNarrowFlatMap<Flow>, UseLLVM, 1295145000);
- }
- Y_UNIT_TEST_TWIN(NarrowMultiMap, UseLLVM) {
- ParallelProgTest(CreateNarrowMultiMap, UseLLVM, 1295145000ull * 2);
- }
- Y_UNIT_TEST_QUAD(SqueezeToSortedDict, WithPayload, UseLLVM) {
- ParallelProgTest(CreateSqueezeToSortedDict<WithPayload>, UseLLVM, 125014500, 1000);
- }
- Y_UNIT_TEST_TWIN(MapJoin, UseLLVM) {
- ParallelProgTest(CreateMapJoin, UseLLVM, 120000, 10'000);
- }
- }
- Y_UNIT_TEST_SUITE(ComputationPatternCache) {
- Y_UNIT_TEST(Smoke) {
- const ui32 cacheSize = 10'000'000;
- const ui32 cacheItems = 10;
- TComputationPatternLRUCache cache({cacheSize, cacheSize});
- auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
- for (ui32 i = 0; i < cacheItems; ++i) {
- auto entry = std::make_shared<TPatternCacheEntry>();
- TScopedAlloc& alloc = entry->Alloc;
- TTypeEnvironment& typeEnv = entry->Env;
- TProgramBuilder pb(typeEnv, *functionRegistry);
- TRuntimeNode progReturn;
- with_lock(alloc) {
- progReturn = pb.NewDataLiteral<NYql::NUdf::EDataSlot::String>("qwerty");
- }
- TExploringNodeVisitor explorer;
- explorer.Walk(progReturn.GetNode(), typeEnv);
- TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
- functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
- "OFF", EGraphPerProcess::Multi);
- {
- auto guard = entry->Env.BindAllocator();
- entry->Pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
- }
- // XXX: There is no way to accurately define how the entry's
- // allocator obtains the memory pages: using the free ones from the
- // global page pool or the ones directly requested by <mmap>. At the
- // same time, it is the total allocated bytes (not just the number
- // of the borrowed pages) that is a good estimate of the memory
- // consumed by the pattern cache entry for real life workload.
- // Hence, to avoid undesired cache flushes, release the free pages
- // of the allocator of the particular entry.
- alloc.ReleaseFreePages();
- cache.EmplacePattern(TString((char)('a' + i)), entry);
- }
- for (ui32 i = 0; i < cacheItems; ++i) {
- auto key = TString((char)('a' + i));
- auto randomProvider = CreateDeterministicRandomProvider(1);
- auto timeProvider = CreateDeterministicTimeProvider(10000000);
- TScopedAlloc graphAlloc(__LOCATION__);
- auto entry = cache.Find(key);
- UNIT_ASSERT(entry);
- TComputationPatternOpts opts(entry->Alloc.Ref(), entry->Env, GetBuiltinFactory(),
- functionRegistry.Get(), NUdf::EValidateMode::Lazy, NUdf::EValidatePolicy::Exception,
- "OFF", EGraphPerProcess::Multi);
- auto graph = entry->Pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider, &graphAlloc.Ref()));
- auto value = graph->GetValue();
- UNIT_ASSERT_EQUAL(NYql::NUdf::TStringRef("qwerty"), value.AsStringRef());
- }
- }
- Y_UNIT_TEST(DoubleNotifyPatternCompiled) {
- class TMockComputationPattern final : public IComputationPattern {
- public:
- explicit TMockComputationPattern(size_t codeSize) : Size_(codeSize) {}
- void Compile(TString, IStatsRegistry*) override { Compiled_ = true; }
- bool IsCompiled() const override { return Compiled_; }
- size_t CompiledCodeSize() const override { return Size_; }
- void RemoveCompiledCode() override { Compiled_ = false; }
- THolder<IComputationGraph> Clone(const TComputationOptsFull&) override { return {}; }
- bool GetSuitableForCache() const override { return true; }
- private:
- const size_t Size_;
- bool Compiled_ = false;
- };
- const TString key = "program";
- const ui32 cacheSize = 2;
- TComputationPatternLRUCache cache({cacheSize, cacheSize});
- auto entry = std::make_shared<TPatternCacheEntry>();
- entry->Pattern = MakeIntrusive<TMockComputationPattern>(1u);
- cache.EmplacePattern(key, entry);
- for (ui32 i = 0; i < cacheSize + 1; ++i) {
- entry->Pattern->Compile("", nullptr);
- cache.NotifyPatternCompiled(key);
- }
- entry = std::make_shared<TPatternCacheEntry>();
- entry->Pattern = MakeIntrusive<TMockComputationPattern>(cacheSize + 1);
- entry->Pattern->Compile("", nullptr);
- cache.EmplacePattern(key, entry);
- }
- Y_UNIT_TEST(AddPerf) {
- TTimer t("all: ");
- TScopedAlloc alloc(__LOCATION__);
- TTypeEnvironment typeEnv(alloc);
- auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
- TProgramBuilder pb(typeEnv, *functionRegistry);
- auto prog1 = pb.NewDataLiteral<ui64>(123591592ULL);
- auto prog2 = pb.NewDataLiteral<ui64>(323591592ULL);
- auto progReturn = pb.Add(prog1, prog2);
- TExploringNodeVisitor explorer;
- explorer.Walk(progReturn.GetNode(), typeEnv);
- NUdf::EValidateMode validateMode = NUdf::EValidateMode::Lazy;
- TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetBuiltinFactory(),
- functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception,
- "OFF", EGraphPerProcess::Multi);
- auto t_make_pattern = std::make_unique<TTimer>("make_pattern: ");
- auto pattern = MakeComputationPattern(explorer, progReturn, {}, opts);
- t_make_pattern.reset();
- auto randomProvider = CreateDeterministicRandomProvider(1);
- auto timeProvider = CreateDeterministicTimeProvider(10000000);
- auto t_clone = std::make_unique<TTimer>("clone: ");
- auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
- t_clone.reset();
- const ui64 repeats = 100'000;
- {
- TTimer t("graph: ");
- ui64 acc = 0;
- for (ui64 i = 0; i < repeats; ++i) {
- acc += graph->GetValue().Get<ui64>();
- }
- Y_DO_NOT_OPTIMIZE_AWAY(acc);
- }
- {
- std::function<ui64(ui64, ui64)> add = [](ui64 a, ui64 b) {
- return a + b;
- };
- TTimer t("lambda: ");
- ui64 acc = 0;
- for (ui64 i = 0; i < repeats; ++i) {
- acc += add(123591592ULL, 323591592ULL);
- }
- Y_DO_NOT_OPTIMIZE_AWAY(acc);
- }
- {
- std::function<TUnboxedValue(TUnboxedValue&, TUnboxedValue&)> add =
- [](TUnboxedValue& a, TUnboxedValue& b) {
- return TUnboxedValuePod(a.Get<ui64>() + b.Get<ui64>());
- };
- Y_DO_NOT_OPTIMIZE_AWAY(add);
- TTimer t("lambda unboxed value: ");
- TUnboxedValue acc(TUnboxedValuePod(0));
- TUnboxedValue v1(TUnboxedValuePod(ui64{123591592UL}));
- TUnboxedValue v2(TUnboxedValuePod(ui64{323591592UL}));
- for (ui64 i = 0; i < repeats; ++i) {
- auto r = add(v1, v2);
- acc = add(r, acc);
- }
- Y_DO_NOT_OPTIMIZE_AWAY(acc.Get<ui64>());
- }
- }
- Y_UNIT_TEST_TWIN(FilterPerf, Wide) {
- TScopedAlloc alloc(__LOCATION__);
- TTypeEnvironment typeEnv(alloc);
- auto functionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone();
- TProgramBuilder pb(typeEnv, *functionRegistry);
- const ui64 vecSize = 100'000;
- Cerr << "vecSize: " << vecSize << Endl;
- const auto listType = pb.NewListType(pb.NewDataType(NUdf::TDataType<ui64>::Id));
- const auto list = TCallableBuilder(pb.GetTypeEnvironment(), "TestList", listType).Build();
- auto progReturn = CreateFilter<Wide>(pb, vecSize, list);
- TExploringNodeVisitor explorer;
- explorer.Walk(progReturn.GetNode(), typeEnv);
- NUdf::EValidateMode validateMode = NUdf::EValidateMode::Max;
- TComputationPatternOpts opts(alloc.Ref(), typeEnv, GetListTestFactory(),
- functionRegistry.Get(), validateMode, NUdf::EValidatePolicy::Exception,
- "OFF", EGraphPerProcess::Multi);
- auto t_make_pattern = std::make_unique<TTimer>("make_pattern: ");
- auto pattern = MakeComputationPattern(explorer, progReturn, {list}, opts);
- t_make_pattern.reset();
- auto randomProvider = CreateDeterministicRandomProvider(1);
- auto timeProvider = CreateDeterministicTimeProvider(10000000);
- auto t_clone = std::make_unique<TTimer>("clone: ");
- auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
- t_clone.reset();
- auto genData = [&]() {
- std::vector<ui64> data;
- data.reserve(vecSize);
- for (ui64 i = 0; i < vecSize; ++i) {
- data.push_back((i + 124515) % 6740234);
- }
- return data;
- };
- auto testResult = [&] (ui64 acc, ui64 count) {
- if (vecSize == 100'000'000) {
- UNIT_ASSERT_VALUES_EQUAL(acc, 2614128386688);
- UNIT_ASSERT_VALUES_EQUAL(count, 781263);
- } else if (vecSize == 10'000'000) {
- UNIT_ASSERT_VALUES_EQUAL(acc, 222145217664);
- } else if (vecSize == 100'000) {
- UNIT_ASSERT_VALUES_EQUAL(acc, 136480896);
- UNIT_ASSERT_VALUES_EQUAL(count, 782);
- } else {
- UNIT_FAIL("result is not checked");
- }
- };
- ui64 kIter = 2;
- {
- TDuration total;
- for (ui64 i = 0; i < kIter; ++i) {
- ui64 acc = 0;
- ui64 count = 0;
- auto graph = pattern->Clone(opts.ToComputationOptions(*randomProvider, *timeProvider));
- auto data = genData();
- TUnboxedValue* items = nullptr;
- graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), graph->GetHolderFactory().CreateDirectArrayHolder(data.size(), items));
- std::transform(data.cbegin(), data.cend(), items,
- [](const auto s) {
- return ToValue<ui64>(s);
- });
- TSimpleTimer t;
- TUnboxedValue v = graph->GetValue();
- while (v.HasValue()) {
- acc += v.Get<ui64>();
- ++count;
- v = graph->GetValue();
- }
- testResult(acc, count);
- total += t.Get();
- }
- Cerr << "graph: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
- }
- {
- auto data = genData();
- std::function<bool(ui64)> predicate = [](ui64 a) {
- return a % 128 == 0;
- };
- Y_DO_NOT_OPTIMIZE_AWAY(predicate);
- TDuration total;
- for (ui64 i = 0; i < kIter; ++i) {
- TSimpleTimer t;
- ui64 acc = 0;
- ui64 count = 0;
- for (ui64 j = 0; j < data.size(); ++j) {
- if (predicate(data[j])) {
- acc += data[j];
- ++count;
- }
- }
- total += t.Get();
- testResult(acc, count);
- }
- Cerr << "std::function: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
- }
- {
- auto data = genData();
- static auto predicate = [](ui64 a) {
- return a % 128 == 0;
- };
- Y_DO_NOT_OPTIMIZE_AWAY(predicate);
- TDuration total;
- for (ui64 i = 0; i < kIter; ++i) {
- TSimpleTimer t;
- ui64 acc = 0;
- ui64 count = 0;
- for (ui64 j = 0; j < data.size(); ++j) {
- if (predicate(data[j])) {
- acc += data[j];
- ++count;
- }
- }
- total += t.Get();
- testResult(acc, count);
- }
- Cerr << "lambda: " << Sprintf("%.3f", total.SecondsFloat()) << "s" << Endl;
- }
- }
- }
- }
- }
|