Просмотр исходного кода

Initial implementation of block SOME

impl

impl
vvvv 1 год назад
Родитель
Сommit
f8d946ba74

+ 5 - 1
ydb/library/yql/core/type_ann/type_ann_list.cpp

@@ -5379,7 +5379,7 @@ namespace {
         ui32 expectedArgs;
         if (name == "count_all") {
             expectedArgs = overState ? 2 : 1;
-        } else if (name == "count" || name == "sum" || name == "avg" || name == "min" || name == "max") {
+        } else if (name == "count" || name == "sum" || name == "avg" || name == "min" || name == "max" || name == "some") {
             expectedArgs = 2;
         } else {
             ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
@@ -5454,6 +5454,10 @@ namespace {
                 }
             }
 
+            input->SetTypeAnn(retType);
+        } else if (name == "some") {
+            auto itemType = input->Child(1)->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+            const TTypeAnnotationNode* retType = itemType;
             input->SetTypeAnn(retType);
         } else {
             ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),

+ 1 - 0
ydb/library/yql/minikql/comp_nodes/CMakeLists.darwin-x86_64.txt

@@ -41,6 +41,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp

+ 1 - 0
ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-aarch64.txt

@@ -42,6 +42,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp

+ 1 - 0
ydb/library/yql/minikql/comp_nodes/CMakeLists.linux-x86_64.txt

@@ -42,6 +42,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp

+ 1 - 0
ydb/library/yql/minikql/comp_nodes/CMakeLists.windows-x86_64.txt

@@ -41,6 +41,7 @@ target_sources(yql-minikql-comp_nodes PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_count.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_agg_sum.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_builder.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/mkql_block_coalesce.cpp

+ 2 - 0
ydb/library/yql/minikql/comp_nodes/mkql_block_agg_factory.cpp

@@ -2,6 +2,7 @@
 #include "mkql_block_agg_count.h"
 #include "mkql_block_agg_sum.h"
 #include "mkql_block_agg_minmax.h"
+#include "mkql_block_agg_some.h"
 
 namespace NKikimr {
 namespace NMiniKQL {
@@ -17,6 +18,7 @@ struct TAggregatorFactories {
         Factories["avg"] = MakeBlockAvgFactory();
         Factories["min"] = MakeBlockMinFactory();
         Factories["max"] = MakeBlockMaxFactory();
+        Factories["some"] = MakeBlockSomeFactory();
     }
 };
 

+ 24 - 24
ydb/library/yql/minikql/comp_nodes/mkql_block_agg_minmax.cpp

@@ -106,13 +106,13 @@ constexpr TIn InitialStateValue() {
 
 template <typename TIn, bool IsMin>
 struct TState<true, TIn, IsMin> {
-    TIn Value_ = InitialStateValue<TIn, IsMin>();
-    ui8 IsValid_ = 0;
+    TIn Value = InitialStateValue<TIn, IsMin>();
+    ui8 IsValid = 0;
 };
 
 template <typename TIn, bool IsMin>
 struct TState<false, TIn, IsMin> {
-    TIn Value_ = InitialStateValue<TIn, IsMin>();
+    TIn Value = InitialStateValue<TIn, IsMin>();
 };
 
 using TGenericState = NUdf::TUnboxedValuePod;
@@ -131,12 +131,12 @@ public:
     void Add(const void* state) final {
         auto typedState = static_cast<const TStateType*>(state);
         if constexpr (IsNullable) {
-            if (!typedState->IsValid_) {
+            if (!typedState->IsValid) {
                 Builder_.Add(TBlockItem());
                 return;
             }
         }
-        Builder_.Add(TBlockItem(typedState->Value_));
+        Builder_.Add(TBlockItem(typedState->Value));
     }
 
     NUdf::TUnboxedValue Build() final {
@@ -630,11 +630,11 @@ public:
             Y_ENSURE(datum.is_scalar());
             if constexpr (IsNullable) {
                 if (datum.scalar()->is_valid) {
-                    typedState->Value_ = datum.scalar_as<TInScalar>().value;
-                    typedState->IsValid_ = 1;
+                    typedState->Value = datum.scalar_as<TInScalar>().value;
+                    typedState->IsValid = 1;
                 }
             } else {
-                typedState->Value_ = datum.scalar_as<TInScalar>().value;
+                typedState->Value = datum.scalar_as<TInScalar>().value;
             }
         } else {
             const auto& array = datum.array();
@@ -647,9 +647,9 @@ public:
             }
 
             if (!filtered) {
-                TIn value = typedState->Value_;
+                TIn value = typedState->Value;
                 if constexpr (IsNullable) {
-                    typedState->IsValid_ = 1;
+                    typedState->IsValid = 1;
                 }
 
                 if (IsNullable && nullCount != 0) {
@@ -665,14 +665,14 @@ public:
                     }
                 }
 
-                typedState->Value_ = value;
+                typedState->Value = value;
             } else {
                 const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
                 const auto& filterArray = filterDatum.array();
                 MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
                 const ui8* filterBitmap = filterArray->template GetValues<uint8_t>(1);
 
-                TIn value = typedState->Value_;
+                TIn value = typedState->Value;
                 ui64 validCount = 0;
                 if (IsNullable && nullCount != 0) {
                     auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
@@ -691,9 +691,9 @@ public:
                 }
 
                 if constexpr (IsNullable) {
-                    typedState->IsValid_ |= validCount ? 1 : 0;
+                    typedState->IsValid |= validCount ? 1 : 0;
                 }
-                typedState->Value_ = value;
+                typedState->Value = value;
             }
         }
     }
@@ -701,12 +701,12 @@ public:
     NUdf::TUnboxedValue FinishOne(const void* state) final {
         auto typedState = static_cast<const TStateType*>(state);
         if constexpr (IsNullable) {
-            if (!typedState->IsValid_) {
+            if (!typedState->IsValid) {
                 return NUdf::TUnboxedValuePod();
             }
         }
 
-        return NUdf::TUnboxedValuePod(typedState->Value_);
+        return NUdf::TUnboxedValuePod(typedState->Value);
     }
 
 private:
@@ -720,28 +720,28 @@ static void PushValueToState(TState<IsNullable, TIn, IsMin>* typedState, const a
         Y_ENSURE(datum.is_scalar());
         if constexpr (IsNullable) {
             if (datum.scalar()->is_valid) {
-                typedState->Value_ = datum.scalar_as<TInScalar>().value;
-                typedState->IsValid_ = 1;
+                typedState->Value = datum.scalar_as<TInScalar>().value;
+                typedState->IsValid = 1;
             }
         } else {
-            typedState->Value_ = datum.scalar_as<TInScalar>().value;
+            typedState->Value = datum.scalar_as<TInScalar>().value;
         }
     } else {
         const auto &array = datum.array();
         auto ptr = array->GetValues<TIn>(1);
         if constexpr (IsNullable) {
             if (array->GetNullCount() == 0) {
-                typedState->IsValid_ = 1;
-                typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]);
+                typedState->IsValid = 1;
+                typedState->Value = UpdateMinMax<IsMin>(typedState->Value, ptr[row]);
             } else {
                 auto nullBitmapPtr = array->GetValues<uint8_t>(0, 0);
                 ui64 fullIndex = row + array->offset;
                 ui8 notNull = (nullBitmapPtr[fullIndex >> 3] >> (fullIndex & 0x07)) & 1;
-                typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, SelectArg(notNull, ptr[row], typedState->Value_));
-                typedState->IsValid_ |= notNull;
+                typedState->Value = UpdateMinMax<IsMin>(typedState->Value, SelectArg(notNull, ptr[row], typedState->Value));
+                typedState->IsValid |= notNull;
             }
         } else {
-            typedState->Value_ = UpdateMinMax<IsMin>(typedState->Value_, ptr[row]);
+            typedState->Value = UpdateMinMax<IsMin>(typedState->Value, ptr[row]);
         }
     }
 }

