#include "../mkql_time_order_recover.h" #include #include #include #include #include #include #include #include #include #include namespace NKikimr::NMiniKQL { namespace { TIntrusivePtr CreateRandomProvider() { return CreateDeterministicRandomProvider(1); } TIntrusivePtr CreateTimeProvider() { return CreateDeterministicTimeProvider(10000000); } struct TSetup { TSetup(TScopedAlloc& alloc) : Alloc(alloc) { FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry()); RandomProvider = CreateRandomProvider(); TimeProvider = CreateTimeProvider(); Env.Reset(new TTypeEnvironment(Alloc)); PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry)); } THolder BuildGraph(TRuntimeNode pgm, const std::vector& entryPoints = std::vector()) { Explorer.Walk(pgm.GetNode(), *Env); TComputationPatternOpts opts( Alloc.Ref(), *Env, GetBuiltinFactory(), FunctionRegistry.Get(), NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi); Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts); TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider); return Pattern->Clone(compOpts); } TIntrusivePtr FunctionRegistry; TIntrusivePtr RandomProvider; TIntrusivePtr TimeProvider; TScopedAlloc& Alloc; THolder Env; THolder PgmBuilder; TExploringNodeVisitor Explorer; IComputationPattern::TPtr Pattern; }; using TTestInputData = std::vector>; THolder BuildGraph( TSetup& setup, bool streamingMode, const TTestInputData& input) { TProgramBuilder& pgmBuilder = *setup.PgmBuilder; const auto structType = pgmBuilder.NewStructType({ {"time", pgmBuilder.NewDataType(NUdf::EDataSlot::Int64)}, {"key", pgmBuilder.NewDataType(NUdf::EDataSlot::String)}, {"sum", pgmBuilder.NewDataType(NUdf::EDataSlot::Uint32)}, {"part", pgmBuilder.NewDataType(NUdf::EDataSlot::String)} }); TVector items; for (size_t i = 0; i < input.size(); ++i) { const auto& [time, key, sum, part] = input[i]; items.push_back(pgmBuilder.NewStruct({ {"time", pgmBuilder.NewDataLiteral(time)}, {"key", pgmBuilder.NewDataLiteral(key)}, {"sum", pgmBuilder.NewDataLiteral(sum)}, {"part", pgmBuilder.NewDataLiteral(part)}, })); } const auto list = pgmBuilder.NewList(structType, std::move(items)); auto inputFlow = pgmBuilder.ToFlow(list); auto pgmReturn = pgmBuilder.MatchRecognizeCore( inputFlow, [&](TRuntimeNode item) { return pgmBuilder.NewTuple({pgmBuilder.Member(item, "part")}); }, {}, {"key"sv}, {[&](TRuntimeNode /*measureInputDataArg*/, TRuntimeNode /*matchedVarsArg*/) { return pgmBuilder.NewDataLiteral(56); }}, { {NYql::NMatchRecognize::TRowPatternFactor{"A", 3, 3, false, false, false}} }, {"A"sv}, {[&](TRuntimeNode /*inputDataArg*/, TRuntimeNode /*matchedVarsArg*/, TRuntimeNode /*currentRowIndexArg*/) { return pgmBuilder.NewDataLiteral(true); }}, streamingMode, {NYql::NMatchRecognize::EAfterMatchSkipTo::NextRow, ""}, NYql::NMatchRecognize::ERowsPerMatch::OneRow ); auto graph = setup.BuildGraph(pgmReturn); return graph; } } Y_UNIT_TEST_SUITE(MatchRecognizeSaveLoadTest) { void TestWithSaveLoadImpl(bool streamingMode) { TScopedAlloc alloc(__LOCATION__); std::vector> result; TSetup setup1(alloc); const TTestInputData input = { {1000, "A", 101, "P"}, {1001, "B", 102, "P"}, {1002, "C", 103, "P"}, // <- match end {1003, "D", 103, "P"}}; // <- not processed auto graph1 = BuildGraph(setup1,streamingMode, input); auto value = graph1->GetValue(); UNIT_ASSERT(!value.IsFinish() && value); auto v = value.GetElement(0).Get(); TString graphState = graph1->SaveGraphState(); graph1.Reset(); TSetup setup2(alloc); auto graph2 = BuildGraph(setup2, streamingMode, TTestInputData{{1003, "D", 103, "P"}}); graph2->LoadGraphState(graphState); value = graph2->GetValue(); UNIT_ASSERT(!value.IsFinish() && value); v = value.GetElement(0).Get(); UNIT_ASSERT_VALUES_EQUAL(56, v); } Y_UNIT_TEST(StreamingMode) { TestWithSaveLoadImpl(true); } Y_UNIT_TEST(NotStreamingMode) { TestWithSaveLoadImpl(false); } } } // namespace NKikimr::NMiniKQL