#include "mkql_block_agg_minmax.h" #include "mkql_block_agg_state_helper.h" #include #include #include #include #include #include #include #include #include #include #include #include namespace NKikimr { namespace NMiniKQL { namespace { template inline bool AggLess(T a, T b) { if constexpr (std::is_floating_point::value) { if (std::isunordered(a, b)) { // biggest fp value in agg ordering is NaN return std::isnan(a) < std::isnan(b); } } return a < b; } template inline T UpdateMinMax(T x, T y) { if constexpr (IsMin) { return AggLess(x, y) ? x : y; } else { return AggLess(y, x) ? x : y; } } template inline void UpdateMinMax(TMaybe& state, bool& stateUpdated, T value) { if constexpr (IsMin) { if (!state || AggLess(value, *state)) { state = value; stateUpdated = true; } } else { if (!state || AggLess(*state, value)) { state = value; stateUpdated = true; } } } template inline void UpdateMinMax(NYql::NUdf::IBlockItemComparator& comparator, TBlockItem& state, bool& stateUpdated, TBlockItem value) { if constexpr (IsMin) { if (!state || comparator.Less(value, state)) { state = value; stateUpdated = true; } } else { if (!state || comparator.Less(state, value)) { state = value; stateUpdated = true; } } } template class TMinMaxBlockStringAggregator; template class TMinMaxBlockFixedAggregator; template class TMinMaxBlockGenericAggregator; template struct TState; template constexpr TIn InitialStateValue() { if constexpr (std::is_floating_point::value) { static_assert(std::numeric_limits::has_infinity && std::numeric_limits::has_quiet_NaN); if constexpr (IsMin) { // biggest fp value in agg ordering is NaN return std::numeric_limits::quiet_NaN(); } else { return -std::numeric_limits::infinity(); } } else if constexpr (std::is_same_v) { if constexpr (IsMin) { return NYql::NDecimal::Nan(); } else { return -NYql::NDecimal::Inf(); } } else if constexpr (std::is_arithmetic::value) { if constexpr (IsMin) { return std::numeric_limits::max(); } else { return std::numeric_limits::min(); } } else { static_assert(std::is_arithmetic::value); } } template struct TState { TIn Value = InitialStateValue(); ui8 IsValid = 0; }; template struct TState { TIn Value = InitialStateValue(); }; using TGenericState = NUdf::TUnboxedValuePod; template class TColumnBuilder : public IAggColumnBuilder { using TBuilder = typename NYql::NUdf::TFixedSizeArrayBuilder; using TStateType = TState; public: TColumnBuilder(ui64 size, TType* type, TComputationContext& ctx) : Builder_(type, TTypeInfoHelper(), ctx.ArrowMemoryPool, size) , Ctx_(ctx) { } void Add(const void* state) final { auto typedState = MakeStateWrapper(state); if constexpr (IsNullable) { if (!typedState->IsValid) { Builder_.Add(TBlockItem()); return; } } Builder_.Add(TBlockItem(typedState->Value)); } NUdf::TUnboxedValue Build() final { return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true)); } private: TBuilder Builder_; TComputationContext& Ctx_; }; class TGenericColumnBuilder : public IAggColumnBuilder { public: TGenericColumnBuilder(ui64 size, TType* columnType, TComputationContext& ctx) : Builder_(MakeArrayBuilder(TTypeInfoHelper(), columnType, ctx.ArrowMemoryPool, size, &ctx.Builder->GetPgBuilder())) , Ctx_(ctx) { } void Add(const void* state) final { Builder_->Add(*static_cast(state)); } NUdf::TUnboxedValue Build() final { return Ctx_.HolderFactory.CreateArrowBlock(Builder_->Build(true)); } private: const std::unique_ptr Builder_; TComputationContext& Ctx_; }; template void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row, IBlockReader& reader, IBlockItemConverter& converter, NYql::NUdf::IBlockItemComparator& comparator, TComputationContext& ctx) { TBlockItem stateItem; bool stateChanged = false; if (datum.is_scalar()) { if (datum.scalar()->is_valid) { stateItem = reader.GetScalarItem(*datum.scalar()); stateChanged = true; } } else { if (*typedState) { stateItem = converter.MakeItem(*typedState); } const auto& array = datum.array(); TBlockItem curr = reader.GetItem(*array, row); if (curr) { UpdateMinMax(comparator, stateItem, stateChanged, curr); } } if (stateChanged) { typedState->DeleteUnreferenced(); *typedState = converter.MakeValue(stateItem, ctx.HolderFactory); } } template class TMinMaxBlockGenericAggregator : public TCombineAllTag::TBase { public: using TBase = TCombineAllTag::TBase; TMinMaxBlockGenericAggregator(TType* type, std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TGenericState), filterColumn, ctx) , ArgColumn_(argColumn) , ReaderOne_(MakeBlockReader(TTypeInfoHelper(), type)) , ReaderTwo_(MakeBlockReader(TTypeInfoHelper(), type)) , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder())) , Compare_(TBlockTypeHelper().MakeComparator(type)) { } void InitState(void* state) final { new(state) TGenericState(); } void DestroyState(void* state) noexcept final { auto typedState = static_cast(state); typedState->DeleteUnreferenced(); *typedState = TGenericState(); } void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional filtered) final { TGenericState& typedState = *static_cast(state); Y_UNUSED(batchLength); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); IBlockReader* currReader = ReaderOne_.get(); IBlockReader* stateReader = ReaderTwo_.get(); TBlockItem stateItem; bool stateChanged = false; if (datum.is_scalar()) { if (datum.scalar()->is_valid) { stateItem = currReader->GetScalarItem(*datum.scalar()); stateChanged = true; } } else { if (typedState) { stateItem = Converter_->MakeItem(typedState); } const auto& array = datum.array(); auto len = array->length; const ui8* filterBitmap = nullptr; if (filtered) { const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum(); const auto& filterArray = filterDatum.array(); MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column"); filterBitmap = filterArray->template GetValues(1); } auto& comparator = *Compare_; for (auto i = 0; i < len; ++i) { TBlockItem curr = currReader->GetItem(*array, i); if (curr && (!filterBitmap || filterBitmap[i])) { bool changed = false; UpdateMinMax(comparator, stateItem, changed, curr); if (changed) { std::swap(currReader, stateReader); stateChanged = true; } } } } if (stateChanged) { typedState.DeleteUnreferenced(); typedState = Converter_->MakeValue(stateItem, Ctx_.HolderFactory); } } NUdf::TUnboxedValue FinishOne(const void *state) final { auto typedState = *static_cast(state); return typedState; } private: const ui32 ArgColumn_; const std::unique_ptr ReaderOne_; const std::unique_ptr ReaderTwo_; const std::unique_ptr Converter_; const NYql::NUdf::IBlockItemComparator::TPtr Compare_; }; template class TMinMaxBlockGenericAggregator : public TCombineKeysTag::TBase { public: using TBase = TCombineKeysTag::TBase; TMinMaxBlockGenericAggregator(TType* type, std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TGenericState), filterColumn, ctx) , ArgColumn_(argColumn) , Type_(type) , Reader_(MakeBlockReader(TTypeInfoHelper(), type)) , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder())) , Compare_(TBlockTypeHelper().MakeComparator(type)) { } void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TGenericState(); UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { auto typedState = static_cast(state); typedState->DeleteUnreferenced(); *typedState = TGenericState(); } void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = static_cast(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_); } std::unique_ptr MakeStateBuilder(ui64 size) final { return std::make_unique(size, Type_, Ctx_); } private: const ui32 ArgColumn_; TType* const Type_; const std::unique_ptr Reader_; const std::unique_ptr Converter_; const NYql::NUdf::IBlockItemComparator::TPtr Compare_; }; template class TMinMaxBlockGenericAggregator : public TFinalizeKeysTag::TBase { public: using TBase = TFinalizeKeysTag::TBase; TMinMaxBlockGenericAggregator(TType* type, std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TGenericState), filterColumn, ctx) , ArgColumn_(argColumn) , Type_(type) , Reader_(MakeBlockReader(TTypeInfoHelper(), type)) , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type, ctx.Builder->GetPgBuilder())) , Compare_(TBlockTypeHelper().MakeComparator(type)) { } void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TGenericState(); UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { auto typedState = static_cast(state); typedState->DeleteUnreferenced(); *typedState = TGenericState(); } void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = static_cast(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState(typedState, datum, row, *Reader_, *Converter_, *Compare_, Ctx_); } std::unique_ptr MakeResultBuilder(ui64 size) final { return std::make_unique(size, Type_, Ctx_); } private: const ui32 ArgColumn_; TType* const Type_; const std::unique_ptr Reader_; const std::unique_ptr Converter_; const NYql::NUdf::IBlockItemComparator::TPtr Compare_; }; template void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row) { using TOffset = typename TPrimitiveDataType::TResult::offset_type;; TMaybe currentState; if (*typedState) { currentState = typedState->AsStringRef(); } bool stateUpdated = false; if (datum.is_scalar()) { if (datum.scalar()->is_valid) { auto buffer = arrow::internal::checked_cast(*datum.scalar()).value; const char* data = reinterpret_cast(buffer->data()); auto value = NUdf::TStringRef(data, buffer->size()); UpdateMinMax(currentState, stateUpdated, value); } } else { const auto& array = datum.array(); const TOffset* offsets = array->GetValues(1); const char* data = array->GetValues(2, 0); if (array->GetNullCount() == 0) { auto value = NUdf::TStringRef(data + offsets[row], offsets[row + 1] - offsets[row]); UpdateMinMax(currentState, stateUpdated, value); } else { auto nullBitmapPtr = array->GetValues(0, 0); ui64 fullIndex = row + array->offset; if ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) { auto value = NUdf::TStringRef(data + offsets[row], offsets[row + 1] - offsets[row]); UpdateMinMax(currentState, stateUpdated, value); } } } if (stateUpdated) { auto newState = MakeString(*currentState); typedState->DeleteUnreferenced(); *typedState = std::move(newState); } } template class TMinMaxBlockStringAggregator : public TCombineAllTag::TBase { public: using TBase = TCombineAllTag::TBase; using TOffset = typename TPrimitiveDataType::TResult::offset_type; TMinMaxBlockStringAggregator(TType* type, std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TGenericState), filterColumn, ctx) , ArgColumn_(argColumn) { Y_UNUSED(type); } void InitState(void* state) final { new(state) TGenericState(); } void DestroyState(void* state) noexcept final { auto typedState = static_cast(state); typedState->DeleteUnreferenced(); *typedState = TGenericState(); } void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional filtered) final { TGenericState& typedState = *static_cast(state); Y_UNUSED(batchLength); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); TMaybe currentState; if (typedState) { currentState = typedState.AsStringRef(); } bool stateUpdated = false; if (datum.is_scalar()) { if (datum.scalar()->is_valid) { auto buffer = arrow::internal::checked_cast(*datum.scalar()).value; const char* data = reinterpret_cast(buffer->data()); auto value = NUdf::TStringRef(data, buffer->size()); UpdateMinMax(currentState, stateUpdated, value); } } else { const auto& array = datum.array(); auto len = array->length; auto count = len - array->GetNullCount(); if (!count) { return; } const TOffset* offsets = array->GetValues(1); const char* data = array->GetValues(2, 0); if (!filtered) { if (array->GetNullCount() == 0) { for (int64_t i = 0; i < len; ++i) { NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]); UpdateMinMax(currentState, stateUpdated, value); } } else { auto nullBitmapPtr = array->GetValues(0, 0); for (int64_t i = 0; i < len; ++i) { ui64 fullIndex = i + array->offset; if ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) { NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]); UpdateMinMax(currentState, stateUpdated, value); } } } } else { const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum(); const auto& filterArray = filterDatum.array(); MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column"); const ui8* filterBitmap = filterArray->template GetValues(1); if (array->GetNullCount() == 0) { for (int64_t i = 0; i < len; ++i) { if (filterBitmap[i]) { NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]); UpdateMinMax(currentState, stateUpdated, value); } } } else { auto nullBitmapPtr = array->GetValues(0, 0); for (int64_t i = 0; i < len; ++i) { ui64 fullIndex = i + array->offset; if (filterBitmap[i] && ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1)) { NUdf::TStringRef value(data + offsets[i], offsets[i + 1] - offsets[i]); UpdateMinMax(currentState, stateUpdated, value); } } } } } if (stateUpdated) { auto newState = MakeString(*currentState); typedState.DeleteUnreferenced(); typedState = std::move(newState); } } NUdf::TUnboxedValue FinishOne(const void* state) final { auto typedState = *static_cast(state); return typedState; } private: const ui32 ArgColumn_; }; template class TMinMaxBlockStringAggregator : public TCombineKeysTag::TBase { public: using TBase = TCombineKeysTag::TBase; TMinMaxBlockStringAggregator(TType* type, std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TGenericState), filterColumn, ctx) , ArgColumn_(argColumn) , Type_(type) { } void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TGenericState(); UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { auto typedState = static_cast(state); typedState->DeleteUnreferenced(); *typedState = TGenericState(); } void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = static_cast(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState(typedState, datum, row); } std::unique_ptr MakeStateBuilder(ui64 size) final { return std::make_unique(size, Type_, Ctx_); } private: const ui32 ArgColumn_; TType* const Type_; }; template class TMinMaxBlockStringAggregator : public TFinalizeKeysTag::TBase { public: using TBase = TFinalizeKeysTag::TBase; TMinMaxBlockStringAggregator(TType* type, std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TGenericState), filterColumn, ctx) , ArgColumn_(argColumn) , Type_(type) { } void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TGenericState(); UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { auto typedState = static_cast(state); typedState->DeleteUnreferenced(); *typedState = TGenericState(); } void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = static_cast(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState(typedState, datum, row); } std::unique_ptr MakeResultBuilder(ui64 size) final { return std::make_unique(size, Type_, Ctx_); } private: const ui32 ArgColumn_; TType* const Type_; }; template class TMinMaxBlockFixedAggregator : public TCombineAllTag::TBase { public: using TBase = TCombineAllTag::TBase; using TStateType = TState; using TInScalar = typename TPrimitiveDataType::TScalarResult; TMinMaxBlockFixedAggregator(TType* type, std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TStateType), filterColumn, ctx) , ArgColumn_(argColumn) { Y_UNUSED(type); } void InitState(void* ptr) final { TStateType state; WriteUnaligned(ptr, state); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional filtered) final { auto typedState = MakeStateWrapper(state); Y_UNUSED(batchLength); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if constexpr (IsScalar) { Y_ENSURE(datum.is_scalar()); if constexpr (IsNullable) { if (datum.scalar()->is_valid) { typedState->Value = TIn(Cast(datum.scalar_as().value)); typedState->IsValid = 1; } } else { typedState->Value = TIn(Cast(datum.scalar_as().value)); } } else { const auto& array = datum.array(); auto ptr = array->GetValues(1); auto len = array->length; auto nullCount = IsNullable ? array->GetNullCount() : 0; auto count = len - nullCount; if (!count) { return; } if (!filtered) { TIn value = typedState->Value; if constexpr (IsNullable) { typedState->IsValid = 1; } if (IsNullable && nullCount != 0) { auto nullBitmapPtr = array->GetValues(0, 0); for (int64_t i = 0; i < len; ++i) { ui64 fullIndex = i + array->offset; ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1; value = UpdateMinMax(value, SelectArg(notNull, ptr[i], value)); } } else { for (int64_t i = 0; i < len; ++i) { value = UpdateMinMax(value, ptr[i]); } } typedState->Value = value; } else { const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum(); const auto& filterArray = filterDatum.array(); MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column"); const ui8* filterBitmap = filterArray->template GetValues(1); TIn value = typedState->Value; ui64 validCount = 0; if (IsNullable && nullCount != 0) { auto nullBitmapPtr = array->GetValues(0, 0); for (int64_t i = 0; i < len; ++i) { ui64 fullIndex = i + array->offset; ui8 notNullAndFiltered = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) & filterBitmap[i]; value = UpdateMinMax(value, SelectArg(notNullAndFiltered, ptr[i], value)); validCount += notNullAndFiltered; } } else { for (int64_t i = 0; i < len; ++i) { ui8 filtered = filterBitmap[i]; value = UpdateMinMax(value, SelectArg(filtered, ptr[i], value)); validCount += filtered; } } if constexpr (IsNullable) { typedState->IsValid |= validCount ? 1 : 0; } typedState->Value = value; } } } NUdf::TUnboxedValue FinishOne(const void* state) final { auto typedState = MakeStateWrapper(state); if constexpr (IsNullable) { if (!typedState->IsValid) { return NUdf::TUnboxedValuePod(); } } return NUdf::TUnboxedValuePod(typedState->Value); } private: const ui32 ArgColumn_; }; template static void PushValueToState(TState* typedState, const arrow::Datum& datum, ui64 row) { using TInScalar = typename TPrimitiveDataType::TScalarResult; if constexpr (IsScalar) { Y_ENSURE(datum.is_scalar()); if constexpr (IsNullable) { if (datum.scalar()->is_valid) { typedState->Value = TIn(Cast(datum.scalar_as().value)); typedState->IsValid = 1; } } else { typedState->Value = TIn(Cast(datum.scalar_as().value)); } } else { const auto &array = datum.array(); auto ptr = array->GetValues(1); if constexpr (IsNullable) { if (array->GetNullCount() == 0) { typedState->IsValid = 1; typedState->Value = UpdateMinMax(typedState->Value, ptr[row]); } else { auto nullBitmapPtr = array->GetValues(0, 0); ui64 fullIndex = row + array->offset; ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1; typedState->Value = UpdateMinMax(typedState->Value, SelectArg(notNull, ptr[row], typedState->Value)); typedState->IsValid |= notNull; } } else { typedState->Value = UpdateMinMax(typedState->Value, ptr[row]); } } } template class TMinMaxBlockFixedAggregator : public TCombineKeysTag::TBase { public: using TBase = TCombineKeysTag::TBase; using TStateType = TState; TMinMaxBlockFixedAggregator(TType* type, std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TStateType), filterColumn, ctx) , ArgColumn_(argColumn) , Type_(type) { } void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { TStateType st; WriteUnaligned(state, st); UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = MakeStateWrapper(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState(typedState.Get(), datum, row); } std::unique_ptr MakeStateBuilder(ui64 size) final { return std::make_unique>(size, Type_, Ctx_); } private: const ui32 ArgColumn_; const std::shared_ptr BuilderDataType_; TType* const Type_; }; template class TMinMaxBlockFixedAggregator : public TFinalizeKeysTag::TBase { public: using TBase = TFinalizeKeysTag::TBase; using TStateType = TState; TMinMaxBlockFixedAggregator(TType* type, std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TStateType), filterColumn, ctx) , ArgColumn_(argColumn) , Type_(type) { } void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { TStateType st; WriteUnaligned(state, st); UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = MakeStateWrapper(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState(typedState.Get(), datum, row); } std::unique_ptr MakeResultBuilder(ui64 size) final { return std::make_unique>(size, Type_, Ctx_); } private: const ui32 ArgColumn_; TType* const Type_; }; template class TPreparedMinMaxBlockStringAggregator : public TTag::TPreparedAggregator { public: using TBase = typename TTag::TPreparedAggregator; TPreparedMinMaxBlockStringAggregator(TType* type, std::optional filterColumn, ui32 argColumn) : TBase(sizeof(TGenericState)) , Type_(type) , FilterColumn_(filterColumn) , ArgColumn_(argColumn) {} std::unique_ptr Make(TComputationContext& ctx) const final { return std::make_unique>(Type_, FilterColumn_, ArgColumn_, ctx); } private: TType* const Type_; const std::optional FilterColumn_; const ui32 ArgColumn_; }; template class TPreparedMinMaxBlockFixedAggregator : public TTag::TPreparedAggregator { public: using TBase = typename TTag::TPreparedAggregator; using TStateType = TState; TPreparedMinMaxBlockFixedAggregator(TType* type, std::optional filterColumn, ui32 argColumn) : TBase(sizeof(TStateType)) , Type_(type) , FilterColumn_(filterColumn) , ArgColumn_(argColumn) {} std::unique_ptr Make(TComputationContext& ctx) const final { return std::make_unique>(Type_, FilterColumn_, ArgColumn_, ctx); } private: TType* const Type_; const std::optional FilterColumn_; const ui32 ArgColumn_; }; template class TPreparedMinMaxBlockGenericAggregator : public TTag::TPreparedAggregator { public: using TBase = typename TTag::TPreparedAggregator; TPreparedMinMaxBlockGenericAggregator(TType* type, std::optional filterColumn, ui32 argColumn) : TBase(sizeof(TGenericState)) , Type_(type) , FilterColumn_(filterColumn) , ArgColumn_(argColumn) {} std::unique_ptr Make(TComputationContext& ctx) const final { return std::make_unique>(Type_, FilterColumn_, ArgColumn_, ctx); } private: TType* const Type_; const std::optional FilterColumn_; const ui32 ArgColumn_; }; template std::unique_ptr PrepareMinMaxFixed(TType* type, bool isOptional, bool isScalar, std::optional filterColumn, ui32 argColumn) { if (isScalar) { if (isOptional) { return std::make_unique>(type, filterColumn, argColumn); } return std::make_unique>(type, filterColumn, argColumn); } if (isOptional) { return std::make_unique>(type, filterColumn, argColumn); } return std::make_unique>(type, filterColumn, argColumn); } template std::unique_ptr PrepareMinMax(TTupleType* tupleType, std::optional filterColumn, ui32 argColumn) { auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn)); const bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar; auto argType = blockType->GetItemType(); bool isOptional; auto unpacked = UnpackOptional(argType, isOptional); if (!unpacked->IsData()) { return std::make_unique>(argType, filterColumn, argColumn); } auto dataType = AS_TYPE(TDataType, unpacked); const auto slot = *dataType->GetDataSlot(); if (slot == NUdf::EDataSlot::String) { using TStringType = char*; return std::make_unique>(argType, filterColumn, argColumn); } else if (slot == NUdf::EDataSlot::Utf8) { using TStringType = NUdf::TUtf8; return std::make_unique>(argType, filterColumn, argColumn); } switch (slot) { case NUdf::EDataSlot::Int8: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Bool: case NUdf::EDataSlot::Uint8: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Int16: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Uint16: case NUdf::EDataSlot::Date: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Int32: case NUdf::EDataSlot::Date32: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Uint32: case NUdf::EDataSlot::Datetime: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Int64: case NUdf::EDataSlot::Interval: case NUdf::EDataSlot::Interval64: case NUdf::EDataSlot::Timestamp64: case NUdf::EDataSlot::Datetime64: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Uint64: case NUdf::EDataSlot::Timestamp: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Float: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Double: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Decimal: return PrepareMinMaxFixed(dataType, isOptional, isScalar, filterColumn, argColumn); default: throw yexception() << "Unsupported MIN/MAX input type"; } } template class TBlockMinMaxFactory : public IBlockAggregatorFactory { public: std::unique_ptr PrepareCombineAll( TTupleType* tupleType, std::optional filterColumn, const std::vector& argsColumns, const TTypeEnvironment& env) const final { Y_UNUSED(env); return PrepareMinMax(tupleType, filterColumn, argsColumns[0]); } std::unique_ptr PrepareCombineKeys( TTupleType* tupleType, const std::vector& argsColumns, const TTypeEnvironment& env) const final { Y_UNUSED(env); return PrepareMinMax(tupleType, std::optional(), argsColumns[0]); } std::unique_ptr PrepareFinalizeKeys( TTupleType* tupleType, const std::vector& argsColumns, const TTypeEnvironment& env, TType* returnType, ui32 hint) const final { Y_UNUSED(env); Y_UNUSED(returnType); Y_UNUSED(hint); return PrepareMinMax(tupleType, std::optional(), argsColumns[0]); } }; } // namespace std::unique_ptr MakeBlockMinFactory() { return std::make_unique>(); } std::unique_ptr MakeBlockMaxFactory() { return std::make_unique>(); } } }