#include #include #include #include #include #include #include #include #include #include namespace NKikimr { namespace NMiniKQL { namespace { TIntrusivePtr CreateRandomProvider() { return CreateDeterministicRandomProvider(1); } TIntrusivePtr CreateTimeProvider() { return CreateDeterministicTimeProvider(10000000); } TComputationNodeFactory GetAuxCallableFactory() { return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { if (callable.GetType()->GetName() == "OneYieldStream") { return new TExternalComputationNode(ctx.Mutables); } return GetBuiltinFactory()(callable, ctx); }; } struct TSetup { TSetup(TScopedAlloc& alloc) : Alloc(alloc) { FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry()); RandomProvider = CreateRandomProvider(); TimeProvider = CreateTimeProvider(); Env.Reset(new TTypeEnvironment(Alloc)); PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry)); } THolder BuildGraph(TRuntimeNode pgm, const std::vector& entryPoints = std::vector()) { Explorer.Walk(pgm.GetNode(), *Env); TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(), FunctionRegistry.Get(), NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi); Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts); TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider); return Pattern->Clone(compOpts); } TIntrusivePtr FunctionRegistry; TIntrusivePtr RandomProvider; TIntrusivePtr TimeProvider; TScopedAlloc& Alloc; THolder Env; THolder PgmBuilder; TExploringNodeVisitor Explorer; IComputationPattern::TPtr Pattern; }; struct TStreamWithYield : public NUdf::TBoxedValue { TStreamWithYield(const TUnboxedValueVector& items, ui32 yieldPos, ui32 index) : Items(items) , YieldPos(yieldPos) , Index(index) {} private: TUnboxedValueVector Items; ui32 YieldPos; ui32 Index; ui32 GetTraverseCount() const override { return 0; } NUdf::TUnboxedValue Save() const override { return NUdf::TUnboxedValue::Zero(); } bool Load2(const NUdf::TUnboxedValue& state) override { Y_UNUSED(state); return false; } NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final { if (Index >= Items.size()) { return NUdf::EFetchStatus::Finish; } if (Index == YieldPos) { return NUdf::EFetchStatus::Yield; } result = Items[Index++]; return NUdf::EFetchStatus::Ok; } }; } Y_UNIT_TEST_SUITE(TMiniKQLSaveLoadTest) { Y_UNIT_TEST(TestSqueezeSaveLoad) { TScopedAlloc alloc(__LOCATION__); const std::vector items = {2, 3, 4, 5, 6, 7, 8}; auto buildGraph = [&items] (TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder { TProgramBuilder& pgmBuilder = *setup.PgmBuilder; auto dataType = pgmBuilder.NewDataType(NUdf::TDataType::Id); auto streamType = pgmBuilder.NewStreamType(dataType); TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", streamType); auto streamNode = inStream.Build(); auto pgmReturn = pgmBuilder.Squeeze( TRuntimeNode(streamNode, false), pgmBuilder.NewDataLiteral(1), [&](TRuntimeNode item, TRuntimeNode state) { return pgmBuilder.Add(item, state); }, [](TRuntimeNode state) { return state; }, [](TRuntimeNode state) { return state; }); TUnboxedValueVector streamItems; for (auto item : items) { streamItems.push_back(NUdf::TUnboxedValuePod(item)); } auto graph = setup.BuildGraph(pgmReturn, {streamNode}); auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); return graph; }; for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) { TSetup setup1(alloc); auto graph1 = buildGraph(setup1, yieldPos, 0); auto root1 = graph1->GetValue(); NUdf::TUnboxedValue res; auto status = root1.Fetch(res); UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); TString graphState; SaveGraphState(&root1, 1, 0ULL, graphState); TSetup setup2(alloc); auto graph2 = buildGraph(setup2, -1, yieldPos); auto root2 = graph2->GetValue(); LoadGraphState(&root2, 1, 0ULL, graphState); status = root2.Fetch(res); UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Ok); UNIT_ASSERT_EQUAL(res.Get(), 36); status = root2.Fetch(res); UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); } } Y_UNIT_TEST(TestSqueeze1SaveLoad) { TScopedAlloc alloc(__LOCATION__); const std::vector items = {1, 2, 3, 4, 5, 6, 7, 8}; auto buildGraph = [&items] (TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder { TProgramBuilder& pgmBuilder = *setup.PgmBuilder; auto dataType = pgmBuilder.NewDataType(NUdf::TDataType::Id); auto streamType = pgmBuilder.NewStreamType(dataType); TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", streamType); auto streamNode = inStream.Build(); auto pgmReturn = pgmBuilder.Squeeze1( TRuntimeNode(streamNode, false), [](TRuntimeNode item) { return item; }, [&](TRuntimeNode item, TRuntimeNode state) { return pgmBuilder.Add(item, state); }, [](TRuntimeNode state) { return state; }, [](TRuntimeNode state) { return state; }); TUnboxedValueVector streamItems; for (auto item : items) { streamItems.push_back(NUdf::TUnboxedValuePod(item)); } auto graph = setup.BuildGraph(pgmReturn, {streamNode}); auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); return graph; }; for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) { TSetup setup1(alloc); auto graph1 = buildGraph(setup1, yieldPos, 0); auto root1 = graph1->GetValue(); NUdf::TUnboxedValue res; auto status = root1.Fetch(res); UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); TString graphState; SaveGraphState(&root1, 1, 0ULL, graphState); TSetup setup2(alloc); auto graph2 = buildGraph(setup2, -1, yieldPos); auto root2 = graph2->GetValue(); LoadGraphState(&root2, 1, 0ULL, graphState); status = root2.Fetch(res); UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Ok); UNIT_ASSERT_EQUAL(res.Get(), 36); status = root2.Fetch(res); UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); } } Y_UNIT_TEST(TestHoppingSaveLoad) { TScopedAlloc alloc(__LOCATION__); const std::vector> items = { {1, 2}, {2, 3}, {15, 4}, {23, 6}, {24, 5}, {25, 7}, {40, 2}, {47, 1}, {51, 6}, {59, 2}, {85, 8}, {55, 1000}, {200, 0} }; auto buildGraph = [&items] (TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder { TProgramBuilder& pgmBuilder = *setup.PgmBuilder; auto structType = pgmBuilder.NewEmptyStructType(); structType = pgmBuilder.NewStructType(structType, "time", pgmBuilder.NewDataType(NUdf::TDataType::Id)); structType = pgmBuilder.NewStructType(structType, "sum", pgmBuilder.NewDataType(NUdf::TDataType::Id)); auto timeIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("time"); auto sumIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("sum"); auto inStreamType = pgmBuilder.NewStreamType(structType); TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", inStreamType); auto streamNode = inStream.Build(); ui64 hop = 10, interval = 30, delay = 20; auto pgmReturn = pgmBuilder.HoppingCore( TRuntimeNode(streamNode, false), [&](TRuntimeNode item) { // timeExtractor return pgmBuilder.Member(item, "time"); }, [&](TRuntimeNode item) { // init std::vector> members; members.emplace_back("sum", pgmBuilder.Member(item, "sum")); return pgmBuilder.NewStruct(members); }, [&](TRuntimeNode item, TRuntimeNode state) { // update auto add = pgmBuilder.AggrAdd( pgmBuilder.Member(item, "sum"), pgmBuilder.Member(state, "sum")); std::vector> members; members.emplace_back("sum", add); return pgmBuilder.NewStruct(members); }, [&](TRuntimeNode state) { // save return pgmBuilder.Member(state, "sum"); }, [&](TRuntimeNode savedState) { // load std::vector> members; members.emplace_back("sum", savedState); return pgmBuilder.NewStruct(members); }, [&](TRuntimeNode state1, TRuntimeNode state2) { // merge auto add = pgmBuilder.AggrAdd( pgmBuilder.Member(state1, "sum"), pgmBuilder.Member(state2, "sum")); std::vector> members; members.emplace_back("sum", add); return pgmBuilder.NewStruct(members); }, [&](TRuntimeNode state, TRuntimeNode time) { // finish Y_UNUSED(time); std::vector> members; members.emplace_back("sum", pgmBuilder.Member(state, "sum")); return pgmBuilder.NewStruct(members); }, pgmBuilder.NewDataLiteral(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop pgmBuilder.NewDataLiteral(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval pgmBuilder.NewDataLiteral(NUdf::TStringRef((const char*)&delay, sizeof(delay))) // delay ); auto graph = setup.BuildGraph(pgmReturn, {streamNode}); TUnboxedValueVector streamItems; for (size_t i = 0; i < items.size(); ++i) { NUdf::TUnboxedValue* itemsPtr; auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(2, itemsPtr); itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(items[i].first); itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(items[i].second); streamItems.push_back(std::move(structValues)); } auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex)); graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue)); return graph; }; for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) { std::vector result; TSetup setup1(alloc); auto graph1 = buildGraph(setup1, yieldPos, 0); auto root1 = graph1->GetValue(); NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok; while (status == NUdf::EFetchStatus::Ok) { NUdf::TUnboxedValue val; status = root1.Fetch(val); if (status == NUdf::EFetchStatus::Ok) { result.push_back(val.GetElement(0).Get()); } } UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield); TString graphState; SaveGraphState(&root1, 1, 0ULL, graphState); TSetup setup2(alloc); auto graph2 = buildGraph(setup2, -1, yieldPos); auto root2 = graph2->GetValue(); LoadGraphState(&root2, 1, 0ULL, graphState); status = NUdf::EFetchStatus::Ok; while (status == NUdf::EFetchStatus::Ok) { NUdf::TUnboxedValue val; status = root2.Fetch(val); if (status == NUdf::EFetchStatus::Ok) { result.push_back(val.GetElement(0).Get()); } } UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish); const std::vector resultCompare = {5, 9, 27, 22, 21, 11, 11, 8, 8, 8, 8}; UNIT_ASSERT_EQUAL(result, resultCompare); } } } } // namespace NMiniKQL } // namespace NKikimr