mkql_block_coalesce.cpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #include "mkql_block_coalesce.h"
  2. #include <yql/essentials/minikql/arrow/arrow_defs.h>
  3. #include <yql/essentials/minikql/mkql_type_builder.h>
  4. #include <yql/essentials/minikql/computation/mkql_block_impl.h>
  5. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  6. #include <yql/essentials/minikql/mkql_node_builder.h>
  7. #include <yql/essentials/minikql/mkql_node_cast.h>
  8. #include <yql/essentials/public/udf/arrow/block_builder.h>
  9. #include <yql/essentials/public/udf/arrow/block_reader.h>
  10. #include <yql/essentials/public/udf/arrow/util.h>
  11. #include <arrow/util/bitmap_ops.h>
  12. namespace NKikimr {
  13. namespace NMiniKQL {
  14. namespace {
  15. class TCoalesceBlockExec {
  16. public:
  17. TCoalesceBlockExec(const std::shared_ptr<arrow::DataType>& returnArrowType, TType* firstItemType, TType* secondItemType, bool needUnwrapFirst)
  18. : ReturnArrowType_(returnArrowType)
  19. , FirstItemType_(firstItemType)
  20. , SecondItemType_(secondItemType)
  21. , NeedUnwrapFirst_(needUnwrapFirst)
  22. {}
  23. arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
  24. const auto& first = batch.values[0];
  25. const auto& second = batch.values[1];
  26. MKQL_ENSURE(!first.is_scalar() || !second.is_scalar(), "Expected at least one array");
  27. size_t length = Max(first.length(), second.length());
  28. auto firstReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), FirstItemType_);
  29. auto secondReader = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), SecondItemType_);
  30. if (first.is_scalar()) {
  31. auto firstValue = firstReader->GetScalarItem(*first.scalar());
  32. if (firstValue) {
  33. auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondItemType_, *ctx->memory_pool(), length, nullptr);
  34. builder->Add(NeedUnwrapFirst_ ? firstValue.GetOptionalValue() : firstValue, length);
  35. *res = builder->Build(true);
  36. } else {
  37. *res = second;
  38. }
  39. } else if (second.is_scalar()) {
  40. const auto& firstArray = *first.array();
  41. if (firstArray.GetNullCount() == 0) {
  42. *res = NeedUnwrapFirst_ ? Unwrap(firstArray, FirstItemType_) : first;
  43. } else if ((size_t)firstArray.GetNullCount() == length) {
  44. auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondItemType_, *ctx->memory_pool(), length, nullptr);
  45. auto secondValue = secondReader->GetScalarItem(*second.scalar());
  46. builder->Add(secondValue, length);
  47. *res = builder->Build(true);
  48. } else {
  49. auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondItemType_, *ctx->memory_pool(), length, nullptr);
  50. auto secondValue = secondReader->GetScalarItem(*second.scalar());
  51. for (size_t i = 0; i < length; ++i) {
  52. auto firstItem = firstReader->GetItem(firstArray, i);
  53. if (firstItem) {
  54. builder->Add(NeedUnwrapFirst_ ? firstItem.GetOptionalValue() : firstItem);
  55. } else {
  56. builder->Add(secondValue);
  57. }
  58. }
  59. *res = builder->Build(true);
  60. }
  61. } else {
  62. const auto& firstArray = *first.array();
  63. const auto& secondArray = *second.array();
  64. if (firstArray.GetNullCount() == 0) {
  65. *res = NeedUnwrapFirst_ ? Unwrap(firstArray, FirstItemType_) : first;
  66. } else if ((size_t)firstArray.GetNullCount() == length) {
  67. *res = second;
  68. } else {
  69. auto builder = NYql::NUdf::MakeArrayBuilder(TTypeInfoHelper(), SecondItemType_, *ctx->memory_pool(), length, nullptr);
  70. for (size_t i = 0; i < length; ++i) {
  71. auto firstItem = firstReader->GetItem(firstArray, i);
  72. if (firstItem) {
  73. builder->Add(NeedUnwrapFirst_ ? firstItem.GetOptionalValue() : firstItem);
  74. } else {
  75. auto secondItem = secondReader->GetItem(secondArray, i);
  76. builder->Add(secondItem);
  77. }
  78. }
  79. *res = builder->Build(true);
  80. }
  81. }
  82. return arrow::Status::OK();
  83. }
  84. private:
  85. const std::shared_ptr<arrow::DataType> ReturnArrowType_;
  86. TType* const FirstItemType_;
  87. TType* const SecondItemType_;
  88. const bool NeedUnwrapFirst_;
  89. };
  90. std::shared_ptr<arrow::compute::ScalarKernel> MakeBlockCoalesceKernel(const TVector<TType*>& argTypes, TType* resultType, bool needUnwrapFirst) {
  91. using TExec = TCoalesceBlockExec;
  92. std::shared_ptr<arrow::DataType> returnArrowType;
  93. MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type");
  94. auto exec = std::make_shared<TExec>(
  95. returnArrowType,
  96. AS_TYPE(TBlockType, argTypes[0])->GetItemType(),
  97. AS_TYPE(TBlockType, argTypes[1])->GetItemType(),
  98. needUnwrapFirst);
  99. auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType),
  100. [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
  101. return exec->Exec(ctx, batch, res);
  102. });
  103. kernel->null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE;
  104. return kernel;
  105. }
  106. } // namespace
  107. IComputationNode* WrapBlockCoalesce(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  108. MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args");
  109. auto first = callable.GetInput(0);
  110. auto second = callable.GetInput(1);
  111. auto firstType = AS_TYPE(TBlockType, first.GetStaticType());
  112. auto secondType = AS_TYPE(TBlockType, second.GetStaticType());
  113. auto firstItemType = firstType->GetItemType();
  114. auto secondItemType = secondType->GetItemType();
  115. MKQL_ENSURE(firstItemType->IsOptional() || firstItemType->IsPg(), "Expecting Optional or Pg type as first argument");
  116. bool needUnwrapFirst = false;
  117. if (!firstItemType->IsSameType(*secondItemType)) {
  118. needUnwrapFirst = true;
  119. bool firstOptional;
  120. firstItemType = UnpackOptional(firstItemType, firstOptional);
  121. MKQL_ENSURE(firstItemType->IsSameType(*secondItemType), "Uncompatible arguemnt types");
  122. }
  123. auto firstCompute = LocateNode(ctx.NodeLocator, callable, 0);
  124. auto secondCompute = LocateNode(ctx.NodeLocator, callable, 1);
  125. TComputationNodePtrVector argsNodes = { firstCompute, secondCompute };
  126. TVector<TType*> argsTypes = { firstType, secondType };
  127. auto kernel = MakeBlockCoalesceKernel(argsTypes, secondType, needUnwrapFirst);
  128. return new TBlockFuncNode(ctx.Mutables, "Coalesce", std::move(argsNodes), argsTypes, *kernel, kernel);
  129. }
  130. }
  131. }