#include "mkql_block_just.h" #include #include #include #include #include namespace NKikimr { namespace NMiniKQL { namespace { template class TJustBlockExec { public: TJustBlockExec(const std::shared_ptr& returnArrowType) : ReturnArrowType(returnArrowType) {} arrow::Status Exec(arrow::compute::KernelContext*, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { arrow::Datum inputDatum = batch.values[0]; if (Trivial) { *res = inputDatum; return arrow::Status::OK(); } if (inputDatum.is_scalar()) { std::vector> arrowValue; arrowValue.emplace_back(inputDatum.scalar()); *res = arrow::Datum(std::make_shared(arrowValue, ReturnArrowType)); } else { auto array = inputDatum.array(); auto newArrayData = arrow::ArrayData::Make(ReturnArrowType, array->length, { nullptr }, 0, 0); newArrayData->child_data.push_back(array); *res = arrow::Datum(newArrayData); } return arrow::Status::OK(); } private: const std::shared_ptr ReturnArrowType; }; template std::shared_ptr MakeBlockJustKernel(const TVector& argTypes, TType* resultType) { using TExec = TJustBlockExec; std::shared_ptr returnArrowType; MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type"); auto exec = std::make_shared(returnArrowType); auto kernel = std::make_shared(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType), [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { return exec->Exec(ctx, batch, res); }); kernel->null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE; return kernel; } } // namespace IComputationNode* WrapBlockJust(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args"); auto data = callable.GetInput(0); auto dataType = AS_TYPE(TBlockType, data.GetStaticType()); auto itemType = dataType->GetItemType(); auto dataCompute = LocateNode(ctx.NodeLocator, callable, 0); TComputationNodePtrVector argsNodes = { dataCompute }; TVector argsTypes = { dataType }; std::shared_ptr kernel; if (itemType->IsOptional() || itemType->IsVariant()) { kernel = MakeBlockJustKernel(argsTypes, callable.GetType()->GetReturnType()); } else { kernel = MakeBlockJustKernel(argsTypes, callable.GetType()->GetReturnType()); } return new TBlockFuncNode(ctx.Mutables, callable.GetType()->GetName(), std::move(argsNodes), argsTypes, *kernel, kernel); } } }