#include "mkql_block_agg_count.h" #include #include namespace NKikimr { namespace NMiniKQL { namespace { struct TState { ui64 Count_ = 0; }; class TColumnBuilder : public IAggColumnBuilder { public: TColumnBuilder(ui64 size, TComputationContext& ctx) : Builder_(TTypeInfoHelper(), arrow::uint64(), ctx.ArrowMemoryPool, size) , Ctx_(ctx) { } void Add(const void* state) final { auto typedState = static_cast(state); Builder_.Add(TBlockItem(typedState->Count_)); } NUdf::TUnboxedValue Build() final { return Ctx_.HolderFactory.CreateArrowBlock(Builder_.Build(true)); } private: NYql::NUdf::TFixedSizeArrayBuilder Builder_; TComputationContext& Ctx_; }; template class TCountAllAggregator; template class TCountAggregator; template <> class TCountAllAggregator : public TCombineAllTag::TBase { public: using TBase = TCombineAllTag::TBase; TCountAllAggregator(std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TState), filterColumn, ctx) { Y_UNUSED(argColumn); } void InitState(void* state) final { new(state) TState(); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional filtered) final { auto typedState = static_cast(state); Y_UNUSED(columns); if (filtered) { typedState->Count_ += *filtered; } else { typedState->Count_ += batchLength; } } NUdf::TUnboxedValue FinishOne(const void* state) final { auto typedState = static_cast(state); return NUdf::TUnboxedValuePod(typedState->Count_); } }; template <> class TCountAllAggregator : public TCombineKeysTag::TBase { public: using TBase = TCombineKeysTag::TBase; TCountAllAggregator(std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TState), filterColumn, ctx) { Y_UNUSED(argColumn); } void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TState(); UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); Y_UNUSED(columns); Y_UNUSED(row); auto typedState = static_cast(state); typedState->Count_ += 1; } std::unique_ptr MakeStateBuilder(ui64 size) final { return std::make_unique(size, Ctx_); } }; template <> class TCountAllAggregator : public TFinalizeKeysTag::TBase { public: using TBase = TFinalizeKeysTag::TBase; TCountAllAggregator(std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TState), filterColumn, ctx) , ArgColumn_(argColumn) { } void LoadState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TState(); UpdateState(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void UpdateState(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = static_cast(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { MKQL_ENSURE(datum.scalar()->is_valid, "Expected not null"); typedState->Count_ += datum.scalar_as().value; } else { const auto& array = datum.array(); auto ptr = array->GetValues(1); MKQL_ENSURE(array->GetNullCount() == 0, "Expected not null"); typedState->Count_ += ptr[row]; } } std::unique_ptr MakeResultBuilder(ui64 size) final { return std::make_unique(size, Ctx_); } private: const ui32 ArgColumn_; }; template <> class TCountAggregator : public TCombineAllTag::TBase { public: using TBase = TCombineAllTag::TBase; TCountAggregator(std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TState), filterColumn, ctx) , ArgColumn_(argColumn) { } void InitState(void* state) final { new(state) TState(); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional filtered) final { auto typedState = static_cast(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { typedState->Count_ += filtered ? *filtered : batchLength; } } else { const auto& array = datum.array(); if (!filtered) { typedState->Count_ += array->length - array->GetNullCount(); } else if (array->GetNullCount() == array->length) { // all nulls return; } else if (array->GetNullCount() == 0) { // no nulls typedState->Count_ += *filtered; } else { const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum(); // intersect masks from nulls and filter column const auto& filterArray = filterDatum.array(); MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column"); auto nullBitmapPtr = array->GetValues(0, 0); const ui8* filterBitmap = filterArray->GetValues(1); auto state = typedState->Count_; for (ui32 i = 0; i < array->length; ++i) { ui64 fullIndex = i + array->offset; auto bit1 = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1); auto bit2 = filterBitmap[i]; state += bit1 & bit2; } typedState->Count_ = state; } } } NUdf::TUnboxedValue FinishOne(const void* state) final { auto typedState = static_cast(state); return NUdf::TUnboxedValuePod(typedState->Count_); } private: const ui32 ArgColumn_; }; template <> class TCountAggregator : public TCombineKeysTag::TBase { public: using TBase = TCombineKeysTag::TBase; TCountAggregator(std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(sizeof(TState), filterColumn, ctx) , ArgColumn_(argColumn) { } void InitKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { new(state) TState(); UpdateKey(state, batchNum, columns, row); } void DestroyState(void* state) noexcept final { static_assert(std::is_trivially_destructible::value); Y_UNUSED(state); } void UpdateKey(void* state, ui64 batchNum, const NUdf::TUnboxedValue* columns, ui64 row) final { Y_UNUSED(batchNum); auto typedState = static_cast(state); const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum(); if (datum.is_scalar()) { if (datum.scalar()->is_valid) { typedState->Count_ += 1; } } else { const auto& array = datum.array(); if (array->GetNullCount() == 0) { typedState->Count_ += 1; } else { auto nullBitmapPtr = array->GetValues(0, 0); auto fullIndex = row + array->offset; auto bit = ((nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1); typedState->Count_ += bit; } } } std::unique_ptr MakeStateBuilder(ui64 size) final { return std::make_unique(size, Ctx_); } private: const ui32 ArgColumn_; }; template <> class TCountAggregator : public TCountAllAggregator { public: using TBase = TCountAllAggregator; TCountAggregator(std::optional filterColumn, ui32 argColumn, TComputationContext& ctx) : TBase(filterColumn, argColumn, ctx) {} }; template class TPreparedCountAll : public TTag::TPreparedAggregator { public: using TBase = typename TTag::TPreparedAggregator; TPreparedCountAll(std::optional filterColumn, ui32 argColumn) : TBase(sizeof(TState)) , FilterColumn_(filterColumn) , ArgColumn_(argColumn) {} std::unique_ptr Make(TComputationContext& ctx) const final { return std::make_unique>(FilterColumn_, ArgColumn_, ctx); } private: const std::optional FilterColumn_; const ui32 ArgColumn_; }; template class TPreparedCount : public TTag::TPreparedAggregator { public: using TBase = typename TTag::TPreparedAggregator; TPreparedCount(std::optional filterColumn, ui32 argColumn) : TBase(sizeof(TState)) , FilterColumn_(filterColumn) , ArgColumn_(argColumn) {} std::unique_ptr Make(TComputationContext& ctx) const final { return std::make_unique>(FilterColumn_, ArgColumn_, ctx); } private: const std::optional FilterColumn_; const ui32 ArgColumn_; }; template std::unique_ptr PrepareCountAll(std::optional filterColumn, ui32 argColumn) { return std::make_unique>(filterColumn, argColumn); } template std::unique_ptr PrepareCount(std::optional filterColumn, ui32 argColumn) { return std::make_unique>(filterColumn, argColumn); } class TBlockCountAllFactory : public IBlockAggregatorFactory { public: std::unique_ptr PrepareCombineAll( TTupleType* tupleType, std::optional filterColumn, const std::vector& argsColumns, const TTypeEnvironment& env) const final { Y_UNUSED(tupleType); Y_UNUSED(argsColumns); Y_UNUSED(env); return PrepareCountAll(filterColumn, 0); } std::unique_ptr PrepareCombineKeys( TTupleType* tupleType, const std::vector& argsColumns, const TTypeEnvironment& env) const final { Y_UNUSED(tupleType); Y_UNUSED(argsColumns); Y_UNUSED(env); return PrepareCountAll(std::optional(), 0); } std::unique_ptr PrepareFinalizeKeys( TTupleType* tupleType, const std::vector& argsColumns, const TTypeEnvironment& env, TType* returnType, ui32 hint) const final { Y_UNUSED(tupleType); Y_UNUSED(argsColumns); Y_UNUSED(env); Y_UNUSED(returnType); Y_UNUSED(hint); return PrepareCountAll(std::optional(), argsColumns[0]); } }; class TBlockCountFactory : public IBlockAggregatorFactory { public: std::unique_ptr PrepareCombineAll( TTupleType* tupleType, std::optional filterColumn, const std::vector& argsColumns, const TTypeEnvironment& env) const final { Y_UNUSED(tupleType); Y_UNUSED(env); return PrepareCount(filterColumn, argsColumns[0]); } std::unique_ptr PrepareCombineKeys( TTupleType* tupleType, const std::vector& argsColumns, const TTypeEnvironment& env) const final { Y_UNUSED(tupleType); Y_UNUSED(argsColumns); Y_UNUSED(env); return PrepareCount(std::optional(), argsColumns[0]); } std::unique_ptr PrepareFinalizeKeys( TTupleType* tupleType, const std::vector& argsColumns, const TTypeEnvironment& env, TType* returnType, ui32 hint) const final { Y_UNUSED(tupleType); Y_UNUSED(argsColumns); Y_UNUSED(env); Y_UNUSED(returnType); Y_UNUSED(hint); return PrepareCount(std::optional(), argsColumns[0]); } }; } std::unique_ptr MakeBlockCountAllFactory() { return std::make_unique(); } std::unique_ptr MakeBlockCountFactory() { return std::make_unique(); } } }