+ 285 - 0
ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.cpp

@@ -0,0 +1,285 @@
+#include "mkql_block_agg_some.h"
+
+#include <ydb/library/yql/minikql/mkql_node_builder.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+
+#include <ydb/library/yql/minikql/comp_nodes/mkql_block_builder.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_block_reader.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+using TGenericState = NUdf::TUnboxedValuePod;
+
+void PushValueToState(TGenericState* typedState, const arrow::Datum& datum, ui64 row, IBlockReader& reader,
+    IBlockItemConverter& converter, TComputationContext& ctx)
+{
+    if (datum.is_scalar()) {
+        if (datum.scalar()->is_valid) {
+            auto item = reader.GetScalarItem(*datum.scalar());
+            *typedState = converter.MakeValue(item, ctx.HolderFactory);
+        }
+    } else {
+        const auto& array = datum.array();
+        TBlockItem curr = reader.GetItem(*array, row);
+        if (curr) {
+            *typedState = converter.MakeValue(curr, ctx.HolderFactory);
+        }
+    }
+}
+
+class TGenericColumnBuilder : public IAggColumnBuilder {
+public:
+    TGenericColumnBuilder(ui64 size, TType* columnType, TComputationContext& ctx)
+        : Builder_(MakeArrayBuilder(TTypeInfoHelper(), columnType, ctx.ArrowMemoryPool, size))
+        , Ctx_(ctx)
+    {
+    }
+
+    void Add(const void* state) final {
+        Builder_->Add(*static_cast<const TGenericState*>(state));
+    }
+
+    NUdf::TUnboxedValue Build() final {
+        return Ctx_.HolderFactory.CreateArrowBlock(Builder_->Build(true));
+    }
+
+private:
+    const std::unique_ptr<IArrayBuilder> Builder_;
+    TComputationContext& Ctx_;
+};
+
+template<typename TTag>
+class TSomeBlockGenericAggregator;
+
+template<>
+class TSomeBlockGenericAggregator<TCombineAllTag> : public TCombineAllTag::TBase {
+public:
+    using TBase = TCombineAllTag::TBase;
+
+    TSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+        : TBase(sizeof(TGenericState), filterColumn, ctx)
+        , ArgColumn_(argColumn)
+        , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
+        , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type))
+    {
+    }
+
+    void InitState(void* state) final {
+        new(state) TGenericState();
+    }
+
+    void DestroyState(void* state) noexcept final {
+        auto typedState = static_cast<TGenericState*>(state);
+        typedState->DeleteUnreferenced();
+        *typedState = TGenericState();
+    }
+
+    void AddMany(void* state, const NUdf::TUnboxedValue* columns, ui64 batchLength, std::optional<ui64> filtered) final {
+        TGenericState& typedState = *static_cast<TGenericState*>(state);
+        if (typedState) {
+            return;
+        }
+
+        Y_UNUSED(batchLength);
+        const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+
+        if (datum.is_scalar()) {
+            if (datum.scalar()->is_valid) {
+                auto item = Reader_->GetScalarItem(*datum.scalar());
+                typedState = Converter_->MakeValue(item, Ctx_.HolderFactory);
+            }
+        } else {
+            const auto& array = datum.array();
+            auto len = array->length;
+            
+            const ui8* filterBitmap = nullptr;
+            if (filtered) {
+                const auto& filterDatum = TArrowBlock::From(columns[*FilterColumn_]).GetDatum();
+                const auto& filterArray = filterDatum.array();
+                MKQL_ENSURE(filterArray->GetNullCount() == 0, "Expected non-nullable bool column");
+                filterBitmap = filterArray->template GetValues<uint8_t>(1);
+            }
+
+            for (size_t i = 0; i < len; ++i) {
+                TBlockItem curr = Reader_->GetItem(*array, i);
+                if (curr && (!filterBitmap || filterBitmap[i])) {
+                    typedState = Converter_->MakeValue(curr, Ctx_.HolderFactory);
+                    break;
+                }
+            }
+        }
+    }
+
+    NUdf::TUnboxedValue FinishOne(const void *state) final {
+        auto typedState = *static_cast<const TGenericState *>(state);
+        return typedState;
+    }
+
+private:
+    const ui32 ArgColumn_;
+    const std::unique_ptr<IBlockReader> Reader_;
+    const std::unique_ptr<IBlockItemConverter> Converter_;
+};
+
+template<>
+class TSomeBlockGenericAggregator<TCombineKeysTag> : public TCombineKeysTag::TBase {
+public:
+    using TBase = TCombineKeysTag::TBase;
+
+    TSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+        : TBase(sizeof(TGenericState), filterColumn, ctx)
+        , ArgColumn_(argColumn)
+        , Type_(type)
+        , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
+        , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type))
+    {
+    }
+
+    void InitKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+        new(state) TGenericState();
+        UpdateKey(state, columns, row);
+    }
+
+    void DestroyState(void* state) noexcept final {
+        auto typedState = static_cast<TGenericState*>(state);
+        typedState->DeleteUnreferenced();
+        *typedState = TGenericState();
+    }
+
+    void UpdateKey(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+        auto typedState = static_cast<TGenericState*>(state);
+        if (*typedState) {
+            return;
+        }
+
+        const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+        PushValueToState(typedState, datum, row, *Reader_, *Converter_, Ctx_);
+    }
+
+    std::unique_ptr<IAggColumnBuilder> MakeStateBuilder(ui64 size) final {
+        return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
+    }
+
+private:
+    const ui32 ArgColumn_;
+    TType* const Type_;
+    const std::unique_ptr<IBlockReader> Reader_;
+    const std::unique_ptr<IBlockItemConverter> Converter_;
+};
+
+template<>
+class TSomeBlockGenericAggregator<TFinalizeKeysTag> : public TFinalizeKeysTag::TBase {
+public:
+    using TBase = TFinalizeKeysTag::TBase;
+
+    TSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn, TComputationContext& ctx)
+        : TBase(sizeof(TGenericState), filterColumn, ctx)
+        , ArgColumn_(argColumn)
+        , Type_(type)
+        , Reader_(MakeBlockReader(TTypeInfoHelper(), type))
+        , Converter_(MakeBlockItemConverter(TTypeInfoHelper(), type))
+    {
+    }
+
+    void LoadState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+        new(state) TGenericState();
+        UpdateState(state, columns, row);
+    }
+
+    void DestroyState(void* state) noexcept final {
+        auto typedState = static_cast<TGenericState*>(state);
+        typedState->DeleteUnreferenced();
+        *typedState = TGenericState();
+    }
+
+    void UpdateState(void* state, const NUdf::TUnboxedValue* columns, ui64 row) final {
+        auto typedState = static_cast<TGenericState*>(state);
+        if (*typedState) {
+            return;
+        }
+
+        const auto& datum = TArrowBlock::From(columns[ArgColumn_]).GetDatum();
+        PushValueToState(typedState, datum, row, *Reader_, *Converter_, Ctx_);
+    }
+
+    std::unique_ptr<IAggColumnBuilder> MakeResultBuilder(ui64 size) final {
+        return std::make_unique<TGenericColumnBuilder>(size, Type_, Ctx_);
+    }
+
+private:
+    const ui32 ArgColumn_;
+    TType* const Type_;
+    const std::unique_ptr<IBlockReader> Reader_;
+    const std::unique_ptr<IBlockItemConverter> Converter_;
+};
+
+template <typename TTag>
+class TPreparedSomeBlockGenericAggregator : public TTag::TPreparedAggregator {
+public:
+    using TBase = typename TTag::TPreparedAggregator;
+
+    TPreparedSomeBlockGenericAggregator(TType* type, std::optional<ui32> filterColumn, ui32 argColumn)
+        : TBase(sizeof(TGenericState))
+        , Type_(type)
+        , FilterColumn_(filterColumn)
+        , ArgColumn_(argColumn)
+    {}
+
+    std::unique_ptr<typename TTag::TAggregator> Make(TComputationContext& ctx) const final {
+        return std::make_unique<TSomeBlockGenericAggregator<TTag>>(Type_, FilterColumn_, ArgColumn_, ctx);
+    }
+
+private:
+    TType* const Type_;
+    const std::optional<ui32> FilterColumn_;
+    const ui32 ArgColumn_;
+};
+
+template <typename TTag>
+std::unique_ptr<typename TTag::TPreparedAggregator> PrepareSome(TTupleType* tupleType, std::optional<ui32> filterColumn, ui32 argColumn) {
+    auto blockType = AS_TYPE(TBlockType, tupleType->GetElementType(argColumn));
+    const bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
+    auto argType = blockType->GetItemType();    
+
+    return std::make_unique<TPreparedSomeBlockGenericAggregator<TTag>>(argType, filterColumn, argColumn);
+}
+
+class TBlockSomeFactory : public IBlockAggregatorFactory {
+   std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineAll>> PrepareCombineAll(
+       TTupleType* tupleType,
+       std::optional<ui32> filterColumn,
+       const std::vector<ui32>& argsColumns,
+       const TTypeEnvironment& env) const override {
+        Y_UNUSED(env);
+        return PrepareSome<TCombineAllTag>(tupleType, filterColumn, argsColumns[0]);
+    }
+
+   std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorCombineKeys>> PrepareCombineKeys(
+       TTupleType* tupleType,
+       std::optional<ui32> filterColumn,
+       const std::vector<ui32>& argsColumns,
+       const TTypeEnvironment& env) const override {
+        Y_UNUSED(env);
+        return PrepareSome<TCombineKeysTag>(tupleType, filterColumn, argsColumns[0]);
+    }
+
+    std::unique_ptr<IPreparedBlockAggregator<IBlockAggregatorFinalizeKeys>> PrepareFinalizeKeys(
+       TTupleType* tupleType,
+       const std::vector<ui32>& argsColumns,
+       const TTypeEnvironment& env) const override {
+        Y_UNUSED(env);
+        return PrepareSome<TFinalizeKeysTag>(tupleType, std::optional<ui32>(), argsColumns[0]);
+    }
+};
+
+}
+
+std::unique_ptr<IBlockAggregatorFactory> MakeBlockSomeFactory() {
+    return std::make_unique<TBlockSomeFactory>();
+}
+
+}
+}

+ 10 - 0
ydb/library/yql/minikql/comp_nodes/mkql_block_agg_some.h

@@ -0,0 +1,10 @@
+#pragma once
+#include "mkql_block_agg_factory.h"
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+std::unique_ptr<IBlockAggregatorFactory> MakeBlockSomeFactory();
+
+}
+}

+ 2 - 0
ydb/library/yql/minikql/comp_nodes/ya.make

@@ -17,6 +17,8 @@ SRCS(
     mkql_block_agg_factory.h
     mkql_block_agg_minmax.cpp
     mkql_block_agg_minmax.h
+    mkql_block_agg_some.cpp
+    mkql_block_agg_some.h
     mkql_block_agg_sum.cpp
     mkql_block_agg_sum.h
     mkql_block_builder.cpp