mkql_block_agg_count.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. #include "mkql_block_agg_count.h"
  2. #include <yql/essentials/minikql/arrow/arrow_defs.h>
  3. #include <yql/essentials/minikql/computation/mkql_block_builder.h>
  4. namespace NKikimr {
  5. namespace NMiniKQL {
  6. namespace {
  7. struct TState {
  8. ui64 Count_ = 0;
  9. };
  10. class TColumnBuilder : public IAggColumnBuilder {
  11. public:
  12. TColumnBuilder(ui64 size, TComputationContext& ctx)
  13. : Builder_(TTypeInfoHelper(), arrow::uint64(), ctx.ArrowMemoryPool, size)
  14. , Ctx_(ctx)
  15. {
  16. }
  17. void Add(const void* state) final {
  18. auto typedState = static_cast<const TState*>(state);
  19. Builder_.Add(TBlockItem(typedState->Count_));
  20. }
  21. NUdf::TUnboxedValue Build() final {
  22. return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true));
  23. }
  24. private:
  25. NYql::NUdf::TFixedSizeArrayBuilder<ui64, false> Builder_;
  26. TComputationContext& Ctx_;
  27. };
  28. template <typename TTag>
  29. class TCountAllAggregator;
  30. template <typename TTag>
  31. class TCountAggregator;
  32. template <>
  33. class TCountAllAggregator<TCombineAllTag> : public TCombineAllTag::TBase {
  34. public:
  35. using TBase = TCombineAllTag::TBase;
  36. TCountAllAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  37. : TBase(sizeof(TState), filterColumn, ctx)
  38. {
  39. Y_UNUSED(argColumn);
  40. }
  41. void InitState(void* state) final {
  42. new(state) TState();
  43. }
  44. void DestroyState(void* state) noexcept final {
  45. static_assert(std::is_trivially_destructible<TState>::value);
  46. Y_UNUSED(state);
  47. }
  48. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  49. auto typedState = static_cast<TState*>(state);
  50. Y_UNUSED(columns);
  51. if (filtered) {
  52. typedState->Count_ += *filtered;
  53. }
  54. else {
  55. typedState->Count_ += batchLength;
  56. }
  57. }
  58. NUdf::TUnboxedValue FinishOne(const void* state) final {
  59. auto typedState = static_cast<const TState*>(state);
  60. return NUdf::TUnboxedValuePod(typedState->Count_);
  61. }
  62. };
  63. template <>
  64. class TCountAllAggregator<TCombineKeysTag> : public TCombineKeysTag::TBase {
  65. public:
  66. using TBase = TCombineKeysTag::TBase;
  67. TCountAllAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  68. : TBase(sizeof(TState), filterColumn, ctx)
  69. {
  70. Y_UNUSED(argColumn);
  71. }
  72. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  73. new(state) TState();
  74. UpdateKey(state, batchNum, columns, row);
  75. }
  76. void DestroyState(void* state) noexcept final {
  77. static_assert(std::is_trivially_destructible<TState>::value);
  78. Y_UNUSED(state);
  79. }
  80. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  81. Y_UNUSED(batchNum);
  82. Y_UNUSED(columns);
  83. Y_UNUSED(row);
  84. auto typedState = static_cast<TState*>(state);
  85. typedState->Count_ += 1;
  86. }
  87. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  88. return std::make_unique<TColumnBuilder>(size, Ctx_);
  89. }
  90. };
  91. template <>
  92. class TCountAllAggregator<TFinalizeKeysTag> : public TFinalizeKeysTag::TBase {
  93. public:
  94. using TBase = TFinalizeKeysTag::TBase;
  95. TCountAllAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  96. : TBase(sizeof(TState), filterColumn, ctx)
  97. , ArgColumn_(argColumn)
  98. {
  99. }
  100. void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  101. new(state) TState();
  102. UpdateState(state, batchNum, columns, row);
  103. }
  104. void DestroyState(void* state) noexcept final {
  105. static_assert(std::is_trivially_destructible<TState>::value);
  106. Y_UNUSED(state);
  107. }
  108. void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  109. Y_UNUSED(batchNum);
  110. auto typedState = static_cast<TState*>(state);
  111. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  112. if (datum.is_scalar()) {
  113. MKQL_ENSURE(datum.scalar()->is_valid, "Expected not null");
  114. typedState->Count_ += datum.scalar_as<arrow::UInt64Scalar>().value;
  115. } else {
  116. const auto& array = datum.array();
  117. auto ptr = array->GetValues<ui64>(1);
  118. MKQL_ENSURE(array->GetNullCount() == 0, "Expected not null");
  119. typedState->Count_ += ptr[row];
  120. }
  121. }
  122. void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
  123. auto typedState = static_cast<TState*>(state);
  124. buffer.PushNumber(typedState->Count_);
  125. }
  126. void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
  127. auto typedState = static_cast<TState*>(state);
  128. buffer.PopNumber(typedState->Count_);
  129. }
  130. std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
  131. return std::make_unique<TColumnBuilder>(size, Ctx_);
  132. }
  133. private:
  134. const ui32 ArgColumn_;
  135. };
  136. template <>
  137. class TCountAggregator<TCombineAllTag> : public TCombineAllTag::TBase {
  138. public:
  139. using TBase = TCombineAllTag::TBase;
  140. TCountAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  141. : TBase(sizeof(TState), filterColumn, ctx)
  142. , ArgColumn_(argColumn)
  143. {
  144. }
  145. void InitState(void* state) final {
  146. new(state) TState();
  147. }
  148. void DestroyState(void* state) noexcept final {
  149. static_assert(std::is_trivially_destructible<TState>::value);
  150. Y_UNUSED(state);
  151. }
  152. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  153. auto typedState = static_cast<TState*>(state);
  154. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  155. if (datum.is_scalar()) {
  156. if (datum.scalar()->is_valid) {
  157. typedState->Count_ += filtered ? *filtered : batchLength;
  158. }
  159. } else {
  160. const auto& array = datum.array();
  161. if (!filtered) {
  162. typedState->Count_ += array->length - array->GetNullCount();
  163. } else if (array->GetNullCount() == array->length) {
  164. // all nulls
  165. return;
  166. } else if (array->GetNullCount() == 0) {
  167. // no nulls
  168. typedState->Count_ += *filtered;
  169. } else {
  170. const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
  171. // intersect masks from nulls and filter column
  172. const auto& filterArray = filterDatum.array();
  173. MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
  174. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  175. const ui8* filterBitmap = filterArray->GetValues<uint8_t>(1);
  176. auto state = typedState->Count_;
  177. for (ui32 i = 0; i < array->length; ++i) {
  178. ui64 fullIndex = i + array->offset;
  179. auto bit1 = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1);
  180. auto bit2 = filterBitmap[i];
  181. state += bit1 & bit2;
  182. }
  183. typedState->Count_ = state;
  184. }
  185. }
  186. }
  187. NUdf::TUnboxedValue FinishOne(const void* state) final {
  188. auto typedState = static_cast<const TState*>(state);
  189. return NUdf::TUnboxedValuePod(typedState->Count_);
  190. }
  191. private:
  192. const ui32 ArgColumn_;
  193. };
  194. template <>
  195. class TCountAggregator<TCombineKeysTag> : public TCombineKeysTag::TBase {
  196. public:
  197. using TBase = TCombineKeysTag::TBase;
  198. TCountAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  199. : TBase(sizeof(TState), filterColumn, ctx)
  200. , ArgColumn_(argColumn)
  201. {
  202. }
  203. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  204. new(state) TState();
  205. UpdateKey(state, batchNum, columns, row);
  206. }
  207. void DestroyState(void* state) noexcept final {
  208. static_assert(std::is_trivially_destructible<TState>::value);
  209. Y_UNUSED(state);
  210. }
  211. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  212. Y_UNUSED(batchNum);
  213. auto typedState = static_cast<TState*>(state);
  214. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  215. if (datum.is_scalar()) {
  216. if (datum.scalar()->is_valid) {
  217. typedState->Count_ += 1;
  218. }
  219. } else {
  220. const auto& array = datum.array();
  221. if (array->GetNullCount() == 0) {
  222. typedState->Count_ += 1;
  223. } else {
  224. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  225. auto fullIndex = row + array->offset;
  226. auto bit = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1);
  227. typedState->Count_ += bit;
  228. }
  229. }
  230. }
  231. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  232. return std::make_unique<TColumnBuilder>(size, Ctx_);
  233. }
  234. private:
  235. const ui32 ArgColumn_;
  236. };
  237. template <>
  238. class TCountAggregator<TFinalizeKeysTag> : public TCountAllAggregator<TFinalizeKeysTag>
  239. {
  240. public:
  241. using TBase = TCountAllAggregator<TFinalizeKeysTag>;
  242. TCountAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  243. : TBase(filterColumn, argColumn, ctx)
  244. {}
  245. };
  246. template <typename TTag>
  247. class TPreparedCountAll : public TTag::TPreparedAggregator {
  248. public:
  249. using TBase = typename TTag::TPreparedAggregator;
  250. TPreparedCountAll(std::optional<ui32> filterColumn, ui32 argColumn)
  251. : TBase(sizeof(TState))
  252. , FilterColumn_(filterColumn)
  253. , ArgColumn_(argColumn)
  254. {}
  255. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  256. return std::make_unique<TCountAllAggregator<TTag>>(FilterColumn_, ArgColumn_, ctx);
  257. }
  258. private:
  259. const std::optional<ui32> FilterColumn_;
  260. const ui32 ArgColumn_;
  261. };
  262. template <typename TTag>
  263. class TPreparedCount : public TTag::TPreparedAggregator {
  264. public:
  265. using TBase = typename TTag::TPreparedAggregator;
  266. TPreparedCount(std::optional<ui32> filterColumn, ui32 argColumn)
  267. : TBase(sizeof(TState))
  268. , FilterColumn_(filterColumn)
  269. , ArgColumn_(argColumn)
  270. {}
  271. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  272. return std::make_unique<TCountAggregator<TTag>>(FilterColumn_, ArgColumn_, ctx);
  273. }
  274. private:
  275. const std::optional<ui32> FilterColumn_;
  276. const ui32 ArgColumn_;
  277. };
  278. template <typename TTag>
  279. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareCountAll(std::optional<ui32> filterColumn, ui32 argColumn) {
  280. return std::make_unique<TPreparedCountAll<TTag>>(filterColumn, argColumn);
  281. }
  282. template <typename TTag>
  283. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareCount(std::optional<ui32> filterColumn, ui32 argColumn) {
  284. return std::make_unique<TPreparedCount<TTag>>(filterColumn, argColumn);
  285. }
  286. class TBlockCountAllFactory : public IBlockAggregatorFactory {
  287. public:
  288. std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
  289. TTupleType* tupleType,
  290. std::optional<ui32> filterColumn,
  291. const std::vector<ui32>& argsColumns,
  292. const TTypeEnvironment& env) const final {
  293. Y_UNUSED(tupleType);
  294. Y_UNUSED(argsColumns);
  295. Y_UNUSED(env);
  296. return PrepareCountAll<TCombineAllTag>(filterColumn, 0);
  297. }
  298. std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
  299. TTupleType* tupleType,
  300. const std::vector<ui32>& argsColumns,
  301. const TTypeEnvironment& env) const final {
  302. Y_UNUSED(tupleType);
  303. Y_UNUSED(argsColumns);
  304. Y_UNUSED(env);
  305. return PrepareCountAll<TCombineKeysTag>(std::optional<ui32>(), 0);
  306. }
  307. std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
  308. TTupleType* tupleType,
  309. const std::vector<ui32>& argsColumns,
  310. const TTypeEnvironment& env,
  311. TType* returnType,
  312. ui32 hint) const final {
  313. Y_UNUSED(tupleType);
  314. Y_UNUSED(argsColumns);
  315. Y_UNUSED(env);
  316. Y_UNUSED(returnType);
  317. Y_UNUSED(hint);
  318. return PrepareCountAll<TFinalizeKeysTag>(std::optional<ui32>(), argsColumns[0]);
  319. }
  320. };
  321. class TBlockCountFactory : public IBlockAggregatorFactory {
  322. public:
  323. std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
  324. TTupleType* tupleType,
  325. std::optional<ui32> filterColumn,
  326. const std::vector<ui32>& argsColumns,
  327. const TTypeEnvironment& env) const final {
  328. Y_UNUSED(tupleType);
  329. Y_UNUSED(env);
  330. return PrepareCount<TCombineAllTag>(filterColumn, argsColumns[0]);
  331. }
  332. std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
  333. TTupleType* tupleType,
  334. const std::vector<ui32>& argsColumns,
  335. const TTypeEnvironment& env) const final {
  336. Y_UNUSED(tupleType);
  337. Y_UNUSED(argsColumns);
  338. Y_UNUSED(env);
  339. return PrepareCount<TCombineKeysTag>(std::optional<ui32>(), argsColumns[0]);
  340. }
  341. std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
  342. TTupleType* tupleType,
  343. const std::vector<ui32>& argsColumns,
  344. const TTypeEnvironment& env,
  345. TType* returnType,
  346. ui32 hint) const final {
  347. Y_UNUSED(tupleType);
  348. Y_UNUSED(argsColumns);
  349. Y_UNUSED(env);
  350. Y_UNUSED(returnType);
  351. Y_UNUSED(hint);
  352. return PrepareCount<TFinalizeKeysTag>(std::optional<ui32>(), argsColumns[0]);
  353. }
  354. };
  355. }
  356. std::unique_ptr<IBlockAggregatorFactory> MakeBlockCountAllFactory() {
  357. return std::make_unique<TBlockCountAllFactory>();
  358. }
  359. std::unique_ptr<IBlockAggregatorFactory> MakeBlockCountFactory() {
  360. return std::make_unique<TBlockCountFactory>();
  361. }
  362. }
  363. }