mkql_chopper_ut.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. #include "mkql_computation_node_ut.h"
  2. #include <yql/essentials/minikql/mkql_node_cast.h>
  3. #include <yql/essentials/minikql/mkql_string_util.h>
  4. namespace NKikimr {
  5. namespace NMiniKQL {
  6. namespace {
  7. template<bool UseLLVM>
  8. TRuntimeNode MakeStream(TSetup<UseLLVM>& setup, ui64 count = 9U) {
  9. TProgramBuilder& pb = *setup.PgmBuilder;
  10. TCallableBuilder callableBuilder(*setup.Env, "TestStream",
  11. pb.NewStreamType(
  12. pb.NewDataType(NUdf::EDataSlot::Uint64)
  13. )
  14. );
  15. callableBuilder.Add(pb.NewDataLiteral(count));
  16. return TRuntimeNode(callableBuilder.Build(), false);
  17. }
  18. template<bool UseLLVM>
  19. TRuntimeNode MakeFlow(TSetup<UseLLVM>& setup, ui64 count = 9U) {
  20. TProgramBuilder& pb = *setup.PgmBuilder;
  21. return pb.ToFlow(MakeStream<UseLLVM>(setup, count));
  22. }
  23. template<bool UseLLVM>
  24. TRuntimeNode GroupWithBomb(TSetup<UseLLVM>& setup, TRuntimeNode stream) {
  25. TProgramBuilder& pb = *setup.PgmBuilder;
  26. const auto keyExtractor = [&](TRuntimeNode item) { return item; };
  27. const auto groupSwitch = [&](TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral<bool>(false); };
  28. return pb.Chopper(stream, keyExtractor, groupSwitch, [&](TRuntimeNode, TRuntimeNode group) {
  29. const auto bomb = pb.NewDataLiteral<NUdf::EDataSlot::String>("BOMB");
  30. return pb.Ensure(pb.Map(group, [&] (TRuntimeNode) { return bomb; }), pb.NewDataLiteral<bool>(false), bomb, "", 0, 0);
  31. });
  32. }
  33. template<bool UseLLVM>
  34. TRuntimeNode Group(TSetup<UseLLVM>& setup, TRuntimeNode stream, const std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)>& groupSwitch) {
  35. TProgramBuilder& pb = *setup.PgmBuilder;
  36. const auto keyExtractor = [&](TRuntimeNode item) { return item; };
  37. return pb.Chopper(stream, keyExtractor, groupSwitch, [&](TRuntimeNode key, TRuntimeNode grpItem) {
  38. return pb.Condense(grpItem, pb.NewDataLiteral<NUdf::EDataSlot::String>("*"),
  39. [&] (TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral<bool>(false); },
  40. [&] (TRuntimeNode item, TRuntimeNode state) {
  41. auto res = pb.Concat(pb.ToString(key), pb.ToString(item));
  42. res = pb.Concat(state, res);
  43. return pb.Concat(res, pb.NewDataLiteral<NUdf::EDataSlot::String>("*"));
  44. });
  45. });
  46. }
  47. template<bool UseLLVM>
  48. TRuntimeNode GroupGetKeysFirst(TSetup<UseLLVM>& setup, TRuntimeNode stream, const std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)>& groupSwitch) {
  49. TProgramBuilder& pb = *setup.PgmBuilder;
  50. const auto keyExtractor = [&](TRuntimeNode item) { return item; };
  51. const bool isFlow = stream.GetStaticType()->IsFlow();
  52. return pb.Chopper(stream, keyExtractor, groupSwitch, [&](TRuntimeNode key, TRuntimeNode grpItem) {
  53. auto list = pb.ToList(pb.AggrConcat(
  54. pb.NewOptional(pb.Concat(pb.ToString(key), pb.NewDataLiteral<NUdf::EDataSlot::String>(":"))),
  55. pb.ToOptional(pb.Collect(pb.Condense1(grpItem,
  56. [&] (TRuntimeNode item) { return pb.ToString(item); },
  57. [&] (TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral<bool>(false); },
  58. [&] (TRuntimeNode item, TRuntimeNode state) {
  59. return pb.Concat(state, pb.ToString(item));
  60. }
  61. )))
  62. ));
  63. return isFlow ? pb.ToFlow(list) : pb.Iterator(list, {});
  64. });
  65. }
  66. template<bool UseLLVM>
  67. TRuntimeNode GroupKeys(TSetup<UseLLVM>& setup, TRuntimeNode stream, const std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)>& groupSwitch) {
  68. TProgramBuilder& pb = *setup.PgmBuilder;
  69. const auto keyExtractor = [&](TRuntimeNode item) { return item; };
  70. return pb.Chopper(stream, keyExtractor, groupSwitch,
  71. [&](TRuntimeNode key, TRuntimeNode group) {
  72. return pb.Map(pb.Take(group, pb.NewDataLiteral<ui64>(1ULL)), [&](TRuntimeNode) { return pb.ToString(key); });
  73. }
  74. );
  75. }
  76. template<bool UseLLVM>
  77. TRuntimeNode StreamToString(TSetup<UseLLVM>& setup, TRuntimeNode stream) {
  78. TProgramBuilder& pb = *setup.PgmBuilder;
  79. stream = pb.Condense(stream, pb.NewDataLiteral<NUdf::EDataSlot::String>("|"),
  80. [&] (TRuntimeNode, TRuntimeNode) { return pb.NewDataLiteral(false); },
  81. [&] (TRuntimeNode item, TRuntimeNode state) {
  82. return pb.Concat(pb.Concat(state, item), pb.NewDataLiteral<NUdf::EDataSlot::String>("|"));
  83. }
  84. );
  85. if (stream.GetStaticType()->IsFlow()) {
  86. stream = pb.FromFlow(stream);
  87. }
  88. return stream;
  89. }
  90. } // unnamed
  91. Y_UNIT_TEST_SUITE(TMiniKQLChopperStreamTest) {
  92. Y_UNIT_TEST_LLVM(TestEmpty) {
  93. TSetup<LLVM> setup;
  94. const auto stream = GroupWithBomb(setup, MakeStream(setup, 0U));
  95. const auto pgm = StreamToString(setup, stream);
  96. const auto graph = setup.BuildGraph(pgm);
  97. const auto streamVal = graph->GetValue();
  98. NUdf::TUnboxedValue result;
  99. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  100. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|");
  101. }
  102. Y_UNIT_TEST_LLVM(TestGrouping) {
  103. TSetup<LLVM> setup;
  104. TProgramBuilder& pb = *setup.PgmBuilder;
  105. auto stream = MakeStream(setup);
  106. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  107. Y_UNUSED(key);
  108. return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
  109. });
  110. const auto pgm = StreamToString(setup, stream);
  111. const auto graph = setup.BuildGraph(pgm);
  112. const auto streamVal = graph->GetValue();
  113. NUdf::TUnboxedValue result;
  114. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  115. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*|*00*01*|*00*|*00*|*00*01*02*03*|");
  116. }
  117. Y_UNIT_TEST_LLVM(TestGroupingGetKeysFirst) {
  118. TSetup<LLVM> setup;
  119. TProgramBuilder& pb = *setup.PgmBuilder;
  120. auto stream = MakeStream(setup);
  121. stream = GroupGetKeysFirst(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  122. Y_UNUSED(key);
  123. return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
  124. });
  125. const auto pgm = StreamToString(setup, stream);
  126. const auto graph = setup.BuildGraph(pgm);
  127. const auto streamVal = graph->GetValue();
  128. NUdf::TUnboxedValue result;
  129. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  130. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0:0|0:01|0:0|0:0|0:0123|");
  131. }
  132. Y_UNIT_TEST_LLVM(TestGroupingKeyNotEquals) {
  133. TSetup<LLVM> setup;
  134. TProgramBuilder& pb = *setup.PgmBuilder;
  135. auto stream = MakeStream(setup);
  136. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  137. return pb.NotEquals(item, key);
  138. });
  139. const auto pgm = StreamToString(setup, stream);
  140. const auto graph = setup.BuildGraph(pgm);
  141. const auto streamVal = graph->GetValue();
  142. NUdf::TUnboxedValue result;
  143. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  144. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*00*|*11*|*00*00*00*|*11*|*22*|*33*|");
  145. }
  146. Y_UNIT_TEST_LLVM(TestGroupingWithEmptyInput) {
  147. TSetup<LLVM> setup;
  148. TProgramBuilder& pb = *setup.PgmBuilder;
  149. auto stream = MakeStream(setup, 0);
  150. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  151. Y_UNUSED(key);
  152. return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
  153. });
  154. const auto pgm = StreamToString(setup, stream);
  155. const auto graph = setup.BuildGraph(pgm);
  156. const auto streamVal = graph->GetValue();
  157. NUdf::TUnboxedValue result;
  158. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  159. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|");
  160. }
  161. Y_UNIT_TEST_LLVM(TestSingleGroup) {
  162. TSetup<LLVM> setup;
  163. TProgramBuilder& pb = *setup.PgmBuilder;
  164. auto stream = MakeStream(setup);
  165. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  166. Y_UNUSED(key);
  167. Y_UNUSED(item);
  168. return pb.NewDataLiteral<bool>(false);
  169. });
  170. const auto pgm = StreamToString(setup, stream);
  171. const auto graph = setup.BuildGraph(pgm);
  172. const auto streamVal = graph->GetValue();
  173. NUdf::TUnboxedValue result;
  174. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  175. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*00*01*00*00*00*01*02*03*|");
  176. }
  177. Y_UNIT_TEST_LLVM(TestGroupingWithYield) {
  178. TSetup<LLVM> setup;
  179. TProgramBuilder& pb = *setup.PgmBuilder;
  180. auto stream = MakeStream(setup);
  181. TSwitchInput switchInput;
  182. switchInput.Indicies.push_back(0);
  183. switchInput.InputType = stream.GetStaticType();
  184. stream = pb.Switch(stream,
  185. MakeArrayRef(&switchInput, 1),
  186. [&](ui32 /*index*/, TRuntimeNode item1) {
  187. return Group(setup, item1, [&](TRuntimeNode key, TRuntimeNode item2) {
  188. Y_UNUSED(key);
  189. return pb.Equals(item2, pb.NewDataLiteral<ui64>(0));
  190. });
  191. },
  192. 1,
  193. pb.NewStreamType(pb.NewDataType(NUdf::EDataSlot::String))
  194. );
  195. const auto pgm = StreamToString(setup, stream);
  196. const auto graph = setup.BuildGraph(pgm);
  197. const auto streamVal = graph->GetValue();
  198. NUdf::TUnboxedValue result;
  199. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  200. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*|*00*01*|*00*|*00*|*00*01*02*03*|");
  201. }
  202. Y_UNIT_TEST_LLVM(TestGroupingWithCutSubStreams) {
  203. TSetup<LLVM> setup;
  204. TProgramBuilder& pb = *setup.PgmBuilder;
  205. auto stream = MakeStream(setup);
  206. stream = GroupKeys(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  207. Y_UNUSED(key);
  208. return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
  209. });
  210. const auto pgm = StreamToString(setup, stream);
  211. const auto graph = setup.BuildGraph(pgm);
  212. const auto streamVal = graph->GetValue();
  213. NUdf::TUnboxedValue result;
  214. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  215. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
  216. }
  217. Y_UNIT_TEST_LLVM(TestGroupingWithYieldAndCutSubStreams) {
  218. TSetup<LLVM> setup;
  219. TProgramBuilder& pb = *setup.PgmBuilder;
  220. auto stream = MakeStream(setup);
  221. TSwitchInput switchInput;
  222. switchInput.Indicies.push_back(0);
  223. switchInput.InputType = stream.GetStaticType();
  224. stream = pb.Switch(stream,
  225. MakeArrayRef(&switchInput, 1),
  226. [&](ui32 /*index*/, TRuntimeNode item1) {
  227. return GroupKeys(setup, item1, [&](TRuntimeNode key, TRuntimeNode item2) {
  228. Y_UNUSED(key);
  229. return pb.Equals(item2, pb.NewDataLiteral<ui64>(0));
  230. });
  231. },
  232. 1,
  233. pb.NewStreamType(pb.NewDataType(NUdf::EDataSlot::String))
  234. );
  235. const auto pgm = StreamToString(setup, stream);
  236. const auto graph = setup.BuildGraph(pgm);
  237. const auto streamVal = graph->GetValue();
  238. NUdf::TUnboxedValue result;
  239. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  240. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
  241. }
  242. }
  243. #if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 9u
  244. Y_UNIT_TEST_SUITE(TMiniKQLChopperFlowTest) {
  245. Y_UNIT_TEST_LLVM(TestEmpty) {
  246. TSetup<LLVM> setup;
  247. const auto stream = GroupWithBomb(setup, MakeFlow(setup, 0U));
  248. const auto pgm = StreamToString(setup, stream);
  249. const auto graph = setup.BuildGraph(pgm);
  250. const auto streamVal = graph->GetValue();
  251. NUdf::TUnboxedValue result;
  252. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  253. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|");
  254. }
  255. Y_UNIT_TEST_LLVM(TestGrouping) {
  256. TSetup<LLVM> setup;
  257. TProgramBuilder& pb = *setup.PgmBuilder;
  258. auto stream = MakeFlow(setup);
  259. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  260. Y_UNUSED(key);
  261. return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
  262. });
  263. const auto pgm = StreamToString(setup, stream);
  264. const auto graph = setup.BuildGraph(pgm);
  265. const auto streamVal = graph->GetValue();
  266. NUdf::TUnboxedValue result;
  267. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  268. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*|*00*01*|*00*|*00*|*00*01*02*03*|");
  269. }
  270. Y_UNIT_TEST_LLVM(TestGroupingGetKeysFirst) {
  271. TSetup<LLVM> setup;
  272. TProgramBuilder& pb = *setup.PgmBuilder;
  273. auto stream = MakeFlow(setup);
  274. stream = GroupGetKeysFirst(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  275. Y_UNUSED(key);
  276. return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
  277. });
  278. const auto pgm = StreamToString(setup, stream);
  279. const auto graph = setup.BuildGraph(pgm);
  280. const auto streamVal = graph->GetValue();
  281. NUdf::TUnboxedValue result;
  282. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  283. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0:0|0:01|0:0|0:0|0:0123|");
  284. }
  285. Y_UNIT_TEST_LLVM(TestGroupingKeyNotEquals) {
  286. TSetup<LLVM> setup;
  287. TProgramBuilder& pb = *setup.PgmBuilder;
  288. auto stream = MakeFlow(setup);
  289. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  290. return pb.NotEquals(item, key);
  291. });
  292. const auto pgm = StreamToString(setup, stream);
  293. const auto graph = setup.BuildGraph(pgm);
  294. const auto streamVal = graph->GetValue();
  295. NUdf::TUnboxedValue result;
  296. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  297. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*00*|*11*|*00*00*00*|*11*|*22*|*33*|");
  298. }
  299. Y_UNIT_TEST_LLVM(TestGroupingWithEmptyInput) {
  300. TSetup<LLVM> setup;
  301. TProgramBuilder& pb = *setup.PgmBuilder;
  302. auto stream = MakeFlow(setup, 0);
  303. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  304. Y_UNUSED(key);
  305. return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
  306. });
  307. const auto pgm = StreamToString(setup, stream);
  308. const auto graph = setup.BuildGraph(pgm);
  309. const auto streamVal = graph->GetValue();
  310. NUdf::TUnboxedValue result;
  311. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  312. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|");
  313. }
  314. Y_UNIT_TEST_LLVM(TestSingleGroup) {
  315. TSetup<LLVM> setup;
  316. TProgramBuilder& pb = *setup.PgmBuilder;
  317. auto stream = MakeFlow(setup);
  318. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  319. Y_UNUSED(key);
  320. Y_UNUSED(item);
  321. return pb.NewDataLiteral<bool>(false);
  322. });
  323. const auto pgm = StreamToString(setup, stream);
  324. const auto graph = setup.BuildGraph(pgm);
  325. const auto streamVal = graph->GetValue();
  326. NUdf::TUnboxedValue result;
  327. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  328. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*00*01*00*00*00*01*02*03*|");
  329. }
  330. Y_UNIT_TEST_LLVM(TestGroupingWithYield) {
  331. TSetup<LLVM> setup;
  332. TProgramBuilder& pb = *setup.PgmBuilder;
  333. auto stream = MakeFlow(setup);
  334. TSwitchInput switchInput;
  335. switchInput.Indicies.push_back(0);
  336. switchInput.InputType = stream.GetStaticType();
  337. stream = pb.Switch(stream,
  338. MakeArrayRef(&switchInput, 1),
  339. [&](ui32 /*index*/, TRuntimeNode item1) {
  340. return Group(setup, item1, [&](TRuntimeNode key, TRuntimeNode item2) {
  341. Y_UNUSED(key);
  342. return pb.Equals(item2, pb.NewDataLiteral<ui64>(0));
  343. });
  344. },
  345. 1,
  346. pb.NewFlowType(pb.NewDataType(NUdf::EDataSlot::String))
  347. );
  348. const auto pgm = StreamToString(setup, stream);
  349. const auto graph = setup.BuildGraph(pgm);
  350. const auto streamVal = graph->GetValue();
  351. NUdf::TUnboxedValue result;
  352. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  353. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*|*00*01*|*00*|*00*|*00*01*02*03*|");
  354. }
  355. Y_UNIT_TEST_LLVM(TestGroupingWithCutSubStreams) {
  356. TSetup<LLVM> setup;
  357. TProgramBuilder& pb = *setup.PgmBuilder;
  358. auto stream = MakeFlow(setup);
  359. stream = GroupKeys(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  360. Y_UNUSED(key);
  361. return pb.Equals(item, pb.NewDataLiteral<ui64>(0));
  362. });
  363. const auto pgm = StreamToString(setup, stream);
  364. const auto graph = setup.BuildGraph(pgm);
  365. const auto streamVal = graph->GetValue();
  366. NUdf::TUnboxedValue result;
  367. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  368. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
  369. }
  370. Y_UNIT_TEST_LLVM(TestGroupingWithYieldAndCutSubStreams) {
  371. TSetup<LLVM> setup;
  372. TProgramBuilder& pb = *setup.PgmBuilder;
  373. auto stream = MakeFlow(setup);
  374. TSwitchInput switchInput;
  375. switchInput.Indicies.push_back(0);
  376. switchInput.InputType = stream.GetStaticType();
  377. stream = pb.Switch(stream,
  378. MakeArrayRef(&switchInput, 1),
  379. [&](ui32 /*index*/, TRuntimeNode item1) {
  380. return GroupKeys(setup, item1, [&](TRuntimeNode key, TRuntimeNode item2) {
  381. Y_UNUSED(key);
  382. return pb.Equals(item2, pb.NewDataLiteral<ui64>(0));
  383. });
  384. },
  385. 1,
  386. pb.NewFlowType(pb.NewDataType(NUdf::EDataSlot::String))
  387. );
  388. const auto pgm = StreamToString(setup, stream);
  389. const auto graph = setup.BuildGraph(pgm);
  390. const auto streamVal = graph->GetValue();
  391. NUdf::TUnboxedValue result;
  392. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  393. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
  394. }
  395. }
  396. #endif
  397. } // NMiniKQL
  398. } // NKikimr