mkql_block_impl.h 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #pragma once
  2. #include "mkql_computation_node_impl.h"
  3. #include "mkql_computation_node_holders.h"
  4. #include <yql/essentials/minikql/arrow/arrow_util.h>
  5. #include <yql/essentials/public/udf/arrow/block_item.h>
  6. #include <arrow/array.h>
  7. #include <arrow/scalar.h>
  8. #include <arrow/datum.h>
  9. #include <arrow/compute/kernel.h>
  10. extern "C" uint64_t GetBlockCount(const NYql::NUdf::TUnboxedValuePod data);
  11. extern "C" uint64_t GetBitmapPopCountCount(const NYql::NUdf::TUnboxedValuePod data);
  12. extern "C" uint8_t GetBitmapScalarValue(const NYql::NUdf::TUnboxedValuePod data);
  13. namespace NKikimr::NMiniKQL {
  14. arrow::Datum ConvertScalar(TType* type, const NUdf::TUnboxedValuePod& value, arrow::MemoryPool& pool);
  15. arrow::Datum ConvertScalar(TType* type, const NUdf::TBlockItem& value, arrow::MemoryPool& pool);
  16. arrow::Datum MakeArrayFromScalar(const arrow::Scalar& scalar, size_t len, TType* type, arrow::MemoryPool& pool);
  17. arrow::ValueDescr ToValueDescr(TType* type);
  18. std::vector<arrow::ValueDescr> ToValueDescr(const TVector<TType*>& types);
  19. std::vector<arrow::compute::InputType> ConvertToInputTypes(const TVector<TType*>& argTypes);
  20. arrow::compute::OutputType ConvertToOutputType(TType* output);
  21. NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const uint64_t count);
  22. class TBlockFuncNode : public TMutableComputationNode<TBlockFuncNode> {
  23. public:
  24. TBlockFuncNode(TComputationMutables& mutables, TStringBuf name, TComputationNodePtrVector&& argsNodes,
  25. const TVector<TType*>& argsTypes, const arrow::compute::ScalarKernel& kernel,
  26. std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder = {},
  27. const arrow::compute::FunctionOptions* functionOptions = nullptr);
  28. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const;
  29. private:
  30. class TArrowNode : public IArrowKernelComputationNode {
  31. public:
  32. TArrowNode(const TBlockFuncNode* parent);
  33. TStringBuf GetKernelName() const final;
  34. const arrow::compute::ScalarKernel& GetArrowKernel() const final;
  35. const std::vector<arrow::ValueDescr>& GetArgsDesc() const final;
  36. const IComputationNode* GetArgument(ui32 index) const final;
  37. private:
  38. const TBlockFuncNode* Parent_;
  39. };
  40. friend class TArrowNode;
  41. struct TState : public TComputationValue<TState> {
  42. using TComputationValue::TComputationValue;
  43. TState(TMemoryUsageInfo* memInfo, const arrow::compute::FunctionOptions* options,
  44. const arrow::compute::ScalarKernel& kernel, const std::vector<arrow::ValueDescr>& argsValuesDescr,
  45. TComputationContext& ctx)
  46. : TComputationValue(memInfo)
  47. , ExecContext(&ctx.ArrowMemoryPool, nullptr, nullptr)
  48. , KernelContext(&ExecContext)
  49. {
  50. if (kernel.init) {
  51. State = ARROW_RESULT(kernel.init(&KernelContext, { &kernel, argsValuesDescr, options }));
  52. KernelContext.SetState(State.get());
  53. }
  54. }
  55. arrow::compute::ExecContext ExecContext;
  56. arrow::compute::KernelContext KernelContext;
  57. std::unique_ptr<arrow::compute::KernelState> State;
  58. };
  59. void RegisterDependencies() const final;
  60. TState& GetState(TComputationContext& ctx) const;
  61. std::unique_ptr<IArrowKernelComputationNode> PrepareArrowKernelComputationNode(TComputationContext& ctx) const final;
  62. private:
  63. const ui32 StateIndex;
  64. const TComputationNodePtrVector ArgsNodes;
  65. const std::vector<arrow::ValueDescr> ArgsValuesDescr;
  66. const arrow::compute::ScalarKernel& Kernel;
  67. const std::shared_ptr<arrow::compute::ScalarKernel> KernelHolder;
  68. const arrow::compute::FunctionOptions* const Options;
  69. const bool ScalarOutput;
  70. const TString Name;
  71. };
  72. struct TBlockState : public TComputationValue<TBlockState> {
  73. using TBase = TComputationValue<TBlockState>;
  74. ui64 Count = 0;
  75. NUdf::TUnboxedValue* Pointer_ = nullptr;
  76. TUnboxedValueVector Values;
  77. std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques;
  78. std::vector<std::shared_ptr<arrow::ArrayData>> Arrays;
  79. TBlockState(TMemoryUsageInfo* memInfo, size_t width);
  80. void ClearValues();
  81. void FillArrays();
  82. ui64 Slice();
  83. NUdf::TUnboxedValuePod Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const;
  84. };
  85. } //namespace NKikimr::NMiniKQL