mkql_group_ut.cpp 9.5 KB


  1. #include "mkql_computation_node_ut.h"
  2. #include <yql/essentials/minikql/mkql_node_cast.h>
  3. #include <yql/essentials/minikql/mkql_string_util.h>
  4. namespace NKikimr {
  5. namespace NMiniKQL {
  6. namespace {
  7. template<bool UseLLVM>
  8. TRuntimeNode MakeStream(TSetup<UseLLVM>& setup, ui64 count = 9U) {
  9. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  10. TCallableBuilder callableBuilder(*setup.Env, "TestStream",
  11. pgmBuilder.NewStreamType(
  12. pgmBuilder.NewDataType(NUdf::EDataSlot::Uint64)
  13. )
  14. );
  15. callableBuilder.Add(pgmBuilder.NewDataLiteral(count));
  16. return TRuntimeNode(callableBuilder.Build(), false);
  17. }
  18. template<bool UseLLVM>
  19. TRuntimeNode Group(TSetup<UseLLVM>& setup, TRuntimeNode stream, const std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)>& groupSwitch,
  20. const std::function<TRuntimeNode(TRuntimeNode)>& handler = {})
  21. {
  22. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  23. auto keyExtractor = [&](TRuntimeNode item) {
  24. return item;
  25. };
  26. stream = pgmBuilder.GroupingCore(stream, groupSwitch, keyExtractor, handler);
  27. return pgmBuilder.FlatMap(stream, [&](TRuntimeNode grpItem) {
  28. return pgmBuilder.Squeeze(pgmBuilder.Nth(grpItem, 1),
  29. pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>("*"),
  30. [&] (TRuntimeNode item, TRuntimeNode state) {
  31. auto res = pgmBuilder.Concat(pgmBuilder.ToString(pgmBuilder.Nth(grpItem, 0)), pgmBuilder.ToString(item));
  32. res = pgmBuilder.Concat(state, res);
  33. res = pgmBuilder.Concat(res, pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>("*"));
  34. return res;
  35. },
  36. {}, {});
  37. });
  38. }
  39. template<bool UseLLVM>
  40. TRuntimeNode GroupKeys(TSetup<UseLLVM>& setup, TRuntimeNode stream, const std::function<TRuntimeNode(TRuntimeNode, TRuntimeNode)>& groupSwitch) {
  41. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  42. auto keyExtractor = [&](TRuntimeNode item) {
  43. return item;
  44. };
  45. stream = pgmBuilder.GroupingCore(stream, groupSwitch, keyExtractor);
  46. return pgmBuilder.Map(stream, [&](TRuntimeNode grpItem) {
  47. return pgmBuilder.ToString(pgmBuilder.Nth(grpItem, 0));
  48. });
  49. }
  50. template<bool UseLLVM>
  51. TRuntimeNode StreamToString(TSetup<UseLLVM>& setup, TRuntimeNode stream) {
  52. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  53. return pgmBuilder.Squeeze(stream, pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>("|"), [&] (TRuntimeNode item, TRuntimeNode state) {
  54. return pgmBuilder.Concat(pgmBuilder.Concat(state, item), pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>("|"));
  55. }, {}, {});
  56. }
  57. } // unnamed
  58. Y_UNIT_TEST_SUITE(TMiniKQLGroupingTest) {
  59. Y_UNIT_TEST_LLVM(TestGrouping) {
  60. TSetup<LLVM> setup;
  61. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  62. auto stream = MakeStream(setup);
  63. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  64. Y_UNUSED(key);
  65. return pgmBuilder.Equals(item, pgmBuilder.NewDataLiteral<ui64>(0));
  66. });
  67. auto pgm = StreamToString(setup, stream);
  68. auto graph = setup.BuildGraph(pgm);
  69. auto streamVal = graph->GetValue();
  70. NUdf::TUnboxedValue result;
  71. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  72. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*|*00*01*|*00*|*00*|*00*01*02*03*|");
  73. }
  74. Y_UNIT_TEST_LLVM(TestGroupingKeyNotEquals) {
  75. TSetup<LLVM> setup;
  76. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  77. auto stream = MakeStream(setup);
  78. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  79. return pgmBuilder.NotEquals(item, key);
  80. });
  81. auto pgm = StreamToString(setup, stream);
  82. auto graph = setup.BuildGraph(pgm);
  83. auto streamVal = graph->GetValue();
  84. NUdf::TUnboxedValue result;
  85. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  86. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*00*|*11*|*00*00*00*|*11*|*22*|*33*|");
  87. }
  88. Y_UNIT_TEST_LLVM(TestGroupingWithEmptyInput) {
  89. TSetup<LLVM> setup;
  90. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  91. auto stream = MakeStream(setup, 0);
  92. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  93. Y_UNUSED(key);
  94. return pgmBuilder.Equals(item, pgmBuilder.NewDataLiteral<ui64>(0));
  95. });
  96. auto pgm = StreamToString(setup, stream);
  97. auto graph = setup.BuildGraph(pgm);
  98. auto streamVal = graph->GetValue();
  99. NUdf::TUnboxedValue result;
  100. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  101. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|");
  102. }
  103. Y_UNIT_TEST_LLVM(TestSingleGroup) {
  104. TSetup<LLVM> setup;
  105. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  106. auto stream = MakeStream(setup);
  107. stream = Group(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  108. Y_UNUSED(key);
  109. Y_UNUSED(item);
  110. return pgmBuilder.NewDataLiteral<bool>(false);
  111. });
  112. auto pgm = StreamToString(setup, stream);
  113. auto graph = setup.BuildGraph(pgm);
  114. auto streamVal = graph->GetValue();
  115. NUdf::TUnboxedValue result;
  116. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  117. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*00*01*00*00*00*01*02*03*|");
  118. }
  119. Y_UNIT_TEST_LLVM(TestGroupingWithYield) {
  120. TSetup<LLVM> setup;
  121. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  122. auto stream = MakeStream(setup);
  123. TSwitchInput switchInput;
  124. switchInput.Indicies.push_back(0);
  125. switchInput.InputType = stream.GetStaticType();
  126. stream = pgmBuilder.Switch(stream,
  127. MakeArrayRef(&switchInput, 1),
  128. [&](ui32 /*index*/, TRuntimeNode item1) {
  129. return Group(setup, item1, [&](TRuntimeNode key, TRuntimeNode item2) {
  130. Y_UNUSED(key);
  131. return pgmBuilder.Equals(item2, pgmBuilder.NewDataLiteral<ui64>(0));
  132. });
  133. },
  134. 1,
  135. pgmBuilder.NewStreamType(pgmBuilder.NewDataType(NUdf::EDataSlot::String))
  136. );
  137. auto pgm = StreamToString(setup, stream);
  138. auto graph = setup.BuildGraph(pgm);
  139. auto streamVal = graph->GetValue();
  140. NUdf::TUnboxedValue result;
  141. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  142. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*00*|*00*01*|*00*|*00*|*00*01*02*03*|");
  143. }
  144. Y_UNIT_TEST_LLVM(TestGroupingWithoutFetchingSubStreams) {
  145. TSetup<LLVM> setup;
  146. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  147. auto stream = MakeStream(setup);
  148. stream = GroupKeys(setup, stream, [&](TRuntimeNode key, TRuntimeNode item) {
  149. Y_UNUSED(key);
  150. return pgmBuilder.Equals(item, pgmBuilder.NewDataLiteral<ui64>(0));
  151. });
  152. auto pgm = StreamToString(setup, stream);
  153. auto graph = setup.BuildGraph(pgm);
  154. auto streamVal = graph->GetValue();
  155. NUdf::TUnboxedValue result;
  156. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  157. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
  158. }
  159. Y_UNIT_TEST_LLVM(TestGroupingWithYieldAndWithoutFetchingSubStreams) {
  160. TSetup<LLVM> setup;
  161. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  162. auto stream = MakeStream(setup);
  163. TSwitchInput switchInput;
  164. switchInput.Indicies.push_back(0);
  165. switchInput.InputType = stream.GetStaticType();
  166. stream = pgmBuilder.Switch(stream,
  167. MakeArrayRef(&switchInput, 1),
  168. [&](ui32 /*index*/, TRuntimeNode item1) {
  169. return GroupKeys(setup, item1, [&](TRuntimeNode key, TRuntimeNode item2) {
  170. Y_UNUSED(key);
  171. return pgmBuilder.Equals(item2, pgmBuilder.NewDataLiteral<ui64>(0));
  172. });
  173. },
  174. 1,
  175. pgmBuilder.NewStreamType(pgmBuilder.NewDataType(NUdf::EDataSlot::String))
  176. );
  177. auto pgm = StreamToString(setup, stream);
  178. auto graph = setup.BuildGraph(pgm);
  179. auto streamVal = graph->GetValue();
  180. NUdf::TUnboxedValue result;
  181. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  182. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
  183. }
  184. Y_UNIT_TEST_LLVM(TestGroupingWithHandler) {
  185. TSetup<LLVM> setup;
  186. TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
  187. auto stream = MakeStream(setup);
  188. stream = Group(setup, stream,
  189. [&](TRuntimeNode key, TRuntimeNode item) {
  190. Y_UNUSED(key);
  191. return pgmBuilder.Equals(item, pgmBuilder.NewDataLiteral<ui64>(0));
  192. },
  193. [&](TRuntimeNode item) {
  194. return pgmBuilder.Add(pgmBuilder.Convert(item, pgmBuilder.NewDataType(NUdf::EDataSlot::Int32)), pgmBuilder.NewDataLiteral<ui64>(1));
  195. }
  196. );
  197. auto pgm = StreamToString(setup, stream);
  198. auto graph = setup.BuildGraph(pgm);
  199. auto streamVal = graph->GetValue();
  200. NUdf::TUnboxedValue result;
  201. UNIT_ASSERT_EQUAL(streamVal.Fetch(result), NUdf::EFetchStatus::Ok);
  202. UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|*01*|*01*02*|*01*|*01*|*01*02*03*04*|");
  203. }
  204. }
  205. } // NMiniKQL
  206. } // NKikimr