123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- #include "mkql_block_coalesce.h"
- #include <yql/essentials/minikql/arrow/arrow_defs.h>
- #include <yql/essentials/minikql/mkql_type_builder.h>
- #include <yql/essentials/minikql/computation/mkql_block_impl.h>
- #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
- #include <yql/essentials/minikql/mkql_node_builder.h>
- #include <yql/essentials/minikql/mkql_node_cast.h>
- #include <yql/essentials/public/udf/arrow/block_builder.h>
- #include <yql/essentials/public/udf/arrow/block_reader.h>
- #include <yql/essentials/public/udf/arrow/util.h>
- #include <arrow/util/bitmap_ops.h>
- namespace NKikimr {
- namespace NMiniKQL {
- namespace {
- class TCoalesceBlockExec {
- public:
- TCoalesceBlockExec(const std::shared_ptr<arrow::DataType>& returnArrowType, TType* firstItemType, TType* secondItemType, bool needUnwrapFirst)
- : ReturnArrowType_(returnArrowType)
- , FirstItemType_(firstItemType)
- , SecondItemType_(secondItemType)
- , NeedUnwrapFirst_(needUnwrapFirst)
- {}
- arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
- const auto& first = batch.values[0];
- const auto& second = batch.values[1];
- MKQL_ENSURE(!first.is_scalar() || !second.is_scalar(), "Expected at least one array");
- size_t length = Max(first.length(), second.length());
- auto firstReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), FirstItemType_);
- auto secondReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), SecondItemType_);
- if (first.is_scalar()) {
- auto firstValue = firstReader->GetScalarItem(*first.scalar());
- if (firstValue) {
- auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondItemType_, *ctx->memory_pool(), length, nullptr);
- builder->Add(NeedUnwrapFirst_ ? firstValue.GetOptionalValue() : firstValue, length);
- *res = builder->Build(true);
- } else {
- *res = second;
- }
- } else if (second.is_scalar()) {
- const auto& firstArray = *first.array();
- if (firstArray.GetNullCount() == 0) {
- *res = NeedUnwrapFirst_ ? Unwrap(firstArray, FirstItemType_) : first;
- } else if ((size_t)firstArray.GetNullCount() == length) {
- auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondItemType_, *ctx->memory_pool(), length, nullptr);
- auto secondValue = secondReader->GetScalarItem(*second.scalar());
- builder->Add(secondValue, length);
- *res = builder->Build(true);
- } else {
- auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondItemType_, *ctx->memory_pool(), length, nullptr);
- auto secondValue = secondReader->GetScalarItem(*second.scalar());
- for (size_t i = 0; i < length; ++i) {
- auto firstItem = firstReader->GetItem(firstArray, i);
- if (firstItem) {
- builder->Add(NeedUnwrapFirst_ ? firstItem.GetOptionalValue() : firstItem);
- } else {
- builder->Add(secondValue);
- }
- }
- *res = builder->Build(true);
- }
- } else {
- const auto& firstArray = *first.array();
- const auto& secondArray = *second.array();
- if (firstArray.GetNullCount() == 0) {
- *res = NeedUnwrapFirst_ ? Unwrap(firstArray, FirstItemType_) : first;
- } else if ((size_t)firstArray.GetNullCount() == length) {
- *res = second;
- } else {
- auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondItemType_, *ctx->memory_pool(), length, nullptr);
- for (size_t i = 0; i < length; ++i) {
- auto firstItem = firstReader->GetItem(firstArray, i);
- if (firstItem) {
- builder->Add(NeedUnwrapFirst_ ? firstItem.GetOptionalValue() : firstItem);
- } else {
- auto secondItem = secondReader->GetItem(secondArray, i);
- builder->Add(secondItem);
- }
- }
- *res = builder->Build(true);
- }
- }
- return arrow::Status::OK();
- }
- private:
- const std::shared_ptr<arrow::DataType> ReturnArrowType_;
- TType* const FirstItemType_;
- TType* const SecondItemType_;
- const bool NeedUnwrapFirst_;
- };
- std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockCoalesceKernel(const TVector<TType*>& argTypes, TType* resultType, bool needUnwrapFirst) {
- using TExec = TCoalesceBlockExec;
- std::shared_ptr<arrow::DataType> returnArrowType;
- MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type");
- auto exec = std::make_shared<TExec>(
- returnArrowType,
- AS_TYPE(TBlockType, argTypes[0])->GetItemType(),
- AS_TYPE(TBlockType, argTypes[1])->GetItemType(),
- needUnwrapFirst);
- auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType),
- [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
- return exec->Exec(ctx, batch, res);
- });
- kernel->null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE;
- return kernel;
- }
- } // namespace
- IComputationNode* WrapBlockCoalesce(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args");
- auto first = callable.GetInput(0);
- auto second = callable.GetInput(1);
- auto firstType = AS_TYPE(TBlockType, first.GetStaticType());
- auto secondType = AS_TYPE(TBlockType, second.GetStaticType());
- auto firstItemType = firstType->GetItemType();
- auto secondItemType = secondType->GetItemType();
- MKQL_ENSURE(firstItemType->IsOptional() || firstItemType->IsPg(), "Expecting Optional or Pg type as first argument");
- bool needUnwrapFirst = false;
- if (!firstItemType->IsSameType(*secondItemType)) {
- needUnwrapFirst = true;
- bool firstOptional;
- firstItemType = UnpackOptional(firstItemType, firstOptional);
- MKQL_ENSURE(firstItemType->IsSameType(*secondItemType), "Uncompatible arguemnt types");
- }
- auto firstCompute = LocateNode(ctx.NodeLocator, callable, 0);
- auto secondCompute = LocateNode(ctx.NodeLocator, callable, 1);
- TComputationNodePtrVector argsNodes = { firstCompute, secondCompute };
- TVector<TType*> argsTypes = { firstType, secondType };
- auto kernel = MakeBlockCoalesceKernel(argsTypes, secondType, needUnwrapFirst);
- return new TBlockFuncNode(ctx.Mutables, "Coalesce", std::move(argsNodes), argsTypes, *kernel, kernel);
- }
- }
- }
|