#include "mkql_block_agg_sum.h" #include "mkql_block_agg_state_helper.h" #include #include #include #include #include #include #include #include #include #include namespace NKikimr { namespace NMiniKQL { namespace { template struct TSumState; template struct TSumState { typename TPrimitiveDataType::TArithmetic Sum_ = 0; ui8 IsValid_ = 0; }; template struct TSumState { typename TPrimitiveDataType::TArithmetic Sum_ = 0; }; template struct TAvgState { typename TPrimitiveDataType::TArithmetic Sum_ = 0; ui64 Count_ = 0; }; template class TSumColumnBuilder : public IAggColumnBuilder { public: using TStateType = TSumState; TSumColumnBuilder(ui64 size, TType* dataType, TComputationContext& ctx) : Builder_(dataType, TTypeInfoHelper(), ctx.ArrowMemoryPool, size) , Ctx_(ctx) { } void Add(const void* state) final { auto typedState = MakeStateWrapper(state); if constexpr (IsNullable) { if (!typedState->IsValid_) { Builder_.Add(TBlockItem()); return; } } Builder_.Add(TBlockItem(TSum(typedState->Sum_))); } NUdf::TUnboxedValue Build() final { return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true)); } private: NYql::NUdf::TFixedSizeArrayBuilder Builder_; TComputationContext& Ctx_; }; template class TAvgStateColumnBuilder : public IAggColumnBuilder { public: TAvgStateColumnBuilder(ui64 size, TType* outputType, TComputationContext& ctx) : Ctx_(ctx) , Builder_(MakeArrayBuilder(TTypeInfoHelper(), outputType, ctx.ArrowMemoryPool, size, &ctx.Builder->GetPgBuilder())) { } void Add(const void* state) final { auto typedState = MakeStateWrapper>(state); auto tupleBuilder = static_cast*>(Builder_.get()); if (typedState->Count_) { TBlockItem tupleItems[] = { TBlockItem(TOut(typedState->Sum_)), TBlockItem(typedState->Count_)} ; tupleBuilder->Add(TBlockItem(tupleItems)); } else { tupleBuilder->Add(TBlockItem()); } } NUdf::TUnboxedValue Build() final { return Ctx_.HolderFactory.CreateArrowBlock(Builder_->Build(true)); } private: TComputationContext& Ctx_; const std::unique_ptr Builder_; }; template class TAvgResultColumnBuilder : public IAggColumnBuilder { public: TAvgResultColumnBuilder(ui64 size, TComputationContext& ctx) : Ctx_(ctx) , Builder_(TTypeInfoHelper(), arrow::TypeTraits::TResult>::type_singleton(), ctx.ArrowMemoryPool, size) { } void Add(const void* state) final { auto typedState = MakeStateWrapper>(state); if (typedState->Count_) { Builder_.Add(TBlockItem(TOut(typedState->Sum_ / typedState->Count_))); } else { Builder_.Add(TBlockItem()); } } NUdf::TUnboxedValue Build() final { return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true)); } private: TComputationContext& Ctx_; NYql::NUdf::TFixedSizeArrayBuilder Builder_; }; template class TSumBlockAggregator; template class TAvgBlockAggregator; template class TSumBlockAggregator : public TCombineAllTag::TBase { public: using TBase = TCombineAllTag::TBase; using TStateType = TSumState; using TInScalar = typename TPrimitiveDataType::TScalarResult; TSumBlockAggregator(std::optional filterColumn, ui32 argColumn, TType* dataType, TComputationContext& ctx) : TBase(sizeof(TStateType), filterColumn, ctx) , ArgColumn_(argColumn) { Y_UNUSED(dataType); } void InitState(void* state) final { TStateType st; WriteUnaligned(state, st); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional filtered) final { auto typedState = MakeStateWrapper(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if constexpr (IsScalar) { Y_ENSURE(datum.is_scalar()); if constexpr (IsNullable) { if (datum.scalar()->is_valid) { typedState->Sum_ += (filtered ? *filtered : batchLength) * Cast(datum.scalar_as().value); typedState->IsValid_ = 1; } } else { typedState->Sum_ += (filtered ? *filtered : batchLength) * Cast(datum.scalar_as().value); } } else { const auto& array = datum.array(); auto ptr = array->GetValues(1); auto len = array->length; auto nullCount = IsNullable ? array->GetNullCount() : 0; auto count = len - nullCount; if (!count) { return; } if (!filtered) { if constexpr (IsNullable) { typedState->IsValid_ = 1; } auto sum = typedState->Sum_; if (IsNullable && nullCount != 0) { auto nullBitmapPtr = array->GetValues(0, 0); for (int64_t i = 0; i < len; ++i) { ui64 fullIndex = i + array->offset; ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1; sum += SelectArg(notNull, ptr[i], 0); } } else { for (int64_t i = 0; i < len; ++i) { sum += ptr[i]; } } typedState->Sum_ = sum; } else { const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum(); const auto& filterArray = filterDatum.array(); MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column"); const ui8* filterBitmap = filterArray->template GetValues(1); auto sum = typedState->Sum_; if (IsNullable && nullCount != 0) { ui64 count = 0; auto nullBitmapPtr = array->template GetValues(0, 0); for (int64_t i = 0; i < len; ++i) { ui64 fullIndex = i + array->offset; ui8 notNullAndFiltered = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) & filterBitmap[i]; sum += SelectArg(notNullAndFiltered, ptr[i], 0); count += notNullAndFiltered; } if constexpr (IsNullable) { typedState->IsValid_ |= count ? 1 : 0; } } else { for (int64_t i = 0; i < len; ++i) { sum += SelectArg(filterBitmap[i], ptr[i], 0); } if constexpr (IsNullable) { typedState->IsValid_ = 1; } } typedState->Sum_ = sum; } } } NUdf::TUnboxedValue FinishOne(const void* state) final { auto typedState = MakeStateWrapper(state); if constexpr (IsNullable) { if (!typedState->IsValid_) { return NUdf::TUnboxedValuePod(); } } return NUdf::TUnboxedValuePod(TSum(typedState->Sum_)); } private: const ui32 ArgColumn_; }; template void PushValueToState(TSumState* typedState, const arrow::Datum& datum, ui64 row) { using TInScalar = typename TPrimitiveDataType::TScalarResult; if constexpr (IsScalar) { Y_ENSURE(datum.is_scalar()); if constexpr (IsNullable) { if (datum.scalar()->is_valid) { typedState->Sum_ += Cast(datum.scalar_as().value); typedState->IsValid_ = 1; } } else { typedState->Sum_ += Cast(datum.scalar_as().value); } } else { const auto& array = datum.array(); auto ptr = array->GetValues(1); if constexpr (IsNullable) { if (array->GetNullCount() == 0) { typedState->IsValid_ = 1; typedState->Sum_ += ptr[row]; } else { auto nullBitmapPtr = array->GetValues(0, 0); ui64 fullIndex = row + array->offset; ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1; typedState->Sum_ += SelectArg(notNull, ptr[row], 0); typedState->IsValid_ |= notNull; } } else { typedState->Sum_ += ptr[row]; } } } template class TSumBlockAggregator : public TCombineKeysTag::TBase { public: using TBase = TCombineKeysTag::TBase; using TStateType = TSumState; TSumBlockAggregator(std::optional filterColumn, ui32 argColumn, TType* dataType, TComputationContext& ctx) : TBase(sizeof(TStateType), filterColumn, ctx) , ArgColumn_(argColumn) , DataType_(dataType) { } void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { TStateType st; WriteUnaligned(state, st); UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = MakeStateWrapper(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState(typedState.Get(), datum, row); } std::unique_ptr MakeStateBuilder(ui64 size) final { return std::make_unique>(size, DataType_, Ctx_); } private: const ui32 ArgColumn_; TType* const DataType_; }; template class TSumBlockAggregator : public TFinalizeKeysTag::TBase { public: using TBase = TFinalizeKeysTag::TBase; using TStateType = TSumState; TSumBlockAggregator(std::optional filterColumn, ui32 argColumn, TType* dataType, TComputationContext& ctx) : TBase(sizeof(TStateType), filterColumn, ctx) , ArgColumn_(argColumn) , DataType_(dataType) { } void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { TStateType st; WriteUnaligned(state, st); UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = MakeStateWrapper(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); PushValueToState(typedState.Get(), datum, row); } std::unique_ptr MakeResultBuilder(ui64 size) final { return std::make_unique>(size, DataType_, Ctx_); } private: const ui32 ArgColumn_; TType* const DataType_; }; template class TAvgBlockAggregator : public TCombineAllTag::TBase { public: using TBase = TCombineAllTag::TBase; using TInScalar = typename TPrimitiveDataType::TScalarResult; TAvgBlockAggregator(std::optional filterColumn, ui32 argColumn, TType* outputType, TComputationContext& ctx) : TBase(sizeof(TAvgState), filterColumn, ctx) , ArgColumn_(argColumn) { Y_UNUSED(outputType); } void InitState(void* state) final { TAvgState st; WriteUnaligned>(state, st); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible>::value); Y_UNUSED(state); } void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional filtered) final { auto typedState = MakeStateWrapper>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { typedState->Sum_ += (filtered ? *filtered : batchLength) * Cast(datum.scalar_as().value); typedState->Count_ += batchLength; } } else { const auto& array = datum.array(); auto ptr = array->GetValues(1); auto len = array->length; auto count = len - array->GetNullCount(); if (!count) { return; } if (!filtered) { typedState->Count_ += count; auto sum = typedState->Sum_; if (array->GetNullCount() == 0) { for (int64_t i = 0; i < len; ++i) { sum += ptr[i]; } } else { auto nullBitmapPtr = array->GetValues(0, 0); for (int64_t i = 0; i < len; ++i) { ui64 fullIndex = i + array->offset; // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1; sum += SelectArg(notNull, ptr[i], 0); } } typedState->Sum_ = sum; } else { const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum(); const auto& filterArray = filterDatum.array(); MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column"); const ui8* filterBitmap = filterArray->template GetValues(1); auto sum = typedState->Sum_; ui64 count = typedState->Count_; if (array->GetNullCount() == 0) { for (int64_t i = 0; i < len; ++i) { ui8 filtered = filterBitmap[i]; sum += SelectArg(filterBitmap[i], ptr[i], 0); count += filtered; } } else { auto nullBitmapPtr = array->GetValues(0, 0); for (int64_t i = 0; i < len; ++i) { ui64 fullIndex = i + array->offset; ui8 notNullAndFiltered = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1) & filterBitmap[i]; sum += SelectArg(notNullAndFiltered, ptr[i], 0); count += notNullAndFiltered; } } typedState->Sum_ = sum; typedState->Count_ = count; } } } NUdf::TUnboxedValue FinishOne(const void* state) final { auto typedState = MakeStateWrapper>(state); if (!typedState->Count_) { return NUdf::TUnboxedValuePod(); } NUdf::TUnboxedValue* items; auto arr = Ctx_.HolderFactory.CreateDirectArrayHolder(2, items); items[0] = NUdf::TUnboxedValuePod(TOut(typedState->Sum_)); items[1] = NUdf::TUnboxedValuePod(typedState->Count_); return arr; } private: ui32 ArgColumn_; }; template class TAvgBlockAggregator : public TCombineKeysTag::TBase { public: using TBase = TCombineKeysTag::TBase; using TInScalar = typename TPrimitiveDataType::TScalarResult; TAvgBlockAggregator(std::optional filterColumn, ui32 argColumn, TType* outputType, TComputationContext& ctx) : TBase(sizeof(TAvgState), filterColumn, ctx) , ArgColumn_(argColumn) , OutputType_(outputType) { } void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { TAvgState st; WriteUnaligned>(state, st); UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible>::value); Y_UNUSED(state); } void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = MakeStateWrapper>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { typedState->Sum_ += Cast(datum.scalar_as().value); typedState->Count_ += 1; } } else { const auto& array = datum.array(); auto ptr = array->GetValues(1); if (array->GetNullCount() == 0) { typedState->Sum_ += ptr[row]; typedState->Count_ += 1; } else { auto nullBitmapPtr = array->GetValues(0, 0); ui64 fullIndex = row + array->offset; ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1; typedState->Sum_ += SelectArg(notNull, ptr[row], 0); typedState->Count_ += notNull; } } } std::unique_ptr MakeStateBuilder(ui64 size) final { return std::make_unique>(size, OutputType_, Ctx_); } private: const ui32 ArgColumn_; TType* const OutputType_; }; template class TAvgBlockAggregatorOverState : public TFinalizeKeysTag::TBase { public: using TBase = TFinalizeKeysTag::TBase; using TInScalar = typename TPrimitiveDataType::TScalarResult; TAvgBlockAggregatorOverState(ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TAvgState), {}, ctx) , ArgColumn_(argColumn) { } void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { TAvgState st; WriteUnaligned>(state, st); UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible>::value); Y_UNUSED(state); } void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = MakeStateWrapper>(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { const auto& structScalar = arrow::internal::checked_cast(*datum.scalar()); typedState->Sum_ += Cast(arrow::internal::checked_cast(*structScalar.value[0]).value); typedState->Count_ += arrow::internal::checked_cast(*structScalar.value[1]).value; } } else { const auto& array = datum.array(); auto sumPtr = array->child_data[0]->GetValues(1); auto countPtr = array->child_data[1]->GetValues(1); if (array->GetNullCount() == 0) { typedState->Sum_ += sumPtr[row]; typedState->Count_ += countPtr[row]; } else { auto nullBitmapPtr = array->GetValues(0, 0); ui64 fullIndex = row + array->offset; // bit 1 -> mask 0xFF..FF, bit 0 -> mask 0x00..00 auto bit = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1; ui64 mask = -ui64(bit); typedState->Sum_ += sumPtr[row] * bit; typedState->Count_ += mask & countPtr[row]; } } } std::unique_ptr MakeResultBuilder(ui64 size) final { return std::make_unique>(size, Ctx_); } private: const ui32 ArgColumn_; }; template class TPreparedSumBlockAggregator : public TTag::TPreparedAggregator { public: using TBase = typename TTag::TPreparedAggregator; using TStateType = TSumState; TPreparedSumBlockAggregator(std::optional filterColumn, ui32 argColumn, TType* dataType) : TBase(sizeof(TStateType)) , FilterColumn_(filterColumn) , ArgColumn_(argColumn) , DataType_(dataType) {} std::unique_ptr Make(TComputationContext& ctx) const final { return std::make_unique>(FilterColumn_, ArgColumn_, DataType_, ctx); } private: const std::optional FilterColumn_; const ui32 ArgColumn_; TType* const DataType_; }; template std::unique_ptr PrepareSumFixed(TType* type, bool isOptional, bool isScalar, std::optional filterColumn, ui32 argColumn) { if (isScalar) { if (isOptional) { return std::make_unique>(filterColumn, argColumn, type); } return std::make_unique>(filterColumn, argColumn, type); } if (isOptional) { return std::make_unique>(filterColumn, argColumn, type); } return std::make_unique>(filterColumn, argColumn, type); } template std::unique_ptr PrepareSum(TTupleType* tupleType, std::optional filterColumn, ui32 argColumn, const TTypeEnvironment& env) { auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn)); auto argType = blockType->GetItemType(); bool isOptional; auto dataType = UnpackOptionalData(argType, isOptional); bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar; TType* sumRetType = nullptr; const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(*dataType->GetDataSlot()); if (typeInfo.Features & NYql::NUdf::EDataTypeFeatures::SignedIntegralType) { sumRetType = TDataType::Create(NUdf::TDataType::Id, env); } else if (typeInfo.Features & NYql::NUdf::EDataTypeFeatures::UnsignedIntegralType) { sumRetType = TDataType::Create(NUdf::TDataType::Id, env); } else if (*dataType->GetDataSlot() == NUdf::EDataSlot::Decimal) { auto decimalType = static_cast(dataType); auto [_, scale] = decimalType->GetParams(); sumRetType = TDataDecimalType::Create(NYql::NDecimal::MaxPrecision, scale, env); } else if (*dataType->GetDataSlot() == NUdf::EDataSlot::Interval) { sumRetType = TDataDecimalType::Create(NYql::NDecimal::MaxPrecision, 0, env); } else { Y_ENSURE(typeInfo.Features & NYql::NUdf::EDataTypeFeatures::FloatType); sumRetType = dataType; } sumRetType = TOptionalType::Create(sumRetType, env); switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Int8: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Uint8: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Int16: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Uint16: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Int32: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Uint32: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Int64: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Uint64: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Float: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Double: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Interval: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); case NUdf::EDataSlot::Decimal: return PrepareSumFixed(sumRetType, isOptional, isScalar, filterColumn, argColumn); default: throw yexception() << "Unsupported SUM input type"; } } class TBlockSumFactory : public IBlockAggregatorFactory { public: std::unique_ptr PrepareCombineAll( TTupleType* tupleType, std::optional filterColumn, const std::vector& argsColumns, const TTypeEnvironment& env) const final { return PrepareSum(tupleType, filterColumn, argsColumns[0], env); } std::unique_ptr PrepareCombineKeys( TTupleType* tupleType, const std::vector& argsColumns, const TTypeEnvironment& env) const final { return PrepareSum(tupleType, std::optional(), argsColumns[0], env); } std::unique_ptr PrepareFinalizeKeys( TTupleType* tupleType, const std::vector& argsColumns, const TTypeEnvironment& env, TType* returnType, ui32 hint) const final { Y_UNUSED(returnType); Y_UNUSED(hint); return PrepareSum(tupleType, std::optional(), argsColumns[0], env); } }; template class TPreparedAvgBlockAggregator : public TTag::TPreparedAggregator { public: using TBase = typename TTag::TPreparedAggregator; TPreparedAvgBlockAggregator(std::optional filterColumn, ui32 argColumn, TType* outputType) : TBase(sizeof(TAvgState)) , FilterColumn_(filterColumn) , ArgColumn_(argColumn) , OutputType_(outputType) {} std::unique_ptr Make(TComputationContext& ctx) const final { return std::make_unique>(FilterColumn_, ArgColumn_, OutputType_, ctx); } private: const std::optional FilterColumn_; const ui32 ArgColumn_; TType* const OutputType_; }; template class TPreparedAvgBlockAggregatorOverState : public TFinalizeKeysTag::TPreparedAggregator { public: using TBase = TFinalizeKeysTag::TPreparedAggregator; TPreparedAvgBlockAggregatorOverState(ui32 argColumn) : TBase(sizeof(TAvgState)) , ArgColumn_(argColumn) {} std::unique_ptr Make(TComputationContext& ctx) const final { return std::make_unique>(ArgColumn_, ctx); } private: const ui32 ArgColumn_; }; template std::unique_ptr PrepareAvg(TTupleType* tupleType, std::optional filterColumn, ui32 argColumn, const TTypeEnvironment& env); template std::unique_ptr PrepareAvgOverInput(TTupleType* tupleType, std::optional filterColumn, ui32 argColumn, const TTypeEnvironment& env) { auto doubleType = TDataType::Create(NUdf::TDataType::Id, env); auto ui64Type = TDataType::Create(NUdf::TDataType::Id, env); TVector tupleElements = { doubleType, ui64Type }; auto avgRetType = TOptionalType::Create(TTupleType::Create(2, tupleElements.data(), env), env); auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn))->GetItemType(); bool isOptional; auto dataType = UnpackOptionalData(argType, isOptional); switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Int8: return std::make_unique>(filterColumn, argColumn, avgRetType); case NUdf::EDataSlot::Uint8: case NUdf::EDataSlot::Bool: return std::make_unique>(filterColumn, argColumn, avgRetType); case NUdf::EDataSlot::Int16: return std::make_unique>(filterColumn, argColumn, avgRetType); case NUdf::EDataSlot::Uint16: return std::make_unique>(filterColumn, argColumn, avgRetType); case NUdf::EDataSlot::Int32: return std::make_unique>(filterColumn, argColumn, avgRetType); case NUdf::EDataSlot::Uint32: return std::make_unique>(filterColumn, argColumn, avgRetType); case NUdf::EDataSlot::Int64: return std::make_unique>(filterColumn, argColumn, avgRetType); case NUdf::EDataSlot::Uint64: return std::make_unique>(filterColumn, argColumn, avgRetType); case NUdf::EDataSlot::Float: return std::make_unique>(filterColumn, argColumn, avgRetType); case NUdf::EDataSlot::Double: return std::make_unique>(filterColumn, argColumn, avgRetType); case NUdf::EDataSlot::Interval: { auto decimalType = TDataDecimalType::Create(NYql::NDecimal::MaxPrecision, 0, env); TVector tupleDecimalElements = { decimalType, ui64Type }; auto avgRetDecimalType = TOptionalType::Create(TTupleType::Create(2, tupleDecimalElements.data(), env), env); return std::make_unique>(filterColumn, argColumn, avgRetDecimalType); } case NUdf::EDataSlot::Decimal: { auto [precision, scale] = static_cast(dataType)->GetParams(); auto decimalType = TDataDecimalType::Create(precision, scale, env); TVector tupleDecimalElements = { decimalType, ui64Type }; auto avgRetDecimalType = TOptionalType::Create(TTupleType::Create(2, tupleDecimalElements.data(), env), env); return std::make_unique>(filterColumn, argColumn, avgRetDecimalType); } default: throw yexception() << "Unsupported AVG input type"; } } template <> std::unique_ptr PrepareAvg(TTupleType* tupleType, std::optional filterColumn, ui32 argColumn, const TTypeEnvironment& env) { return PrepareAvgOverInput(tupleType, filterColumn, argColumn, env); } template <> std::unique_ptr PrepareAvg(TTupleType* tupleType, std::optional filterColumn, ui32 argColumn, const TTypeEnvironment& env) { return PrepareAvgOverInput(tupleType, filterColumn, argColumn, env); } template <> std::unique_ptr PrepareAvg(TTupleType* tupleType, std::optional filterColumn, ui32 argColumn, const TTypeEnvironment& env) { Y_UNUSED(filterColumn); Y_UNUSED(env); auto argType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn))->GetItemType(); bool isOptional; auto aggTupleType = UnpackOptional(argType, isOptional); MKQL_ENSURE(aggTupleType->IsTuple(), "Expected tuple or optional of tuple, actual: " << PrintNode(argType, true)); auto dataType = UnpackOptionalData(AS_TYPE(TTupleType, aggTupleType)->GetElementType(0), isOptional); switch (*dataType->GetDataSlot()) { case NUdf::EDataSlot::Decimal: return std::make_unique>(argColumn); case NUdf::EDataSlot::Double: return std::make_unique>(argColumn); default: throw yexception() << "Unsupported Finalize input type"; } } class TBlockAvgFactory : public IBlockAggregatorFactory { public: std::unique_ptr PrepareCombineAll( TTupleType* tupleType, std::optional filterColumn, const std::vector& argsColumns, const TTypeEnvironment& env) const final { return PrepareAvg(tupleType, filterColumn, argsColumns[0], env); } std::unique_ptr PrepareCombineKeys( TTupleType* tupleType, const std::vector& argsColumns, const TTypeEnvironment& env) const final { return PrepareAvg(tupleType, std::optional(), argsColumns[0], env); } std::unique_ptr PrepareFinalizeKeys( TTupleType* tupleType, const std::vector& argsColumns, const TTypeEnvironment& env, TType* returnType, ui32 hint) const final { Y_UNUSED(returnType); Y_UNUSED(hint); return PrepareAvg(tupleType, std::optional(), argsColumns[0], env); } }; } std::unique_ptr MakeBlockSumFactory() { return std::make_unique(); } std::unique_ptr MakeBlockAvgFactory() { return std::make_unique(); } } }