Browse Source

YQL-16443 sparse list for MATCH_RECOGNIZE on streams

zverevgeny 1 year ago
parent
commit
1176941d34

+ 16 - 10
ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp

@@ -1,3 +1,4 @@
+#include "mkql_match_recognize_list.h"
 #include "mkql_match_recognize_matched_vars.h"
 #include "mkql_match_recognize_measure_arg.h"
 #include <ydb/library/yql/core/sql_types/match_recognize.h>
@@ -41,6 +42,9 @@ struct TMatchRecognizeProcessorParameters {
 };
 
 class TBackTrackingMatchRecognize: public IProcessMatchRecognize {
+    using TPartitionList = TSimpleList;
+    using TRange = TPartitionList::TRange;
+    using TMatchedVars = TMatchedVars<TRange>;
 public:
     TBackTrackingMatchRecognize(
         NUdf::TUnboxedValue&& partitionKey,
@@ -57,7 +61,7 @@ public:
 
     bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override {
         Y_UNUSED(ctx);
-        Rows.push_back(std::move(row));
+        Rows.Append(std::move(row));
         return false;
     }
     NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override {
@@ -89,14 +93,14 @@ public:
     bool ProcessEndOfData(TComputationContext& ctx) override {
         //Assume, that data moved to IComputationExternalNode node, will not be modified or released
         //till the end of the current function
-        auto rowsSize = Rows.size();
-        Parameters.InputDataArg->SetValue(ctx, ctx.HolderFactory.VectorAsVectorHolder(std::move(Rows)));
+        auto rowsSize = Rows.Size();
+        Parameters.InputDataArg->SetValue(ctx, ctx.HolderFactory.Create<TListValue<TPartitionList>>(Rows));
         for (size_t i = 0; i != rowsSize; ++i) {
             Parameters.CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(static_cast<ui64>(i)));
             for (size_t v = 0; v != Parameters.Defines.size(); ++v) {
                 const auto &d = Parameters.Defines[v]->GetValue(ctx);
                 if (d && d.GetOptionalValue().Get<bool>()) {
-                    Extend(CurMatchedVars[v], i);
+                    Extend(CurMatchedVars[v], TRange{i});
                 }
             }
             //for the sake of dummy usage assume non-overlapped matches at every 5th row of any partition
@@ -113,13 +117,16 @@ private:
     const NUdf::TUnboxedValue PartitionKey;
     const TMatchRecognizeProcessorParameters& Parameters;
     const TContainerCacheOnContext& Cache;
-    TUnboxedValueVector Rows;
+    TSimpleList Rows;
     TMatchedVars CurMatchedVars;
     std::deque<TMatchedVars> Matches;
     ui64 MatchNumber;
 };
 
 class TStreamingMatchRecognize: public IProcessMatchRecognize {
+    using TPartitionList = TSparseList;
+    using TRange = TPartitionList::TRange;
+    using TMatchedVars = TMatchedVars<TRange>;
 public:
     TStreamingMatchRecognize(
             NUdf::TUnboxedValue&& partitionKey,
@@ -131,20 +138,19 @@ public:
         , Cache(cache)
         , MatchedVars(parameters.Defines.size())
         , HasMatch(false)
-        , RowCount(0)
     {
     }
 
     bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override{
         Y_UNUSED(row);
-        Parameters.CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(RowCount));
+        Parameters.CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(Rows.Size()));
+        auto r = Rows.Append(std::move(row));
         for (size_t i = 0; i != Parameters.Defines.size(); ++i) {
             const auto& d = Parameters.Defines[i]->GetValue(ctx);
             if (d && d.GetOptionalValue().Get<bool>()) {
-                Extend(MatchedVars[i], RowCount);
+                Extend(MatchedVars[i], r);
             }
         }
-        ++RowCount;
         return HasMatch;
     }
     NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override {
@@ -177,7 +183,7 @@ private:
     const TContainerCacheOnContext& Cache;
     TMatchedVars MatchedVars;
     bool HasMatch;
-    size_t RowCount;
+    TSparseList Rows;
 };
 
 class TStateForNonInterleavedPartitions

+ 335 - 0
ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_list.h

@@ -0,0 +1,335 @@
+#pragma once
+#include <ydb/library/yql/minikql/defs.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/public/udf/udf_value.h>
+#include <unordered_map>
+
+namespace NKikimr::NMiniKQL::NMatchRecognize {
+
+class TSimpleList {
+public:
+    ///Range that includes starting and ending points
+    ///Can not be empty
+    class TRange {
+    public:
+        TRange()
+            : FromIndex(-1)
+            , ToIndex(-1)
+        {
+        }
+
+        explicit TRange(ui64 index)
+                : FromIndex(index)
+                , ToIndex(index)
+        {
+        }
+
+        TRange(ui64 from, ui64 to)
+                : FromIndex(from)
+                , ToIndex(to)
+        {
+            MKQL_ENSURE(FromIndex <= ToIndex, "Internal logic error");
+        }
+
+        bool IsValid() const {
+            return true;
+        }
+
+        size_t From() const {
+            MKQL_ENSURE(IsValid(), "Internal logic error");
+            return FromIndex;
+        }
+
+        size_t To() const {
+            MKQL_ENSURE(IsValid(), "Internal logic error");
+            return ToIndex;
+        }
+
+        size_t Size() const {
+            MKQL_ENSURE(IsValid(), "Internal logic error");
+            return ToIndex - FromIndex + 1;
+        }
+
+        void Extend() {
+            MKQL_ENSURE(IsValid(), "Internal logic error");
+            ++ToIndex;
+        }
+
+    private:
+        ui64 FromIndex;
+        ui64 ToIndex;
+    };
+
+    TRange Append(NUdf::TUnboxedValue&& value) {
+        TRange result(Rows.size());
+        Rows.push_back(std::move(value));
+        return result;
+    }
+
+    size_t Size() const {
+        return Rows.size();
+    }
+
+    bool Empty() const {
+        return Size() == 0;
+    }
+
+    NUdf::TUnboxedValue Get(size_t i) const {
+        return Rows.at(i);
+    }
+private:
+    TUnboxedValueVector Rows;
+};
+
+///Stores only locked items
+///Locks are holds by TRange
+///When all locks on an item are released, the item is removed from the list
+class TSparseList {
+    struct TItem {
+        NUdf::TUnboxedValue Value;
+        size_t LockCount = 0;
+    };
+
+    class TContainer: public TSimpleRefCount<TContainer> {
+    public:
+        using TPtr = TIntrusivePtr<TContainer>;
+
+        void Add(size_t index, NUdf::TUnboxedValue&& value) {
+            const auto& [iter, newOne] = Storage.emplace(index, TItem{std::move(value), 1});
+            MKQL_ENSURE(newOne, "Internal logic error");
+        }
+
+        size_t Size() const {
+            return Storage.size();
+        }
+
+        NUdf::TUnboxedValue Get(size_t i) const {
+            if (const auto it = Storage.find(i); it != Storage.cend()) {
+                return it->second.Value;
+            } else {
+                return NUdf::TUnboxedValue{};
+            }
+        }
+
+        void LockRange(size_t from, size_t to) {
+            for (auto i = from; i <= to; ++i) {
+                const auto it = Storage.find(i);
+                MKQL_ENSURE(it != Storage.cend(), "Internal logic error");
+                ++it->second.LockCount;
+            }
+        }
+
+        void UnlockRange(size_t from, size_t to) {
+            for (auto i = from; i <= to; ++i) {
+                const auto it = Storage.find(i);
+                MKQL_ENSURE(it != Storage.cend(), "Internal logic error");
+                auto lockCount = --it->second.LockCount;
+                if (0 == lockCount) {
+                    Storage.erase(it);
+                }
+            }
+        }
+
+    private:
+        //TODO consider to replace hash table with contiguous chunks
+        using TAllocator = TMKQLAllocator<std::pair<const size_t, TItem>, EMemorySubPool::Temporary>;
+        std::unordered_map<
+            size_t,
+            TItem,
+            std::hash<size_t>,
+            std::equal_to<size_t>,
+            TAllocator> Storage;
+    };
+    using TContainerPtr = TContainer::TPtr;
+
+public:
+    ///Range that includes starting and ending points
+    ///Holds a lock on items in the list
+    ///Can not be empty, but can be in invalid state, with no container set
+    class TRange{
+        friend class TSparseList;
+    public:
+        TRange()
+            : Container()
+            , FromIndex(-1)
+            , ToIndex(-1)
+        {
+        }
+
+        TRange(const TRange& other)
+            : Container(other.Container)
+            , FromIndex(other.FromIndex)
+            , ToIndex(other.ToIndex)
+        {
+            LockRange(FromIndex, ToIndex);
+        }
+
+        TRange(TRange&& other)
+            : Container(other.Container)
+            , FromIndex(other.FromIndex)
+            , ToIndex(other.ToIndex)
+        {
+            other.Reset();
+        }
+
+        ~TRange() {
+            Release();
+        }
+
+        TRange& operator=(const TRange& other) {
+            if (&other == this) {
+                return *this;
+            }
+            //TODO(zverevgeny): optimize for overlapped source and destination
+            Release();
+            Container = other.Container;
+            FromIndex = other.FromIndex;
+            ToIndex = other.ToIndex;
+            LockRange(FromIndex, ToIndex);
+            return *this;
+        }
+
+        TRange& operator=(TRange&& other) {
+            if (&other == this) {
+                return *this;
+            }
+            Release();
+            Container = other.Container;
+            FromIndex = other.FromIndex;
+            ToIndex = other.ToIndex;
+            other.Reset();
+            return *this;
+        }
+
+        bool IsValid() const {
+            return static_cast<bool>(Container);
+        }
+
+        size_t From() const {
+            MKQL_ENSURE(IsValid(), "Internal logic error");
+            return FromIndex;
+        }
+
+        size_t To() const {
+            MKQL_ENSURE(IsValid(), "Internal logic error");
+            return ToIndex;
+        }
+
+        size_t Size() const {
+            MKQL_ENSURE(IsValid(), "Internal logic error");
+            return ToIndex - FromIndex + 1;
+        }
+
+        void Extend() {
+            MKQL_ENSURE(IsValid(), "Internal logic error");
+            ++ToIndex;
+            LockRange(ToIndex, ToIndex);
+        }
+
+        void Release() {
+            UnlockRange(FromIndex, ToIndex);
+            Container.Reset();
+            FromIndex = -1;
+            ToIndex = -1;
+        }
+
+    private:
+        TRange(TContainerPtr container, size_t index)
+            : Container(container)
+            , FromIndex(index)
+            , ToIndex(index)
+        {}
+
+        void LockRange(size_t from, size_t to) {
+            if (Container) {
+                Container->LockRange(from, to);
+            }
+        }
+
+        void UnlockRange(size_t from, size_t to) {
+            if (Container) {
+                Container->UnlockRange(from, to);
+            }
+        }
+
+        void Reset() {
+            Container.Reset();
+            FromIndex = -1;
+            ToIndex = -1;
+        }
+
+        TContainerPtr Container;
+        size_t FromIndex;
+        size_t ToIndex;
+    };
+
+public:
+    TRange Append(NUdf::TUnboxedValue&& value) {
+        const auto index = ListSize++;
+        Container->Add(index, std::move(value));
+        return TRange(Container, index);
+    }
+
+    NUdf::TUnboxedValue Get(size_t i) const {
+        return Container->Get(i);
+    }
+
+    ///Return total size of sparse list including absent values
+    size_t Size() const {
+        return ListSize;
+    }
+
+    ///Return number of present values in sparse list
+    size_t Filled() const {
+        return Container->Size();
+    }
+
+    bool Empty() const {
+        return Size() == 0;
+    }
+
+private:
+    TContainerPtr Container = MakeIntrusive<TContainer>();
+    size_t ListSize = 0; //impl: max index ever stored + 1
+};
+
+template<typename L>
+class TListValue: public TComputationValue<TListValue<L>> {
+public:
+    TListValue(TMemoryUsageInfo* memUsage, const L& list)
+        : TComputationValue<TListValue<L>>(memUsage)
+        , List(list)
+    {
+    }
+
+    //TODO https://st.yandex-team.ru/YQL-16508
+    //NUdf::TUnboxedValue GetListIterator() const override;
+
+    bool HasFastListLength() const override {
+        return !List.Empty();
+    }
+
+    ui64 GetListLength() const override {
+        return List.Size();
+    }
+
+    bool HasListItems() const override {
+        return !List.Empty();
+    }
+
+    NUdf::IBoxedValuePtr ToIndexDictImpl(const NUdf::IValueBuilder& builder) const override {
+        Y_UNUSED(builder);
+        return const_cast<TListValue*>(this);
+    }
+
+    NUdf::TUnboxedValue Lookup(const NUdf::TUnboxedValuePod& key) const override {
+        return List.Get(key.Get<ui64>());
+    }
+
+private:
+    L List;
+};
+
+}//namespace NKikimr::NMiniKQL::NMatchRecognize
+

+ 37 - 77
ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_matched_vars.h

@@ -1,62 +1,38 @@
 #pragma once
+#include "mkql_match_recognize_list.h"
 #include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
 #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
-namespace NKikimr::NMiniKQL::NMatchRecognize {
-
-///Range that includes starting and ending points
-///Can not be empty
-class TMatchedRange {
-public:
-    TMatchedRange(ui64 index)
-        : FromIndex(index)
-        , ToIndex(index)
-    {}
-
-    TMatchedRange(ui64 from, ui64 to)
-        : FromIndex(from)
-        , ToIndex(to)
-    {}
-
-    size_t From() const {
-        return FromIndex;
-    }
-
-    size_t To() const {
-        return ToIndex;
-    }
 
-    void Extend() {
-        ++ToIndex;
-    }
-
-private:
-    ui64 FromIndex;
-    ui64 ToIndex;
-};
+namespace NKikimr::NMiniKQL::NMatchRecognize {
 
-using TMatchedVar = std::vector<TMatchedRange>;
 
-inline void Extend(TMatchedVar& var, size_t index) {
+template<class R>
+using TMatchedVar = std::vector<R>;
+template<class R>
+void Extend(TMatchedVar<R>& var, const R& r) {
     if (var.empty()) {
-        var.emplace_back(index);
+        var.emplace_back(r);
     } else {
-        MKQL_ENSURE(index > var.back().To(), "Internal logic error");
-        if (var.back().To() + 1 == index) {
+        MKQL_ENSURE(r.From() > var.back().To(), "Internal logic error");
+        if (var.back().To() + 1 == r.From()) {
             var.back().Extend();
         } else {
-            var.emplace_back(index);
+            var.emplace_back(r);
         }
     }
 }
 
-using TMatchedVars = std::vector<TMatchedVar>;
+template<class R>
+using TMatchedVars = std::vector<TMatchedVar<R>>;
 
-inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TMatchedRange& range) {
+template<class R>
+NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const R& range) {
     std::array<NUdf::TUnboxedValue, 2> array = {NUdf::TUnboxedValuePod{range.From()}, NUdf::TUnboxedValuePod{range.To()}};
     return holderFactory.RangeAsArray(cbegin(array), cend(array));
 }
 
-inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TMatchedVar& var) {
+template<class R>
+NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TMatchedVar<R>& var) {
     TUnboxedValueVector data;
     data.reserve(var.size());
     for (const auto& r: var) {
@@ -65,7 +41,8 @@ inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TM
     return holderFactory.VectorAsVectorHolder(std::move(data));
 }
 
-inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TMatchedVars& vars) {
+template<class R>
+inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TMatchedVars<R>& vars) {
     NUdf::TUnboxedValue* ptr;
     auto result = holderFactory.CreateDirectArrayHolder(vars.size(), ptr);
     for (const auto& v: vars) {
@@ -76,35 +53,14 @@ inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TM
 
 ///Optimized reference based implementation to be used as an argument
 ///for lambdas which produce strict result(do not require lazy access to its arguments)
-class TMatchedVarsValue : public TComputationValue<TMatchedVarsValue> {
-    class TRangeValue: public TComputationValue<TRangeValue> {
-    public:
-        TRangeValue(TMemoryUsageInfo* memInfo, const TMatchedRange& r)
-                : TComputationValue<TRangeValue>(memInfo)
-                , Range(r)
-        {
-        }
-
-        NUdf::TUnboxedValue* GetElements() const override {
-            return nullptr;
-        }
-        NUdf::TUnboxedValue GetElement(ui32 index) const override {
-            MKQL_ENSURE(index < 2, "Index out of range");
-            switch(index) {
-                case 0: return NUdf::TUnboxedValuePod(Range.From());
-                case 1: return NUdf::TUnboxedValuePod(Range.To());
-            }
-            return NUdf::TUnboxedValuePod();
-        }
-    private:
-        const TMatchedRange& Range;
-    };
-
+template<class R>
+class TMatchedVarsValue : public TComputationValue<TMatchedVarsValue<R>> {
     class TRangeList: public TComputationValue<TRangeList> {
         class TIterator : public TComputationValue<TIterator> {
         public:
-            TIterator(TMemoryUsageInfo *memInfo, const std::vector<TMatchedRange>& ranges)
+            TIterator(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, const std::vector<R>& ranges)
                     : TComputationValue<TIterator>(memInfo)
+                    , HolderFactory(holderFactory)
                     , Ranges(ranges)
                     , Index(0)
             {}
@@ -114,17 +70,18 @@ class TMatchedVarsValue : public TComputationValue<TMatchedVarsValue> {
                 if (Ranges.size() == Index){
                     return false;
                 }
-                value = NUdf::TUnboxedValuePod(new TRangeValue(GetMemInfo(), Ranges[Index++]));
+                value = ToValue(HolderFactory, Ranges[Index++]);
                 return true;
             }
-
-            const std::vector<TMatchedRange>& Ranges;
+            const THolderFactory& HolderFactory;
+            const std::vector<R>& Ranges;
             size_t Index;
         };
 
     public:
-        TRangeList(TMemoryUsageInfo* memInfo, const TMatchedVar& v)
+        TRangeList(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, const TMatchedVar<R>& v)
             : TComputationValue<TRangeList>(memInfo)
+            , HolderFactory(holderFactory)
             , Var(v)
         {
         }
@@ -142,23 +99,26 @@ class TMatchedVarsValue : public TComputationValue<TMatchedVarsValue> {
         }
 
         NUdf::TUnboxedValue GetListIterator() const override {
-            return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), Var));
+            return HolderFactory.Create<TIterator>(HolderFactory, Var);
         }
     private:
-        const TMatchedVar& Var;
+        const THolderFactory& HolderFactory;
+        const TMatchedVar<R>& Var;
     };
 public:
-    TMatchedVarsValue(TMemoryUsageInfo* memInfo, const std::vector<TMatchedVar>& vars)
-            : TComputationValue<TMatchedVarsValue>(memInfo)
-            , Vars(vars)
+    TMatchedVarsValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, const std::vector<TMatchedVar<R>>& vars)
+        : TComputationValue<TMatchedVarsValue>(memInfo)
+        , HolderFactory(holderFactory)
+        , Vars(vars)
     {
     }
 
     NUdf::TUnboxedValue GetElement(ui32 index) const override {
-        return NUdf::TUnboxedValuePod(new TRangeList(GetMemInfo(), Vars[index]));
+        return HolderFactory.Create<TRangeList>(HolderFactory, Vars[index]);
     }
 private:
-    const std::vector<TMatchedVar>& Vars;
+    const THolderFactory& HolderFactory;
+    const std::vector<TMatchedVar<R>>& Vars;
 };
 
 }//namespace NKikimr::NMiniKQL::NMatchRecognize

+ 1 - 0
ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt

@@ -61,6 +61,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp

+ 1 - 0
ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt

@@ -64,6 +64,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp

+ 1 - 0
ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt

@@ -65,6 +65,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp

+ 1 - 0
ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt

@@ -54,6 +54,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp

+ 136 - 0
ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp

@@ -0,0 +1,136 @@
+#include "../mkql_match_recognize_list.h"
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/computation/mkql_value_builder.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NMiniKQL::NMatchRecognize {
+
+template<class L>
+void CommonForSimpleAndSparse(const THolderFactory& holderFactory) {
+    using TList = L;
+    using TRange = typename L::TRange;
+    TList list;
+    TRange r;
+    for (ui64 i = 0; i != 10; ++i) {
+        r = list.Append(NUdf::TUnboxedValuePod{i});
+        UNIT_ASSERT_VALUES_EQUAL(1, r.Size());
+        NUdf::TUnboxedValue v = list.Get(i);
+        UNIT_ASSERT_VALUES_EQUAL(i, v.Get<ui64>());
+    }
+    UNIT_ASSERT_VALUES_EQUAL(10, list.Size());
+    {
+        auto r2 = list.Append(NUdf::TUnboxedValuePod{10});
+        Y_UNUSED(r2);
+        r.Extend();
+    }
+    UNIT_ASSERT_VALUES_EQUAL(11, list.Size());
+    {
+        const NUdf::TUnboxedValue& v = list.Get(10);
+        UNIT_ASSERT_VALUES_EQUAL(10, v.Get<ui64>());
+    }
+    //Test access via value
+    const NUdf::TUnboxedValue& listValue = holderFactory.Create<TListValue<L>>(list);
+    UNIT_ASSERT(listValue);
+    UNIT_ASSERT(listValue.HasValue());
+    UNIT_ASSERT(listValue.HasListItems());
+    UNIT_ASSERT(listValue.HasFastListLength());
+    UNIT_ASSERT_VALUES_EQUAL(11, listValue.GetListLength());
+    TDefaultValueBuilder valueBuilder(holderFactory);
+    auto listValueAsDict = NUdf::TBoxedValueAccessor::ToIndexDictImpl(*listValue.AsBoxed(), TDefaultValueBuilder(holderFactory));
+    {
+        const NUdf::TUnboxedValue &v = NUdf::TBoxedValueAccessor::Lookup(*listValueAsDict, NUdf::TUnboxedValuePod{9});
+        UNIT_ASSERT_VALUES_EQUAL(9, v.Get<ui64>());
+    }
+    {
+        const NUdf::TUnboxedValue &v = NUdf::TBoxedValueAccessor::Lookup(*listValueAsDict, NUdf::TUnboxedValuePod{10});
+        UNIT_ASSERT_VALUES_EQUAL(10, v.Get<ui64>());
+    }
+}
+
+Y_UNIT_TEST_SUITE(MatchRecognizeList) {
+    TMemoryUsageInfo memUsage("MatchRecognizeListTest");
+    Y_UNIT_TEST(SimpleListCommon) {
+        TScopedAlloc alloc(__LOCATION__);
+        THolderFactory holderFactory(alloc.Ref(), memUsage);
+        CommonForSimpleAndSparse<TSimpleList>(holderFactory);
+    }
+    Y_UNIT_TEST(SparseListCommon) {
+        TScopedAlloc alloc(__LOCATION__);
+        THolderFactory holderFactory(alloc.Ref(), memUsage);
+        CommonForSimpleAndSparse<TSparseList>(holderFactory);
+    }
+    Y_UNIT_TEST(SimpleListSpecific) {
+        TScopedAlloc alloc(__LOCATION__);
+        THolderFactory holderFactory(alloc.Ref(), memUsage);
+        TSimpleList list;
+        for (ui64 i = 0; i != 10; ++i) {
+            list.Append(NUdf::TUnboxedValuePod{i});
+        }
+        //All added items are accessible regardless of held ranges(locks)
+        for (ui64 i = 0; i != 10; ++i) {
+            NUdf::TUnboxedValue v = list.Get(i);
+            UNIT_ASSERT_VALUES_EQUAL(i, v.Get<ui64>());
+        }
+    }
+    Y_UNIT_TEST(SparseListSpecific) {
+        TScopedAlloc alloc(__LOCATION__);
+        THolderFactory holderFactory(alloc.Ref(), memUsage);
+        TSparseList list;
+        //Add 10 items
+        for (ui64 i = 0; i != 10; ++i) {
+            list.Append(NUdf::TUnboxedValuePod{i});
+        }
+        //Check no one is stored
+        UNIT_ASSERT_VALUES_EQUAL(0, list.Filled());
+        for (ui64 i = 0; i != 10; ++i) {
+            NUdf::TUnboxedValue v = list.Get(i);
+            UNIT_ASSERT(!v);
+        }
+        //Add another 10 items and lock the last item added at every iteration
+        TSparseList::TRange r;
+        for (ui64 i = 10; i != 20; ++i) {
+            r = list.Append(NUdf::TUnboxedValuePod{i});
+        }
+        //Check that only the last is stored
+        UNIT_ASSERT_VALUES_EQUAL(1, list.Filled());
+        for (ui64 i = 0; i != 19; ++i) {
+            NUdf::TUnboxedValue v = list.Get(i);
+            UNIT_ASSERT(!v);
+        }
+        {
+            NUdf::TUnboxedValue v = list.Get(19);
+            UNIT_ASSERT_VALUES_EQUAL(19, v.Get<ui64>());
+        }
+
+        //Test copy and assignment for locks
+        TSparseList::TRange copiedRange{r};
+        TSparseList::TRange assignedRange{r};
+        assignedRange = copiedRange;
+        UNIT_ASSERT_VALUES_EQUAL(1, list.Filled());
+        {
+            NUdf::TUnboxedValue v = list.Get(19);
+            UNIT_ASSERT_VALUES_EQUAL(19, v.Get<ui64>());
+        }
+        r.Release();
+        UNIT_ASSERT_VALUES_EQUAL(1, list.Filled());
+        {
+            NUdf::TUnboxedValue v = list.Get(19);
+            UNIT_ASSERT_VALUES_EQUAL(19, v.Get<ui64>());
+        }
+        UNIT_ASSERT_VALUES_EQUAL(1, list.Filled());
+        copiedRange.Release();
+        UNIT_ASSERT_VALUES_EQUAL(1, list.Filled());
+        {
+            NUdf::TUnboxedValue v = list.Get(19);
+            UNIT_ASSERT_VALUES_EQUAL(19, v.Get<ui64>());
+        }
+        assignedRange.Release();
+        UNIT_ASSERT_VALUES_EQUAL(0, list.Filled());
+        {
+            NUdf::TUnboxedValue v = list.Get(19);
+            UNIT_ASSERT(!v);
+        }
+    }
+}
+
+}//namespace NKikimr::NMiniKQL::TMatchRecognize

+ 44 - 21
ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp

@@ -1,60 +1,71 @@
 #include "../mkql_match_recognize_matched_vars.h"
+#include "../mkql_match_recognize_list.h"
 #include <library/cpp/testing/unittest/registar.h>
 
 namespace NKikimr::NMiniKQL::NMatchRecognize {
 
 Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarExtend) {
+    using TRange = TSimpleList::TRange;
+    using TMatchedVar = TMatchedVar<TRange>;
+    using TMatchedVars = TMatchedVars<TRange>;
+
     Y_UNIT_TEST(MatchedRangeSingleton) {
-        TMatchedRange r{10};
+        TRange r{10};
         UNIT_ASSERT_VALUES_EQUAL(10, r.From());
         UNIT_ASSERT_VALUES_EQUAL(10, r.To());
         r.Extend();
         UNIT_ASSERT_VALUES_EQUAL(10, r.From());
         UNIT_ASSERT_VALUES_EQUAL(11, r.To());
     }
+
     Y_UNIT_TEST(MatchedRange) {
-        TMatchedRange r{10, 20};
+        TRange r{10, 20};
         UNIT_ASSERT_VALUES_EQUAL(10, r.From());
         UNIT_ASSERT_VALUES_EQUAL(20, r.To());
         r.Extend();
         UNIT_ASSERT_VALUES_EQUAL(10, r.From());
         UNIT_ASSERT_VALUES_EQUAL(21, r.To());
     }
+
     Y_UNIT_TEST(MatchedVarEmpty) {
         TMatchedVar v{};
-        Extend(v, 10);
+        Extend(v, TRange{10});
         UNIT_ASSERT_VALUES_EQUAL(1, v.size());
         UNIT_ASSERT_VALUES_EQUAL(10, v[0].From());
         UNIT_ASSERT_VALUES_EQUAL(10, v[0].To());
     }
+
     Y_UNIT_TEST(MatchedVarExtendSingletonContiguous) {
-        TMatchedVar v{TMatchedRange{10}};
-        Extend(v, 11);
+        TMatchedVar v{TRange{10}};
+        Extend(v, TRange{11});
         UNIT_ASSERT_VALUES_EQUAL(1, v.size());
         UNIT_ASSERT_VALUES_EQUAL(10, v[0].From());
         UNIT_ASSERT_VALUES_EQUAL(11, v[0].To());
     }
+
     Y_UNIT_TEST(MatchedVarExtendSingletonWithGap) {
-        TMatchedVar v{TMatchedRange{10}};
-        Extend(v, 20);
+        TMatchedVar v{TRange{10}};
+        Extend(v, TRange{20});
         UNIT_ASSERT_VALUES_EQUAL(2, v.size());
         UNIT_ASSERT_VALUES_EQUAL(10, v[0].From());
         UNIT_ASSERT_VALUES_EQUAL(10, v[0].To());
         UNIT_ASSERT_VALUES_EQUAL(20, v[1].From());
         UNIT_ASSERT_VALUES_EQUAL(20, v[1].To());
     }
+
     Y_UNIT_TEST(MatchedVarExtendContiguous) {
-        TMatchedVar v{TMatchedRange{10, 20}, TMatchedRange{30, 40}};
-        Extend(v, 41);
+        TMatchedVar v{TRange{10, 20}, TRange{30, 40}};
+        Extend(v, TRange{41});
         UNIT_ASSERT_VALUES_EQUAL(2, v.size());
         UNIT_ASSERT_VALUES_EQUAL(10, v[0].From());
         UNIT_ASSERT_VALUES_EQUAL(20, v[0].To());
         UNIT_ASSERT_VALUES_EQUAL(30, v[1].From());
         UNIT_ASSERT_VALUES_EQUAL(41, v[1].To());
     }
+
     Y_UNIT_TEST(MatchedVarExtendWithGap) {
-        TMatchedVar v{TMatchedRange{10, 20}, TMatchedRange{30, 40}};
-        Extend(v, 50);
+        TMatchedVar v{TRange{10, 20}, TRange{30, 40}};
+        Extend(v, TRange{50});
         UNIT_ASSERT_VALUES_EQUAL(3, v.size());
         UNIT_ASSERT_VALUES_EQUAL(10, v[0].From());
         UNIT_ASSERT_VALUES_EQUAL(20, v[0].To());
@@ -66,12 +77,16 @@ Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarExtend) {
 }
 
 Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValue) {
+    using TRange = TSimpleList::TRange;
+    using TMatchedVar = TMatchedVar<TRange>;
+    using TMatchedVars = TMatchedVars<TRange>;
     TMemoryUsageInfo memUsage("MatchedVars");
+
     Y_UNIT_TEST(MatchedRange) {
         TScopedAlloc alloc(__LOCATION__);
         THolderFactory holderFactory(alloc.Ref(), memUsage);
         {
-            TMatchedRange r{10, 20};
+            TRange r{10, 20};
             const auto value = ToValue(holderFactory, r);
             const auto elems = value.GetElements();
             UNIT_ASSERT(elems);
@@ -84,7 +99,7 @@ Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValue) {
         TScopedAlloc alloc(__LOCATION__);
         THolderFactory holderFactory(alloc.Ref(), memUsage);
         {
-            const auto value = ToValue(holderFactory, std::vector<TMatchedRange>{});
+            const auto value = ToValue(holderFactory, TMatchedVar{});
             UNIT_ASSERT(value);
             UNIT_ASSERT(!value.HasListItems());
             UNIT_ASSERT(value.HasFastListLength());
@@ -101,9 +116,9 @@ Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValue) {
         TScopedAlloc alloc(__LOCATION__);
         THolderFactory holderFactory(alloc.Ref(), memUsage);
         {
-            const auto value = ToValue(holderFactory, {
-                TMatchedRange{10, 30},
-                TMatchedRange{40, 45},
+            const auto value = ToValue(holderFactory, TMatchedVar{
+                TRange{10, 30},
+                TRange{40, 45},
 
             });
             UNIT_ASSERT(value);
@@ -132,10 +147,10 @@ Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValue) {
         TScopedAlloc alloc(__LOCATION__);
         THolderFactory holderFactory(alloc.Ref(), memUsage);
         {
-            const auto value = ToValue(holderFactory, {
+            const auto value = ToValue(holderFactory, TMatchedVars {
                     {},
-                    {TMatchedRange{20, 25}},
-                    {TMatchedRange{10, 30}, TMatchedRange{40, 45}},
+                    {TRange{20, 25}},
+                    {TRange{10, 30}, TRange{40, 45}},
             });
             UNIT_ASSERT(value);
             const auto varElems = value.GetElements();
@@ -166,22 +181,30 @@ Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValue) {
 }
 
 Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValueByRef) {
+    using TRange = TSimpleList::TRange;
+    using TMatchedVar = TMatchedVar<TRange>;
+    using TMatchedVars = TMatchedVars<TRange>;
     TMemoryUsageInfo memUsage("MatchedVarsByRef");
+
     Y_UNIT_TEST(MatchedVarsEmpty) {
         TScopedAlloc alloc(__LOCATION__);
+        THolderFactory holderFactory(alloc.Ref(), memUsage);
         {
             TMatchedVars vars{};
-            NUdf::TUnboxedValue value(NUdf::TUnboxedValuePod(new TMatchedVarsValue(&memUsage, vars)));
+            NUdf::TUnboxedValue value = holderFactory.Create<TMatchedVarsValue<TRange>>(holderFactory, vars);
             UNIT_ASSERT(value.HasValue());
         }
     }
+
     Y_UNIT_TEST(MatchedVars) {
         TScopedAlloc alloc(__LOCATION__);
+        THolderFactory holderFactory(alloc.Ref(), memUsage);
         {
             TMatchedVar A{{1, 4}, {7, 9}, {100, 200}};
             TMatchedVar B{{1, 6}};
             TMatchedVars vars{A, B};
-            NUdf::TUnboxedValue value(NUdf::TUnboxedValuePod(new TMatchedVarsValue(&memUsage, vars)));
+            NUdf::TUnboxedValue value = holderFactory.Create<TMatchedVarsValue<TRange>>(holderFactory, vars);
+            Y_UNUSED(value);
             UNIT_ASSERT(value.HasValue());
             auto a = value.GetElement(0);
             UNIT_ASSERT(a.HasValue());

+ 1 - 0
ydb/library/yql/minikql/comp_nodes/ut/ya.make

@@ -41,6 +41,7 @@ SRCS(
     mkql_grace_join_ut.cpp
     mkql_map_join_ut.cpp
     mkql_match_recognize_matched_vars_ut.cpp
+    mkql_match_recognize_list_ut.cpp
     mkql_safe_circular_buffer_ut.cpp
     mkql_sort_ut.cpp
     mkql_switch_ut.cpp