mkql_block_impl.cpp 16 KB


  1. #include "mkql_block_impl.h"
  2. #include "mkql_block_builder.h"
  3. #include "mkql_block_reader.h"
  4. #include <yql/essentials/minikql/arrow/mkql_functions.h>
  5. #include <yql/essentials/minikql/mkql_node_builder.h>
  6. #include <yql/essentials/minikql/mkql_node_cast.h>
  7. #include <yql/essentials/minikql/arrow/arrow_util.h>
  8. #include <yql/essentials/minikql/arrow/mkql_bit_utils.h>
  9. #include <yql/essentials/public/udf/arrow/args_dechunker.h>
  10. #include <yql/essentials/parser/pg_wrapper/interface/arrow.h>
  11. #include <arrow/compute/exec_internal.h>
  12. extern "C" uint64_t GetBlockCount(const NYql::NUdf::TUnboxedValuePod data) {
  13. return NKikimr::NMiniKQL::TArrowBlock::From(data).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
  14. }
  15. extern "C" uint64_t GetBitmapPopCountCount(const NYql::NUdf::TUnboxedValuePod data) {
  16. const auto& arr = NKikimr::NMiniKQL::TArrowBlock::From(data).GetDatum().array();
  17. const size_t len = (size_t)arr->length;
  18. MKQL_ENSURE(arr->GetNullCount() == 0, "Bitmap block should not have nulls");
  19. const ui8* src = arr->GetValues<ui8>(1);
  20. return NKikimr::NMiniKQL::GetSparseBitmapPopCount(src, len);
  21. }
  22. extern "C" uint8_t GetBitmapScalarValue(const NYql::NUdf::TUnboxedValuePod data) {
  23. return NKikimr::NMiniKQL::TArrowBlock::From(data).GetDatum().scalar_as<arrow::UInt8Scalar>().value;
  24. }
  25. namespace NKikimr::NMiniKQL {
  26. namespace {
  27. template<typename T>
  28. arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& pool) {
  29. std::shared_ptr<arrow::DataType> arrowType;
  30. MKQL_ENSURE(ConvertArrowType(type, arrowType), "Unsupported type of scalar " << *type);
  31. if (!value) {
  32. return arrow::MakeNullScalar(arrowType);
  33. }
  34. bool isOptional = false;
  35. if (type->IsOptional()) {
  36. type = AS_TYPE(TOptionalType, type)->GetItemType();
  37. isOptional = true;
  38. }
  39. if (type->IsOptional() || (isOptional && type->IsPg())) {
  40. // nested optionals
  41. std::vector<std::shared_ptr<arrow::Scalar>> arrowValue;
  42. arrowValue.emplace_back(DoConvertScalar(type, value.GetOptionalValue(), pool).scalar());
  43. return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType));
  44. }
  45. if (type->IsStruct()) {
  46. auto structType = AS_TYPE(TStructType, type);
  47. std::vector<std::shared_ptr<arrow::Scalar>> arrowValue;
  48. for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
  49. arrowValue.emplace_back(DoConvertScalar(structType->GetMemberType(i), value.GetElement(i), pool).scalar());
  50. }
  51. return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType));
  52. }
  53. if (type->IsTuple()) {
  54. auto tupleType = AS_TYPE(TTupleType, type);
  55. std::vector<std::shared_ptr<arrow::Scalar>> arrowValue;
  56. for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
  57. arrowValue.emplace_back(DoConvertScalar(tupleType->GetElementType(i), value.GetElement(i), pool).scalar());
  58. }
  59. return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType));
  60. }
  61. if (type->IsData()) {
  62. auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
  63. switch (slot) {
  64. case NUdf::EDataSlot::Int8:
  65. return arrow::Datum(static_cast<int8_t>(value.template Get<i8>()));
  66. case NUdf::EDataSlot::Bool:
  67. case NUdf::EDataSlot::Uint8:
  68. return arrow::Datum(static_cast<uint8_t>(value.template Get<ui8>()));
  69. case NUdf::EDataSlot::Int16:
  70. return arrow::Datum(static_cast<int16_t>(value.template Get<i16>()));
  71. case NUdf::EDataSlot::Uint16:
  72. case NUdf::EDataSlot::Date:
  73. return arrow::Datum(static_cast<uint16_t>(value.template Get<ui16>()));
  74. case NUdf::EDataSlot::Int32:
  75. case NUdf::EDataSlot::Date32:
  76. return arrow::Datum(static_cast<int32_t>(value.template Get<i32>()));
  77. case NUdf::EDataSlot::Uint32:
  78. case NUdf::EDataSlot::Datetime:
  79. return arrow::Datum(static_cast<uint32_t>(value.template Get<ui32>()));
  80. case NUdf::EDataSlot::Int64:
  81. case NUdf::EDataSlot::Interval:
  82. case NUdf::EDataSlot::Interval64:
  83. case NUdf::EDataSlot::Datetime64:
  84. case NUdf::EDataSlot::Timestamp64:
  85. return arrow::Datum(static_cast<int64_t>(value.template Get<i64>()));
  86. case NUdf::EDataSlot::Uint64:
  87. case NUdf::EDataSlot::Timestamp:
  88. return arrow::Datum(static_cast<uint64_t>(value.template Get<ui64>()));
  89. case NUdf::EDataSlot::Float:
  90. return arrow::Datum(static_cast<float>(value.template Get<float>()));
  91. case NUdf::EDataSlot::Double:
  92. return arrow::Datum(static_cast<double>(value.template Get<double>()));
  93. case NUdf::EDataSlot::String:
  94. case NUdf::EDataSlot::Utf8:
  95. case NUdf::EDataSlot::Yson:
  96. case NUdf::EDataSlot::Json:
  97. case NUdf::EDataSlot::JsonDocument: {
  98. const auto& str = value.AsStringRef();
  99. std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(str.Size(), &pool)));
  100. std::memcpy(buffer->mutable_data(), str.Data(), str.Size());
  101. std::shared_ptr<arrow::Scalar> scalar;
  102. if (slot == NUdf::EDataSlot::String || slot == NUdf::EDataSlot::Yson || slot == NUdf::EDataSlot::JsonDocument) {
  103. scalar = std::make_shared<arrow::BinaryScalar>(buffer, arrow::binary());
  104. } else {
  105. // NOTE: Do not use |arrow::BinaryScalar| for utf8 and json types directly.
  106. // This is necessary so that the type of the scalar is clearly preserved at runtime.
  107. scalar = std::make_shared<arrow::StringScalar>(buffer);
  108. }
  109. return arrow::Datum(scalar);
  110. }
  111. case NUdf::EDataSlot::TzDate: {
  112. auto items = arrow::StructScalar::ValueType{
  113. std::make_shared<arrow::UInt16Scalar>(value.template Get<ui16>()),
  114. std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
  115. };
  116. return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate>()));
  117. }
  118. case NUdf::EDataSlot::TzDatetime: {
  119. auto items = arrow::StructScalar::ValueType{
  120. std::make_shared<arrow::UInt32Scalar>(value.template Get<ui32>()),
  121. std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
  122. };
  123. return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime>()));
  124. }
  125. case NUdf::EDataSlot::TzTimestamp: {
  126. auto items = arrow::StructScalar::ValueType{
  127. std::make_shared<arrow::UInt64Scalar>(value.template Get<ui64>()),
  128. std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
  129. };
  130. return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp>()));
  131. }
  132. case NUdf::EDataSlot::TzDate32: {
  133. auto items = arrow::StructScalar::ValueType{
  134. std::make_shared<arrow::Int32Scalar>(value.template Get<i32>()),
  135. std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
  136. };
  137. return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate32>()));
  138. }
  139. case NUdf::EDataSlot::TzDatetime64: {
  140. auto items = arrow::StructScalar::ValueType{
  141. std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()),
  142. std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
  143. };
  144. return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime64>()));
  145. }
  146. case NUdf::EDataSlot::TzTimestamp64: {
  147. auto items = arrow::StructScalar::ValueType{
  148. std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()),
  149. std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
  150. };
  151. return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp64>()));
  152. }
  153. case NUdf::EDataSlot::Decimal: {
  154. std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(16, &pool)));
  155. *reinterpret_cast<NYql::NDecimal::TInt128*>(buffer->mutable_data()) = value.GetInt128();
  156. return arrow::Datum(std::make_shared<TPrimitiveDataType<NYql::NDecimal::TInt128>::TScalarResult>(buffer));
  157. }
  158. default:
  159. MKQL_ENSURE(false, "Unsupported data slot " << slot);
  160. }
  161. }
  162. if (type->IsPg()) {
  163. return NYql::MakePgScalar(AS_TYPE(TPgType, type), value, pool);
  164. }
  165. MKQL_ENSURE(false, "Unsupported type " << *type);
  166. }
  167. } // namespace
  168. arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool) {
  169. return DoConvertScalar(type, value, pool);
  170. }
  171. arrow::Datum ConvertScalar(TType* type, const NUdf::TBlockItem& value, arrow::MemoryPool& pool) {
  172. return DoConvertScalar(type, value, pool);
  173. }
  174. arrow::Datum MakeArrayFromScalar(const arrow::Scalar& scalar, size_t len, TType* type, arrow::MemoryPool& pool) {
  175. MKQL_ENSURE(len > 0, "Invalid block size");
  176. auto reader = MakeBlockReader(TTypeInfoHelper(), type);
  177. auto builder = MakeArrayBuilder(TTypeInfoHelper(), type, pool, len, nullptr);
  178. auto scalarItem = reader->GetScalarItem(scalar);
  179. builder->Add(scalarItem, len);
  180. return builder->Build(true);
  181. }
  182. arrow::ValueDescr ToValueDescr(TType* type) {
  183. arrow::ValueDescr ret;
  184. MKQL_ENSURE(ConvertInputArrowType(type, ret), "can't get arrow type");
  185. return ret;
  186. }
  187. std::vector<arrow::ValueDescr> ToValueDescr(const TVector<TType*>& types) {
  188. std::vector<arrow::ValueDescr> res;
  189. res.reserve(types.size());
  190. for (const auto& type : types) {
  191. res.emplace_back(ToValueDescr(type));
  192. }
  193. return res;
  194. }
  195. std::vector<arrow::compute::InputType> ConvertToInputTypes(const TVector<TType*>& argTypes) {
  196. std::vector<arrow::compute::InputType> result;
  197. result.reserve(argTypes.size());
  198. for (auto& type : argTypes) {
  199. result.emplace_back(ToValueDescr(type));
  200. }
  201. return result;
  202. }
  203. arrow::compute::OutputType ConvertToOutputType(TType* output) {
  204. return arrow::compute::OutputType(ToValueDescr(output));
  205. }
  206. NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const uint64_t count) {
  207. return holderFactory.CreateArrowBlock(arrow::Datum(count));
  208. }
  209. TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes,
  210. const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel,
  211. std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder,
  212. const arrow::compute::FunctionOptions* functionOptions)
  213. : TMutableComputationNode(mutables)
  214. , StateIndex(mutables.CurValueIndex++)
  215. , ArgsNodes(std::move(argsNodes))
  216. , ArgsValuesDescr(ToValueDescr(argsTypes))
  217. , Kernel(kernel)
  218. , KernelHolder(std::move(kernelHolder))
  219. , Options(functionOptions)
  220. , ScalarOutput(GetResultShape(argsTypes) == TBlockType::EShape::Scalar)
  221. , Name(name.starts_with("Block") ? name.substr(5) : name)
  222. {
  223. }
  224. NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) const {
  225. auto& state = GetState(ctx);
  226. std::vector<arrow::Datum> argDatums;
  227. for (ui32 i = 0; i < ArgsNodes.size(); ++i) {
  228. const auto& value = ArgsNodes[i]->GetValue(ctx);
  229. argDatums.emplace_back(TArrowBlock::From(value).GetDatum());
  230. ARROW_DEBUG_CHECK_DATUM_TYPES(ArgsValuesDescr[i], argDatums.back().descr());
  231. }
  232. if (ScalarOutput) {
  233. auto executor = arrow::compute::detail::KernelExecutor::MakeScalar();
  234. ARROW_OK(executor->Init(&state.KernelContext, { &Kernel, ArgsValuesDescr, Options }));
  235. auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>();
  236. ARROW_OK(executor->Execute(argDatums, listener.get()));
  237. auto output = executor->WrapResults(argDatums, listener->values());
  238. return ctx.HolderFactory.CreateArrowBlock(std::move(output));
  239. }
  240. NYql::NUdf::TArgsDechunker dechunker(std::move(argDatums));
  241. std::vector<arrow::Datum> chunk;
  242. TVector<std::shared_ptr<arrow::ArrayData>> arrays;
  243. while (dechunker.Next(chunk)) {
  244. auto executor = arrow::compute::detail::KernelExecutor::MakeScalar();
  245. ARROW_OK(executor->Init(&state.KernelContext, { &Kernel, ArgsValuesDescr, Options }));
  246. arrow::compute::detail::DatumAccumulator listener;
  247. ARROW_OK(executor->Execute(chunk, &listener));
  248. auto output = executor->WrapResults(chunk, listener.values());
  249. ForEachArrayData(output, [&](const auto& arr) { arrays.push_back(arr); });
  250. }
  251. return ctx.HolderFactory.CreateArrowBlock(MakeArray(arrays));
  252. }
  253. void TBlockFuncNode::RegisterDependencies() const {
  254. for (const auto& arg : ArgsNodes) {
  255. DependsOn(arg);
  256. }
  257. }
  258. TBlockFuncNode::TState& TBlockFuncNode::GetState(TComputationContext& ctx) const {
  259. auto& result = ctx.MutableValues[StateIndex];
  260. if (!result.HasValue()) {
  261. result = ctx.HolderFactory.Create<TState>(Options, Kernel, ArgsValuesDescr, ctx);
  262. }
  263. return *static_cast<TState*>(result.AsBoxed().Get());
  264. }
  265. std::unique_ptr<IArrowKernelComputationNode> TBlockFuncNode::PrepareArrowKernelComputationNode(TComputationContext&) const {
  266. return std::make_unique<TArrowNode>(this);
  267. }
  268. TBlockFuncNode::TArrowNode::TArrowNode(const TBlockFuncNode* parent)
  269. : Parent_(parent)
  270. {}
  271. TStringBuf TBlockFuncNode::TArrowNode::GetKernelName() const {
  272. return Parent_->Name;
  273. }
  274. const arrow::compute::ScalarKernel& TBlockFuncNode::TArrowNode::GetArrowKernel() const {
  275. return Parent_->Kernel;
  276. }
  277. const std::vector<arrow::ValueDescr>& TBlockFuncNode::TArrowNode::GetArgsDesc() const {
  278. return Parent_->ArgsValuesDescr;
  279. }
  280. const IComputationNode* TBlockFuncNode::TArrowNode::GetArgument(ui32 index) const {
  281. MKQL_ENSURE(index < Parent_->ArgsNodes.size(), "Wrong index");
  282. return Parent_->ArgsNodes[index];
  283. }
  284. TBlockState::TBlockState(TMemoryUsageInfo* memInfo, size_t width)
  285. : TBase(memInfo), Values(width), Deques(width - 1ULL), Arrays(width - 1ULL)
  286. {
  287. Pointer_ = Values.data();
  288. }
  289. void TBlockState::ClearValues() {
  290. Values.assign(Values.size(), NUdf::TUnboxedValuePod());
  291. }
  292. void TBlockState::FillArrays() {
  293. MKQL_ENSURE(Count == 0, "All existing arrays have to be processed");
  294. auto& counterDatum = TArrowBlock::From(Values.back()).GetDatum();
  295. MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)");
  296. Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value;
  297. if (!Count)
  298. return;
  299. for (size_t i = 0U; i < Deques.size(); ++i) {
  300. Deques[i].clear();
  301. if (const auto& value = Values[i]) {
  302. const auto& datum = TArrowBlock::From(value).GetDatum();
  303. if (datum.is_scalar()) {
  304. return;
  305. }
  306. MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)");
  307. ForEachArrayData(datum, [this, i](const auto& arrayData) {
  308. Deques[i].push_back(arrayData);
  309. });
  310. }
  311. }
  312. }
  313. ui64 TBlockState::Slice() {
  314. auto sliceSize = Count;
  315. for (size_t i = 0; i < Deques.size(); ++i) {
  316. const auto& arr = Deques[i];
  317. if (arr.empty())
  318. continue;
  319. Y_ABORT_UNLESS(ui64(arr.front()->length) <= Count);
  320. MKQL_ENSURE(ui64(arr.front()->length) <= Count, "Unexpected array length at column #" << i);
  321. sliceSize = std::min<ui64>(sliceSize, arr.front()->length);
  322. }
  323. for (size_t i = 0; i < Arrays.size(); ++i) {
  324. auto& arr = Deques[i];
  325. if (arr.empty())
  326. continue;
  327. if (auto& array = arr.front(); ui64(array->length) == sliceSize) {
  328. Arrays[i] = std::move(array);
  329. Deques[i].pop_front();
  330. } else
  331. Arrays[i] = Chop(array, sliceSize);
  332. }
  333. Count -= sliceSize;
  334. return sliceSize;
  335. }
  336. NUdf::TUnboxedValuePod TBlockState::Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const {
  337. if (idx >= Deques.size())
  338. return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
  339. if (auto array = Arrays[idx])
  340. return holderFactory.CreateArrowBlock(std::move(array));
  341. else
  342. return Values[idx];
  343. }
  344. }