mkql_block_agg_minmax.cpp 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028
  1. #include "mkql_block_agg_minmax.h"
  2. #include "mkql_block_agg_state_helper.h"
  3. #include <yql/essentials/minikql/mkql_node_cast.h>
  4. #include <yql/essentials/minikql/mkql_node_builder.h>
  5. #include <yql/essentials/minikql/mkql_string_util.h>
  6. #include <yql/essentials/minikql/computation/mkql_block_builder.h>
  7. #include <yql/essentials/minikql/computation/mkql_block_reader.h>
  8. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  9. #include <yql/essentials/minikql/arrow/arrow_defs.h>
  10. #include <yql/essentials/minikql/arrow/arrow_util.h>
  11. #include <yql/essentials/minikql/arrow/mkql_bit_utils.h>
  12. #include <yql/essentials/public/udf/arrow/block_item_comparator.h>
  13. #include <arrow/scalar.h>
  14. #include <arrow/array/builder_primitive.h>
  15. namespace NKikimr {
  16. namespace NMiniKQL {
  17. namespace {
  18. template<typename T>
  19. inline bool AggLess(T a, T b) {
  20. if constexpr (std::is_floating_point<T>::value) {
  21. if (std::isunordered(a, b)) {
  22. // biggest fp value in agg ordering is NaN
  23. return std::isnan(a) < std::isnan(b);
  24. }
  25. }
  26. return a < b;
  27. }
  28. template <bool IsMin, typename T>
  29. inline T UpdateMinMax(T x, T y) {
  30. if constexpr (IsMin) {
  31. return AggLess(x, y) ? x : y;
  32. } else {
  33. return AggLess(y, x) ? x : y;
  34. }
  35. }
  36. template<bool IsMin, typename T>
  37. inline void UpdateMinMax(TMaybe<T>& state, bool& stateUpdated, T value) {
  38. if constexpr (IsMin) {
  39. if (!state || AggLess(value, *state)) {
  40. state = value;
  41. stateUpdated = true;
  42. }
  43. } else {
  44. if (!state || AggLess(*state, value)) {
  45. state = value;
  46. stateUpdated = true;
  47. }
  48. }
  49. }
  50. template<bool IsMin>
  51. inline void UpdateMinMax(NYql::NUdf::IBlockItemComparator& comparator, TBlockItem& state, bool& stateUpdated, TBlockItem value) {
  52. if constexpr (IsMin) {
  53. if (!state || comparator.Less(value, state)) {
  54. state = value;
  55. stateUpdated = true;
  56. }
  57. } else {
  58. if (!state || comparator.Less(state, value)) {
  59. state = value;
  60. stateUpdated = true;
  61. }
  62. }
  63. }
  64. template<typename TTag, typename TString, bool IsMin>
  65. class TMinMaxBlockStringAggregator;
  66. template<typename TTag, bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  67. class TMinMaxBlockFixedAggregator;
  68. template<typename TTag, bool IsMin>
  69. class TMinMaxBlockGenericAggregator;
  70. template <bool IsNullable, typename TIn, bool IsMin>
  71. struct TState;
  72. template<typename TIn, bool IsMin>
  73. constexpr TIn InitialStateValue() {
  74. if constexpr (std::is_floating_point<TIn>::value) {
  75. static_assert(std::numeric_limits<TIn>::has_infinity && std::numeric_limits<TIn>::has_quiet_NaN);
  76. if constexpr (IsMin) {
  77. // biggest fp value in agg ordering is NaN
  78. return std::numeric_limits<TIn>::quiet_NaN();
  79. } else {
  80. return -std::numeric_limits<TIn>::infinity();
  81. }
  82. } else if constexpr (std::is_same_v<TIn, NYql::NDecimal::TInt128>) {
  83. if constexpr (IsMin) {
  84. return NYql::NDecimal::Nan();
  85. } else {
  86. return -NYql::NDecimal::Inf();
  87. }
  88. } else if constexpr (std::is_arithmetic<TIn>::value) {
  89. if constexpr (IsMin) {
  90. return std::numeric_limits<TIn>::max();
  91. } else {
  92. return std::numeric_limits<TIn>::min();
  93. }
  94. } else {
  95. static_assert(std::is_arithmetic<TIn>::value);
  96. }
  97. }
  98. template <typename TIn, bool IsMin>
  99. struct TState<true, TIn, IsMin> {
  100. TIn Value = InitialStateValue<TIn, IsMin>();
  101. ui8 IsValid = 0;
  102. };
  103. template <typename TIn, bool IsMin>
  104. struct TState<false, TIn, IsMin> {
  105. TIn Value = InitialStateValue<TIn, IsMin>();
  106. };
  107. using TGenericState = NUdf::TUnboxedValuePod;
  108. template <bool IsNullable, typename TIn, bool IsMin>
  109. class TColumnBuilder : public IAggColumnBuilder {
  110. using TBuilder = typename NYql::NUdf::TFixedSizeArrayBuilder<TIn, IsNullable>;
  111. using TStateType = TState<IsNullable, TIn, IsMin>;
  112. public:
  113. TColumnBuilder(ui64 size, TType* type, TComputationContext& ctx)
  114. : Builder_(type, TTypeInfoHelper(), ctx.ArrowMemoryPool, size)
  115. , Ctx_(ctx)
  116. {
  117. }
  118. void Add(const void* state) final {
  119. auto typedState = MakeStateWrapper<TStateType>(state);
  120. if constexpr (IsNullable) {
  121. if (!typedState->IsValid) {
  122. Builder_.Add(TBlockItem());
  123. return;
  124. }
  125. }
  126. Builder_.Add(TBlockItem(typedState->Value));
  127. }
  128. NUdf::TUnboxedValue Build() final {
  129. return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true));
  130. }
  131. private:
  132. TBuilder Builder_;
  133. TComputationContext& Ctx_;
  134. };
  135. class TGenericColumnBuilder : public IAggColumnBuilder {
  136. public:
  137. TGenericColumnBuilder(ui64 size, TType* columnType, TComputationContext& ctx)
  138. : Builder_(MakeArrayBuilder(TTypeInfoHelper(), columnType, ctx.ArrowMemoryPool, size, &ctx.Builder->GetPgBuilder()))
  139. , Ctx_(ctx)
  140. {
  141. }
  142. void Add(const void* state) final {
  143. Builder_->Add(*static_cast<const TGenericState*>(state));
  144. }
  145. NUdf::TUnboxedValue Build() final {
  146. return Ctx_.HolderFactory.CreateArrowBlock(Builder_->Build(true));
  147. }
  148. private:
  149. const std::unique_ptr<IArrayBuilder> Builder_;
  150. TComputationContext& Ctx_;
  151. };
  152. template <bool IsMin>
  153. void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row, IBlockReader& reader,
  154. IBlockItemConverter& converter, NYql::NUdf::IBlockItemComparator& comparator, TComputationContext& ctx)
  155. {
  156. TBlockItem stateItem;
  157. bool stateChanged = false;
  158. if (datum.is_scalar()) {
  159. if (datum.scalar()->is_valid) {
  160. stateItem = reader.GetScalarItem(*datum.scalar());
  161. stateChanged = true;
  162. }
  163. } else {
  164. if (*typedState) {
  165. stateItem = converter.MakeItem(*typedState);
  166. }
  167. const auto& array = datum.array();
  168. TBlockItem curr = reader.GetItem(*array, row);
  169. if (curr) {
  170. UpdateMinMax<IsMin>(comparator, stateItem, stateChanged, curr);
  171. }
  172. }
  173. if (stateChanged) {
  174. typedState->DeleteUnreferenced();
  175. *typedState = converter.MakeValue(stateItem, ctx.HolderFactory);
  176. }
  177. }
  178. template<bool IsMin>
  179. class TMinMaxBlockGenericAggregator<TCombineAllTag, IsMin> : public TCombineAllTag::TBase {
  180. public:
  181. using TBase = TCombineAllTag::TBase;
  182. TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  183. : TBase(sizeof(TGenericState), filterColumn, ctx)
  184. , ArgColumn_(argColumn)
  185. , ReaderOne_(MakeBlockReader(TTypeInfoHelper(), type))
  186. , ReaderTwo_(MakeBlockReader(TTypeInfoHelper(), type))
  187. , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
  188. , Compare_(TBlockTypeHelper().MakeComparator(type))
  189. {
  190. }
  191. void InitState(void* state) final {
  192. new(state) TGenericState();
  193. }
  194. void DestroyState(void* state) noexcept final {
  195. auto typedState = static_cast<TGenericState*>(state);
  196. typedState->DeleteUnreferenced();
  197. *typedState = TGenericState();
  198. }
  199. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  200. TGenericState& typedState = *static_cast<TGenericState*>(state);
  201. Y_UNUSED(batchLength);
  202. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  203. IBlockReader* currReader = ReaderOne_.get();
  204. IBlockReader* stateReader = ReaderTwo_.get();
  205. TBlockItem stateItem;
  206. bool stateChanged = false;
  207. if (datum.is_scalar()) {
  208. if (datum.scalar()->is_valid) {
  209. stateItem = currReader->GetScalarItem(*datum.scalar());
  210. stateChanged = true;
  211. }
  212. } else {
  213. if (typedState) {
  214. stateItem = Converter_->MakeItem(typedState);
  215. }
  216. const auto& array = datum.array();
  217. auto len = array->length;
  218. const ui8* filterBitmap = nullptr;
  219. if (filtered) {
  220. const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
  221. const auto& filterArray = filterDatum.array();
  222. MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
  223. filterBitmap = filterArray->template GetValues<uint8_t>(1);
  224. }
  225. auto& comparator = *Compare_;
  226. for (auto i = 0; i < len; ++i) {
  227. TBlockItem curr = currReader->GetItem(*array, i);
  228. if (curr && (!filterBitmap || filterBitmap[i])) {
  229. bool changed = false;
  230. UpdateMinMax<IsMin>(comparator, stateItem, changed, curr);
  231. if (changed) {
  232. std::swap(currReader, stateReader);
  233. stateChanged = true;
  234. }
  235. }
  236. }
  237. }
  238. if (stateChanged) {
  239. typedState.DeleteUnreferenced();
  240. typedState = Converter_->MakeValue(stateItem, Ctx_.HolderFactory);
  241. }
  242. }
  243. NUdf::TUnboxedValue FinishOne(const void *state) final {
  244. auto typedState = *static_cast<const TGenericState *>(state);
  245. return typedState;
  246. }
  247. private:
  248. const ui32 ArgColumn_;
  249. const std::unique_ptr<IBlockReader> ReaderOne_;
  250. const std::unique_ptr<IBlockReader> ReaderTwo_;
  251. const std::unique_ptr<IBlockItemConverter> Converter_;
  252. const NYql::NUdf::IBlockItemComparator::TPtr Compare_;
  253. };
  254. template<bool IsMin>
  255. class TMinMaxBlockGenericAggregator<TCombineKeysTag, IsMin> : public TCombineKeysTag::TBase {
  256. public:
  257. using TBase = TCombineKeysTag::TBase;
  258. TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  259. : TBase(sizeof(TGenericState), filterColumn, ctx)
  260. , ArgColumn_(argColumn)
  261. , Type_(type)
  262. , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
  263. , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
  264. , Compare_(TBlockTypeHelper().MakeComparator(type))
  265. {
  266. }
  267. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  268. new(state) TGenericState();
  269. UpdateKey(state, batchNum, columns, row);
  270. }
  271. void DestroyState(void* state) noexcept final {
  272. auto typedState = static_cast<TGenericState*>(state);
  273. typedState->DeleteUnreferenced();
  274. *typedState = TGenericState();
  275. }
  276. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  277. Y_UNUSED(batchNum);
  278. auto typedState = static_cast<TGenericState*>(state);
  279. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  280. PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_);
  281. }
  282. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  283. return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
  284. }
  285. private:
  286. const ui32 ArgColumn_;
  287. TType* const Type_;
  288. const std::unique_ptr<IBlockReader> Reader_;
  289. const std::unique_ptr<IBlockItemConverter> Converter_;
  290. const NYql::NUdf::IBlockItemComparator::TPtr Compare_;
  291. };
  292. template<bool IsMin>
  293. class TMinMaxBlockGenericAggregator<TFinalizeKeysTag, IsMin> : public TFinalizeKeysTag::TBase {
  294. public:
  295. using TBase = TFinalizeKeysTag::TBase;
  296. TMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  297. : TBase(sizeof(TGenericState), filterColumn, ctx)
  298. , ArgColumn_(argColumn)
  299. , Type_(type)
  300. , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
  301. , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder()))
  302. , Compare_(TBlockTypeHelper().MakeComparator(type))
  303. {
  304. }
  305. void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  306. new(state) TGenericState();
  307. UpdateState(state, batchNum, columns, row);
  308. }
  309. void DestroyState(void* state) noexcept final {
  310. auto typedState = static_cast<TGenericState*>(state);
  311. typedState->DeleteUnreferenced();
  312. *typedState = TGenericState();
  313. }
  314. void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  315. Y_UNUSED(batchNum);
  316. auto typedState = static_cast<TGenericState*>(state);
  317. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  318. PushValueToState<IsMin>(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_);
  319. }
  320. std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
  321. return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
  322. }
  323. private:
  324. const ui32 ArgColumn_;
  325. TType* const Type_;
  326. const std::unique_ptr<IBlockReader> Reader_;
  327. const std::unique_ptr<IBlockItemConverter> Converter_;
  328. const NYql::NUdf::IBlockItemComparator::TPtr Compare_;
  329. };
  330. template <typename TStringType, bool IsMin>
  331. void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row) {
  332. using TOffset = typename TPrimitiveDataType<TStringType>::TResult::offset_type;;
  333. TMaybe<NUdf::TStringRef> currentState;
  334. if (*typedState) {
  335. currentState = typedState->AsStringRef();
  336. }
  337. bool stateUpdated = false;
  338. if (datum.is_scalar()) {
  339. if (datum.scalar()->is_valid) {
  340. auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(*datum.scalar()).value;
  341. const char* data = reinterpret_cast<const char*>(buffer->data());
  342. auto value = NUdf::TStringRef(data, buffer->size());
  343. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  344. }
  345. } else {
  346. const auto& array = datum.array();
  347. const TOffset* offsets = array->GetValues<TOffset>(1);
  348. const char* data = array->GetValues<char>(2, 0);
  349. if (array->GetNullCount() == 0) {
  350. auto value = NUdf::TStringRef(data + offsets[row], offsets[row + 1] - offsets[row]);
  351. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  352. } else {
  353. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  354. ui64 fullIndex = row + array->offset;
  355. if ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) {
  356. auto value = NUdf::TStringRef(data + offsets[row], offsets[row + 1] - offsets[row]);
  357. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  358. }
  359. }
  360. }
  361. if (stateUpdated) {
  362. auto newState = MakeString(*currentState);
  363. typedState->DeleteUnreferenced();
  364. *typedState = std::move(newState);
  365. }
  366. }
  367. template<typename TStringType, bool IsMin>
  368. class TMinMaxBlockStringAggregator<TCombineAllTag, TStringType, IsMin> : public TCombineAllTag::TBase {
  369. public:
  370. using TBase = TCombineAllTag::TBase;
  371. using TOffset = typename TPrimitiveDataType<TStringType>::TResult::offset_type;
  372. TMinMaxBlockStringAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  373. : TBase(sizeof(TGenericState), filterColumn, ctx)
  374. , ArgColumn_(argColumn)
  375. {
  376. Y_UNUSED(type);
  377. }
  378. void InitState(void* state) final {
  379. new(state) TGenericState();
  380. }
  381. void DestroyState(void* state) noexcept final {
  382. auto typedState = static_cast<TGenericState*>(state);
  383. typedState->DeleteUnreferenced();
  384. *typedState = TGenericState();
  385. }
  386. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  387. TGenericState& typedState = *static_cast<TGenericState*>(state);
  388. Y_UNUSED(batchLength);
  389. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  390. TMaybe<NUdf::TStringRef> currentState;
  391. if (typedState) {
  392. currentState = typedState.AsStringRef();
  393. }
  394. bool stateUpdated = false;
  395. if (datum.is_scalar()) {
  396. if (datum.scalar()->is_valid) {
  397. auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(*datum.scalar()).value;
  398. const char* data = reinterpret_cast<const char*>(buffer->data());
  399. auto value = NUdf::TStringRef(data, buffer->size());
  400. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  401. }
  402. } else {
  403. const auto& array = datum.array();
  404. auto len = array->length;
  405. auto count = len - array->GetNullCount();
  406. if (!count) {
  407. return;
  408. }
  409. const TOffset* offsets = array->GetValues<TOffset>(1);
  410. const char* data = array->GetValues<char>(2, 0);
  411. if (!filtered) {
  412. if (array->GetNullCount() == 0) {
  413. for (int64_t i = 0; i < len; ++i) {
  414. NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]);
  415. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  416. }
  417. } else {
  418. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  419. for (int64_t i = 0; i < len; ++i) {
  420. ui64 fullIndex = i + array->offset;
  421. if ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) {
  422. NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]);
  423. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  424. }
  425. }
  426. }
  427. } else {
  428. const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
  429. const auto& filterArray = filterDatum.array();
  430. MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
  431. const ui8* filterBitmap = filterArray->template GetValues<uint8_t>(1);
  432. if (array->GetNullCount() == 0) {
  433. for (int64_t i = 0; i < len; ++i) {
  434. if (filterBitmap[i]) {
  435. NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]);
  436. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  437. }
  438. }
  439. } else {
  440. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  441. for (int64_t i = 0; i < len; ++i) {
  442. ui64 fullIndex = i + array->offset;
  443. if (filterBitmap[i] && ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1)) {
  444. NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]);
  445. UpdateMinMax<IsMin>(currentState, stateUpdated, value);
  446. }
  447. }
  448. }
  449. }
  450. }
  451. if (stateUpdated) {
  452. auto newState = MakeString(*currentState);
  453. typedState.DeleteUnreferenced();
  454. typedState = std::move(newState);
  455. }
  456. }
  457. NUdf::TUnboxedValue FinishOne(const void* state) final {
  458. auto typedState = *static_cast<const TGenericState*>(state);
  459. return typedState;
  460. }
  461. private:
  462. const ui32 ArgColumn_;
  463. };
  464. template<typename TStringType, bool IsMin>
  465. class TMinMaxBlockStringAggregator<TCombineKeysTag, TStringType, IsMin> : public TCombineKeysTag::TBase {
  466. public:
  467. using TBase = TCombineKeysTag::TBase;
  468. TMinMaxBlockStringAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  469. : TBase(sizeof(TGenericState), filterColumn, ctx)
  470. , ArgColumn_(argColumn)
  471. , Type_(type)
  472. {
  473. }
  474. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  475. new(state) TGenericState();
  476. UpdateKey(state, batchNum, columns, row);
  477. }
  478. void DestroyState(void* state) noexcept final {
  479. auto typedState = static_cast<TGenericState*>(state);
  480. typedState->DeleteUnreferenced();
  481. *typedState = TGenericState();
  482. }
  483. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  484. Y_UNUSED(batchNum);
  485. auto typedState = static_cast<TGenericState*>(state);
  486. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  487. PushValueToState<TStringType, IsMin>(typedState, datum, row);
  488. }
  489. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  490. return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
  491. }
  492. private:
  493. const ui32 ArgColumn_;
  494. TType* const Type_;
  495. };
  496. template<typename TStringType, bool IsMin>
  497. class TMinMaxBlockStringAggregator<TFinalizeKeysTag, TStringType, IsMin> : public TFinalizeKeysTag::TBase {
  498. public:
  499. using TBase = TFinalizeKeysTag::TBase;
  500. TMinMaxBlockStringAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  501. : TBase(sizeof(TGenericState), filterColumn, ctx)
  502. , ArgColumn_(argColumn)
  503. , Type_(type)
  504. {
  505. }
  506. void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  507. new(state) TGenericState();
  508. UpdateState(state, batchNum, columns, row);
  509. }
  510. void DestroyState(void* state) noexcept final {
  511. auto typedState = static_cast<TGenericState*>(state);
  512. typedState->DeleteUnreferenced();
  513. *typedState = TGenericState();
  514. }
  515. void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  516. Y_UNUSED(batchNum);
  517. auto typedState = static_cast<TGenericState*>(state);
  518. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  519. PushValueToState<TStringType, IsMin>(typedState, datum, row);
  520. }
  521. std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
  522. return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
  523. }
  524. private:
  525. const ui32 ArgColumn_;
  526. TType* const Type_;
  527. };
  528. template <bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  529. class TMinMaxBlockFixedAggregator<TCombineAllTag, IsNullable, IsScalar, TIn, IsMin> : public TCombineAllTag::TBase {
  530. public:
  531. using TBase = TCombineAllTag::TBase;
  532. using TStateType = TState<IsNullable, TIn, IsMin>;
  533. using TInScalar = typename TPrimitiveDataType<TIn>::TScalarResult;
  534. TMinMaxBlockFixedAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  535. : TBase(sizeof(TStateType), filterColumn, ctx)
  536. , ArgColumn_(argColumn)
  537. {
  538. Y_UNUSED(type);
  539. }
  540. void InitState(void* ptr) final {
  541. TStateType state;
  542. WriteUnaligned<TStateType>(ptr, state);
  543. }
  544. void DestroyState(void* state) noexcept final {
  545. static_assert(std::is_trivially_destructible<TStateType>::value);
  546. Y_UNUSED(state);
  547. }
  548. void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
  549. auto typedState = MakeStateWrapper<TStateType>(state);
  550. Y_UNUSED(batchLength);
  551. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  552. if constexpr (IsScalar) {
  553. Y_ENSURE(datum.is_scalar());
  554. if constexpr (IsNullable) {
  555. if (datum.scalar()->is_valid) {
  556. typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
  557. typedState->IsValid = 1;
  558. }
  559. } else {
  560. typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
  561. }
  562. } else {
  563. const auto& array = datum.array();
  564. auto ptr = array->GetValues<TIn>(1);
  565. auto len = array->length;
  566. auto nullCount = IsNullable ? array->GetNullCount() : 0;
  567. auto count = len - nullCount;
  568. if (!count) {
  569. return;
  570. }
  571. if (!filtered) {
  572. TIn value = typedState->Value;
  573. if constexpr (IsNullable) {
  574. typedState->IsValid = 1;
  575. }
  576. if (IsNullable && nullCount != 0) {
  577. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  578. for (int64_t i = 0; i < len; ++i) {
  579. ui64 fullIndex = i + array->offset;
  580. ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
  581. value = UpdateMinMax<IsMin>(value, SelectArg(notNull, ptr[i], value));
  582. }
  583. } else {
  584. for (int64_t i = 0; i < len; ++i) {
  585. value = UpdateMinMax<IsMin>(value, ptr[i]);
  586. }
  587. }
  588. typedState->Value = value;
  589. } else {
  590. const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
  591. const auto& filterArray = filterDatum.array();
  592. MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
  593. const ui8* filterBitmap = filterArray->template GetValues<uint8_t>(1);
  594. TIn value = typedState->Value;
  595. ui64 validCount = 0;
  596. if (IsNullable && nullCount != 0) {
  597. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  598. for (int64_t i = 0; i < len; ++i) {
  599. ui64 fullIndex = i + array->offset;
  600. ui8 notNullAndFiltered = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) & filterBitmap[i];
  601. value = UpdateMinMax<IsMin>(value, SelectArg(notNullAndFiltered, ptr[i], value));
  602. validCount += notNullAndFiltered;
  603. }
  604. } else {
  605. for (int64_t i = 0; i < len; ++i) {
  606. ui8 filtered = filterBitmap[i];
  607. value = UpdateMinMax<IsMin>(value, SelectArg(filtered, ptr[i], value));
  608. validCount += filtered;
  609. }
  610. }
  611. if constexpr (IsNullable) {
  612. typedState->IsValid |= validCount ? 1 : 0;
  613. }
  614. typedState->Value = value;
  615. }
  616. }
  617. }
  618. NUdf::TUnboxedValue FinishOne(const void* state) final {
  619. auto typedState = MakeStateWrapper<TStateType>(state);
  620. if constexpr (IsNullable) {
  621. if (!typedState->IsValid) {
  622. return NUdf::TUnboxedValuePod();
  623. }
  624. }
  625. return NUdf::TUnboxedValuePod(typedState->Value);
  626. }
  627. private:
  628. const ui32 ArgColumn_;
  629. };
  630. template <bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  631. static void PushValueToState(TState<IsNullable, TIn, IsMin>* typedState, const arrow::Datum& datum, ui64 row) {
  632. using TInScalar = typename TPrimitiveDataType<TIn>::TScalarResult;
  633. if constexpr (IsScalar) {
  634. Y_ENSURE(datum.is_scalar());
  635. if constexpr (IsNullable) {
  636. if (datum.scalar()->is_valid) {
  637. typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
  638. typedState->IsValid = 1;
  639. }
  640. } else {
  641. typedState->Value = TIn(Cast(datum.scalar_as<TInScalar>().value));
  642. }
  643. } else {
  644. const auto &array = datum.array();
  645. auto ptr = array->GetValues<TIn>(1);
  646. if constexpr (IsNullable) {
  647. if (array->GetNullCount() == 0) {
  648. typedState->IsValid = 1;
  649. typedState->Value = UpdateMinMax<IsMin>(typedState->Value, ptr[row]);
  650. } else {
  651. auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
  652. ui64 fullIndex = row + array->offset;
  653. ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
  654. typedState->Value = UpdateMinMax<IsMin>(typedState->Value, SelectArg(notNull, ptr[row], typedState->Value));
  655. typedState->IsValid |= notNull;
  656. }
  657. } else {
  658. typedState->Value = UpdateMinMax<IsMin>(typedState->Value, ptr[row]);
  659. }
  660. }
  661. }
  662. template <bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  663. class TMinMaxBlockFixedAggregator<TCombineKeysTag, IsNullable, IsScalar, TIn, IsMin> : public TCombineKeysTag::TBase {
  664. public:
  665. using TBase = TCombineKeysTag::TBase;
  666. using TStateType = TState<IsNullable, TIn, IsMin>;
  667. TMinMaxBlockFixedAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  668. : TBase(sizeof(TStateType), filterColumn, ctx)
  669. , ArgColumn_(argColumn)
  670. , Type_(type)
  671. {
  672. }
  673. void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  674. TStateType st;
  675. WriteUnaligned<TStateType>(state, st);
  676. UpdateKey(state, batchNum, columns, row);
  677. }
  678. void DestroyState(void* state) noexcept final {
  679. static_assert(std::is_trivially_destructible<TStateType>::value);
  680. Y_UNUSED(state);
  681. }
  682. void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  683. Y_UNUSED(batchNum);
  684. auto typedState = MakeStateWrapper<TStateType>(state);
  685. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  686. PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState.Get(), datum, row);
  687. }
  688. std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
  689. return std::make_unique<TColumnBuilder<IsNullable, TIn, IsMin>>(size, Type_, Ctx_);
  690. }
  691. private:
  692. const ui32 ArgColumn_;
  693. const std::shared_ptr<arrow::DataType> BuilderDataType_;
  694. TType* const Type_;
  695. };
  696. template <bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  697. class TMinMaxBlockFixedAggregator<TFinalizeKeysTag, IsNullable, IsScalar, TIn, IsMin> : public TFinalizeKeysTag::TBase {
  698. public:
  699. using TBase = TFinalizeKeysTag::TBase;
  700. using TStateType = TState<IsNullable, TIn, IsMin>;
  701. TMinMaxBlockFixedAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
  702. : TBase(sizeof(TStateType), filterColumn, ctx)
  703. , ArgColumn_(argColumn)
  704. , Type_(type)
  705. {
  706. }
  707. void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  708. TStateType st;
  709. WriteUnaligned<TStateType>(state, st);
  710. UpdateState(state, batchNum, columns, row);
  711. }
  712. void DestroyState(void* state) noexcept final {
  713. static_assert(std::is_trivially_destructible<TStateType>::value);
  714. Y_UNUSED(state);
  715. }
  716. void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final {
  717. Y_UNUSED(batchNum);
  718. auto typedState = MakeStateWrapper<TStateType>(state);
  719. const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
  720. PushValueToState<IsNullable, IsScalar, TIn, IsMin>(typedState.Get(), datum, row);
  721. }
  722. std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
  723. return std::make_unique<TColumnBuilder<IsNullable, TIn, IsMin>>(size, Type_, Ctx_);
  724. }
  725. private:
  726. const ui32 ArgColumn_;
  727. TType* const Type_;
  728. };
  729. template<typename TTag, typename TStringType, bool IsMin>
  730. class TPreparedMinMaxBlockStringAggregator : public TTag::TPreparedAggregator {
  731. public:
  732. using TBase = typename TTag::TPreparedAggregator;
  733. TPreparedMinMaxBlockStringAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn)
  734. : TBase(sizeof(TGenericState))
  735. , Type_(type)
  736. , FilterColumn_(filterColumn)
  737. , ArgColumn_(argColumn)
  738. {}
  739. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  740. return std::make_unique<TMinMaxBlockStringAggregator<TTag, TStringType, IsMin>>(Type_, FilterColumn_, ArgColumn_, ctx);
  741. }
  742. private:
  743. TType* const Type_;
  744. const std::optional<ui32> FilterColumn_;
  745. const ui32 ArgColumn_;
  746. };
  747. template <typename TTag, bool IsNullable, bool IsScalar, typename TIn, bool IsMin>
  748. class TPreparedMinMaxBlockFixedAggregator : public TTag::TPreparedAggregator {
  749. public:
  750. using TBase = typename TTag::TPreparedAggregator;
  751. using TStateType = TState<IsNullable, TIn, IsMin>;
  752. TPreparedMinMaxBlockFixedAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn)
  753. : TBase(sizeof(TStateType))
  754. , Type_(type)
  755. , FilterColumn_(filterColumn)
  756. , ArgColumn_(argColumn)
  757. {}
  758. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  759. return std::make_unique<TMinMaxBlockFixedAggregator<TTag, IsNullable, IsScalar, TIn, IsMin>>(Type_, FilterColumn_, ArgColumn_, ctx);
  760. }
  761. private:
  762. TType* const Type_;
  763. const std::optional<ui32> FilterColumn_;
  764. const ui32 ArgColumn_;
  765. };
  766. template <typename TTag, bool IsMin>
  767. class TPreparedMinMaxBlockGenericAggregator : public TTag::TPreparedAggregator {
  768. public:
  769. using TBase = typename TTag::TPreparedAggregator;
  770. TPreparedMinMaxBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn)
  771. : TBase(sizeof(TGenericState))
  772. , Type_(type)
  773. , FilterColumn_(filterColumn)
  774. , ArgColumn_(argColumn)
  775. {}
  776. std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
  777. return std::make_unique<TMinMaxBlockGenericAggregator<TTag, IsMin>>(Type_, FilterColumn_, ArgColumn_, ctx);
  778. }
  779. private:
  780. TType* const Type_;
  781. const std::optional<ui32> FilterColumn_;
  782. const ui32 ArgColumn_;
  783. };
  784. template<typename TTag, typename TIn, bool IsMin>
  785. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMaxFixed(TType* type, bool isOptional, bool isScalar, std::optional<ui32> filterColumn, ui32 argColumn) {
  786. if (isScalar) {
  787. if (isOptional) {
  788. return std::make_unique<TPreparedMinMaxBlockFixedAggregator<TTag, true, true, TIn, IsMin>>(type, filterColumn, argColumn);
  789. }
  790. return std::make_unique<TPreparedMinMaxBlockFixedAggregator<TTag, false, true, TIn, IsMin>>(type, filterColumn, argColumn);
  791. }
  792. if (isOptional) {
  793. return std::make_unique<TPreparedMinMaxBlockFixedAggregator<TTag, true, false, TIn, IsMin>>(type, filterColumn, argColumn);
  794. }
  795. return std::make_unique<TPreparedMinMaxBlockFixedAggregator<TTag, false, false, TIn, IsMin>>(type, filterColumn, argColumn);
  796. }
  797. template <typename TTag, bool IsMin>
  798. std::unique_ptr<typename TTag::TPreparedAggregator> PrepareMinMax(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) {
  799. auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn));
  800. const bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
  801. auto argType = blockType->GetItemType();
  802. bool isOptional;
  803. auto unpacked = UnpackOptional(argType, isOptional);
  804. if (!unpacked->IsData()) {
  805. return std::make_unique<TPreparedMinMaxBlockGenericAggregator<TTag, IsMin>>(argType, filterColumn, argColumn);
  806. }
  807. auto dataType = AS_TYPE(TDataType, unpacked);
  808. const auto slot = *dataType->GetDataSlot();
  809. if (slot == NUdf::EDataSlot::String) {
  810. using TStringType = char*;
  811. return std::make_unique<TPreparedMinMaxBlockStringAggregator<TTag, TStringType, IsMin>>(argType, filterColumn, argColumn);
  812. } else if (slot == NUdf::EDataSlot::Utf8) {
  813. using TStringType = NUdf::TUtf8;
  814. return std::make_unique<TPreparedMinMaxBlockStringAggregator<TTag, TStringType, IsMin>>(argType, filterColumn, argColumn);
  815. }
  816. switch (slot) {
  817. case NUdf::EDataSlot::Int8:
  818. return PrepareMinMaxFixed<TTag, i8, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  819. case NUdf::EDataSlot::Bool:
  820. case NUdf::EDataSlot::Uint8:
  821. return PrepareMinMaxFixed<TTag, ui8, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  822. case NUdf::EDataSlot::Int16:
  823. return PrepareMinMaxFixed<TTag, i16, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  824. case NUdf::EDataSlot::Uint16:
  825. case NUdf::EDataSlot::Date:
  826. return PrepareMinMaxFixed<TTag, ui16, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  827. case NUdf::EDataSlot::Int32:
  828. case NUdf::EDataSlot::Date32:
  829. return PrepareMinMaxFixed<TTag, i32, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  830. case NUdf::EDataSlot::Uint32:
  831. case NUdf::EDataSlot::Datetime:
  832. return PrepareMinMaxFixed<TTag, ui32, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  833. case NUdf::EDataSlot::Int64:
  834. case NUdf::EDataSlot::Interval:
  835. case NUdf::EDataSlot::Interval64:
  836. case NUdf::EDataSlot::Timestamp64:
  837. case NUdf::EDataSlot::Datetime64:
  838. return PrepareMinMaxFixed<TTag, i64, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  839. case NUdf::EDataSlot::Uint64:
  840. case NUdf::EDataSlot::Timestamp:
  841. return PrepareMinMaxFixed<TTag, ui64, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  842. case NUdf::EDataSlot::Float:
  843. return PrepareMinMaxFixed<TTag, float, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  844. case NUdf::EDataSlot::Double:
  845. return PrepareMinMaxFixed<TTag, double, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  846. case NUdf::EDataSlot::Decimal:
  847. return PrepareMinMaxFixed<TTag, NYql::NDecimal::TInt128, IsMin>(dataType, isOptional, isScalar, filterColumn, argColumn);
  848. default:
  849. throw yexception() << "Unsupported MIN/MAX input type";
  850. }
  851. }
  852. template <bool IsMin>
  853. class TBlockMinMaxFactory : public IBlockAggregatorFactory {
  854. public:
  855. std::unique_ptr<TCombineAllTag::TPreparedAggregator> PrepareCombineAll(
  856. TTupleType* tupleType,
  857. std::optional<ui32> filterColumn,
  858. const std::vector<ui32>& argsColumns,
  859. const TTypeEnvironment& env) const final {
  860. Y_UNUSED(env);
  861. return PrepareMinMax<TCombineAllTag, IsMin>(tupleType, filterColumn, argsColumns[0]);
  862. }
  863. std::unique_ptr<TCombineKeysTag::TPreparedAggregator> PrepareCombineKeys(
  864. TTupleType* tupleType,
  865. const std::vector<ui32>& argsColumns,
  866. const TTypeEnvironment& env) const final {
  867. Y_UNUSED(env);
  868. return PrepareMinMax<TCombineKeysTag, IsMin>(tupleType, std::optional<ui32>(), argsColumns[0]);
  869. }
  870. std::unique_ptr<TFinalizeKeysTag::TPreparedAggregator> PrepareFinalizeKeys(
  871. TTupleType* tupleType,
  872. const std::vector<ui32>& argsColumns,
  873. const TTypeEnvironment& env,
  874. TType* returnType,
  875. ui32 hint) const final {
  876. Y_UNUSED(env);
  877. Y_UNUSED(returnType);
  878. Y_UNUSED(hint);
  879. return PrepareMinMax<TFinalizeKeysTag, IsMin>(tupleType, std::optional<ui32>(), argsColumns[0]);
  880. }
  881. };
  882. } // namespace
  883. std::unique_ptr<IBlockAggregatorFactory> MakeBlockMinFactory() {
  884. return std::make_unique<TBlockMinMaxFactory<true>>();
  885. }
  886. std::unique_ptr<IBlockAggregatorFactory> MakeBlockMaxFactory() {
  887. return std::make_unique<TBlockMinMaxFactory<false>>();
  888. }
  889. }
  890. }