mkql_block_agg_sum.cpp 37 KB


  1. #include "mkql_block_agg_sum.h"
  2. #include "mkql_block_agg_state_helper.h"
  3. #include <yql/essentials/minikql/mkql_node_builder.h>
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/minikql/mkql_node_printer.h>
  6. #include <yql/essentials/minikql/computation/mkql_block_builder.h>
  7. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  8. #include <yql/essentials/minikql/arrow/arrow_defs.h>
  9. #include <yql/essentials/minikql/arrow/arrow_util.h>
  10. #include <yql/essentials/minikql/arrow/mkql_bit_utils.h>
  11. #include <arrow/scalar.h>
  12. #include <arrow/array/builder_primitive.h>
  13. namespace NKikimr {
  14. namespace NMiniKQL {
  15. namespace {
  16. template<bool IsNullable, typename TSum>
  17. struct TSumState;
  18. template<typename TSum>
  19. struct TSumState<true, TSum> {
  20. typename TPrimitiveDataType<TSum>::TArithmetic Sum_ = 0;
  21. ui8 IsValid_ = 0;
  22. };
  23. template<typename TSum>
  24. struct TSumState<false, TSum> {
  25. typename TPrimitiveDataType<TSum>::TArithmetic Sum_ = 0;
  26. };
  27. template<typename TOut>
  28. struct TAvgState {
  29. typename TPrimitiveDataType<TOut>::TArithmetic Sum_ = 0;
  30. ui64 Count_ = 0;
  31. };
  32. template <bool IsNullable, typename TSum>
  33. class TSumColumnBuilder : public IAggColumnBuilder {
  34. public:
  35. using TStateType = TSumState<IsNullable, TSum>;
  36. TSumColumnBuilder(ui64 size, TType* dataType, TComputationContext& ctx)
  37. : Builder_(dataType, TTypeInfoHelper(), ctx.ArrowMemoryPool, size)
  38. , Ctx_(ctx)
  39. {
  40. }
  41. void Add(const void* state) final {
  42. auto typedState = MakeStateWrapper<TStateType>(state);
  43. if constexpr (IsNullable) {
  44. if (!typedState->IsValid_) {
  45. Builder_.Add(TBlockItem());
  46. return;
  47. }
  48. }
  49. Builder_.Add(TBlockItem(TSum(typedState->Sum_)));
  50. }
  51. NUdf::TUnboxedValue Build() final {
  52. return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true));
  53. }
  54. private:
  55. NYql::NUdf::TFixedSizeArrayBuilder<TSum, IsNullable> Builder_;
  56. TComputationContext& Ctx_;
  57. };
  58. template<typename TOut>
  59. class TAvgStateColumnBuilder : public IAggColumnBuilder {
  60. public:
  61. TAvgStateColumnBuilder(ui64 size, TType* outputType, TComputationContext& ctx)
  62. : Ctx_(ctx)
  63. , Builder_(MakeArrayBuilder(TTypeInfoHelper(), outputType, ctx.ArrowMemoryPool, size, &ctx.Builder->GetPgBuilder()))
  64. {
  65. }
  66. void Add(const void* state) final {
  67. auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
  68. auto tupleBuilder = static_cast<NUdf::TTupleArrayBuilder<true>*>(Builder_.get());
  69. if (typedState->Count_) {
  70. TBlockItem tupleItems[] = { TBlockItem(TOut(typedState->Sum_)), TBlockItem(typedState->Count_)} ;
  71. tupleBuilder->Add(TBlockItem(tupleItems));
  72. } else {
  73. tupleBuilder->Add(TBlockItem());
  74. }
  75. }
  76. NUdf::TUnboxedValue Build() final {
  77. return Ctx_.HolderFactory.CreateArrowBlock(Builder_->Build(true));
  78. }
  79. private:
  80. TComputationContext& Ctx_;
  81. const std::unique_ptr<IArrayBuilder> Builder_;
  82. };
  83. template<typename TOut>
  84. class TAvgResultColumnBuilder : public IAggColumnBuilder {
  85. public:
  86. TAvgResultColumnBuilder(ui64 size, TComputationContext& ctx)
  87. : Ctx_(ctx)
  88. , Builder_(TTypeInfoHelper(), arrow::TypeTraits<typename TPrimitiveDataType<TOut>::TResult>::type_singleton(), ctx.ArrowMemoryPool, size)
  89. {
  90. }
  91. void Add(const void* state) final {
  92. auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
  93. if (typedState->Count_) {
  94. Builder_.Add(TBlockItem(TOut(typedState->Sum_ / typedState->Count_)));
  95. } else {
  96. Builder_.Add(TBlockItem());
  97. }
  98. }
  99. NUdf::TUnboxedValue Build() final {
  100. return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true));
  101. }
  102. private:
  103. TComputationContext& Ctx_;
  104. NYql::NUdf::TFixedSizeArrayBuilder<TOut, /*Nullable=*/true> Builder_;
  105. };
  106. template <typename TTag, bool IsNullable, bool IsScalar, typename TIn, typename TSum>
  107. class TSumBlockAggregator;
  108. template <typename TTag, typename TIn, typename TOut>
  109. class TAvgBlockAggregator;
  110. template <bool IsNullable, bool IsScalar, typename TIn, typename TSum>
  111. class TSumBlockAggregator<TCombineAllTag, IsNullable, IsScalar, TIn, TSum> : public TCombineAllTag::TBase {
  112. public:
  113. using TBase = TCombineAllTag::TBase;
  114. using TStateType = TSumState<IsNullable, TSum>;
  115. using TInScalar = typename TPrimitiveDataType<TIn>::TScalarResult;
  116. TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TType* dataType, TComputationContext& ctx)
  117. : TBase(sizeof(TStateType), filterColumn, ctx)
  118. , ArgColumn_(argColumn)
  119. {
  120. Y_UNUSED(dataType);
  121. }
  122. void InitState(void* state) final {
  123. TStateType st;
  124. WriteUnaligned<TStateType>(state, st);
  125. }
  126. void DestroyState(void* state) noexcept final {
  127. static_assert(std::is_trivially_destructible<TStateType>::value);
  128. Y_UNUSED(state);
  129. }
  130. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  131. auto typedState = MakeStateWrapper<TStateType>(state);
  132. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  133. if constexpr (IsScalar) {
  134. Y_ENSURE(datum.is_scalar());
  135. if constexpr (IsNullable) {
  136. if (datum.scalar()->is_valid) {
  137. typedState->Sum_ += (filtered ? *filtered : batchLength) * Cast(datum.scalar_as<TInScalar>().value);
  138. typedState->IsValid_ = 1;
  139. }
  140. } else {
  141. typedState->Sum_ += (filtered ? *filtered : batchLength) * Cast(datum.scalar_as<TInScalar>().value);
  142. }
  143. } else {
  144. const auto& array = datum.array();
  145. auto ptr = array->GetValues<TIn>(1);
  146. auto len = array->length;
  147. auto nullCount = IsNullable ? array->GetNullCount() : 0;
  148. auto count = len - nullCount;
  149. if (!count) {
  150. return;
  151. }
  152. if (!filtered) {
  153. if constexpr (IsNullable) {
  154. typedState->IsValid_ = 1;
  155. }
  156. auto sum = typedState->Sum_;
  157. if (IsNullable && nullCount != 0) {
  158. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  159. for (int64_t i = 0; i < len; ++i) {
  160. ui64 fullIndex = i + array->offset;
  161. ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
  162. sum += SelectArg<TIn>(notNull, ptr[i], 0);
  163. }
  164. } else {
  165. for (int64_t i = 0; i < len; ++i) {
  166. sum += ptr[i];
  167. }
  168. }
  169. typedState->Sum_ = sum;
  170. } else {
  171. const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
  172. const auto& filterArray = filterDatum.array();
  173. MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
  174. const ui8* filterBitmap = filterArray->template GetValues<uint8_t>(1);
  175. auto sum = typedState->Sum_;
  176. if (IsNullable && nullCount != 0) {
  177. ui64 count = 0;
  178. auto nullBitmapPtr = array->template GetValues<uint8_t>(0, 0);
  179. for (int64_t i = 0; i < len; ++i) {
  180. ui64 fullIndex = i + array->offset;
  181. ui8 notNullAndFiltered = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) & filterBitmap[i];
  182. sum += SelectArg<TIn>(notNullAndFiltered, ptr[i], 0);
  183. count += notNullAndFiltered;
  184. }
  185. if constexpr (IsNullable) {
  186. typedState->IsValid_ |= count ? 1 : 0;
  187. }
  188. } else {
  189. for (int64_t i = 0; i < len; ++i) {
  190. sum += SelectArg<TIn>(filterBitmap[i], ptr[i], 0);
  191. }
  192. if constexpr (IsNullable) {
  193. typedState->IsValid_ = 1;
  194. }
  195. }
  196. typedState->Sum_ = sum;
  197. }
  198. }
  199. }
  200. NUdf::TUnboxedValue FinishOne(const void* state) final {
  201. auto typedState = MakeStateWrapper<TStateType>(state);
  202. if constexpr (IsNullable) {
  203. if (!typedState->IsValid_) {
  204. return NUdf::TUnboxedValuePod();
  205. }
  206. }
  207. return NUdf::TUnboxedValuePod(TSum(typedState->Sum_));
  208. }
  209. private:
  210. const ui32 ArgColumn_;
  211. };
  212. template <bool IsNullable, bool IsScalar, typename TIn, typename TSum>
  213. void PushValueToState(TSumState<IsNullable, TSum>* typedState, const arrow::Datum& datum, ui64 row) {
  214. using TInScalar = typename TPrimitiveDataType<TIn>::TScalarResult;
  215. if constexpr (IsScalar) {
  216. Y_ENSURE(datum.is_scalar());
  217. if constexpr (IsNullable) {
  218. if (datum.scalar()->is_valid) {
  219. typedState->Sum_ += Cast(datum.scalar_as<TInScalar>().value);
  220. typedState->IsValid_ = 1;
  221. }
  222. } else {
  223. typedState->Sum_ += Cast(datum.scalar_as<TInScalar>().value);
  224. }
  225. } else {
  226. const auto& array = datum.array();
  227. auto ptr = array->GetValues<TIn>(1);
  228. if constexpr (IsNullable) {
  229. if (array->GetNullCount() == 0) {
  230. typedState->IsValid_ = 1;
  231. typedState->Sum_ += ptr[row];
  232. } else {
  233. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  234. ui64 fullIndex = row + array->offset;
  235. ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
  236. typedState->Sum_ += SelectArg<TIn>(notNull, ptr[row], 0);
  237. typedState->IsValid_ |= notNull;
  238. }
  239. } else {
  240. typedState->Sum_ += ptr[row];
  241. }
  242. }
  243. }
  244. template <bool IsNullable, bool IsScalar, typename TIn, typename TSum>
  245. class TSumBlockAggregator<TCombineKeysTag, IsNullable, IsScalar, TIn, TSum> : public TCombineKeysTag::TBase {
  246. public:
  247. using TBase = TCombineKeysTag::TBase;
  248. using TStateType = TSumState<IsNullable, TSum>;
  249. TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TType* dataType, TComputationContext& ctx)
  250. : TBase(sizeof(TStateType), filterColumn, ctx)
  251. , ArgColumn_(argColumn)
  252. , DataType_(dataType)
  253. {
  254. }
  255. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  256. TStateType st;
  257. WriteUnaligned<TStateType>(state, st);
  258. UpdateKey(state, batchNum, columns, row);
  259. }
  260. void DestroyState(void* state) noexcept final {
  261. static_assert(std::is_trivially_destructible<TStateType>::value);
  262. Y_UNUSED(state);
  263. }
  264. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  265. Y_UNUSED(batchNum);
  266. auto typedState = MakeStateWrapper<TStateType>(state);
  267. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  268. PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState.Get(), datum, row);
  269. }
  270. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  271. return std::make_unique<TSumColumnBuilder<IsNullable, TSum>>(size, DataType_, Ctx_);
  272. }
  273. private:
  274. const ui32 ArgColumn_;
  275. TType* const DataType_;
  276. };
  277. template <bool IsNullable, bool IsScalar, typename TIn, typename TSum>
  278. class TSumBlockAggregator<TFinalizeKeysTag, IsNullable, IsScalar, TIn, TSum> : public TFinalizeKeysTag::TBase {
  279. public:
  280. using TBase = TFinalizeKeysTag::TBase;
  281. using TStateType = TSumState<IsNullable, TSum>;
  282. TSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TType* dataType, TComputationContext& ctx)
  283. : TBase(sizeof(TStateType), filterColumn, ctx)
  284. , ArgColumn_(argColumn)
  285. , DataType_(dataType)
  286. {
  287. }
  288. void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  289. TStateType st;
  290. WriteUnaligned<TStateType>(state, st);
  291. UpdateState(state, batchNum, columns, row);
  292. }
  293. void DestroyState(void* state) noexcept final {
  294. static_assert(std::is_trivially_destructible<TStateType>::value);
  295. Y_UNUSED(state);
  296. }
  297. void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  298. Y_UNUSED(batchNum);
  299. auto typedState = MakeStateWrapper<TStateType>(state);
  300. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  301. PushValueToState<IsNullable, IsScalar, TIn, TSum>(typedState.Get(), datum, row);
  302. }
  303. void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
  304. auto typedState = MakeStateWrapper<TStateType>(state);
  305. if constexpr (IsNullable) {
  306. buffer.PushNumber(typedState->IsValid_);
  307. }
  308. buffer.PushNumber(typedState->Sum_);
  309. }
  310. void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
  311. auto typedState = MakeStateWrapper<TStateType>(state);
  312. buffer.PopNumber(typedState->Sum_);
  313. if constexpr (IsNullable) {
  314. buffer.PopNumber(typedState->IsValid_);
  315. }
  316. }
  317. std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
  318. return std::make_unique<TSumColumnBuilder<IsNullable, TSum>>(size, DataType_, Ctx_);
  319. }
  320. private:
  321. const ui32 ArgColumn_;
  322. TType* const DataType_;
  323. };
  324. template<typename TIn, typename TOut>
  325. class TAvgBlockAggregator<TCombineAllTag, TIn, TOut> : public TCombineAllTag::TBase {
  326. public:
  327. using TBase = TCombineAllTag::TBase;
  328. using TInScalar = typename TPrimitiveDataType<TIn>::TScalarResult;
  329. TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TType* outputType, TComputationContext& ctx)
  330. : TBase(sizeof(TAvgState<TOut>), filterColumn, ctx)
  331. , ArgColumn_(argColumn)
  332. {
  333. Y_UNUSED(outputType);
  334. }
  335. void InitState(void* state) final {
  336. TAvgState<TOut> st;
  337. WriteUnaligned<TAvgState<TOut>>(state, st);
  338. }
  339. void DestroyState(void* state) noexcept final {
  340. static_assert(std::is_trivially_destructible<TAvgState<TOut>>::value);
  341. Y_UNUSED(state);
  342. }
  343. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  344. auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
  345. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  346. if (datum.is_scalar()) {
  347. if (datum.scalar()->is_valid) {
  348. typedState->Sum_ += (filtered ? *filtered : batchLength) * Cast(datum.scalar_as<TInScalar>().value);
  349. typedState->Count_ += batchLength;
  350. }
  351. } else {
  352. const auto& array = datum.array();
  353. auto ptr = array->GetValues<TIn>(1);
  354. auto len = array->length;
  355. auto count = len - array->GetNullCount();
  356. if (!count) {
  357. return;
  358. }
  359. if (!filtered) {
  360. typedState->Count_ += count;
  361. auto sum = typedState->Sum_;
  362. if (array->GetNullCount() == 0) {
  363. for (int64_t i = 0; i < len; ++i) {
  364. sum += ptr[i];
  365. }
  366. } else {
  367. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  368. for (int64_t i = 0; i < len; ++i) {
  369. ui64 fullIndex = i + array->offset;
  370. // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
  371. ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
  372. sum += SelectArg<TIn>(notNull, ptr[i], 0);
  373. }
  374. }
  375. typedState->Sum_ = sum;
  376. } else {
  377. const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
  378. const auto& filterArray = filterDatum.array();
  379. MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
  380. const ui8* filterBitmap = filterArray->template GetValues<uint8_t>(1);
  381. auto sum = typedState->Sum_;
  382. ui64 count = typedState->Count_;
  383. if (array->GetNullCount() == 0) {
  384. for (int64_t i = 0; i < len; ++i) {
  385. ui8 filtered = filterBitmap[i];
  386. sum += SelectArg<TIn>(filterBitmap[i], ptr[i], 0);
  387. count += filtered;
  388. }
  389. } else {
  390. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  391. for (int64_t i = 0; i < len; ++i) {
  392. ui64 fullIndex = i + array->offset;
  393. ui8 notNullAndFiltered = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) & filterBitmap[i];
  394. sum += SelectArg<TIn>(notNullAndFiltered, ptr[i], 0);
  395. count += notNullAndFiltered;
  396. }
  397. }
  398. typedState->Sum_ = sum;
  399. typedState->Count_ = count;
  400. }
  401. }
  402. }
  403. NUdf::TUnboxedValue FinishOne(const void* state) final {
  404. auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
  405. if (!typedState->Count_) {
  406. return NUdf::TUnboxedValuePod();
  407. }
  408. NUdf::TUnboxedValue* items;
  409. auto arr = Ctx_.HolderFactory.CreateDirectArrayHolder(2, items);
  410. items[0] = NUdf::TUnboxedValuePod(TOut(typedState->Sum_));
  411. items[1] = NUdf::TUnboxedValuePod(typedState->Count_);
  412. return arr;
  413. }
  414. private:
  415. ui32 ArgColumn_;
  416. };
  417. template <typename TIn, typename TOut>
  418. class TAvgBlockAggregator<TCombineKeysTag, TIn, TOut> : public TCombineKeysTag::TBase {
  419. public:
  420. using TBase = TCombineKeysTag::TBase;
  421. using TInScalar = typename TPrimitiveDataType<TIn>::TScalarResult;
  422. TAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TType* outputType, TComputationContext& ctx)
  423. : TBase(sizeof(TAvgState<TOut>), filterColumn, ctx)
  424. , ArgColumn_(argColumn)
  425. , OutputType_(outputType)
  426. {
  427. }
  428. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  429. TAvgState<TOut> st;
  430. WriteUnaligned<TAvgState<TOut>>(state, st);
  431. UpdateKey(state, batchNum, columns, row);
  432. }
  433. void DestroyState(void* state) noexcept final {
  434. static_assert(std::is_trivially_destructible<TAvgState<TOut>>::value);
  435. Y_UNUSED(state);
  436. }
  437. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  438. Y_UNUSED(batchNum);
  439. auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
  440. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  441. if (datum.is_scalar()) {
  442. if (datum.scalar()->is_valid) {
  443. typedState->Sum_ += Cast(datum.scalar_as<TInScalar>().value);
  444. typedState->Count_ += 1;
  445. }
  446. } else {
  447. const auto& array = datum.array();
  448. auto ptr = array->GetValues<TIn>(1);
  449. if (array->GetNullCount() == 0) {
  450. typedState->Sum_ += ptr[row];
  451. typedState->Count_ += 1;
  452. } else {
  453. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  454. ui64 fullIndex = row + array->offset;
  455. ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
  456. typedState->Sum_ += SelectArg<TIn>(notNull, ptr[row], 0);
  457. typedState->Count_ += notNull;
  458. }
  459. }
  460. }
  461. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  462. return std::make_unique<TAvgStateColumnBuilder<TOut>>(size, OutputType_, Ctx_);
  463. }
  464. private:
  465. const ui32 ArgColumn_;
  466. TType* const OutputType_;
  467. };
  468. template<typename TOut>
  469. class TAvgBlockAggregatorOverState : public TFinalizeKeysTag::TBase {
  470. public:
  471. using TBase = TFinalizeKeysTag::TBase;
  472. using TInScalar = typename TPrimitiveDataType<TOut>::TScalarResult;
  473. TAvgBlockAggregatorOverState(ui32 argColumn, TComputationContext& ctx)
  474. : TBase(sizeof(TAvgState<TOut>), {}, ctx)
  475. , ArgColumn_(argColumn)
  476. {
  477. }
  478. void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  479. TAvgState<TOut> st;
  480. WriteUnaligned<TAvgState<TOut>>(state, st);
  481. UpdateState(state, batchNum, columns, row);
  482. }
  483. void DestroyState(void* state) noexcept final {
  484. static_assert(std::is_trivially_destructible<TAvgState<TOut>>::value);
  485. Y_UNUSED(state);
  486. }
  487. void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  488. Y_UNUSED(batchNum);
  489. auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
  490. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  491. if (datum.is_scalar()) {
  492. if (datum.scalar()->is_valid) {
  493. const auto& structScalar = arrow::internal::checked_cast<const arrow::StructScalar&>(*datum.scalar());
  494. typedState->Sum_ += Cast(arrow::internal::checked_cast<const TInScalar&>(*structScalar.value[0]).value);
  495. typedState->Count_ += arrow::internal::checked_cast<const arrow::UInt64Scalar&>(*structScalar.value[1]).value;
  496. }
  497. } else {
  498. const auto& array = datum.array();
  499. auto sumPtr = array->child_data[0]->GetValues<TOut>(1);
  500. auto countPtr = array->child_data[1]->GetValues<ui64>(1);
  501. if (array->GetNullCount() == 0) {
  502. typedState->Sum_ += sumPtr[row];
  503. typedState->Count_ += countPtr[row];
  504. } else {
  505. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  506. ui64 fullIndex = row + array->offset;
  507. // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00
  508. auto bit = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
  509. ui64 mask = -ui64(bit);
  510. typedState->Sum_ += sumPtr[row] * bit;
  511. typedState->Count_ += mask & countPtr[row];
  512. }
  513. }
  514. }
  515. void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
  516. auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
  517. buffer.PushNumber(typedState->Sum_);
  518. buffer.PushNumber(typedState->Count_);
  519. }
  520. void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
  521. auto typedState = MakeStateWrapper<TAvgState<TOut>>(state);
  522. buffer.PopNumber(typedState->Count_);
  523. buffer.PopNumber(typedState->Sum_);
  524. }
  525. std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
  526. return std::make_unique<TAvgResultColumnBuilder<TOut>>(size, Ctx_);
  527. }
  528. private:
  529. const ui32 ArgColumn_;
  530. };
  531. template <typename TTag, bool IsNullable, bool IsScalar, typename TIn, typename TSum>
  532. class TPreparedSumBlockAggregator : public TTag::TPreparedAggregator {
  533. public:
  534. using TBase = typename TTag::TPreparedAggregator;
  535. using TStateType = TSumState<IsNullable, TSum>;
  536. TPreparedSumBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TType* dataType)
  537. : TBase(sizeof(TStateType))
  538. , FilterColumn_(filterColumn)
  539. , ArgColumn_(argColumn)
  540. , DataType_(dataType)
  541. {}
  542. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  543. return std::make_unique<TSumBlockAggregator<TTag, IsNullable, IsScalar, TIn, TSum>>(FilterColumn_, ArgColumn_, DataType_, ctx);
  544. }
  545. private:
  546. const std::optional<ui32> FilterColumn_;
  547. const ui32 ArgColumn_;
  548. TType* const DataType_;
  549. };
  550. template<typename TTag, typename TIn, typename TSum>
  551. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareSumFixed(TType* type, bool isOptional, bool isScalar, std::optional<ui32> filterColumn, ui32 argColumn) {
  552. if (isScalar) {
  553. if (isOptional) {
  554. return std::make_unique<TPreparedSumBlockAggregator<TTag, true, true, TIn, TSum>>(filterColumn, argColumn, type);
  555. }
  556. return std::make_unique<TPreparedSumBlockAggregator<TTag, false, true, TIn, TSum>>(filterColumn, argColumn, type);
  557. }
  558. if (isOptional) {
  559. return std::make_unique<TPreparedSumBlockAggregator<TTag, true, false, TIn, TSum>>(filterColumn, argColumn, type);
  560. }
  561. return std::make_unique<TPreparedSumBlockAggregator<TTag, false, false, TIn, TSum>>(filterColumn, argColumn, type);
  562. }
  563. template <typename TTag>
  564. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareSum(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) {
  565. auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn));
  566. auto argType = blockType->GetItemType();
  567. bool isOptional;
  568. auto dataType = UnpackOptionalData(argType, isOptional);
  569. bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
  570. TType* sumRetType = nullptr;
  571. const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(*dataType->GetDataSlot());
  572. if (typeInfo.Features & NYql::NUdf::EDataTypeFeatures::SignedIntegralType) {
  573. sumRetType = TDataType::Create(NUdf::TDataType<i64>::Id, env);
  574. } else if (typeInfo.Features & NYql::NUdf::EDataTypeFeatures::UnsignedIntegralType) {
  575. sumRetType = TDataType::Create(NUdf::TDataType<ui64>::Id, env);
  576. } else if (*dataType->GetDataSlot() == NUdf::EDataSlot::Decimal) {
  577. auto decimalType = static_cast<TDataDecimalType*>(dataType);
  578. auto [_, scale] = decimalType->GetParams();
  579. sumRetType = TDataDecimalType::Create(NYql::NDecimal::MaxPrecision, scale, env);
  580. } else if (*dataType->GetDataSlot() == NUdf::EDataSlot::Interval) {
  581. sumRetType = TDataDecimalType::Create(NYql::NDecimal::MaxPrecision, 0, env);
  582. } else {
  583. Y_ENSURE(typeInfo.Features & NYql::NUdf::EDataTypeFeatures::FloatType);
  584. sumRetType = dataType;
  585. }
  586. sumRetType = TOptionalType::Create(sumRetType, env);
  587. switch (*dataType->GetDataSlot()) {
  588. case NUdf::EDataSlot::Int8:
  589. return PrepareSumFixed<TTag, i8, i64>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  590. case NUdf::EDataSlot::Uint8:
  591. return PrepareSumFixed<TTag, ui8, ui64>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  592. case NUdf::EDataSlot::Int16:
  593. return PrepareSumFixed<TTag, i16, i64>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  594. case NUdf::EDataSlot::Uint16:
  595. return PrepareSumFixed<TTag, ui16, ui64>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  596. case NUdf::EDataSlot::Int32:
  597. return PrepareSumFixed<TTag, i32, i64>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  598. case NUdf::EDataSlot::Uint32:
  599. return PrepareSumFixed<TTag, ui32, ui64>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  600. case NUdf::EDataSlot::Int64:
  601. return PrepareSumFixed<TTag, i64, i64>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  602. case NUdf::EDataSlot::Uint64:
  603. return PrepareSumFixed<TTag, ui64, ui64>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  604. case NUdf::EDataSlot::Float:
  605. return PrepareSumFixed<TTag, float, float>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  606. case NUdf::EDataSlot::Double:
  607. return PrepareSumFixed<TTag, double, double>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  608. case NUdf::EDataSlot::Interval:
  609. return PrepareSumFixed<TTag, i64, NYql::NDecimal::TInt128>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  610. case NUdf::EDataSlot::Decimal:
  611. return PrepareSumFixed<TTag, NYql::NDecimal::TInt128, NYql::NDecimal::TInt128>(sumRetType, isOptional, isScalar, filterColumn, argColumn);
  612. default:
  613. throw yexception() << "Unsupported SUM input type";
  614. }
  615. }
  616. class TBlockSumFactory : public IBlockAggregatorFactory {
  617. public:
  618. std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
  619. TTupleType* tupleType,
  620. std::optional<ui32> filterColumn,
  621. const std::vector<ui32>& argsColumns,
  622. const TTypeEnvironment& env) const final
  623. {
  624. return PrepareSum<TCombineAllTag>(tupleType, filterColumn, argsColumns[0], env);
  625. }
  626. std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
  627. TTupleType* tupleType,
  628. const std::vector<ui32>& argsColumns,
  629. const TTypeEnvironment& env) const final
  630. {
  631. return PrepareSum<TCombineKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0], env);
  632. }
  633. std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
  634. TTupleType* tupleType,
  635. const std::vector<ui32>& argsColumns,
  636. const TTypeEnvironment& env,
  637. TType* returnType,
  638. ui32 hint) const final
  639. {
  640. Y_UNUSED(returnType);
  641. Y_UNUSED(hint);
  642. return PrepareSum<TFinalizeKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0], env);
  643. }
  644. };
  645. template <typename TTag, typename TIn, typename TOut>
  646. class TPreparedAvgBlockAggregator : public TTag::TPreparedAggregator {
  647. public:
  648. using TBase = typename TTag::TPreparedAggregator;
  649. TPreparedAvgBlockAggregator(std::optional<ui32> filterColumn, ui32 argColumn, TType* outputType)
  650. : TBase(sizeof(TAvgState<TOut>))
  651. , FilterColumn_(filterColumn)
  652. , ArgColumn_(argColumn)
  653. , OutputType_(outputType)
  654. {}
  655. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  656. return std::make_unique<TAvgBlockAggregator<TTag, TIn, TOut>>(FilterColumn_, ArgColumn_, OutputType_, ctx);
  657. }
  658. private:
  659. const std::optional<ui32> FilterColumn_;
  660. const ui32 ArgColumn_;
  661. TType* const OutputType_;
  662. };
  663. template<typename TOut>
  664. class TPreparedAvgBlockAggregatorOverState : public TFinalizeKeysTag::TPreparedAggregator {
  665. public:
  666. using TBase = TFinalizeKeysTag::TPreparedAggregator;
  667. TPreparedAvgBlockAggregatorOverState(ui32 argColumn)
  668. : TBase(sizeof(TAvgState<TOut>))
  669. , ArgColumn_(argColumn)
  670. {}
  671. std::unique_ptr<typename TFinalizeKeysTag::TAggregator> Make(TComputationContext& ctx) const final {
  672. return std::make_unique<TAvgBlockAggregatorOverState<TOut>>(ArgColumn_, ctx);
  673. }
  674. private:
  675. const ui32 ArgColumn_;
  676. };
  677. template <typename TTag>
  678. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareAvg(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env);
  679. template <typename TTag>
  680. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareAvgOverInput(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) {
  681. auto doubleType = TDataType::Create(NUdf::TDataType<double>::Id, env);
  682. auto ui64Type = TDataType::Create(NUdf::TDataType<ui64>::Id, env);
  683. TVector<TType*> tupleElements = { doubleType, ui64Type };
  684. auto avgRetType = TOptionalType::Create(TTupleType::Create(2, tupleElements.data(), env), env);
  685. auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn))->GetItemType();
  686. bool isOptional;
  687. auto dataType = UnpackOptionalData(argType, isOptional);
  688. switch (*dataType->GetDataSlot()) {
  689. case NUdf::EDataSlot::Int8:
  690. return std::make_unique<TPreparedAvgBlockAggregator<TTag, i8, double>>(filterColumn, argColumn, avgRetType);
  691. case NUdf::EDataSlot::Uint8:
  692. case NUdf::EDataSlot::Bool:
  693. return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui8, double>>(filterColumn, argColumn, avgRetType);
  694. case NUdf::EDataSlot::Int16:
  695. return std::make_unique<TPreparedAvgBlockAggregator<TTag, i16, double>>(filterColumn, argColumn, avgRetType);
  696. case NUdf::EDataSlot::Uint16:
  697. return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui16, double>>(filterColumn, argColumn, avgRetType);
  698. case NUdf::EDataSlot::Int32:
  699. return std::make_unique<TPreparedAvgBlockAggregator<TTag, i32, double>>(filterColumn, argColumn, avgRetType);
  700. case NUdf::EDataSlot::Uint32:
  701. return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui32, double>>(filterColumn, argColumn, avgRetType);
  702. case NUdf::EDataSlot::Int64:
  703. return std::make_unique<TPreparedAvgBlockAggregator<TTag, i64, double>>(filterColumn, argColumn, avgRetType);
  704. case NUdf::EDataSlot::Uint64:
  705. return std::make_unique<TPreparedAvgBlockAggregator<TTag, ui64, double>>(filterColumn, argColumn, avgRetType);
  706. case NUdf::EDataSlot::Float:
  707. return std::make_unique<TPreparedAvgBlockAggregator<TTag, float, double>>(filterColumn, argColumn, avgRetType);
  708. case NUdf::EDataSlot::Double:
  709. return std::make_unique<TPreparedAvgBlockAggregator<TTag, double, double>>(filterColumn, argColumn, avgRetType);
  710. case NUdf::EDataSlot::Interval: {
  711. auto decimalType = TDataDecimalType::Create(NYql::NDecimal::MaxPrecision, 0, env);
  712. TVector<TType*> tupleDecimalElements = { decimalType, ui64Type };
  713. auto avgRetDecimalType = TOptionalType::Create(TTupleType::Create(2, tupleDecimalElements.data(), env), env);
  714. return std::make_unique<TPreparedAvgBlockAggregator<TTag, i64, NYql::NDecimal::TInt128>>(filterColumn, argColumn, avgRetDecimalType);
  715. }
  716. case NUdf::EDataSlot::Decimal: {
  717. auto [precision, scale] = static_cast<TDataDecimalType*>(dataType)->GetParams();
  718. auto decimalType = TDataDecimalType::Create(precision, scale, env);
  719. TVector<TType*> tupleDecimalElements = { decimalType, ui64Type };
  720. auto avgRetDecimalType = TOptionalType::Create(TTupleType::Create(2, tupleDecimalElements.data(), env), env);
  721. return std::make_unique<TPreparedAvgBlockAggregator<TTag, NYql::NDecimal::TInt128, NYql::NDecimal::TInt128>>(filterColumn, argColumn, avgRetDecimalType);
  722. }
  723. default:
  724. throw yexception() << "Unsupported AVG input type";
  725. }
  726. }
  727. template <>
  728. std::unique_ptr<typename TCombineAllTag::TPreparedAggregator> PrepareAvg<TCombineAllTag>(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) {
  729. return PrepareAvgOverInput<TCombineAllTag>(tupleType, filterColumn, argColumn, env);
  730. }
  731. template <>
  732. std::unique_ptr<typename TCombineKeysTag::TPreparedAggregator> PrepareAvg<TCombineKeysTag>(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) {
  733. return PrepareAvgOverInput<TCombineKeysTag>(tupleType, filterColumn, argColumn, env);
  734. }
  735. template <>
  736. std::unique_ptr<typename TFinalizeKeysTag::TPreparedAggregator> PrepareAvg<TFinalizeKeysTag>(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn, const TTypeEnvironment& env) {
  737. Y_UNUSED(filterColumn);
  738. Y_UNUSED(env);
  739. auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn))->GetItemType();
  740. bool isOptional;
  741. auto aggTupleType = UnpackOptional(argType, isOptional);
  742. MKQL_ENSURE(aggTupleType->IsTuple(),
  743. "Expected tuple or optional of tuple, actual: " << PrintNode(argType, true));
  744. auto dataType = UnpackOptionalData(AS_TYPE(TTupleType, aggTupleType)->GetElementType(0), isOptional);
  745. switch (*dataType->GetDataSlot()) {
  746. case NUdf::EDataSlot::Decimal:
  747. return std::make_unique<TPreparedAvgBlockAggregatorOverState<NYql::NDecimal::TInt128>>(argColumn);
  748. case NUdf::EDataSlot::Double:
  749. return std::make_unique<TPreparedAvgBlockAggregatorOverState<double>>(argColumn);
  750. default:
  751. throw yexception() << "Unsupported Finalize input type";
  752. }
  753. }
  754. class TBlockAvgFactory : public IBlockAggregatorFactory {
  755. public:
  756. std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
  757. TTupleType* tupleType,
  758. std::optional<ui32> filterColumn,
  759. const std::vector<ui32>& argsColumns,
  760. const TTypeEnvironment& env) const final {
  761. return PrepareAvg<TCombineAllTag>(tupleType, filterColumn, argsColumns[0], env);
  762. }
  763. std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
  764. TTupleType* tupleType,
  765. const std::vector<ui32>& argsColumns,
  766. const TTypeEnvironment& env) const final {
  767. return PrepareAvg<TCombineKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0], env);
  768. }
  769. std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
  770. TTupleType* tupleType,
  771. const std::vector<ui32>& argsColumns,
  772. const TTypeEnvironment& env,
  773. TType* returnType,
  774. ui32 hint) const final {
  775. Y_UNUSED(returnType);
  776. Y_UNUSED(hint);
  777. return PrepareAvg<TFinalizeKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0], env);
  778. }
  779. };
  780. }
  781. std::unique_ptr<IBlockAggregatorFactory> MakeBlockSumFactory() {
  782. return std::make_unique<TBlockSumFactory>();
  783. }
  784. std::unique_ptr<IBlockAggregatorFactory> MakeBlockAvgFactory() {
  785. return std::make_unique<TBlockAvgFactory>();
  786. }
  787. }
  788. }