mkql_computation_node_graph_saveload_ut.cpp 15 KB


  1. #include <yql/essentials/minikql/mkql_node.h>
  2. #include <yql/essentials/minikql/mkql_node_cast.h>
  3. #include <yql/essentials/minikql/mkql_program_builder.h>
  4. #include <yql/essentials/minikql/mkql_function_registry.h>
  5. #include <yql/essentials/minikql/computation/mkql_computation_node.h>
  6. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  7. #include <yql/essentials/minikql/computation/mkql_computation_node_graph_saveload.h>
  8. #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
  9. #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
  10. #include <library/cpp/testing/unittest/registar.h>
  11. namespace NKikimr {
  12. namespace NMiniKQL {
  13. namespace {
  14. TIntrusivePtr<IRandomProvider> CreateRandomProvider() {
  15. return CreateDeterministicRandomProvider(1);
  16. }
  17. TIntrusivePtr<ITimeProvider> CreateTimeProvider() {
  18. return CreateDeterministicTimeProvider(10000000);
  19. }
  20. TComputationNodeFactory GetAuxCallableFactory() {
  21. return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
  22. if (callable.GetType()->GetName() == "OneYieldStream") {
  23. return new TExternalComputationNode(ctx.Mutables);
  24. }
  25. return GetBuiltinFactory()(callable, ctx);
  26. };
  27. }
  28. struct TSetup {
  29. TSetup(TScopedAlloc& alloc)
  30. : Alloc(alloc)
  31. {
  32. FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry());
  33. RandomProvider = CreateRandomProvider();
  34. TimeProvider = CreateTimeProvider();
  35. Env.Reset(new TTypeEnvironment(Alloc));
  36. PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry));
  37. }
  38. THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) {
  39. Explorer.Walk(pgm.GetNode(), *Env);
  40. TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(),
  41. FunctionRegistry.Get(),
  42. NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi);
  43. Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts);
  44. TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider);
  45. return Pattern->Clone(compOpts);
  46. }
  47. TIntrusivePtr<IFunctionRegistry> FunctionRegistry;
  48. TIntrusivePtr<IRandomProvider> RandomProvider;
  49. TIntrusivePtr<ITimeProvider> TimeProvider;
  50. TScopedAlloc& Alloc;
  51. THolder<TTypeEnvironment> Env;
  52. THolder<TProgramBuilder> PgmBuilder;
  53. TExploringNodeVisitor Explorer;
  54. IComputationPattern::TPtr Pattern;
  55. };
  56. struct TStreamWithYield : public NUdf::TBoxedValue {
  57. TStreamWithYield(const TUnboxedValueVector& items, ui32 yieldPos, ui32 index)
  58. : Items(items)
  59. , YieldPos(yieldPos)
  60. , Index(index)
  61. {}
  62. private:
  63. TUnboxedValueVector Items;
  64. ui32 YieldPos;
  65. ui32 Index;
  66. ui32 GetTraverseCount() const override {
  67. return 0;
  68. }
  69. NUdf::TUnboxedValue Save() const override {
  70. return NUdf::TUnboxedValue::Zero();
  71. }
  72. bool Load2(const NUdf::TUnboxedValue& state) override {
  73. Y_UNUSED(state);
  74. return false;
  75. }
  76. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final {
  77. if (Index >= Items.size()) {
  78. return NUdf::EFetchStatus::Finish;
  79. }
  80. if (Index == YieldPos) {
  81. return NUdf::EFetchStatus::Yield;
  82. }
  83. result = Items[Index++];
  84. return NUdf::EFetchStatus::Ok;
  85. }
  86. };
  87. }
  88. Y_UNIT_TEST_SUITE(TMiniKQLSaveLoadTest) {
  89. Y_UNIT_TEST(TestSqueezeSaveLoad) {
  90. TScopedAlloc alloc(__LOCATION__);
  91. const std::vector<ui32> items = {2, 3, 4, 5, 6, 7, 8};
  92. auto buildGraph = [&items] (TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder<IComputationGraph> {
  93. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  94. auto dataType = pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id);
  95. auto streamType = pgmBuilder.NewStreamType(dataType);
  96. TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", streamType);
  97. auto streamNode = inStream.Build();
  98. auto pgmReturn = pgmBuilder.Squeeze(
  99. TRuntimeNode(streamNode, false),
  100. pgmBuilder.NewDataLiteral<ui32>(1),
  101. [&](TRuntimeNode item, TRuntimeNode state) {
  102. return pgmBuilder.Add(item, state);
  103. },
  104. [](TRuntimeNode state) {
  105. return state;
  106. },
  107. [](TRuntimeNode state) {
  108. return state;
  109. });
  110. TUnboxedValueVector streamItems;
  111. for (auto item : items) {
  112. streamItems.push_back(NUdf::TUnboxedValuePod(item));
  113. }
  114. auto graph = setup.BuildGraph(pgmReturn, {streamNode});
  115. auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex));
  116. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
  117. return graph;
  118. };
  119. for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) {
  120. TSetup setup1(alloc);
  121. auto graph1 = buildGraph(setup1, yieldPos, 0);
  122. auto root1 = graph1->GetValue();
  123. NUdf::TUnboxedValue res;
  124. auto status = root1.Fetch(res);
  125. UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield);
  126. TString graphState;
  127. SaveGraphState(&root1, 1, 0ULL, graphState);
  128. TSetup setup2(alloc);
  129. auto graph2 = buildGraph(setup2, -1, yieldPos);
  130. auto root2 = graph2->GetValue();
  131. LoadGraphState(&root2, 1, 0ULL, graphState);
  132. status = root2.Fetch(res);
  133. UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Ok);
  134. UNIT_ASSERT_EQUAL(res.Get<ui32>(), 36);
  135. status = root2.Fetch(res);
  136. UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish);
  137. }
  138. }
  139. Y_UNIT_TEST(TestSqueeze1SaveLoad) {
  140. TScopedAlloc alloc(__LOCATION__);
  141. const std::vector<ui32> items = {1, 2, 3, 4, 5, 6, 7, 8};
  142. auto buildGraph = [&items] (TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder<IComputationGraph> {
  143. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  144. auto dataType = pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id);
  145. auto streamType = pgmBuilder.NewStreamType(dataType);
  146. TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", streamType);
  147. auto streamNode = inStream.Build();
  148. auto pgmReturn = pgmBuilder.Squeeze1(
  149. TRuntimeNode(streamNode, false),
  150. [](TRuntimeNode item) {
  151. return item;
  152. },
  153. [&](TRuntimeNode item, TRuntimeNode state) {
  154. return pgmBuilder.Add(item, state);
  155. },
  156. [](TRuntimeNode state) {
  157. return state;
  158. },
  159. [](TRuntimeNode state) {
  160. return state;
  161. });
  162. TUnboxedValueVector streamItems;
  163. for (auto item : items) {
  164. streamItems.push_back(NUdf::TUnboxedValuePod(item));
  165. }
  166. auto graph = setup.BuildGraph(pgmReturn, {streamNode});
  167. auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex));
  168. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
  169. return graph;
  170. };
  171. for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) {
  172. TSetup setup1(alloc);
  173. auto graph1 = buildGraph(setup1, yieldPos, 0);
  174. auto root1 = graph1->GetValue();
  175. NUdf::TUnboxedValue res;
  176. auto status = root1.Fetch(res);
  177. UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield);
  178. TString graphState;
  179. SaveGraphState(&root1, 1, 0ULL, graphState);
  180. TSetup setup2(alloc);
  181. auto graph2 = buildGraph(setup2, -1, yieldPos);
  182. auto root2 = graph2->GetValue();
  183. LoadGraphState(&root2, 1, 0ULL, graphState);
  184. status = root2.Fetch(res);
  185. UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Ok);
  186. UNIT_ASSERT_EQUAL(res.Get<ui32>(), 36);
  187. status = root2.Fetch(res);
  188. UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish);
  189. }
  190. }
  191. Y_UNIT_TEST(TestHoppingSaveLoad) {
  192. TScopedAlloc alloc(__LOCATION__);
  193. const std::vector<std::pair<i64, ui32>> items = {
  194. {1, 2},
  195. {2, 3},
  196. {15, 4},
  197. {23, 6},
  198. {24, 5},
  199. {25, 7},
  200. {40, 2},
  201. {47, 1},
  202. {51, 6},
  203. {59, 2},
  204. {85, 8},
  205. {55, 1000},
  206. {200, 0}
  207. };
  208. auto buildGraph = [&items] (TSetup& setup, ui32 yieldPos, ui32 startIndex) -> THolder<IComputationGraph> {
  209. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  210. auto structType = pgmBuilder.NewEmptyStructType();
  211. structType = pgmBuilder.NewStructType(structType, "time",
  212. pgmBuilder.NewDataType(NUdf::TDataType<NUdf::TTimestamp>::Id));
  213. structType = pgmBuilder.NewStructType(structType, "sum",
  214. pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id));
  215. auto timeIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("time");
  216. auto sumIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("sum");
  217. auto inStreamType = pgmBuilder.NewStreamType(structType);
  218. TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", inStreamType);
  219. auto streamNode = inStream.Build();
  220. ui64 hop = 10, interval = 30, delay = 20;
  221. auto pgmReturn = pgmBuilder.HoppingCore(
  222. TRuntimeNode(streamNode, false),
  223. [&](TRuntimeNode item) { // timeExtractor
  224. return pgmBuilder.Member(item, "time");
  225. },
  226. [&](TRuntimeNode item) { // init
  227. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  228. members.emplace_back("sum", pgmBuilder.Member(item, "sum"));
  229. return pgmBuilder.NewStruct(members);
  230. },
  231. [&](TRuntimeNode item, TRuntimeNode state) { // update
  232. auto add = pgmBuilder.AggrAdd(
  233. pgmBuilder.Member(item, "sum"),
  234. pgmBuilder.Member(state, "sum"));
  235. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  236. members.emplace_back("sum", add);
  237. return pgmBuilder.NewStruct(members);
  238. },
  239. [&](TRuntimeNode state) { // save
  240. return pgmBuilder.Member(state, "sum");
  241. },
  242. [&](TRuntimeNode savedState) { // load
  243. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  244. members.emplace_back("sum", savedState);
  245. return pgmBuilder.NewStruct(members);
  246. },
  247. [&](TRuntimeNode state1, TRuntimeNode state2) { // merge
  248. auto add = pgmBuilder.AggrAdd(
  249. pgmBuilder.Member(state1, "sum"),
  250. pgmBuilder.Member(state2, "sum"));
  251. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  252. members.emplace_back("sum", add);
  253. return pgmBuilder.NewStruct(members);
  254. },
  255. [&](TRuntimeNode state, TRuntimeNode time) { // finish
  256. Y_UNUSED(time);
  257. std::vector<std::pair<std::string_view, TRuntimeNode>> members;
  258. members.emplace_back("sum", pgmBuilder.Member(state, "sum"));
  259. return pgmBuilder.NewStruct(members);
  260. },
  261. pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop
  262. pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval
  263. pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))) // delay
  264. );
  265. auto graph = setup.BuildGraph(pgmReturn, {streamNode});
  266. TUnboxedValueVector streamItems;
  267. for (size_t i = 0; i < items.size(); ++i) {
  268. NUdf::TUnboxedValue* itemsPtr;
  269. auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(2, itemsPtr);
  270. itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(items[i].first);
  271. itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(items[i].second);
  272. streamItems.push_back(std::move(structValues));
  273. }
  274. auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex));
  275. graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
  276. return graph;
  277. };
  278. for (ui32 yieldPos = 0; yieldPos < items.size(); ++yieldPos) {
  279. std::vector<ui32> result;
  280. TSetup setup1(alloc);
  281. auto graph1 = buildGraph(setup1, yieldPos, 0);
  282. auto root1 = graph1->GetValue();
  283. NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok;
  284. while (status == NUdf::EFetchStatus::Ok) {
  285. NUdf::TUnboxedValue val;
  286. status = root1.Fetch(val);
  287. if (status == NUdf::EFetchStatus::Ok) {
  288. result.push_back(val.GetElement(0).Get<ui32>());
  289. }
  290. }
  291. UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield);
  292. TString graphState;
  293. SaveGraphState(&root1, 1, 0ULL, graphState);
  294. TSetup setup2(alloc);
  295. auto graph2 = buildGraph(setup2, -1, yieldPos);
  296. auto root2 = graph2->GetValue();
  297. LoadGraphState(&root2, 1, 0ULL, graphState);
  298. status = NUdf::EFetchStatus::Ok;
  299. while (status == NUdf::EFetchStatus::Ok) {
  300. NUdf::TUnboxedValue val;
  301. status = root2.Fetch(val);
  302. if (status == NUdf::EFetchStatus::Ok) {
  303. result.push_back(val.GetElement(0).Get<ui32>());
  304. }
  305. }
  306. UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish);
  307. const std::vector<ui32> resultCompare = {5, 9, 27, 22, 21, 11, 11, 8, 8, 8, 8};
  308. UNIT_ASSERT_EQUAL(result, resultCompare);
  309. }
  310. }
  311. }
  312. } // namespace NMiniKQL
  313. } // namespace NKikimr