mkql_blocks_ut.cpp 31 KB


  1. #include "mkql_computation_node_ut.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  3. #include <arrow/compute/exec_internal.h>
  4. #include <arrow/array/builder_primitive.h>
  5. #include <yql/essentials/public/udf/udf_helpers.h>
  6. #include <yql/essentials/public/udf/arrow/udf_arrow_helpers.h>
  7. BEGIN_SIMPLE_ARROW_UDF(TInc, i32(i32)) {
  8. Y_UNUSED(valueBuilder);
  9. const i32 arg = args[0].Get<i32>();
  10. return NYql::NUdf::TUnboxedValuePod(arg + 1);
  11. }
  12. struct TIncKernelExec : public NYql::NUdf::TUnaryKernelExec<TIncKernelExec> {
  13. template <typename TSink>
  14. static void Process(const NYql::NUdf::IValueBuilder* valueBuilder, NYql::NUdf::TBlockItem arg, const TSink& sink) {
  15. Y_UNUSED(valueBuilder);
  16. sink(NYql::NUdf::TBlockItem(arg.As<i32>() + 1));
  17. }
  18. };
  19. END_SIMPLE_ARROW_UDF(TInc, TIncKernelExec::Do);
  20. SIMPLE_MODULE(TBlockUTModule,
  21. TInc
  22. )
  23. namespace NKikimr {
  24. namespace NMiniKQL {
  25. namespace {
  26. arrow::Datum ExecuteOneKernel(const IArrowKernelComputationNode* kernelNode,
  27. const std::vector<arrow::Datum>& argDatums, arrow::compute::ExecContext& execContext) {
  28. const auto& kernel = kernelNode->GetArrowKernel();
  29. arrow::compute::KernelContext kernelContext(&execContext);
  30. std::unique_ptr<arrow::compute::KernelState> state;
  31. if (kernel.init) {
  32. state = ARROW_RESULT(kernel.init(&kernelContext, { &kernel, kernelNode->GetArgsDesc(), nullptr }));
  33. kernelContext.SetState(state.get());
  34. }
  35. auto executor = arrow::compute::detail::KernelExecutor::MakeScalar();
  36. ARROW_OK(executor->Init(&kernelContext, { &kernel, kernelNode->GetArgsDesc(), nullptr }));
  37. auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>();
  38. ARROW_OK(executor->Execute(argDatums, listener.get()));
  39. return executor->WrapResults(argDatums, listener->values());
  40. }
  41. void ExecuteAllKernels(std::vector<arrow::Datum>& datums, const TArrowKernelsTopology* topology, arrow::compute::ExecContext& execContext) {
  42. for (ui32 i = 0; i < topology->Items.size(); ++i) {
  43. std::vector<arrow::Datum> argDatums;
  44. argDatums.reserve(topology->Items[i].Inputs.size());
  45. for (auto j : topology->Items[i].Inputs) {
  46. argDatums.emplace_back(datums[j]);
  47. }
  48. arrow::Datum output = ExecuteOneKernel(topology->Items[i].Node.get(), argDatums, execContext);
  49. datums[i + topology->InputArgsCount] = output;
  50. }
  51. }
  52. }
  53. Y_UNIT_TEST_SUITE(TMiniKQLBlocksTest) {
  54. Y_UNIT_TEST_LLVM(TestEmpty) {
  55. TSetup<LLVM> setup;
  56. auto& pb = *setup.PgmBuilder;
  57. const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  58. const auto list = pb.NewEmptyList(type);
  59. const auto sourceFlow = pb.ToFlow(list);
  60. const auto flowAfterBlocks = pb.FromBlocks(pb.ToBlocks(sourceFlow));
  61. const auto pgmReturn = pb.ForwardList(flowAfterBlocks);
  62. const auto graph = setup.BuildGraph(pgmReturn);
  63. const auto iterator = graph->GetValue().GetListIterator();
  64. NUdf::TUnboxedValue item;
  65. UNIT_ASSERT(!iterator.Next(item));
  66. }
  67. Y_UNIT_TEST_LLVM(TestSimple) {
  68. static const size_t dataCount = 1000;
  69. TSetup<LLVM> setup;
  70. auto& pb = *setup.PgmBuilder;
  71. TRuntimeNode::TList data;
  72. data.reserve(dataCount);
  73. for (ui64 i = 0ULL; i < dataCount; ++i) {
  74. data.push_back(pb.NewDataLiteral(i));
  75. }
  76. const auto type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  77. const auto list = pb.NewList(type, data);
  78. const auto sourceFlow = pb.ToFlow(list);
  79. const auto flowAfterBlocks = pb.FromBlocks(pb.ToBlocks(sourceFlow));
  80. const auto pgmReturn = pb.ForwardList(flowAfterBlocks);
  81. const auto graph = setup.BuildGraph(pgmReturn);
  82. const auto iterator = graph->GetValue().GetListIterator();
  83. for (size_t i = 0; i < dataCount; ++i) {
  84. NUdf::TUnboxedValue item;
  85. UNIT_ASSERT(iterator.Next(item));
  86. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), i);
  87. }
  88. NUdf::TUnboxedValue item;
  89. UNIT_ASSERT(!iterator.Next(item));
  90. UNIT_ASSERT(!iterator.Next(item));
  91. }
  92. Y_UNIT_TEST_LLVM(TestWideToBlocks) {
  93. TSetup<LLVM> setup;
  94. TProgramBuilder& pb = *setup.PgmBuilder;
  95. const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  96. const auto tupleType = pb.NewTupleType({ui64Type, ui64Type});
  97. const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)});
  98. const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)});
  99. const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)});
  100. const auto list = pb.NewList(tupleType, {data1, data2, data3});
  101. const auto flow = pb.ToFlow(list);
  102. const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
  103. return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
  104. });
  105. const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
  106. const auto narrowBlocksFlow = pb.NarrowMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
  107. return items[1];
  108. });
  109. const auto narrowFlow = pb.FromBlocks(narrowBlocksFlow);
  110. const auto pgmReturn = pb.ForwardList(narrowFlow);
  111. const auto graph = setup.BuildGraph(pgmReturn);
  112. const auto iterator = graph->GetValue().GetListIterator();
  113. NUdf::TUnboxedValue item;
  114. UNIT_ASSERT(iterator.Next(item));
  115. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 10);
  116. UNIT_ASSERT(iterator.Next(item));
  117. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 20);
  118. UNIT_ASSERT(iterator.Next(item));
  119. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30);
  120. UNIT_ASSERT(!iterator.Next(item));
  121. UNIT_ASSERT(!iterator.Next(item));
  122. }
  123. namespace {
  124. template<bool LLVM>
  125. void TestChunked(bool withBlockExpand) {
  126. TSetup<LLVM> setup;
  127. TProgramBuilder& pb = *setup.PgmBuilder;
  128. const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  129. const auto boolType = pb.NewDataType(NUdf::TDataType<bool>::Id);
  130. const auto stringType = pb.NewDataType(NUdf::EDataSlot::String);
  131. const auto utf8Type = pb.NewDataType(NUdf::EDataSlot::Utf8);
  132. const auto tupleType = pb.NewTupleType({ui64Type, boolType, stringType, utf8Type});
  133. TRuntimeNode::TList items;
  134. const size_t bigStrSize = 1024 * 1024 + 100;
  135. const size_t smallStrSize = 256 * 1024;
  136. for (size_t i = 0; i < 20; ++i) {
  137. if (i % 2 == 0) {
  138. std::string big(bigStrSize, '0' + i);
  139. std::string small(smallStrSize, 'A' + i);
  140. items.push_back(pb.NewTuple(tupleType, { pb.NewDataLiteral<ui64>(i), pb.NewDataLiteral<bool>(true),
  141. pb.NewDataLiteral<NUdf::EDataSlot::String>(big),
  142. pb.NewDataLiteral<NUdf::EDataSlot::Utf8>(small),
  143. }));
  144. } else {
  145. items.push_back(pb.NewTuple(tupleType, { pb.NewDataLiteral<ui64>(i), pb.NewDataLiteral<bool>(false),
  146. pb.NewDataLiteral<NUdf::EDataSlot::String>(""),
  147. pb.NewDataLiteral<NUdf::EDataSlot::Utf8>(""),
  148. }));
  149. }
  150. }
  151. const auto list = pb.NewList(tupleType, std::move(items));
  152. auto node = pb.ToFlow(list);
  153. node = pb.ExpandMap(node, [&](TRuntimeNode item) -> TRuntimeNode::TList {
  154. return {pb.Nth(item, 0U), pb.Nth(item, 1U), pb.Nth(item, 2U), pb.Nth(item, 3U)};
  155. });
  156. node = pb.WideToBlocks(node);
  157. if (withBlockExpand) {
  158. node = pb.BlockExpandChunked(node);
  159. // WideTakeBlocks won't work on chunked blocks
  160. node = pb.WideTakeBlocks(node, pb.NewDataLiteral<ui64>(19));
  161. node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node)));
  162. } else {
  163. // WideFromBlocks should support chunked blocks
  164. node = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(node)));
  165. node = pb.Take(node, pb.NewDataLiteral<ui64>(19));
  166. }
  167. node = pb.NarrowMap(node, [&](TRuntimeNode::TList items) -> TRuntimeNode {
  168. return pb.NewTuple(tupleType, {items[0], items[1], items[2], items[3]});
  169. });
  170. const auto pgmReturn = pb.ForwardList(node);
  171. const auto graph = setup.BuildGraph(pgmReturn);
  172. const auto iterator = graph->GetValue().GetListIterator();
  173. for (size_t i = 0; i < 19; ++i) {
  174. NUdf::TUnboxedValue item;
  175. UNIT_ASSERT(iterator.Next(item));
  176. ui64 num = item.GetElement(0).Get<ui64>();
  177. bool bl = item.GetElement(1).Get<bool>();
  178. auto strVal = item.GetElement(2);
  179. auto utf8Val = item.GetElement(3);
  180. std::string_view str = strVal.AsStringRef();
  181. std::string_view utf8 = utf8Val.AsStringRef();
  182. UNIT_ASSERT_VALUES_EQUAL(num, i);
  183. UNIT_ASSERT_VALUES_EQUAL(bl, i % 2 == 0);
  184. if (i % 2 == 0) {
  185. std::string big(bigStrSize, '0' + i);
  186. std::string small(smallStrSize, 'A' + i);
  187. UNIT_ASSERT_VALUES_EQUAL(str, big);
  188. UNIT_ASSERT_VALUES_EQUAL(utf8, small);
  189. } else {
  190. UNIT_ASSERT_VALUES_EQUAL(str.size(), 0);
  191. UNIT_ASSERT_VALUES_EQUAL(utf8.size(), 0);
  192. }
  193. }
  194. NUdf::TUnboxedValue item;
  195. UNIT_ASSERT(!iterator.Next(item));
  196. UNIT_ASSERT(!iterator.Next(item));
  197. }
  198. } // namespace
  199. Y_UNIT_TEST_LLVM(TestBlockExpandChunked) {
  200. TestChunked<LLVM>(true);
  201. }
  202. Y_UNIT_TEST_LLVM(TestWideFromBlocksForChunked) {
  203. TestChunked<LLVM>(false);
  204. }
  205. Y_UNIT_TEST(TestScalar) {
  206. const ui64 testValue = 42;
  207. TSetup<false> setup;
  208. auto& pb = *setup.PgmBuilder;
  209. auto dataLiteral = pb.NewDataLiteral<ui64>(testValue);
  210. const auto dataAfterBlocks = pb.AsScalar(dataLiteral);
  211. const auto graph = setup.BuildGraph(dataAfterBlocks);
  212. const auto value = graph->GetValue();
  213. UNIT_ASSERT(value.HasValue() && value.IsBoxed());
  214. UNIT_ASSERT_VALUES_EQUAL(TArrowBlock::From(value).GetDatum().scalar_as<arrow::UInt64Scalar>().value, testValue);
  215. }
  216. Y_UNIT_TEST_LLVM(TestReplicateScalar) {
  217. const ui64 count = 1000;
  218. const ui32 value = 42;
  219. TSetup<LLVM> setup;
  220. TProgramBuilder& pb = *setup.PgmBuilder;
  221. const auto valueType = pb.NewDataType(NUdf::TDataType<ui32>::Id);
  222. const auto scalarValue = pb.AsScalar(pb.NewDataLiteral<ui32>(value));
  223. const auto scalarCount = pb.AsScalar(pb.NewDataLiteral<ui64>(count));
  224. const auto replicated = pb.ReplicateScalar(scalarValue, scalarCount);
  225. const auto replicatedType = pb.NewBlockType(valueType, TBlockType::EShape::Many);
  226. const auto listOfReplicated = pb.NewList(replicatedType, { replicated });
  227. const auto flowOfReplicated = pb.ToFlow(listOfReplicated);
  228. const auto flowAfterBlocks = pb.FromBlocks(flowOfReplicated);
  229. const auto pgmReturn = pb.ForwardList(flowAfterBlocks);
  230. const auto graph = setup.BuildGraph(pgmReturn);
  231. const auto iterator = graph->GetValue().GetListIterator();
  232. for (size_t i = 0; i < count; ++i) {
  233. NUdf::TUnboxedValue item;
  234. UNIT_ASSERT(iterator.Next(item));
  235. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui32>(), value);
  236. }
  237. NUdf::TUnboxedValue item;
  238. UNIT_ASSERT(!iterator.Next(item));
  239. }
  240. Y_UNIT_TEST_LLVM(TestBlockFunc) {
  241. TSetup<LLVM> setup;
  242. TProgramBuilder& pb = *setup.PgmBuilder;
  243. const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  244. const auto tupleType = pb.NewTupleType({ui64Type, ui64Type});
  245. const auto ui64BlockType = pb.NewBlockType(ui64Type, TBlockType::EShape::Many);
  246. const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)});
  247. const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)});
  248. const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)});
  249. const auto list = pb.NewList(tupleType, {data1, data2, data3});
  250. const auto flow = pb.ToFlow(list);
  251. const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
  252. return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
  253. });
  254. const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
  255. const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList {
  256. return {pb.BlockFunc("Add", ui64BlockType, {items[0], items[1]})};
  257. });
  258. const auto sumNarrowFlow = pb.NarrowMap(sumWideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
  259. return items[0];
  260. });
  261. const auto pgmReturn = pb.Collect(pb.FromBlocks(sumNarrowFlow));
  262. const auto graph = setup.BuildGraph(pgmReturn);
  263. const auto iterator = graph->GetValue().GetListIterator();
  264. NUdf::TUnboxedValue item;
  265. UNIT_ASSERT(iterator.Next(item));
  266. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 11);
  267. UNIT_ASSERT(iterator.Next(item));
  268. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 22);
  269. UNIT_ASSERT(iterator.Next(item));
  270. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 33);
  271. UNIT_ASSERT(!iterator.Next(item));
  272. UNIT_ASSERT(!iterator.Next(item));
  273. }
  274. Y_UNIT_TEST_LLVM(TestBlockFuncWithNullables) {
  275. TSetup<LLVM> setup;
  276. TProgramBuilder& pb = *setup.PgmBuilder;
  277. const auto optionalUi64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id, true);
  278. const auto tupleType = pb.NewTupleType({optionalUi64Type, optionalUi64Type});
  279. const auto emptyOptionalUi64 = pb.NewEmptyOptional(optionalUi64Type);
  280. const auto ui64OptBlockType = pb.NewBlockType(optionalUi64Type, TBlockType::EShape::Many);
  281. const auto data1 = pb.NewTuple(tupleType, {
  282. pb.NewOptional(pb.NewDataLiteral<ui64>(1)),
  283. emptyOptionalUi64
  284. });
  285. const auto data2 = pb.NewTuple(tupleType, {
  286. emptyOptionalUi64,
  287. pb.NewOptional(pb.NewDataLiteral<ui64>(20))
  288. });
  289. const auto data3 = pb.NewTuple(tupleType, {
  290. emptyOptionalUi64,
  291. emptyOptionalUi64
  292. });
  293. const auto data4 = pb.NewTuple(tupleType, {
  294. pb.NewOptional(pb.NewDataLiteral<ui64>(10)),
  295. pb.NewOptional(pb.NewDataLiteral<ui64>(20))
  296. });
  297. const auto list = pb.NewList(tupleType, {data1, data2, data3, data4});
  298. const auto flow = pb.ToFlow(list);
  299. const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
  300. return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
  301. });
  302. const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
  303. const auto sumWideFlow = pb.WideMap(wideBlocksFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode::TList {
  304. return {pb.BlockFunc("Add", ui64OptBlockType, {items[0], items[1]})};
  305. });
  306. const auto sumNarrowFlow = pb.NarrowMap(sumWideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode {
  307. return items[0];
  308. });
  309. const auto pgmReturn = pb.Collect(pb.FromBlocks(sumNarrowFlow));
  310. const auto graph = setup.BuildGraph(pgmReturn);
  311. const auto iterator = graph->GetValue().GetListIterator();
  312. NUdf::TUnboxedValue item;
  313. UNIT_ASSERT(iterator.Next(item));
  314. UNIT_ASSERT(!item);
  315. UNIT_ASSERT(iterator.Next(item));
  316. UNIT_ASSERT(!item);
  317. UNIT_ASSERT(iterator.Next(item));
  318. UNIT_ASSERT(!item);
  319. UNIT_ASSERT(iterator.Next(item));
  320. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30);
  321. UNIT_ASSERT(!iterator.Next(item));
  322. UNIT_ASSERT(!iterator.Next(item));
  323. }
  324. Y_UNIT_TEST_LLVM(TestBlockFuncWithNullableScalar) {
  325. TSetup<LLVM> setup;
  326. TProgramBuilder& pb = *setup.PgmBuilder;
  327. const auto optionalUi64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id, true);
  328. const auto ui64OptBlockType = pb.NewBlockType(optionalUi64Type, TBlockType::EShape::Many);
  329. const auto emptyOptionalUi64 = pb.NewEmptyOptional(optionalUi64Type);
  330. const auto list = pb.NewList(optionalUi64Type, {
  331. pb.NewOptional(pb.NewDataLiteral<ui64>(10)),
  332. pb.NewOptional(pb.NewDataLiteral<ui64>(20)),
  333. pb.NewOptional(pb.NewDataLiteral<ui64>(30))
  334. });
  335. const auto flow = pb.ToFlow(list);
  336. const auto blocksFlow = pb.ToBlocks(flow);
  337. THolder<IComputationGraph> graph;
  338. auto map = [&](const TProgramBuilder::TUnaryLambda& func) {
  339. const auto pgmReturn = pb.Collect(pb.FromBlocks(pb.Map(blocksFlow, func)));
  340. graph = setup.BuildGraph(pgmReturn);
  341. return graph->GetValue().GetListIterator();
  342. };
  343. {
  344. const auto scalar = pb.AsScalar(emptyOptionalUi64);
  345. auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode {
  346. return {pb.BlockFunc("Add", ui64OptBlockType, {scalar, item})};
  347. });
  348. NUdf::TUnboxedValue item;
  349. UNIT_ASSERT(iterator.Next(item));
  350. UNIT_ASSERT(!item);
  351. UNIT_ASSERT(iterator.Next(item));
  352. UNIT_ASSERT(!item);
  353. UNIT_ASSERT(iterator.Next(item));
  354. UNIT_ASSERT(!item);
  355. UNIT_ASSERT(!iterator.Next(item));
  356. UNIT_ASSERT(!iterator.Next(item));
  357. }
  358. {
  359. const auto scalar = pb.AsScalar(emptyOptionalUi64);
  360. auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode {
  361. return {pb.BlockFunc("Add", ui64OptBlockType, {item, scalar})};
  362. });
  363. NUdf::TUnboxedValue item;
  364. UNIT_ASSERT(iterator.Next(item));
  365. UNIT_ASSERT(!item);
  366. UNIT_ASSERT(iterator.Next(item));
  367. UNIT_ASSERT(!item);
  368. UNIT_ASSERT(iterator.Next(item));
  369. UNIT_ASSERT(!item);
  370. UNIT_ASSERT(!iterator.Next(item));
  371. UNIT_ASSERT(!iterator.Next(item));
  372. }
  373. {
  374. const auto scalar = pb.AsScalar(pb.NewDataLiteral<ui64>(100));
  375. auto iterator = map([&](TRuntimeNode item) -> TRuntimeNode {
  376. return {pb.BlockFunc("Add", ui64OptBlockType, {item, scalar})};
  377. });
  378. NUdf::TUnboxedValue item;
  379. UNIT_ASSERT(iterator.Next(item));
  380. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 110);
  381. UNIT_ASSERT(iterator.Next(item));
  382. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 120);
  383. UNIT_ASSERT(iterator.Next(item));
  384. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 130);
  385. UNIT_ASSERT(!iterator.Next(item));
  386. UNIT_ASSERT(!iterator.Next(item));
  387. }
  388. }
  389. Y_UNIT_TEST_LLVM(TestBlockFuncWithScalar) {
  390. TSetup<LLVM> setup;
  391. TProgramBuilder& pb = *setup.PgmBuilder;
  392. const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  393. const auto ui64BlockType = pb.NewBlockType(ui64Type, TBlockType::EShape::Many);
  394. const auto data1 = pb.NewDataLiteral<ui64>(10);
  395. const auto data2 = pb.NewDataLiteral<ui64>(20);
  396. const auto data3 = pb.NewDataLiteral<ui64>(30);
  397. const auto rightScalar = pb.AsScalar(pb.NewDataLiteral<ui64>(100));
  398. const auto leftScalar = pb.AsScalar(pb.NewDataLiteral<ui64>(1000));
  399. const auto list = pb.NewList(ui64Type, {data1, data2, data3});
  400. const auto flow = pb.ToFlow(list);
  401. const auto blocksFlow = pb.ToBlocks(flow);
  402. const auto sumBlocksFlow = pb.Map(blocksFlow, [&](TRuntimeNode item) -> TRuntimeNode {
  403. return {pb.BlockFunc("Add", ui64BlockType, { leftScalar, {pb.BlockFunc("Add", ui64BlockType, { item, rightScalar } )}})};
  404. });
  405. const auto pgmReturn = pb.Collect(pb.FromBlocks(sumBlocksFlow));
  406. const auto graph = setup.BuildGraph(pgmReturn);
  407. const auto iterator = graph->GetValue().GetListIterator();
  408. NUdf::TUnboxedValue item;
  409. UNIT_ASSERT(iterator.Next(item));
  410. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1110);
  411. UNIT_ASSERT(iterator.Next(item));
  412. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1120);
  413. UNIT_ASSERT(iterator.Next(item));
  414. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 1130);
  415. UNIT_ASSERT(!iterator.Next(item));
  416. UNIT_ASSERT(!iterator.Next(item));
  417. }
  418. Y_UNIT_TEST_LLVM(TestWideFromBlocks) {
  419. TSetup<LLVM> setup;
  420. TProgramBuilder& pb = *setup.PgmBuilder;
  421. const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  422. const auto data1 = pb.NewDataLiteral<ui64>(10);
  423. const auto data2 = pb.NewDataLiteral<ui64>(20);
  424. const auto data3 = pb.NewDataLiteral<ui64>(30);
  425. const auto list = pb.NewList(ui64Type, {data1, data2, data3});
  426. const auto flow = pb.ToFlow(list);
  427. const auto blocksFlow = pb.ToBlocks(flow);
  428. const auto wideFlow = pb.ExpandMap(blocksFlow, [&](TRuntimeNode item) -> TRuntimeNode::TList { return {item, pb.AsScalar(pb.NewDataLiteral<ui64>(3ULL))}; });
  429. const auto wideFlow2 = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(wideFlow)));
  430. const auto narrowFlow = pb.NarrowMap(wideFlow2, [&](TRuntimeNode::TList items) -> TRuntimeNode { return items.front(); });
  431. const auto pgmReturn = pb.Collect(narrowFlow);
  432. const auto graph = setup.BuildGraph(pgmReturn);
  433. const auto iterator = graph->GetValue().GetListIterator();
  434. NUdf::TUnboxedValue item;
  435. UNIT_ASSERT(iterator.Next(item));
  436. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 10);
  437. UNIT_ASSERT(iterator.Next(item));
  438. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 20);
  439. UNIT_ASSERT(iterator.Next(item));
  440. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30);
  441. UNIT_ASSERT(!iterator.Next(item));
  442. UNIT_ASSERT(!iterator.Next(item));
  443. }
  444. Y_UNIT_TEST_LLVM(TestWideToAndFromBlocks) {
  445. TSetup<LLVM> setup;
  446. TProgramBuilder& pb = *setup.PgmBuilder;
  447. const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  448. const auto tupleType = pb.NewTupleType({ui64Type, ui64Type});
  449. const auto data1 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(1), pb.NewDataLiteral<ui64>(10)});
  450. const auto data2 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(2), pb.NewDataLiteral<ui64>(20)});
  451. const auto data3 = pb.NewTuple(tupleType, {pb.NewDataLiteral<ui64>(3), pb.NewDataLiteral<ui64>(30)});
  452. const auto list = pb.NewList(tupleType, {data1, data2, data3});
  453. const auto flow = pb.ToFlow(list);
  454. const auto wideFlow = pb.ExpandMap(flow, [&](TRuntimeNode item) -> TRuntimeNode::TList {
  455. return {pb.Nth(item, 0U), pb.Nth(item, 1U)};
  456. });
  457. const auto wideBlocksFlow = pb.WideToBlocks(wideFlow);
  458. const auto wideFlow2 = pb.ToFlow(pb.WideFromBlocks(pb.FromFlow(wideBlocksFlow)));
  459. const auto narrowFlow = pb.NarrowMap(wideFlow2, [&](TRuntimeNode::TList items) -> TRuntimeNode {
  460. return items[1];
  461. });
  462. const auto pgmReturn = pb.ForwardList(narrowFlow);
  463. const auto graph = setup.BuildGraph(pgmReturn);
  464. const auto iterator = graph->GetValue().GetListIterator();
  465. NUdf::TUnboxedValue item;
  466. UNIT_ASSERT(iterator.Next(item));
  467. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 10);
  468. UNIT_ASSERT(iterator.Next(item));
  469. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 20);
  470. UNIT_ASSERT(iterator.Next(item));
  471. UNIT_ASSERT_VALUES_EQUAL(item.Get<ui64>(), 30);
  472. UNIT_ASSERT(!iterator.Next(item));
  473. UNIT_ASSERT(!iterator.Next(item));
  474. }
  475. }
  476. Y_UNIT_TEST_SUITE(TMiniKQLDirectKernelTest) {
  477. Y_UNIT_TEST(Simple) {
  478. TSetup<false> setup;
  479. auto& pb = *setup.PgmBuilder;
  480. const auto boolType = pb.NewDataType(NUdf::TDataType<bool>::Id);
  481. const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  482. const auto boolBlocksType = pb.NewBlockType(boolType, TBlockType::EShape::Many);
  483. const auto ui64BlocksType = pb.NewBlockType(ui64Type, TBlockType::EShape::Many);
  484. const auto arg1 = pb.Arg(boolBlocksType);
  485. const auto arg2 = pb.Arg(ui64BlocksType);
  486. const auto arg3 = pb.Arg(ui64BlocksType);
  487. const auto ifNode = pb.BlockIf(arg1, arg2, arg3);
  488. const auto eqNode = pb.BlockFunc("Equals", boolBlocksType, { ifNode, arg2 });
  489. const auto graph = setup.BuildGraph(eqNode, {arg1.GetNode(), arg2.GetNode(), arg3.GetNode()});
  490. const auto topology = graph->GetKernelsTopology();
  491. UNIT_ASSERT(topology);
  492. UNIT_ASSERT_VALUES_EQUAL(topology->InputArgsCount, 3);
  493. UNIT_ASSERT_VALUES_EQUAL(topology->Items.size(), 2);
  494. UNIT_ASSERT_VALUES_EQUAL(topology->Items[0].Node->GetKernelName(), "If");
  495. const std::vector<ui32> expectedInputs1{{0, 1, 2}};
  496. UNIT_ASSERT_VALUES_EQUAL(topology->Items[0].Inputs, expectedInputs1);
  497. UNIT_ASSERT_VALUES_EQUAL(topology->Items[1].Node->GetKernelName(), "Equals");
  498. const std::vector<ui32> expectedInputs2{{3, 1}};
  499. UNIT_ASSERT_VALUES_EQUAL(topology->Items[1].Inputs, expectedInputs2);
  500. arrow::compute::ExecContext execContext;
  501. const size_t blockSize = 100000;
  502. std::vector<arrow::Datum> datums(topology->InputArgsCount + topology->Items.size());
  503. {
  504. arrow::UInt8Builder builder1(execContext.memory_pool());
  505. arrow::UInt64Builder builder2(execContext.memory_pool()), builder3(execContext.memory_pool());
  506. ARROW_OK(builder1.Reserve(blockSize));
  507. ARROW_OK(builder2.Reserve(blockSize));
  508. ARROW_OK(builder3.Reserve(blockSize));
  509. for (size_t i = 0; i < blockSize; ++i) {
  510. builder1.UnsafeAppend(i & 1);
  511. builder2.UnsafeAppend(i);
  512. builder3.UnsafeAppend(3 * i);
  513. }
  514. std::shared_ptr<arrow::ArrayData> data1;
  515. ARROW_OK(builder1.FinishInternal(&data1));
  516. std::shared_ptr<arrow::ArrayData> data2;
  517. ARROW_OK(builder2.FinishInternal(&data2));
  518. std::shared_ptr<arrow::ArrayData> data3;
  519. ARROW_OK(builder3.FinishInternal(&data3));
  520. datums[0] = data1;
  521. datums[1] = data2;
  522. datums[2] = data3;
  523. }
  524. ExecuteAllKernels(datums, topology, execContext);
  525. auto res = datums.back().array()->GetValues<ui8>(1);
  526. for (size_t i = 0; i < blockSize; ++i) {
  527. auto expected = (((i & 1) ? i : i * 3) == i) ? 1 : 0;
  528. UNIT_ASSERT_VALUES_EQUAL(res[i], expected);
  529. }
  530. }
  531. Y_UNIT_TEST(WithScalars) {
  532. TSetup<false> setup;
  533. auto& pb = *setup.PgmBuilder;
  534. const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  535. const auto ui64BlocksType = pb.NewBlockType(ui64Type, TBlockType::EShape::Many);
  536. const auto scalar = pb.AsScalar(pb.NewDataLiteral(false));
  537. const auto arg1 = pb.Arg(ui64BlocksType);
  538. const auto arg2 = pb.Arg(ui64BlocksType);
  539. const auto ifNode = pb.BlockIf(scalar, arg1, arg2);
  540. const auto graph = setup.BuildGraph(ifNode, {arg1.GetNode(), arg2.GetNode()});
  541. const auto topology = graph->GetKernelsTopology();
  542. UNIT_ASSERT(topology);
  543. UNIT_ASSERT_VALUES_EQUAL(topology->InputArgsCount, 2);
  544. UNIT_ASSERT_VALUES_EQUAL(topology->Items.size(), 2);
  545. UNIT_ASSERT_VALUES_EQUAL(topology->Items[0].Node->GetKernelName(), "AsScalar");
  546. const std::vector<ui32> expectedInputs1;
  547. UNIT_ASSERT_VALUES_EQUAL(topology->Items[0].Inputs, expectedInputs1);
  548. UNIT_ASSERT_VALUES_EQUAL(topology->Items[1].Node->GetKernelName(), "If");
  549. const std::vector<ui32> expectedInputs2{{2, 0, 1}};
  550. UNIT_ASSERT_VALUES_EQUAL(topology->Items[1].Inputs, expectedInputs2);
  551. arrow::compute::ExecContext execContext;
  552. const size_t blockSize = 100000;
  553. std::vector<arrow::Datum> datums(topology->InputArgsCount + topology->Items.size());
  554. {
  555. arrow::UInt64Builder builder1(execContext.memory_pool()), builder2(execContext.memory_pool());
  556. ARROW_OK(builder1.Reserve(blockSize));
  557. ARROW_OK(builder2.Reserve(blockSize));
  558. for (size_t i = 0; i < blockSize; ++i) {
  559. builder1.UnsafeAppend(i);
  560. builder2.UnsafeAppend(3 * i);
  561. }
  562. std::shared_ptr<arrow::ArrayData> data1;
  563. ARROW_OK(builder1.FinishInternal(&data1));
  564. std::shared_ptr<arrow::ArrayData> data2;
  565. ARROW_OK(builder2.FinishInternal(&data2));
  566. datums[0] = data1;
  567. datums[1] = data2;
  568. }
  569. ExecuteAllKernels(datums, topology, execContext);
  570. auto res = datums.back().array()->GetValues<ui64>(1);
  571. for (size_t i = 0; i < blockSize; ++i) {
  572. auto expected = 3 * i;
  573. UNIT_ASSERT_VALUES_EQUAL(res[i], expected);
  574. }
  575. }
  576. Y_UNIT_TEST(Udf) {
  577. TVector<TUdfModuleInfo> modules;
  578. modules.emplace_back(TUdfModuleInfo{"", "BlockUT", new TBlockUTModule()});
  579. TSetup<false> setup(GetTestFactory(), std::move(modules));
  580. auto& pb = *setup.PgmBuilder;
  581. const auto i32Type = pb.NewDataType(NUdf::TDataType<i32>::Id);
  582. const auto i32BlocksType = pb.NewBlockType(i32Type, TBlockType::EShape::Many);
  583. const auto arg1 = pb.Arg(i32BlocksType);
  584. const auto userType = pb.NewTupleType({
  585. pb.NewTupleType({i32BlocksType}),
  586. pb.NewEmptyStructType(),
  587. pb.NewEmptyTupleType()});
  588. const auto udf = pb.Udf("BlockUT.Inc_BlocksImpl", pb.NewVoid(), userType);
  589. const auto apply = pb.Apply(udf, {arg1});
  590. const auto graph = setup.BuildGraph(apply, {arg1.GetNode() });
  591. const auto topology = graph->GetKernelsTopology();
  592. UNIT_ASSERT(topology);
  593. UNIT_ASSERT_VALUES_EQUAL(topology->InputArgsCount, 1);
  594. UNIT_ASSERT_VALUES_EQUAL(topology->Items.size(), 1);
  595. UNIT_ASSERT_VALUES_EQUAL(topology->Items[0].Node->GetKernelName(), "Apply");
  596. const std::vector<ui32> expectedInputs1{0};
  597. UNIT_ASSERT_VALUES_EQUAL(topology->Items[0].Inputs, expectedInputs1);
  598. arrow::compute::ExecContext execContext;
  599. const size_t blockSize = 10000;
  600. std::vector<arrow::Datum> datums(topology->InputArgsCount + topology->Items.size());
  601. {
  602. arrow::Int32Builder builder1(execContext.memory_pool());
  603. ARROW_OK(builder1.Reserve(blockSize));
  604. for (size_t i = 0; i < blockSize; ++i) {
  605. builder1.UnsafeAppend(i);
  606. }
  607. std::shared_ptr<arrow::ArrayData> data1;
  608. ARROW_OK(builder1.FinishInternal(&data1));
  609. datums[0] = data1;
  610. }
  611. ExecuteAllKernels(datums, topology, execContext);
  612. auto res = datums.back().array()->GetValues<i32>(1);
  613. for (size_t i = 0; i < blockSize; ++i) {
  614. auto expected = i + 1;
  615. UNIT_ASSERT_VALUES_EQUAL(res[i], expected);
  616. }
  617. }
  618. Y_UNIT_TEST(ScalarApply) {
  619. TSetup<false> setup;
  620. auto& pb = *setup.PgmBuilder;
  621. const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
  622. const auto ui64BlocksType = pb.NewBlockType(ui64Type, TBlockType::EShape::Many);
  623. const auto arg1 = pb.Arg(ui64BlocksType);
  624. const auto arg2 = pb.Arg(ui64BlocksType);
  625. const auto scalarApply = pb.ScalarApply({arg1,arg2}, [&](auto args){
  626. return pb.Add(args[0], args[1]);
  627. });
  628. const auto graph = setup.BuildGraph(scalarApply, {arg1.GetNode(), arg2.GetNode()});
  629. const auto topology = graph->GetKernelsTopology();
  630. UNIT_ASSERT(topology);
  631. UNIT_ASSERT_VALUES_EQUAL(topology->InputArgsCount, 2);
  632. UNIT_ASSERT_VALUES_EQUAL(topology->Items.size(), 1);
  633. UNIT_ASSERT_VALUES_EQUAL(topology->Items[0].Node->GetKernelName(), "ScalarApply");
  634. const std::vector<ui32> expectedInputs1{{0, 1}};
  635. UNIT_ASSERT_VALUES_EQUAL(topology->Items[0].Inputs, expectedInputs1);
  636. arrow::compute::ExecContext execContext;
  637. const size_t blockSize = 100000;
  638. std::vector<arrow::Datum> datums(topology->InputArgsCount + topology->Items.size());
  639. {
  640. arrow::UInt64Builder builder1(execContext.memory_pool()), builder2(execContext.memory_pool());
  641. ARROW_OK(builder1.Reserve(blockSize));
  642. ARROW_OK(builder2.Reserve(blockSize));
  643. for (size_t i = 0; i < blockSize; ++i) {
  644. builder1.UnsafeAppend(i);
  645. builder2.UnsafeAppend(2 * i);
  646. }
  647. std::shared_ptr<arrow::ArrayData> data1;
  648. ARROW_OK(builder1.FinishInternal(&data1));
  649. std::shared_ptr<arrow::ArrayData> data2;
  650. ARROW_OK(builder2.FinishInternal(&data2));
  651. datums[0] = data1;
  652. datums[1] = data2;
  653. }
  654. ExecuteAllKernels(datums, topology, execContext);
  655. auto res = datums.back().array()->GetValues<ui64>(1);
  656. for (size_t i = 0; i < blockSize; ++i) {
  657. auto expected = 3 * i;
  658. UNIT_ASSERT_VALUES_EQUAL(res[i], expected);
  659. }
  660. }
  661. }
  662. }
  663. }