#include "mkql_block_impl.h" #include "mkql_block_builder.h" #include "mkql_block_reader.h" #include #include #include #include #include #include #include #include extern "C" uint64_t GetBlockCount(const NYql::NUdf::TUnboxedValuePod data) { return NKikimr::NMiniKQL::TArrowBlock::From(data).GetDatum().scalar_as().value; } extern "C" uint64_t GetBitmapPopCountCount(const NYql::NUdf::TUnboxedValuePod data) { const auto& arr = NKikimr::NMiniKQL::TArrowBlock::From(data).GetDatum().array(); const size_t len = (size_t)arr->length; MKQL_ENSURE(arr->GetNullCount() == 0, "Bitmap block should not have nulls"); const ui8* src = arr->GetValues(1); return NKikimr::NMiniKQL::GetSparseBitmapPopCount(src, len); } extern "C" uint8_t GetBitmapScalarValue(const NYql::NUdf::TUnboxedValuePod data) { return NKikimr::NMiniKQL::TArrowBlock::From(data).GetDatum().scalar_as().value; } namespace NKikimr::NMiniKQL { namespace { template arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& pool) { std::shared_ptr arrowType; MKQL_ENSURE(ConvertArrowType(type, arrowType), "Unsupported type of scalar " << *type); if (!value) { return arrow::MakeNullScalar(arrowType); } bool isOptional = false; if (type->IsOptional()) { type = AS_TYPE(TOptionalType, type)->GetItemType(); isOptional = true; } if (type->IsOptional() || (isOptional && type->IsPg())) { // nested optionals std::vector> arrowValue; arrowValue.emplace_back(DoConvertScalar(type, value.GetOptionalValue(), pool).scalar()); return arrow::Datum(std::make_shared(arrowValue, arrowType)); } if (type->IsStruct()) { auto structType = AS_TYPE(TStructType, type); std::vector> arrowValue; for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { arrowValue.emplace_back(DoConvertScalar(structType->GetMemberType(i), value.GetElement(i), pool).scalar()); } return arrow::Datum(std::make_shared(arrowValue, arrowType)); } if (type->IsTuple()) { auto tupleType = AS_TYPE(TTupleType, type); std::vector> arrowValue; for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { arrowValue.emplace_back(DoConvertScalar(tupleType->GetElementType(i), value.GetElement(i), pool).scalar()); } return arrow::Datum(std::make_shared(arrowValue, arrowType)); } if (type->IsData()) { auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); switch (slot) { case NUdf::EDataSlot::Int8: return arrow::Datum(static_cast(value.template Get())); case NUdf::EDataSlot::Bool: case NUdf::EDataSlot::Uint8: return arrow::Datum(static_cast(value.template Get())); case NUdf::EDataSlot::Int16: return arrow::Datum(static_cast(value.template Get())); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: return arrow::Datum(static_cast(value.template Get())); case NUdf::EDataSlot::Int32: case NUdf::EDataSlot::Date32: return arrow::Datum(static_cast(value.template Get())); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: return arrow::Datum(static_cast(value.template Get())); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: case NUdf::EDataSlot::Interval64: case NUdf::EDataSlot::Datetime64: case NUdf::EDataSlot::Timestamp64: return arrow::Datum(static_cast(value.template Get())); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: return arrow::Datum(static_cast(value.template Get())); case NUdf::EDataSlot::Float: return arrow::Datum(static_cast(value.template Get())); case NUdf::EDataSlot::Double: return arrow::Datum(static_cast(value.template Get())); case NUdf::EDataSlot::String: case NUdf::EDataSlot::Utf8: case NUdf::EDataSlot::Yson: case NUdf::EDataSlot::Json: case NUdf::EDataSlot::JsonDocument: { const auto& str = value.AsStringRef(); std::shared_ptr buffer(ARROW_RESULT(arrow::AllocateBuffer(str.Size(), &pool))); std::memcpy(buffer->mutable_data(), str.Data(), str.Size()); auto type = (slot == NUdf::EDataSlot::String || slot == NUdf::EDataSlot::Yson || slot == NUdf::EDataSlot::JsonDocument) ? arrow::binary() : arrow::utf8(); std::shared_ptr scalar = std::make_shared(buffer, type); return arrow::Datum(scalar); } case NUdf::EDataSlot::TzDate: { auto items = arrow::StructScalar::ValueType{ std::make_shared(value.template Get()), std::make_shared(value.GetTimezoneId()) }; return arrow::Datum(std::make_shared(items, MakeTzDateArrowType())); } case NUdf::EDataSlot::TzDatetime: { auto items = arrow::StructScalar::ValueType{ std::make_shared(value.template Get()), std::make_shared(value.GetTimezoneId()) }; return arrow::Datum(std::make_shared(items, MakeTzDateArrowType())); } case NUdf::EDataSlot::TzTimestamp: { auto items = arrow::StructScalar::ValueType{ std::make_shared(value.template Get()), std::make_shared(value.GetTimezoneId()) }; return arrow::Datum(std::make_shared(items, MakeTzDateArrowType())); } case NUdf::EDataSlot::TzDate32: { auto items = arrow::StructScalar::ValueType{ std::make_shared(value.template Get()), std::make_shared(value.GetTimezoneId()) }; return arrow::Datum(std::make_shared(items, MakeTzDateArrowType())); } case NUdf::EDataSlot::TzDatetime64: { auto items = arrow::StructScalar::ValueType{ std::make_shared(value.template Get()), std::make_shared(value.GetTimezoneId()) }; return arrow::Datum(std::make_shared(items, MakeTzDateArrowType())); } case NUdf::EDataSlot::TzTimestamp64: { auto items = arrow::StructScalar::ValueType{ std::make_shared(value.template Get()), std::make_shared(value.GetTimezoneId()) }; return arrow::Datum(std::make_shared(items, MakeTzDateArrowType())); } case NUdf::EDataSlot::Decimal: { std::shared_ptr buffer(ARROW_RESULT(arrow::AllocateBuffer(16, &pool))); *reinterpret_cast(buffer->mutable_data()) = value.GetInt128(); return arrow::Datum(std::make_shared::TScalarResult>(buffer)); } default: MKQL_ENSURE(false, "Unsupported data slot " << slot); } } if (type->IsPg()) { return NYql::MakePgScalar(AS_TYPE(TPgType, type), value, pool); } MKQL_ENSURE(false, "Unsupported type " << *type); } } // namespace arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool) { return DoConvertScalar(type, value, pool); } arrow::Datum ConvertScalar(TType* type, const NUdf::TBlockItem& value, arrow::MemoryPool& pool) { return DoConvertScalar(type, value, pool); } arrow::Datum MakeArrayFromScalar(const arrow::Scalar& scalar, size_t len, TType* type, arrow::MemoryPool& pool) { MKQL_ENSURE(len > 0, "Invalid block size"); auto reader = MakeBlockReader(TTypeInfoHelper(), type); auto builder = MakeArrayBuilder(TTypeInfoHelper(), type, pool, len, nullptr); auto scalarItem = reader->GetScalarItem(scalar); builder->Add(scalarItem, len); return builder->Build(true); } arrow::ValueDescr ToValueDescr(TType* type) { arrow::ValueDescr ret; MKQL_ENSURE(ConvertInputArrowType(type, ret), "can't get arrow type"); return ret; } std::vector ToValueDescr(const TVector& types) { std::vector res; res.reserve(types.size()); for (const auto& type : types) { res.emplace_back(ToValueDescr(type)); } return res; } std::vector ConvertToInputTypes(const TVector& argTypes) { std::vector result; result.reserve(argTypes.size()); for (auto& type : argTypes) { result.emplace_back(ToValueDescr(type)); } return result; } arrow::compute::OutputType ConvertToOutputType(TType* output) { return arrow::compute::OutputType(ToValueDescr(output)); } NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const uint64_t count) { return holderFactory.CreateArrowBlock(arrow::Datum(count)); } TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes, const TVector& argsTypes, const arrow::compute::ScalarKernel& kernel, std::shared_ptr kernelHolder, const arrow::compute::FunctionOptions* functionOptions) : TMutableComputationNode(mutables) , StateIndex(mutables.CurValueIndex++) , ArgsNodes(std::move(argsNodes)) , ArgsValuesDescr(ToValueDescr(argsTypes)) , Kernel(kernel) , KernelHolder(std::move(kernelHolder)) , Options(functionOptions) , ScalarOutput(GetResultShape(argsTypes) == TBlockType::EShape::Scalar) , Name(name.starts_with("Block") ? name.substr(5) : name) { } NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) const { auto& state = GetState(ctx); std::vector argDatums; for (ui32 i = 0; i < ArgsNodes.size(); ++i) { const auto& value = ArgsNodes[i]->GetValue(ctx); argDatums.emplace_back(TArrowBlock::From(value).GetDatum()); ARROW_DEBUG_CHECK_DATUM_TYPES(ArgsValuesDescr[i], argDatums.back().descr()); } if (ScalarOutput) { auto executor = arrow::compute::detail::KernelExecutor::MakeScalar(); ARROW_OK(executor->Init(&state.KernelContext, { &Kernel, ArgsValuesDescr, Options })); auto listener = std::make_shared(); ARROW_OK(executor->Execute(argDatums, listener.get())); auto output = executor->WrapResults(argDatums, listener->values()); return ctx.HolderFactory.CreateArrowBlock(std::move(output)); } NYql::NUdf::TArgsDechunker dechunker(std::move(argDatums)); std::vector chunk; TVector> arrays; while (dechunker.Next(chunk)) { auto executor = arrow::compute::detail::KernelExecutor::MakeScalar(); ARROW_OK(executor->Init(&state.KernelContext, { &Kernel, ArgsValuesDescr, Options })); arrow::compute::detail::DatumAccumulator listener; ARROW_OK(executor->Execute(chunk, &listener)); auto output = executor->WrapResults(chunk, listener.values()); ForEachArrayData(output, [&](const auto& arr) { arrays.push_back(arr); }); } return ctx.HolderFactory.CreateArrowBlock(MakeArray(arrays)); } void TBlockFuncNode::RegisterDependencies() const { for (const auto& arg : ArgsNodes) { DependsOn(arg); } } TBlockFuncNode::TState& TBlockFuncNode::GetState(TComputationContext& ctx) const { auto& result = ctx.MutableValues[StateIndex]; if (!result.HasValue()) { result = ctx.HolderFactory.Create(Options, Kernel, ArgsValuesDescr, ctx); } return *static_cast(result.AsBoxed().Get()); } std::unique_ptr TBlockFuncNode::PrepareArrowKernelComputationNode(TComputationContext&) const { return std::make_unique(this); } TBlockFuncNode::TArrowNode::TArrowNode(const TBlockFuncNode* parent) : Parent_(parent) {} TStringBuf TBlockFuncNode::TArrowNode::GetKernelName() const { return Parent_->Name; } const arrow::compute::ScalarKernel& TBlockFuncNode::TArrowNode::GetArrowKernel() const { return Parent_->Kernel; } const std::vector& TBlockFuncNode::TArrowNode::GetArgsDesc() const { return Parent_->ArgsValuesDescr; } const IComputationNode* TBlockFuncNode::TArrowNode::GetArgument(ui32 index) const { MKQL_ENSURE(index < Parent_->ArgsNodes.size(), "Wrong index"); return Parent_->ArgsNodes[index]; } TBlockState::TBlockState(TMemoryUsageInfo* memInfo, size_t width) : TBase(memInfo), Values(width), Deques(width - 1ULL), Arrays(width - 1ULL) { Pointer_ = Values.data(); } void TBlockState::ClearValues() { Values.assign(Values.size(), NUdf::TUnboxedValuePod()); } void TBlockState::FillArrays() { MKQL_ENSURE(Count == 0, "All existing arrays have to be processed"); auto& counterDatum = TArrowBlock::From(Values.back()).GetDatum(); MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)"); Count = counterDatum.scalar_as().value; if (!Count) return; for (size_t i = 0U; i < Deques.size(); ++i) { Deques[i].clear(); if (const auto& value = Values[i]) { const auto& datum = TArrowBlock::From(value).GetDatum(); if (datum.is_scalar()) { return; } MKQL_ENSURE(datum.is_arraylike(), "Unexpected block type (expecting array or chunked array)"); ForEachArrayData(datum, [this, i](const auto& arrayData) { Deques[i].push_back(arrayData); }); } } } ui64 TBlockState::Slice() { auto sliceSize = Count; for (size_t i = 0; i < Deques.size(); ++i) { const auto& arr = Deques[i]; if (arr.empty()) continue; Y_ABORT_UNLESS(ui64(arr.front()->length) <= Count); MKQL_ENSURE(ui64(arr.front()->length) <= Count, "Unexpected array length at column #" << i); sliceSize = std::min(sliceSize, arr.front()->length); } for (size_t i = 0; i < Arrays.size(); ++i) { auto& arr = Deques[i]; if (arr.empty()) continue; if (auto& array = arr.front(); ui64(array->length) == sliceSize) { Arrays[i] = std::move(array); Deques[i].pop_front(); } else Arrays[i] = Chop(array, sliceSize); } Count -= sliceSize; return sliceSize; } NUdf::TUnboxedValuePod TBlockState::Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const { if (idx >= Deques.size()) return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared(sliceSize))); if (auto array = Arrays[idx]) return holderFactory.CreateArrowBlock(std::move(array)); else return Values[idx]; } }