mkql_block_agg_some.cpp 10 KB


  1. #include "mkql_block_agg_some.h"
  2. #include <yql/essentials/minikql/mkql_node_builder.h>
  3. #include <yql/essentials/minikql/mkql_node_cast.h>
  4. #include <yql/essentials/minikql/computation/mkql_block_reader.h>
  5. #include <yql/essentials/minikql/computation/mkql_block_builder.h>
  6. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  7. namespace NKikimr {
  8. namespace NMiniKQL {
  9. namespace {
  10. using TGenericState = NUdf::TUnboxedValuePod;
  11. void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row, IBlockReader& reader,
  12. IBlockItemConverter& converter, TComputationContext& ctx)
  13. {
  14. if (datum.is_scalar()) {
  15. if (datum.scalar()->is_valid) {
  16. auto item = reader.GetScalarItem(*datum.scalar());
  17. *typedState = converter.MakeValue(item, ctx.HolderFactory);
  18. }
  19. } else {
  20. const auto& array = datum.array();
  21. TBlockItem curr = reader.GetItem(*array, row);
  22. if (curr) {
  23. *typedState = converter.MakeValue(curr, ctx.HolderFactory);
  24. }
  25. }
  26. }
  27. class TGenericColumnBuilder : public IAggColumnBuilder {
  28. public:
  29. TGenericColumnBuilder(ui64 size, TType* columnType, TComputationContext& ctx)
  30. : Builder_(MakeArrayBuilder(TTypeInfoHelper(), columnType, ctx.ArrowMemoryPool, size, &ctx.Builder->GetPgBuilder()))
  31. , Ctx_(ctx)
  32. {
  33. }
  34. void Add(const void* state) final {
  35. Builder_->Add(*static_cast<const TGenericState*>(state));
  36. }
  37. NUdf::TUnboxedValue Build() final {
  38. return Ctx_.HolderFactory.CreateArrowBlock(Builder_->Build(true));
  39. }
  40. private:
  41. const std::unique_ptr<IArrayBuilder> Builder_;
  42. TComputationContext& Ctx_;
  43. };
  44. template<typename TTag>
  45. class TSomeBlockGenericAggregator;
  46. template<>
  47. class TSomeBlockGenericAggregator<TCombineAllTag> : public TCombineAllTag::TBase {
  48. public:
  49. using TBase = TCombineAllTag::TBase;
  50. TSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  51. : TBase(sizeof(TGenericState), filterColumn, ctx)
  52. , ArgColumn_(argColumn)
  53. , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
  54. , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
  55. {
  56. }
  57. void InitState(void* state) final {
  58. new(state) TGenericState();
  59. }
  60. void DestroyState(void* state) noexcept final {
  61. auto typedState = static_cast<TGenericState*>(state);
  62. typedState->DeleteUnreferenced();
  63. *typedState = TGenericState();
  64. }
  65. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  66. TGenericState& typedState = *static_cast<TGenericState*>(state);
  67. if (typedState) {
  68. return;
  69. }
  70. Y_UNUSED(batchLength);
  71. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  72. if (datum.is_scalar()) {
  73. if (datum.scalar()->is_valid) {
  74. auto item = Reader_->GetScalarItem(*datum.scalar());
  75. typedState = Converter_->MakeValue(item, Ctx_.HolderFactory);
  76. }
  77. } else {
  78. const auto& array = datum.array();
  79. auto len = array->length;
  80. const ui8* filterBitmap = nullptr;
  81. if (filtered) {
  82. const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
  83. const auto& filterArray = filterDatum.array();
  84. MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
  85. filterBitmap = filterArray->template GetValues<uint8_t>(1);
  86. }
  87. for (auto i = 0; i < len; ++i) {
  88. TBlockItem curr = Reader_->GetItem(*array, i);
  89. if (curr && (!filterBitmap || filterBitmap[i])) {
  90. typedState = Converter_->MakeValue(curr, Ctx_.HolderFactory);
  91. break;
  92. }
  93. }
  94. }
  95. }
  96. NUdf::TUnboxedValue FinishOne(const void *state) final {
  97. auto typedState = *static_cast<const TGenericState *>(state);
  98. return typedState;
  99. }
  100. private:
  101. const ui32 ArgColumn_;
  102. const std::unique_ptr<IBlockReader> Reader_;
  103. const std::unique_ptr<IBlockItemConverter> Converter_;
  104. };
  105. template<>
  106. class TSomeBlockGenericAggregator<TCombineKeysTag> : public TCombineKeysTag::TBase {
  107. public:
  108. using TBase = TCombineKeysTag::TBase;
  109. TSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  110. : TBase(sizeof(TGenericState), filterColumn, ctx)
  111. , ArgColumn_(argColumn)
  112. , Type_(type)
  113. , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
  114. , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
  115. {
  116. }
  117. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  118. new(state) TGenericState();
  119. UpdateKey(state, batchNum, columns, row);
  120. }
  121. void DestroyState(void* state) noexcept final {
  122. auto typedState = static_cast<TGenericState*>(state);
  123. typedState->DeleteUnreferenced();
  124. *typedState = TGenericState();
  125. }
  126. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  127. Y_UNUSED(batchNum);
  128. auto typedState = static_cast<TGenericState*>(state);
  129. if (*typedState) {
  130. return;
  131. }
  132. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  133. PushValueToState(typedState, datum, row, *Reader_, *Converter_, Ctx_);
  134. }
  135. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  136. return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
  137. }
  138. private:
  139. const ui32 ArgColumn_;
  140. TType* const Type_;
  141. const std::unique_ptr<IBlockReader> Reader_;
  142. const std::unique_ptr<IBlockItemConverter> Converter_;
  143. };
  144. template<>
  145. class TSomeBlockGenericAggregator<TFinalizeKeysTag> : public TFinalizeKeysTag::TBase {
  146. public:
  147. using TBase = TFinalizeKeysTag::TBase;
  148. TSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  149. : TBase(sizeof(TGenericState), filterColumn, ctx)
  150. , ArgColumn_(argColumn)
  151. , Type_(type)
  152. , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
  153. , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
  154. , Packer_(false, type)
  155. {
  156. }
  157. void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  158. new(state) TGenericState();
  159. UpdateState(state, batchNum, columns, row);
  160. }
  161. void DestroyState(void* state) noexcept final {
  162. auto typedState = static_cast<TGenericState*>(state);
  163. typedState->DeleteUnreferenced();
  164. *typedState = TGenericState();
  165. }
  166. void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  167. Y_UNUSED(batchNum);
  168. auto typedState = static_cast<TGenericState*>(state);
  169. if (*typedState) {
  170. return;
  171. }
  172. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  173. PushValueToState(typedState, datum, row, *Reader_, *Converter_, Ctx_);
  174. }
  175. void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
  176. auto typedState = static_cast<TGenericState*>(state);
  177. buffer.PushString(Packer_.Pack(*typedState));
  178. }
  179. void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
  180. auto typedState = static_cast<TGenericState*>(state);
  181. *typedState = Packer_.Unpack(buffer.PopString(), Ctx_.HolderFactory).Release();
  182. }
  183. std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
  184. return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
  185. }
  186. private:
  187. const ui32 ArgColumn_;
  188. TType* const Type_;
  189. const std::unique_ptr<IBlockReader> Reader_;
  190. const std::unique_ptr<IBlockItemConverter> Converter_;
  191. TValuePacker Packer_;
  192. };
  193. template <typename TTag>
  194. class TPreparedSomeBlockGenericAggregator : public TTag::TPreparedAggregator {
  195. public:
  196. using TBase = typename TTag::TPreparedAggregator;
  197. TPreparedSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn)
  198. : TBase(sizeof(TGenericState))
  199. , Type_(type)
  200. , FilterColumn_(filterColumn)
  201. , ArgColumn_(argColumn)
  202. {}
  203. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  204. return std::make_unique<TSomeBlockGenericAggregator<TTag>>(Type_, FilterColumn_, ArgColumn_, ctx);
  205. }
  206. private:
  207. TType* const Type_;
  208. const std::optional<ui32> FilterColumn_;
  209. const ui32 ArgColumn_;
  210. };
  211. template <typename TTag>
  212. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareSome(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) {
  213. const auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn));
  214. const auto argType = blockType->GetItemType();
  215. return std::make_unique<TPreparedSomeBlockGenericAggregator<TTag>>(argType, filterColumn, argColumn);
  216. }
  217. class TBlockSomeFactory : public IBlockAggregatorFactory {
  218. std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineAll>> PrepareCombineAll(
  219. TTupleType* tupleType,
  220. std::optional<ui32> filterColumn,
  221. const std::vector<ui32>& argsColumns,
  222. const TTypeEnvironment& env) const override {
  223. Y_UNUSED(env);
  224. return PrepareSome<TCombineAllTag>(tupleType, filterColumn, argsColumns[0]);
  225. }
  226. std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineKeys>> PrepareCombineKeys(
  227. TTupleType* tupleType,
  228. const std::vector<ui32>& argsColumns,
  229. const TTypeEnvironment& env) const override {
  230. Y_UNUSED(env);
  231. return PrepareSome<TCombineKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0]);
  232. }
  233. std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorFinalizeKeys>> PrepareFinalizeKeys(
  234. TTupleType* tupleType,
  235. const std::vector<ui32>& argsColumns,
  236. const TTypeEnvironment& env,
  237. TType* returnType,
  238. ui32 hint) const override {
  239. Y_UNUSED(env);
  240. Y_UNUSED(returnType);
  241. Y_UNUSED(hint);
  242. return PrepareSome<TFinalizeKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0]);
  243. }
  244. };
  245. }
  246. std::unique_ptr<IBlockAggregatorFactory> MakeBlockSomeFactory() {
  247. return std::make_unique<TBlockSomeFactory>();
  248. }
  249. }
  250. }