mkql_block_agg_minmax.cpp 40 KB


  1. #include "mkql_block_agg_minmax.h"
  2. #include "mkql_block_agg_state_helper.h"
  3. #include <yql/essentials/minikql/mkql_node_cast.h>
  4. #include <yql/essentials/minikql/mkql_node_builder.h>
  5. #include <yql/essentials/minikql/mkql_string_util.h>
  6. #include <yql/essentials/minikql/computation/mkql_block_builder.h>
  7. #include <yql/essentials/minikql/computation/mkql_block_reader.h>
  8. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  9. #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
  10. #include <yql/essentials/minikql/arrow/arrow_defs.h>
  11. #include <yql/essentials/minikql/arrow/arrow_util.h>
  12. #include <yql/essentials/minikql/arrow/mkql_bit_utils.h>
  13. #include <yql/essentials/public/udf/arrow/block_item_comparator.h>
  14. #include <arrow/scalar.h>
  15. #include <arrow/array/builder_primitive.h>
  16. namespace NKikimr {
  17. namespace NMiniKQL {
  18. namespace {
  19. template<typename T>
  20. inline bool AggLess(T a, T b) {
  21. if constexpr (std::is_floating_point<T>::value) {
  22. if (std::isunordered(a, b)) {
  23. // biggest fp value in agg ordering is NaN
  24. return std::isnan(a) < std::isnan(b);
  25. }
  26. }
  27. return a < b;
  28. }
  29. template <bool IsMin, typename T>
  30. inline T UpdateMinMax(T x, T y) {
  31. if constexpr (IsMin) {
  32. return AggLess(x, y) ? x : y;
  33. } else {
  34. return AggLess(y, x) ? x : y;
  35. }
  36. }
  37. template<bool IsMin, typename T>
  38. inline void UpdateMinMax(TMaybe<T>& state, bool& stateUpdated, T value) {
  39. if constexpr (IsMin) {
  40. if (!state || AggLess(value, *state)) {
  41. state = value;
  42. stateUpdated = true;
  43. }
  44. } else {
  45. if (!state || AggLess(*state, value)) {
  46. state = value;
  47. stateUpdated = true;
  48. }
  49. }
  50. }
  51. template<bool IsMin>
  52. inline void UpdateMinMax(NYql::NUdf::IBlockItemComparator& comparator, TBlockItem& state, bool& stateUpdated, TBlockItem value) {
  53. if constexpr (IsMin) {
  54. if (!state || comparator.Less(value, state)) {
  55. state = value;
  56. stateUpdated = true;
  57. }
  58. } else {
  59. if (!state || comparator.Less(state, value)) {
  60. state = value;
  61. stateUpdated = true;
  62. }
  63. }
  64. }
  65. template<typename TTag, typename TString, bool IsMin>
  66. class TMinMaxBlockStringAggregator;
  67. template<typename TTag, bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  68. class TMinMaxBlockFixedAggregator;
  69. template<typename TTag, bool IsMin>
  70. class TMinMaxBlockGenericAggregator;
  71. template <bool IsNullable, typename TIn, bool IsMin>
  72. struct TState;
  73. template<typename TIn, bool IsMin>
  74. constexpr TIn InitialStateValue() {
  75. if constexpr (std::is_floating_point<TIn>::value) {
  76. static_assert(std::numeric_limits<TIn>::has_infinity && std::numeric_limits<TIn>::has_quiet_NaN);
  77. if constexpr (IsMin) {
  78. // biggest fp value in agg ordering is NaN
  79. return std::numeric_limits<TIn>::quiet_NaN();
  80. } else {
  81. return -std::numeric_limits<TIn>::infinity();
  82. }
  83. } else if constexpr (std::is_same_v<TIn, NYql::NDecimal::TInt128>) {
  84. if constexpr (IsMin) {
  85. return NYql::NDecimal::Nan();
  86. } else {
  87. return -NYql::NDecimal::Inf();
  88. }
  89. } else if constexpr (std::is_arithmetic<TIn>::value) {
  90. if constexpr (IsMin) {
  91. return std::numeric_limits<TIn>::max();
  92. } else {
  93. return std::numeric_limits<TIn>::min();
  94. }
  95. } else {
  96. static_assert(std::is_arithmetic<TIn>::value);
  97. }
  98. }
  99. template <typename TIn, bool IsMin>
  100. struct TState<true, TIn, IsMin> {
  101. TIn Value = InitialStateValue<TIn, IsMin>();
  102. ui8 IsValid = 0;
  103. };
  104. template <typename TIn, bool IsMin>
  105. struct TState<false, TIn, IsMin> {
  106. TIn Value = InitialStateValue<TIn, IsMin>();
  107. };
  108. using TGenericState = NUdf::TUnboxedValuePod;
  109. template <bool IsNullable, typename TIn, bool IsMin>
  110. class TColumnBuilder : public IAggColumnBuilder {
  111. using TBuilder = typename NYql::NUdf::TFixedSizeArrayBuilder<TIn, IsNullable>;
  112. using TStateType = TState<IsNullable, TIn, IsMin>;
  113. public:
  114. TColumnBuilder(ui64 size, TType* type, TComputationContext& ctx)
  115. : Builder_(type, TTypeInfoHelper(), ctx.ArrowMemoryPool, size)
  116. , Ctx_(ctx)
  117. {
  118. }
  119. void Add(const void* state) final {
  120. auto typedState = MakeStateWrapper<TStateType>(state);
  121. if constexpr (IsNullable) {
  122. if (!typedState->IsValid) {
  123. Builder_.Add(TBlockItem());
  124. return;
  125. }
  126. }
  127. Builder_.Add(TBlockItem(typedState->Value));
  128. }
  129. NUdf::TUnboxedValue Build() final {
  130. return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true));
  131. }
  132. private:
  133. TBuilder Builder_;
  134. TComputationContext& Ctx_;
  135. };
  136. class TGenericColumnBuilder : public IAggColumnBuilder {
  137. public:
  138. TGenericColumnBuilder(ui64 size, TType* columnType, TComputationContext& ctx)
  139. : Builder_(MakeArrayBuilder(TTypeInfoHelper(), columnType, ctx.ArrowMemoryPool, size, &ctx.Builder->GetPgBuilder()))
  140. , Ctx_(ctx)
  141. {
  142. }
  143. void Add(const void* state) final {
  144. Builder_->Add(*static_cast<const TGenericState*>(state));
  145. }
  146. NUdf::TUnboxedValue Build() final {
  147. return Ctx_.HolderFactory.CreateArrowBlock(Builder_->Build(true));
  148. }
  149. private:
  150. const std::unique_ptr<IArrayBuilder> Builder_;
  151. TComputationContext& Ctx_;
  152. };
  153. template <bool IsMin>
  154. void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row, IBlockReader& reader,
  155. IBlockItemConverter& converter, NYql::NUdf::IBlockItemComparator& comparator, TComputationContext& ctx)
  156. {
  157. TBlockItem stateItem;
  158. bool stateChanged = false;
  159. if (datum.is_scalar()) {
  160. if (datum.scalar()->is_valid) {
  161. stateItem = reader.GetScalarItem(*datum.scalar());
  162. stateChanged = true;
  163. }
  164. } else {
  165. if (*typedState) {
  166. stateItem = converter.MakeItem(*typedState);
  167. }
  168. const auto& array = datum.array();
  169. TBlockItem curr = reader.GetItem(*array, row);
  170. if (curr) {
  171. UpdateMinMax<IsMin>(comparator, stateItem, stateChanged, curr);
  172. }
  173. }
  174. if (stateChanged) {
  175. typedState->DeleteUnreferenced();
  176. *typedState = converter.MakeValue(stateItem, ctx.HolderFactory);
  177. }
  178. }
  179. template<bool IsMin>
  180. class TMinMaxBlockGenericAggregator<TCombineAllTag, IsMin> : public TCombineAllTag::TBase {
  181. public:
  182. using TBase = TCombineAllTag::TBase;
  183. TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  184. : TBase(sizeof(TGenericState), filterColumn, ctx)
  185. , ArgColumn_(argColumn)
  186. , ReaderOne_(MakeBlockReader(TTypeInfoHelper(), type))
  187. , ReaderTwo_(MakeBlockReader(TTypeInfoHelper(), type))
  188. , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
  189. , Compare_(TBlockTypeHelper().MakeComparator(type))
  190. {
  191. }
  192. void InitState(void* state) final {
  193. new(state) TGenericState();
  194. }
  195. void DestroyState(void* state) noexcept final {
  196. auto typedState = static_cast<TGenericState*>(state);
  197. typedState->DeleteUnreferenced();
  198. *typedState = TGenericState();
  199. }
  200. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  201. TGenericState& typedState = *static_cast<TGenericState*>(state);
  202. Y_UNUSED(batchLength);
  203. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  204. IBlockReader* currReader = ReaderOne_.get();
  205. IBlockReader* stateReader = ReaderTwo_.get();
  206. TBlockItem stateItem;
  207. bool stateChanged = false;
  208. if (datum.is_scalar()) {
  209. if (datum.scalar()->is_valid) {
  210. stateItem = currReader->GetScalarItem(*datum.scalar());
  211. stateChanged = true;
  212. }
  213. } else {
  214. if (typedState) {
  215. stateItem = Converter_->MakeItem(typedState);
  216. }
  217. const auto& array = datum.array();
  218. auto len = array->length;
  219. const ui8* filterBitmap = nullptr;
  220. if (filtered) {
  221. const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
  222. const auto& filterArray = filterDatum.array();
  223. MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
  224. filterBitmap = filterArray->template GetValues<uint8_t>(1);
  225. }
  226. auto& comparator = *Compare_;
  227. for (auto i = 0; i < len; ++i) {
  228. TBlockItem curr = currReader->GetItem(*array, i);
  229. if (curr && (!filterBitmap || filterBitmap[i])) {
  230. bool changed = false;
  231. UpdateMinMax<IsMin>(comparator, stateItem, changed, curr);
  232. if (changed) {
  233. std::swap(currReader, stateReader);
  234. stateChanged = true;
  235. }
  236. }
  237. }
  238. }
  239. if (stateChanged) {
  240. typedState.DeleteUnreferenced();
  241. typedState = Converter_->MakeValue(stateItem, Ctx_.HolderFactory);
  242. }
  243. }
  244. NUdf::TUnboxedValue FinishOne(const void *state) final {
  245. auto typedState = *static_cast<const TGenericState *>(state);
  246. return typedState;
  247. }
  248. private:
  249. const ui32 ArgColumn_;
  250. const std::unique_ptr<IBlockReader> ReaderOne_;
  251. const std::unique_ptr<IBlockReader> ReaderTwo_;
  252. const std::unique_ptr<IBlockItemConverter> Converter_;
  253. const NYql::NUdf::IBlockItemComparator::TPtr Compare_;
  254. };
  255. template<bool IsMin>
  256. class TMinMaxBlockGenericAggregator<TCombineKeysTag, IsMin> : public TCombineKeysTag::TBase {
  257. public:
  258. using TBase = TCombineKeysTag::TBase;
  259. TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  260. : TBase(sizeof(TGenericState), filterColumn, ctx)
  261. , ArgColumn_(argColumn)
  262. , Type_(type)
  263. , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
  264. , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
  265. , Compare_(TBlockTypeHelper().MakeComparator(type))
  266. {
  267. }
  268. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  269. new(state) TGenericState();
  270. UpdateKey(state, batchNum, columns, row);
  271. }
  272. void DestroyState(void* state) noexcept final {
  273. auto typedState = static_cast<TGenericState*>(state);
  274. typedState->DeleteUnreferenced();
  275. *typedState = TGenericState();
  276. }
  277. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  278. Y_UNUSED(batchNum);
  279. auto typedState = static_cast<TGenericState*>(state);
  280. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  281. PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_);
  282. }
  283. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  284. return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
  285. }
  286. private:
  287. const ui32 ArgColumn_;
  288. TType* const Type_;
  289. const std::unique_ptr<IBlockReader> Reader_;
  290. const std::unique_ptr<IBlockItemConverter> Converter_;
  291. const NYql::NUdf::IBlockItemComparator::TPtr Compare_;
  292. };
  293. template<bool IsMin>
  294. class TMinMaxBlockGenericAggregator<TFinalizeKeysTag, IsMin> : public TFinalizeKeysTag::TBase {
  295. public:
  296. using TBase = TFinalizeKeysTag::TBase;
  297. TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  298. : TBase(sizeof(TGenericState), filterColumn, ctx)
  299. , ArgColumn_(argColumn)
  300. , Type_(type)
  301. , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
  302. , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
  303. , Compare_(TBlockTypeHelper().MakeComparator(type))
  304. , Packer_(false, type)
  305. {
  306. }
  307. void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  308. new(state) TGenericState();
  309. UpdateState(state, batchNum, columns, row);
  310. }
  311. void DestroyState(void* state) noexcept final {
  312. auto typedState = static_cast<TGenericState*>(state);
  313. typedState->DeleteUnreferenced();
  314. *typedState = TGenericState();
  315. }
  316. void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  317. Y_UNUSED(batchNum);
  318. auto typedState = static_cast<TGenericState*>(state);
  319. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  320. PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_);
  321. }
  322. void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
  323. auto typedState = static_cast<TGenericState*>(state);
  324. buffer.PushString(Packer_.Pack(*typedState));
  325. }
  326. void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
  327. auto typedState = static_cast<TGenericState*>(state);
  328. *typedState = Packer_.Unpack(buffer.PopString(), Ctx_.HolderFactory).Release();
  329. }
  330. std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
  331. return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
  332. }
  333. private:
  334. const ui32 ArgColumn_;
  335. TType* const Type_;
  336. const std::unique_ptr<IBlockReader> Reader_;
  337. const std::unique_ptr<IBlockItemConverter> Converter_;
  338. const NYql::NUdf::IBlockItemComparator::TPtr Compare_;
  339. const TValuePacker Packer_;
  340. };
  341. template <typename TStringType, bool IsMin>
  342. void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row) {
  343. using TOffset = typename TPrimitiveDataType<TStringType>::TResult::offset_type;;
  344. TMaybe<NUdf::TStringRef> currentState;
  345. if (*typedState) {
  346. currentState = typedState->AsStringRef();
  347. }
  348. bool stateUpdated = false;
  349. if (datum.is_scalar()) {
  350. if (datum.scalar()->is_valid) {
  351. auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(*datum.scalar()).value;
  352. const char* data = reinterpret_cast<const char*>(buffer->data());
  353. auto value = NUdf::TStringRef(data, buffer->size());
  354. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  355. }
  356. } else {
  357. const auto& array = datum.array();
  358. const TOffset* offsets = array->GetValues<TOffset>(1);
  359. const char* data = array->GetValues<char>(2, 0);
  360. if (array->GetNullCount() == 0) {
  361. auto value = NUdf::TStringRef(data + offsets[row], offsets[row + 1] - offsets[row]);
  362. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  363. } else {
  364. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  365. ui64 fullIndex = row + array->offset;
  366. if ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) {
  367. auto value = NUdf::TStringRef(data + offsets[row], offsets[row + 1] - offsets[row]);
  368. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  369. }
  370. }
  371. }
  372. if (stateUpdated) {
  373. auto newState = MakeString(*currentState);
  374. typedState->DeleteUnreferenced();
  375. *typedState = std::move(newState);
  376. }
  377. }
  378. template<typename TStringType, bool IsMin>
  379. class TMinMaxBlockStringAggregator<TCombineAllTag, TStringType, IsMin> : public TCombineAllTag::TBase {
  380. public:
  381. using TBase = TCombineAllTag::TBase;
  382. using TOffset = typename TPrimitiveDataType<TStringType>::TResult::offset_type;
  383. TMinMaxBlockStringAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  384. : TBase(sizeof(TGenericState), filterColumn, ctx)
  385. , ArgColumn_(argColumn)
  386. {
  387. Y_UNUSED(type);
  388. }
  389. void InitState(void* state) final {
  390. new(state) TGenericState();
  391. }
  392. void DestroyState(void* state) noexcept final {
  393. auto typedState = static_cast<TGenericState*>(state);
  394. typedState->DeleteUnreferenced();
  395. *typedState = TGenericState();
  396. }
  397. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  398. TGenericState& typedState = *static_cast<TGenericState*>(state);
  399. Y_UNUSED(batchLength);
  400. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  401. TMaybe<NUdf::TStringRef> currentState;
  402. if (typedState) {
  403. currentState = typedState.AsStringRef();
  404. }
  405. bool stateUpdated = false;
  406. if (datum.is_scalar()) {
  407. if (datum.scalar()->is_valid) {
  408. auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(*datum.scalar()).value;
  409. const char* data = reinterpret_cast<const char*>(buffer->data());
  410. auto value = NUdf::TStringRef(data, buffer->size());
  411. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  412. }
  413. } else {
  414. const auto& array = datum.array();
  415. auto len = array->length;
  416. auto count = len - array->GetNullCount();
  417. if (!count) {
  418. return;
  419. }
  420. const TOffset* offsets = array->GetValues<TOffset>(1);
  421. const char* data = array->GetValues<char>(2, 0);
  422. if (!filtered) {
  423. if (array->GetNullCount() == 0) {
  424. for (int64_t i = 0; i < len; ++i) {
  425. NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]);
  426. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  427. }
  428. } else {
  429. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  430. for (int64_t i = 0; i < len; ++i) {
  431. ui64 fullIndex = i + array->offset;
  432. if ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) {
  433. NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]);
  434. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  435. }
  436. }
  437. }
  438. } else {
  439. const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
  440. const auto& filterArray = filterDatum.array();
  441. MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
  442. const ui8* filterBitmap = filterArray->template GetValues<uint8_t>(1);
  443. if (array->GetNullCount() == 0) {
  444. for (int64_t i = 0; i < len; ++i) {
  445. if (filterBitmap[i]) {
  446. NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]);
  447. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  448. }
  449. }
  450. } else {
  451. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  452. for (int64_t i = 0; i < len; ++i) {
  453. ui64 fullIndex = i + array->offset;
  454. if (filterBitmap[i] && ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1)) {
  455. NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]);
  456. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  457. }
  458. }
  459. }
  460. }
  461. }
  462. if (stateUpdated) {
  463. auto newState = MakeString(*currentState);
  464. typedState.DeleteUnreferenced();
  465. typedState = std::move(newState);
  466. }
  467. }
  468. NUdf::TUnboxedValue FinishOne(const void* state) final {
  469. auto typedState = *static_cast<const TGenericState*>(state);
  470. return typedState;
  471. }
  472. private:
  473. const ui32 ArgColumn_;
  474. };
  475. template<typename TStringType, bool IsMin>
  476. class TMinMaxBlockStringAggregator<TCombineKeysTag, TStringType, IsMin> : public TCombineKeysTag::TBase {
  477. public:
  478. using TBase = TCombineKeysTag::TBase;
  479. TMinMaxBlockStringAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  480. : TBase(sizeof(TGenericState), filterColumn, ctx)
  481. , ArgColumn_(argColumn)
  482. , Type_(type)
  483. {
  484. }
  485. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  486. new(state) TGenericState();
  487. UpdateKey(state, batchNum, columns, row);
  488. }
  489. void DestroyState(void* state) noexcept final {
  490. auto typedState = static_cast<TGenericState*>(state);
  491. typedState->DeleteUnreferenced();
  492. *typedState = TGenericState();
  493. }
  494. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  495. Y_UNUSED(batchNum);
  496. auto typedState = static_cast<TGenericState*>(state);
  497. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  498. PushValueToState<TStringType, IsMin>(typedState, datum, row);
  499. }
  500. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  501. return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
  502. }
  503. private:
  504. const ui32 ArgColumn_;
  505. TType* const Type_;
  506. };
  507. template<typename TStringType, bool IsMin>
  508. class TMinMaxBlockStringAggregator<TFinalizeKeysTag, TStringType, IsMin> : public TFinalizeKeysTag::TBase {
  509. public:
  510. using TBase = TFinalizeKeysTag::TBase;
  511. TMinMaxBlockStringAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  512. : TBase(sizeof(TGenericState), filterColumn, ctx)
  513. , ArgColumn_(argColumn)
  514. , Type_(type)
  515. {
  516. }
  517. void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  518. new(state) TGenericState();
  519. UpdateState(state, batchNum, columns, row);
  520. }
  521. void DestroyState(void* state) noexcept final {
  522. auto typedState = static_cast<TGenericState*>(state);
  523. typedState->DeleteUnreferenced();
  524. *typedState = TGenericState();
  525. }
  526. void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  527. Y_UNUSED(batchNum);
  528. auto typedState = static_cast<TGenericState*>(state);
  529. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  530. PushValueToState<TStringType, IsMin>(typedState, datum, row);
  531. }
  532. void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
  533. auto typedState = static_cast<TGenericState*>(state);
  534. buffer.PushString(typedState->AsStringRef());
  535. }
  536. void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
  537. auto typedState = static_cast<TGenericState*>(state);
  538. *typedState = std::move(MakeString(buffer.PopString()));
  539. }
  540. std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
  541. return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
  542. }
  543. private:
  544. const ui32 ArgColumn_;
  545. TType* const Type_;
  546. };
  547. template <bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  548. class TMinMaxBlockFixedAggregator<TCombineAllTag, IsNullable, IsScalar, TIn, IsMin> : public TCombineAllTag::TBase {
  549. public:
  550. using TBase = TCombineAllTag::TBase;
  551. using TStateType = TState<IsNullable, TIn, IsMin>;
  552. using TInScalar = typename TPrimitiveDataType<TIn>::TScalarResult;
  553. TMinMaxBlockFixedAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  554. : TBase(sizeof(TStateType), filterColumn, ctx)
  555. , ArgColumn_(argColumn)
  556. {
  557. Y_UNUSED(type);
  558. }
  559. void InitState(void* ptr) final {
  560. TStateType state;
  561. WriteUnaligned<TStateType>(ptr, state);
  562. }
  563. void DestroyState(void* state) noexcept final {
  564. static_assert(std::is_trivially_destructible<TStateType>::value);
  565. Y_UNUSED(state);
  566. }
  567. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  568. auto typedState = MakeStateWrapper<TStateType>(state);
  569. Y_UNUSED(batchLength);
  570. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  571. if constexpr (IsScalar) {
  572. Y_ENSURE(datum.is_scalar());
  573. if constexpr (IsNullable) {
  574. if (datum.scalar()->is_valid) {
  575. typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
  576. typedState->IsValid = 1;
  577. }
  578. } else {
  579. typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
  580. }
  581. } else {
  582. const auto& array = datum.array();
  583. auto ptr = array->GetValues<TIn>(1);
  584. auto len = array->length;
  585. auto nullCount = IsNullable ? array->GetNullCount() : 0;
  586. auto count = len - nullCount;
  587. if (!count) {
  588. return;
  589. }
  590. if (!filtered) {
  591. TIn value = typedState->Value;
  592. if constexpr (IsNullable) {
  593. typedState->IsValid = 1;
  594. }
  595. if (IsNullable && nullCount != 0) {
  596. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  597. for (int64_t i = 0; i < len; ++i) {
  598. ui64 fullIndex = i + array->offset;
  599. ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
  600. value = UpdateMinMax<IsMin>(value, SelectArg(notNull, ptr[i], value));
  601. }
  602. } else {
  603. for (int64_t i = 0; i < len; ++i) {
  604. value = UpdateMinMax<IsMin>(value, ptr[i]);
  605. }
  606. }
  607. typedState->Value = value;
  608. } else {
  609. const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
  610. const auto& filterArray = filterDatum.array();
  611. MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
  612. const ui8* filterBitmap = filterArray->template GetValues<uint8_t>(1);
  613. TIn value = typedState->Value;
  614. ui64 validCount = 0;
  615. if (IsNullable && nullCount != 0) {
  616. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  617. for (int64_t i = 0; i < len; ++i) {
  618. ui64 fullIndex = i + array->offset;
  619. ui8 notNullAndFiltered = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) & filterBitmap[i];
  620. value = UpdateMinMax<IsMin>(value, SelectArg(notNullAndFiltered, ptr[i], value));
  621. validCount += notNullAndFiltered;
  622. }
  623. } else {
  624. for (int64_t i = 0; i < len; ++i) {
  625. ui8 filtered = filterBitmap[i];
  626. value = UpdateMinMax<IsMin>(value, SelectArg(filtered, ptr[i], value));
  627. validCount += filtered;
  628. }
  629. }
  630. if constexpr (IsNullable) {
  631. typedState->IsValid |= validCount ? 1 : 0;
  632. }
  633. typedState->Value = value;
  634. }
  635. }
  636. }
  637. NUdf::TUnboxedValue FinishOne(const void* state) final {
  638. auto typedState = MakeStateWrapper<TStateType>(state);
  639. if constexpr (IsNullable) {
  640. if (!typedState->IsValid) {
  641. return NUdf::TUnboxedValuePod();
  642. }
  643. }
  644. return NUdf::TUnboxedValuePod(typedState->Value);
  645. }
  646. private:
  647. const ui32 ArgColumn_;
  648. };
  649. template <bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  650. static void PushValueToState(TState<IsNullable, TIn, IsMin>* typedState, const arrow::Datum& datum, ui64 row) {
  651. using TInScalar = typename TPrimitiveDataType<TIn>::TScalarResult;
  652. if constexpr (IsScalar) {
  653. Y_ENSURE(datum.is_scalar());
  654. if constexpr (IsNullable) {
  655. if (datum.scalar()->is_valid) {
  656. typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
  657. typedState->IsValid = 1;
  658. }
  659. } else {
  660. typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
  661. }
  662. } else {
  663. const auto &array = datum.array();
  664. auto ptr = array->GetValues<TIn>(1);
  665. if constexpr (IsNullable) {
  666. if (array->GetNullCount() == 0) {
  667. typedState->IsValid = 1;
  668. typedState->Value = UpdateMinMax<IsMin>(typedState->Value, ptr[row]);
  669. } else {
  670. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  671. ui64 fullIndex = row + array->offset;
  672. ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
  673. typedState->Value = UpdateMinMax<IsMin>(typedState->Value, SelectArg(notNull, ptr[row], typedState->Value));
  674. typedState->IsValid |= notNull;
  675. }
  676. } else {
  677. typedState->Value = UpdateMinMax<IsMin>(typedState->Value, ptr[row]);
  678. }
  679. }
  680. }
  681. template <bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  682. class TMinMaxBlockFixedAggregator<TCombineKeysTag, IsNullable, IsScalar, TIn, IsMin> : public TCombineKeysTag::TBase {
  683. public:
  684. using TBase = TCombineKeysTag::TBase;
  685. using TStateType = TState<IsNullable, TIn, IsMin>;
  686. TMinMaxBlockFixedAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  687. : TBase(sizeof(TStateType), filterColumn, ctx)
  688. , ArgColumn_(argColumn)
  689. , Type_(type)
  690. {
  691. }
  692. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  693. TStateType st;
  694. WriteUnaligned<TStateType>(state, st);
  695. UpdateKey(state, batchNum, columns, row);
  696. }
  697. void DestroyState(void* state) noexcept final {
  698. static_assert(std::is_trivially_destructible<TStateType>::value);
  699. Y_UNUSED(state);
  700. }
  701. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  702. Y_UNUSED(batchNum);
  703. auto typedState = MakeStateWrapper<TStateType>(state);
  704. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  705. PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState.Get(), datum, row);
  706. }
  707. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  708. return std::make_unique<TColumnBuilder<IsNullable, TIn, IsMin>>(size, Type_, Ctx_);
  709. }
  710. private:
  711. const ui32 ArgColumn_;
  712. const std::shared_ptr<arrow::DataType> BuilderDataType_;
  713. TType* const Type_;
  714. };
  715. template <bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  716. class TMinMaxBlockFixedAggregator<TFinalizeKeysTag, IsNullable, IsScalar, TIn, IsMin> : public TFinalizeKeysTag::TBase {
  717. public:
  718. using TBase = TFinalizeKeysTag::TBase;
  719. using TStateType = TState<IsNullable, TIn, IsMin>;
  720. TMinMaxBlockFixedAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  721. : TBase(sizeof(TStateType), filterColumn, ctx)
  722. , ArgColumn_(argColumn)
  723. , Type_(type)
  724. {
  725. }
  726. void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  727. TStateType st;
  728. WriteUnaligned<TStateType>(state, st);
  729. UpdateState(state, batchNum, columns, row);
  730. }
  731. void DestroyState(void* state) noexcept final {
  732. static_assert(std::is_trivially_destructible<TStateType>::value);
  733. Y_UNUSED(state);
  734. }
  735. void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  736. Y_UNUSED(batchNum);
  737. auto typedState = MakeStateWrapper<TStateType>(state);
  738. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  739. PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState.Get(), datum, row);
  740. }
  741. void SerializeState(void* state, NUdf::TOutputBuffer& buffer) final {
  742. auto typedState = MakeStateWrapper<TStateType>(state);
  743. if constexpr (IsNullable) {
  744. buffer.PushNumber(typedState->IsValid);
  745. }
  746. buffer.PushNumber(typedState->Value);
  747. }
  748. void DeserializeState(void* state, NUdf::TInputBuffer& buffer) final {
  749. auto typedState = MakeStateWrapper<TStateType>(state);
  750. buffer.PopNumber(typedState->Value);
  751. if constexpr (IsNullable) {
  752. buffer.PopNumber(typedState->IsValid);
  753. }
  754. }
  755. std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
  756. return std::make_unique<TColumnBuilder<IsNullable, TIn, IsMin>>(size, Type_, Ctx_);
  757. }
  758. private:
  759. const ui32 ArgColumn_;
  760. TType* const Type_;
  761. };
  762. template<typename TTag, typename TStringType, bool IsMin>
  763. class TPreparedMinMaxBlockStringAggregator : public TTag::TPreparedAggregator {
  764. public:
  765. using TBase = typename TTag::TPreparedAggregator;
  766. TPreparedMinMaxBlockStringAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn)
  767. : TBase(sizeof(TGenericState))
  768. , Type_(type)
  769. , FilterColumn_(filterColumn)
  770. , ArgColumn_(argColumn)
  771. {}
  772. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  773. return std::make_unique<TMinMaxBlockStringAggregator<TTag, TStringType, IsMin>>(Type_, FilterColumn_, ArgColumn_, ctx);
  774. }
  775. private:
  776. TType* const Type_;
  777. const std::optional<ui32> FilterColumn_;
  778. const ui32 ArgColumn_;
  779. };
  780. template <typename TTag, bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  781. class TPreparedMinMaxBlockFixedAggregator : public TTag::TPreparedAggregator {
  782. public:
  783. using TBase = typename TTag::TPreparedAggregator;
  784. using TStateType = TState<IsNullable, TIn, IsMin>;
  785. TPreparedMinMaxBlockFixedAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn)
  786. : TBase(sizeof(TStateType))
  787. , Type_(type)
  788. , FilterColumn_(filterColumn)
  789. , ArgColumn_(argColumn)
  790. {}
  791. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  792. return std::make_unique<TMinMaxBlockFixedAggregator<TTag, IsNullable, IsScalar, TIn, IsMin>>(Type_, FilterColumn_, ArgColumn_, ctx);
  793. }
  794. private:
  795. TType* const Type_;
  796. const std::optional<ui32> FilterColumn_;
  797. const ui32 ArgColumn_;
  798. };
  799. template <typename TTag, bool IsMin>
  800. class TPreparedMinMaxBlockGenericAggregator : public TTag::TPreparedAggregator {
  801. public:
  802. using TBase = typename TTag::TPreparedAggregator;
  803. TPreparedMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn)
  804. : TBase(sizeof(TGenericState))
  805. , Type_(type)
  806. , FilterColumn_(filterColumn)
  807. , ArgColumn_(argColumn)
  808. {}
  809. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  810. return std::make_unique<TMinMaxBlockGenericAggregator<TTag, IsMin>>(Type_, FilterColumn_, ArgColumn_, ctx);
  811. }
  812. private:
  813. TType* const Type_;
  814. const std::optional<ui32> FilterColumn_;
  815. const ui32 ArgColumn_;
  816. };
  817. template<typename TTag, typename TIn, bool IsMin>
  818. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMaxFixed(TType* type, bool isOptional, bool isScalar, std::optional<ui32> filterColumn, ui32 argColumn) {
  819. if (isScalar) {
  820. if (isOptional) {
  821. return std::make_unique<TPreparedMinMaxBlockFixedAggregator<TTag, true, true, TIn, IsMin>>(type, filterColumn, argColumn);
  822. }
  823. return std::make_unique<TPreparedMinMaxBlockFixedAggregator<TTag, false, true, TIn, IsMin>>(type, filterColumn, argColumn);
  824. }
  825. if (isOptional) {
  826. return std::make_unique<TPreparedMinMaxBlockFixedAggregator<TTag, true, false, TIn, IsMin>>(type, filterColumn, argColumn);
  827. }
  828. return std::make_unique<TPreparedMinMaxBlockFixedAggregator<TTag, false, false, TIn, IsMin>>(type, filterColumn, argColumn);
  829. }
  830. template <typename TTag, bool IsMin>
  831. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMax(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) {
  832. auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn));
  833. const bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
  834. auto argType = blockType->GetItemType();
  835. bool isOptional;
  836. auto unpacked = UnpackOptional(argType, isOptional);
  837. if (!unpacked->IsData()) {
  838. return std::make_unique<TPreparedMinMaxBlockGenericAggregator<TTag, IsMin>>(argType, filterColumn, argColumn);
  839. }
  840. auto dataType = AS_TYPE(TDataType, unpacked);
  841. const auto slot = *dataType->GetDataSlot();
  842. if (slot == NUdf::EDataSlot::String) {
  843. using TStringType = char*;
  844. return std::make_unique<TPreparedMinMaxBlockStringAggregator<TTag, TStringType, IsMin>>(argType, filterColumn, argColumn);
  845. } else if (slot == NUdf::EDataSlot::Utf8) {
  846. using TStringType = NUdf::TUtf8;
  847. return std::make_unique<TPreparedMinMaxBlockStringAggregator<TTag, TStringType, IsMin>>(argType, filterColumn, argColumn);
  848. }
  849. switch (slot) {
  850. case NUdf::EDataSlot::Int8:
  851. return PrepareMinMaxFixed<TTag, i8, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  852. case NUdf::EDataSlot::Bool:
  853. case NUdf::EDataSlot::Uint8:
  854. return PrepareMinMaxFixed<TTag, ui8, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  855. case NUdf::EDataSlot::Int16:
  856. return PrepareMinMaxFixed<TTag, i16, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  857. case NUdf::EDataSlot::Uint16:
  858. case NUdf::EDataSlot::Date:
  859. return PrepareMinMaxFixed<TTag, ui16, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  860. case NUdf::EDataSlot::Int32:
  861. case NUdf::EDataSlot::Date32:
  862. return PrepareMinMaxFixed<TTag, i32, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  863. case NUdf::EDataSlot::Uint32:
  864. case NUdf::EDataSlot::Datetime:
  865. return PrepareMinMaxFixed<TTag, ui32, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  866. case NUdf::EDataSlot::Int64:
  867. case NUdf::EDataSlot::Interval:
  868. case NUdf::EDataSlot::Interval64:
  869. case NUdf::EDataSlot::Timestamp64:
  870. case NUdf::EDataSlot::Datetime64:
  871. return PrepareMinMaxFixed<TTag, i64, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  872. case NUdf::EDataSlot::Uint64:
  873. case NUdf::EDataSlot::Timestamp:
  874. return PrepareMinMaxFixed<TTag, ui64, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  875. case NUdf::EDataSlot::Float:
  876. return PrepareMinMaxFixed<TTag, float, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  877. case NUdf::EDataSlot::Double:
  878. return PrepareMinMaxFixed<TTag, double, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  879. case NUdf::EDataSlot::Decimal:
  880. return PrepareMinMaxFixed<TTag, NYql::NDecimal::TInt128, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  881. default:
  882. throw yexception() << "Unsupported MIN/MAX input type";
  883. }
  884. }
  885. template <bool IsMin>
  886. class TBlockMinMaxFactory : public IBlockAggregatorFactory {
  887. public:
  888. std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
  889. TTupleType* tupleType,
  890. std::optional<ui32> filterColumn,
  891. const std::vector<ui32>& argsColumns,
  892. const TTypeEnvironment& env) const final {
  893. Y_UNUSED(env);
  894. return PrepareMinMax<TCombineAllTag, IsMin>(tupleType, filterColumn, argsColumns[0]);
  895. }
  896. std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
  897. TTupleType* tupleType,
  898. const std::vector<ui32>& argsColumns,
  899. const TTypeEnvironment& env) const final {
  900. Y_UNUSED(env);
  901. return PrepareMinMax<TCombineKeysTag, IsMin>(tupleType, std::optional<ui32>(), argsColumns[0]);
  902. }
  903. std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
  904. TTupleType* tupleType,
  905. const std::vector<ui32>& argsColumns,
  906. const TTypeEnvironment& env,
  907. TType* returnType,
  908. ui32 hint) const final {
  909. Y_UNUSED(env);
  910. Y_UNUSED(returnType);
  911. Y_UNUSED(hint);
  912. return PrepareMinMax<TFinalizeKeysTag, IsMin>(tupleType, std::optional<ui32>(), argsColumns[0]);
  913. }
  914. };
  915. } // namespace
  916. std::unique_ptr<IBlockAggregatorFactory> MakeBlockMinFactory() {
  917. return std::make_unique<TBlockMinMaxFactory<true>>();
  918. }
  919. std::unique_ptr<IBlockAggregatorFactory> MakeBlockMaxFactory() {
  920. return std::make_unique<TBlockMinMaxFactory<false>>();
  921. }
  922. }
  923. }