Browse Source

YQ-675: fix hopping

ref:6a78c858cc2d2b70cc9e47ceee374941b1a44bee
d-mokhnatkin 3 years ago
parent
commit
8850c0828a

+ 51 - 49
ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp

@@ -70,7 +70,7 @@ public:
 
         struct TKeyState {
             std::vector<TBucket, TMKQLAllocator<TBucket>> Buckets; // circular buffer
-            ui64 HopIndex;
+            ui64 HopIndex; // Start index of current window
 
             TKeyState(ui64 bucketsCount, ui64 hopIndex)
                 : Buckets(bucketsCount)
@@ -163,19 +163,13 @@ public:
                 return NUdf::EFetchStatus::Finish;
             }
 
-            i64 thrownEvents = 0;
-            i64 newHops = 0;
-            i64 emptyTimeCt = 0;
+            i64 thrownEventsStat = 0;
+            i64 newHopsStat = 0;
+            i64 emptyTimeCtStat = 0;
             Y_DEFER {
-                if (thrownEvents) {
-                    MKQL_ADD_STAT(Ctx.Stats, Hop_ThrownEventsCount, thrownEvents);
-                }
-                if (newHops) {
-                    MKQL_ADD_STAT(Ctx.Stats, Hop_NewHopsCount, newHops);
-                }
-                if (emptyTimeCt) {
-                    MKQL_ADD_STAT(Ctx.Stats, Hop_EmptyTimeCount, emptyTimeCt);
-                }
+                MKQL_ADD_STAT(Ctx.Stats, Hop_ThrownEventsCount, thrownEventsStat);
+                MKQL_ADD_STAT(Ctx.Stats, Hop_NewHopsCount, newHopsStat);
+                MKQL_ADD_STAT(Ctx.Stats, Hop_EmptyTimeCount, emptyTimeCtStat);
             };
 
             for (NUdf::TUnboxedValue item;;) {
@@ -188,7 +182,7 @@ public:
                 const auto status = Stream.Fetch(item);
                 if (status != NUdf::EFetchStatus::Ok) {
                     if (status == NUdf::EFetchStatus::Finish) {
-                        CloseOldBuckets(Max<ui64>(), newHops);
+                        CloseOldBuckets(Max<ui64>(), newHopsStat);
                         Finished = true;
                         if (!Ready.empty()) {
                             result = std::move(Ready.front());
@@ -203,34 +197,38 @@ public:
                 auto key = Self->KeyExtract->GetValue(Ctx);
                 const auto& time = Self->OutTime->GetValue(Ctx);
                 if (!time) {
-                    ++emptyTimeCt;
+                    ++emptyTimeCtStat;
                     continue;
                 }
 
                 const auto ts = time.Get<ui64>();
                 const auto hopIndex = ts / HopTime;
-                auto& keyState = GetOrCreateKeyState(key, hopIndex + 1);
-
-                CloseOldBucketsForKey(key, keyState, hopIndex, newHops);
-
-                if (hopIndex + DelayHopCount + 1 >= keyState.HopIndex) {
-                    auto& bucket = keyState.Buckets[hopIndex % keyState.Buckets.size()];
-                    if (!bucket.HasValue) {
-                        bucket.Value = Self->OutInit->GetValue(Ctx);
-                        bucket.HasValue = true;
-                    } else {
-                        Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key));
-                        Self->State->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
-                        bucket.Value = Self->OutUpdate->GetValue(Ctx);
-                    }
+                auto& keyState = GetOrCreateKeyState(key, hopIndex);
+
+                if (hopIndex < keyState.HopIndex) {
+                    ++thrownEventsStat;
+                    continue;
+                }
+
+                // Overflow is not possible, because of hopIndex is a product of a division
+                auto closeBeforeIndex = Max<i64>(hopIndex + 1 - DelayHopCount - IntervalHopCount, 0);
+
+                CloseOldBucketsForKey(key, keyState, closeBeforeIndex, newHopsStat);
+
+                auto& bucket = keyState.Buckets[hopIndex % keyState.Buckets.size()];
+                if (!bucket.HasValue) {
+                    bucket.Value = Self->OutInit->GetValue(Ctx);
+                    bucket.HasValue = true;
                 } else {
-                    ++thrownEvents;
+                    Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key));
+                    Self->State->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
+                    bucket.Value = Self->OutUpdate->GetValue(Ctx);
                 }
 
                 if (WatermarkTracker) {
                     const auto newWatermark = WatermarkTracker->HandleNextEventTime(ts);
                     if (newWatermark) {
-                        CloseOldBuckets(*newWatermark, newHops);
+                        CloseOldBuckets(*newWatermark, newHopsStat);
                     }
                 }
                 MKQL_SET_STAT(Ctx.Stats, Hop_KeysCount, StatesMap.size());
@@ -241,7 +239,9 @@ public:
             const auto iter = StatesMap.try_emplace(
                 key,
                 IntervalHopCount + DelayHopCount,
-                hopIndex
+                Max<i64>(hopIndex + 1 - IntervalHopCount, 0)
+                // For first element we shouldn't forget windows in the past
+                // Overflow is not possible, because of hopIndex is a product of a division
             );
             if (iter.second) {
                 key.Ref();
@@ -253,20 +253,23 @@ public:
         bool CloseOldBucketsForKey(
             const NUdf::TUnboxedValue& key,
             TKeyState& keyState,
-            const ui64 hopIndex,
-            i64& newHops)
+            const ui64 closeBeforeIndex, // Excluded bound
+            i64& newHopsStat)
         {
             auto& bucketsForKey = keyState.Buckets;
-            const auto endIndex = Min(hopIndex, keyState.HopIndex + bucketsForKey.size()); // TODO: fix possible overflow
 
-            for (auto& hopIndexForKey = keyState.HopIndex; hopIndexForKey <= endIndex; hopIndexForKey++) {
-                auto firstBucketIndex = hopIndexForKey % bucketsForKey.size();
+            auto stateEmpty = true;
+            for (auto i = 0; i < bucketsForKey.size(); i++) {
+                const auto curHopIndex = keyState.HopIndex;
+                if (curHopIndex >= closeBeforeIndex) {
+                    stateEmpty = false;
+                    break;
+                }
 
-                auto bucketIndex = firstBucketIndex;
                 TMaybe<NUdf::TUnboxedValue> aggregated;
-
-                for (ui64 i = 0; i < IntervalHopCount; ++i) {
-                    const auto& bucket = bucketsForKey[bucketIndex];
+                for (ui64 j = 0; j < IntervalHopCount; j++) {
+                    const auto curBucketIndex = (curHopIndex + j) % bucketsForKey.size();
+                    const auto& bucket = bucketsForKey[curBucketIndex];
                     if (bucket.HasValue) {
                         if (!aggregated) { // todo: clone
                             Self->InSave->SetValue(Ctx, NUdf::TUnboxedValue(bucket.Value));
@@ -278,33 +281,32 @@ public:
                             aggregated = Self->OutMerge->GetValue(Ctx);
                         }
                     }
-                    if (++bucketIndex == bucketsForKey.size()) {
-                        bucketIndex = 0;
-                    }
                 }
 
-                auto& clearBucket = bucketsForKey[firstBucketIndex];
+                auto& clearBucket = bucketsForKey[curHopIndex % bucketsForKey.size()];
                 clearBucket.Value = NUdf::TUnboxedValue();
                 clearBucket.HasValue = false;
 
                 if (aggregated) {
                     Self->Key->SetValue(Ctx, NUdf::TUnboxedValue(key));
                     Self->State->SetValue(Ctx, NUdf::TUnboxedValue(*aggregated));
-                    Self->Time->SetValue(Ctx, NUdf::TUnboxedValuePod((hopIndexForKey - DelayHopCount) * HopTime));
+                    // Outer code requires window end time (not start as could be expected)
+                    Self->Time->SetValue(Ctx, NUdf::TUnboxedValuePod((curHopIndex + IntervalHopCount) * HopTime));
                     Ready.emplace_back(Self->OutFinish->GetValue(Ctx));
                 }
 
-                ++newHops;
+                keyState.HopIndex++;
+                newHopsStat++;
             }
 
-            return endIndex < hopIndex;
+            return stateEmpty;
         }
 
         void CloseOldBuckets(ui64 watermarkTs, i64& newHops) {
             const auto watermarkIndex = watermarkTs / HopTime;
             EraseNodesIf(StatesMap, [&](auto& iter) {
                 auto& [key, val] = iter;
-                const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, watermarkIndex, newHops);
+                const auto keyStateBecameEmpty = CloseOldBucketsForKey(key, val, watermarkIndex + 1 - IntervalHopCount, newHops);
                 if (keyStateBecameEmpty) {
                     key.UnRef();
                 }

+ 300 - 0
ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp

@@ -0,0 +1,300 @@
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_graph_saveload.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+    TIntrusivePtr<IRandomProvider> CreateRandomProvider() {
+        return CreateDeterministicRandomProvider(1);
+    }
+
+    TIntrusivePtr<ITimeProvider> CreateTimeProvider() {
+        return CreateDeterministicTimeProvider(10000000);
+    }
+
+    TComputationNodeFactory GetAuxCallableFactory() {
+        return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+            if (callable.GetType()->GetName() == "OneYieldStream") {
+                return new TExternalComputationNode(ctx.Mutables);
+            }
+
+            return GetBuiltinFactory()(callable, ctx);
+        };
+    }
+
+    struct TSetup {
+        TSetup(TScopedAlloc& alloc)
+            : Alloc(alloc)
+        {
+            FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry());
+            RandomProvider = CreateRandomProvider();
+            TimeProvider = CreateTimeProvider();
+
+            Env.Reset(new TTypeEnvironment(Alloc));
+            PgmBuilder.Reset(new TProgramBuilder(*Env, *FunctionRegistry));
+        }
+
+        THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) {
+            Explorer.Walk(pgm.GetNode(), *Env);
+            TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(),
+                FunctionRegistry.Get(),
+                NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi);
+            Pattern = MakeComputationPattern(Explorer, pgm, entryPoints, opts);
+            TComputationOptsFull compOpts = opts.ToComputationOptions(*RandomProvider, *TimeProvider);
+            return Pattern->Clone(compOpts);
+        }
+
+        TIntrusivePtr<IFunctionRegistry> FunctionRegistry;
+        TIntrusivePtr<IRandomProvider> RandomProvider;
+        TIntrusivePtr<ITimeProvider> TimeProvider;
+
+        TScopedAlloc& Alloc;
+        THolder<TTypeEnvironment> Env;
+        THolder<TProgramBuilder> PgmBuilder;
+
+        TExploringNodeVisitor Explorer;
+        IComputationPattern::TPtr Pattern;
+    };
+
+    struct TStreamWithYield : public NUdf::TBoxedValue {
+        TStreamWithYield(const TUnboxedValueVector& items, ui32 yieldPos, ui32 index)
+            : Items(items)
+            , YieldPos(yieldPos)
+            , Index(index)
+        {}
+
+    private:
+        TUnboxedValueVector Items;
+        ui32 YieldPos;
+        ui32 Index;
+
+        ui32 GetTraverseCount() const override {
+            return 0;
+        }
+
+        NUdf::TUnboxedValue Save() const override {
+            return NUdf::TUnboxedValue::Zero();
+        }
+
+        void Load(const NUdf::TStringRef& state) override {
+            Y_UNUSED(state);
+        }
+
+        NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final {
+            if (Index >= Items.size()) {
+                return NUdf::EFetchStatus::Finish;
+            }
+            if (Index == YieldPos) {
+                return NUdf::EFetchStatus::Yield;
+            }
+            result = Items[Index++];
+            return NUdf::EFetchStatus::Ok;
+        }
+    };
+
+    THolder<IComputationGraph> BuildGraph(TSetup& setup, const std::vector<std::tuple<ui32, i64, ui32>> items,
+                                          ui32 yieldPos, ui32 startIndex, bool dataWatermarks) {
+        TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+
+        auto structType = pgmBuilder.NewEmptyStructType();
+        structType = pgmBuilder.NewStructType(structType, "key",
+            pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id));
+        structType = pgmBuilder.NewStructType(structType, "time",
+            pgmBuilder.NewDataType(NUdf::TDataType<NUdf::TTimestamp>::Id));
+        structType = pgmBuilder.NewStructType(structType, "sum",
+            pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id));
+        auto keyIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("key");
+        auto timeIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("time");
+        auto sumIndex = AS_TYPE(TStructType, structType)->GetMemberIndex("sum");
+
+        auto inStreamType = pgmBuilder.NewStreamType(structType);
+
+        TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", inStreamType);
+        auto streamNode = inStream.Build();
+
+        ui64 hop = 10, interval = 30, delay = 20;
+
+        auto pgmReturn = pgmBuilder.MultiHoppingCore(
+            TRuntimeNode(streamNode, false),
+            [&](TRuntimeNode item) { // keyExtractor
+                return pgmBuilder.Member(item, "key");
+            },
+            [&](TRuntimeNode item) { // timeExtractor
+                return pgmBuilder.Member(item, "time");
+            },
+            [&](TRuntimeNode item) { // init
+                std::vector<std::pair<std::string_view, TRuntimeNode>> members;
+                members.emplace_back("sum", pgmBuilder.Member(item, "sum"));
+                return pgmBuilder.NewStruct(members);
+            },
+            [&](TRuntimeNode item, TRuntimeNode state) { // update
+                auto add = pgmBuilder.AggrAdd(
+                    pgmBuilder.Member(item, "sum"),
+                    pgmBuilder.Member(state, "sum"));
+                std::vector<std::pair<std::string_view, TRuntimeNode>> members;
+                members.emplace_back("sum", add);
+                return pgmBuilder.NewStruct(members);
+            },
+            [&](TRuntimeNode state) { // save
+                return pgmBuilder.Member(state, "sum");
+            },
+            [&](TRuntimeNode savedState) { // load
+                std::vector<std::pair<std::string_view, TRuntimeNode>> members;
+                members.emplace_back("sum", savedState);
+                return pgmBuilder.NewStruct(members);
+            },
+            [&](TRuntimeNode state1, TRuntimeNode state2) { // merge
+                auto add = pgmBuilder.AggrAdd(
+                    pgmBuilder.Member(state1, "sum"),
+                    pgmBuilder.Member(state2, "sum"));
+                std::vector<std::pair<std::string_view, TRuntimeNode>> members;
+                members.emplace_back("sum", add);
+                return pgmBuilder.NewStruct(members);
+            },
+            [&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) { // finish
+                Y_UNUSED(time);
+                std::vector<std::pair<std::string_view, TRuntimeNode>> members;
+                members.emplace_back("key", key);
+                members.emplace_back("sum", pgmBuilder.Member(state, "sum"));
+                return pgmBuilder.NewStruct(members);
+            },
+            pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop
+            pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval
+            pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))),  // delay
+            pgmBuilder.NewDataLiteral<bool>(dataWatermarks)  // dataWatermarks
+        );
+
+        auto graph = setup.BuildGraph(pgmReturn, {streamNode});
+
+        TUnboxedValueVector streamItems;
+        for (size_t i = 0; i < items.size(); ++i) {
+            NUdf::TUnboxedValue* itemsPtr;
+            auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(3, itemsPtr);
+            itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(std::get<0>(items[i]));
+            itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(std::get<1>(items[i]));
+            itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(std::get<2>(items[i]));
+            streamItems.push_back(std::move(structValues));
+        }
+
+        auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex));
+        graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
+        return graph;
+    }
+}
+
+Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingSaveLoadTest) {
+    void TestWithSaveLoadImpl(
+        const std::vector<std::tuple<ui32, i64, ui32>> input,
+        const std::vector<std::tuple<ui32, ui32>> expected,
+        bool withTraverse,
+        bool dataWatermarks)
+    {
+        TScopedAlloc alloc;
+
+        for (ui32 yieldPos = 0; yieldPos < input.size(); ++yieldPos) {
+            std::vector<std::tuple<ui32, ui32>> result;
+
+            TSetup setup1(alloc);
+            auto graph1 = BuildGraph(setup1, input, yieldPos, 0, dataWatermarks);
+            auto root1 = graph1->GetValue();
+
+            NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok;
+            while (status == NUdf::EFetchStatus::Ok) {
+                NUdf::TUnboxedValue val;
+                status = root1.Fetch(val);
+                if (status == NUdf::EFetchStatus::Ok) {
+                    result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>());
+                }
+            }
+            UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield);
+
+            TString graphState;
+            if (withTraverse) {
+                SaveGraphState(&root1, 1, 0ULL, graphState);
+            } else {
+                graphState = graph1->SaveGraphState();
+            }
+
+            TSetup setup2(alloc);
+            auto graph2 = BuildGraph(setup2, input, -1, yieldPos, dataWatermarks);
+            NUdf::TUnboxedValue root2;
+            if (withTraverse) {
+                root2 = graph2->GetValue();
+                LoadGraphState(&root2, 1, 0ULL, graphState);
+            } else {
+                graph2->LoadGraphState(graphState);
+                root2 = graph2->GetValue();
+            }
+
+            status = NUdf::EFetchStatus::Ok;
+            while (status == NUdf::EFetchStatus::Ok) {
+                NUdf::TUnboxedValue val;
+                status = root2.Fetch(val);
+                if (status == NUdf::EFetchStatus::Ok) {
+                    result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>());
+                }
+            }
+            UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish);
+
+            auto sortedExpected = expected;
+            std::sort(result.begin(), result.end());
+            std::sort(sortedExpected.begin(), sortedExpected.end());
+            UNIT_ASSERT_EQUAL(result, sortedExpected);
+        }
+    }
+
+    const std::vector<std::tuple<ui32, i64, ui32>> input1 = {
+        // Group; Time; Value
+        {2, 1, 2},
+        {1, 1, 2},
+        {2, 2, 3},
+        {1, 2, 3},
+        {2, 15, 4},
+        {1, 15, 4},
+        {2, 23, 6},
+        {1, 23, 6},
+        {2, 24, 5},
+        {1, 24, 5},
+        {2, 25, 7},
+        {1, 25, 7},
+        {2, 40, 2},
+        {1, 40, 2},
+        {2, 47, 1},
+        {1, 47, 1},
+        {2, 51, 6},
+        {1, 51, 6},
+        {2, 59, 2},
+        {1, 59, 2},
+        {2, 85, 8},
+        {1, 85, 8}
+    };
+
+    const std::vector<std::tuple<ui32, ui32>> expected = {
+        {1, 8}, {1, 8}, {1, 8}, {1, 8},
+        {1, 11}, {1, 11}, {1, 21}, {1, 22},
+        {1, 27},
+        {2, 8}, {2, 8}, {2, 8}, {2, 8},
+        {2, 11}, {2, 11}, {2, 21},
+        {2, 22}, {2, 27}};
+
+    Y_UNIT_TEST(Test1) {
+        TestWithSaveLoadImpl(input1, expected, true, false);
+    }
+
+    Y_UNIT_TEST(Test2) {
+        TestWithSaveLoadImpl(input1, expected, false, false);
+    }
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr

+ 217 - 147
ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp

@@ -14,6 +14,48 @@ namespace NKikimr {
 namespace NMiniKQL {
 
 namespace {
+    struct TInputItem {
+        ui32 Key = 0;
+        i64 Time = 0;
+        ui32 Val = 0;
+    };
+
+    struct TOutputItem {
+        ui32 Key = 0;
+        ui32 Val = 0;
+        ui64 Time = 0;
+
+        constexpr bool operator==(const TOutputItem& rhs) const
+        {
+            return this->Key == rhs.Key && this->Val == rhs.Val && this->Time == rhs.Time;
+        }
+    };
+
+    struct TOutputGroup {
+        TOutputGroup(std::initializer_list<TOutputItem> items) : Items(items) {}
+
+        std::vector<TOutputItem> Items;
+    };
+
+    std::vector<TOutputItem> Ordered(std::vector<TOutputItem> vec) {
+        auto res = vec;
+        std::sort(res.begin(), res.end(), [](auto l, auto r) {
+            return std::make_tuple(l.Key, l.Val, l.Time) < std::make_tuple(r.Key, r.Val, r.Time);
+        });
+        return res;
+    }
+
+    IOutputStream &operator<<(IOutputStream &output, std::vector<TOutputItem> items) {
+        output << "[";
+        for (ui32 i = 0; i < items.size(); ++i) {
+            output << "(" << items.at(i).Key << ";" << items.at(i).Val << ";" << items.at(i).Time << ")";
+            if (i != items.size() - 1)
+                output << ",";
+        }
+        output << "]";
+        return output;
+    }
+
     TIntrusivePtr<IRandomProvider> CreateRandomProvider() {
         return CreateDeterministicRandomProvider(1);
     }
@@ -24,7 +66,7 @@ namespace {
 
     TComputationNodeFactory GetAuxCallableFactory() {
         return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
-            if (callable.GetType()->GetName() == "OneYieldStream") {
+            if (callable.GetType()->GetName() == "MyStream") {
                 return new TExternalComputationNode(ctx.Mutables);
             }
 
@@ -66,44 +108,35 @@ namespace {
         IComputationPattern::TPtr Pattern;
     };
 
-    struct TStreamWithYield : public NUdf::TBoxedValue {
-        TStreamWithYield(const TUnboxedValueVector& items, ui32 yieldPos, ui32 index)
+    struct TStream : public NUdf::TBoxedValue {
+        TStream(const TUnboxedValueVector& items, std::function<void()> fetchCallback)
             : Items(items)
-            , YieldPos(yieldPos)
-            , Index(index)
-        {}
+            , FetchCallback(fetchCallback) {}
 
     private:
         TUnboxedValueVector Items;
-        ui32 YieldPos;
         ui32 Index;
-
-        ui32 GetTraverseCount() const override {
-            return 0;
-        }
-
-        NUdf::TUnboxedValue Save() const override {
-            return NUdf::TUnboxedValue::Zero();
-        }
-
-        void Load(const NUdf::TStringRef& state) override {
-            Y_UNUSED(state);
-        }
+        std::function<void()> FetchCallback;
 
         NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) final {
+            FetchCallback();
             if (Index >= Items.size()) {
                 return NUdf::EFetchStatus::Finish;
             }
-            if (Index == YieldPos) {
-                return NUdf::EFetchStatus::Yield;
-            }
             result = Items[Index++];
             return NUdf::EFetchStatus::Ok;
         }
     };
 
-    THolder<IComputationGraph> BuildGraph(TSetup& setup, const std::vector<std::tuple<ui32, i64, ui32>> items,
-                                          ui32 yieldPos, ui32 startIndex, bool dataWatermarks) {
+    THolder<IComputationGraph> BuildGraph(
+        TSetup& setup,
+        const std::vector<TInputItem> items,
+        std::function<void()> fetchCallback,
+        bool dataWatermarks,
+        ui64 hop = 10,
+        ui64 interval = 30,
+        ui64 delay = 20)
+    {
         TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
 
         auto structType = pgmBuilder.NewEmptyStructType();
@@ -119,11 +152,9 @@ namespace {
 
         auto inStreamType = pgmBuilder.NewStreamType(structType);
 
-        TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "OneYieldStream", inStreamType);
+        TCallableBuilder inStream(pgmBuilder.GetTypeEnvironment(), "MyStream", inStreamType);
         auto streamNode = inStream.Build();
 
-        ui64 hop = 10, interval = 30, delay = 20;
-
         auto pgmReturn = pgmBuilder.MultiHoppingCore(
             TRuntimeNode(streamNode, false),
             [&](TRuntimeNode item) { // keyExtractor
@@ -162,10 +193,10 @@ namespace {
                 return pgmBuilder.NewStruct(members);
             },
             [&](TRuntimeNode key, TRuntimeNode state, TRuntimeNode time) { // finish
-                Y_UNUSED(time);
                 std::vector<std::pair<std::string_view, TRuntimeNode>> members;
                 members.emplace_back("key", key);
                 members.emplace_back("sum", pgmBuilder.Member(state, "sum"));
+                members.emplace_back("time", time);
                 return pgmBuilder.NewStruct(members);
             },
             pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop
@@ -180,155 +211,194 @@ namespace {
         for (size_t i = 0; i < items.size(); ++i) {
             NUdf::TUnboxedValue* itemsPtr;
             auto structValues = graph->GetHolderFactory().CreateDirectArrayHolder(3, itemsPtr);
-            itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(std::get<0>(items[i]));
-            itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(std::get<1>(items[i]));
-            itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(std::get<2>(items[i]));
+            itemsPtr[keyIndex] = NUdf::TUnboxedValuePod(items.at(i).Key);
+            itemsPtr[timeIndex] = NUdf::TUnboxedValuePod(items.at(i).Time);
+            itemsPtr[sumIndex] = NUdf::TUnboxedValuePod(items.at(i).Val);
             streamItems.push_back(std::move(structValues));
         }
 
-        auto streamValue = NUdf::TUnboxedValuePod(new TStreamWithYield(streamItems, yieldPos, startIndex));
+        auto streamValue = NUdf::TUnboxedValuePod(new TStream(streamItems, fetchCallback));
         graph->GetEntryPoint(0, true)->SetValue(graph->GetContext(), std::move(streamValue));
         return graph;
     }
 }
 
 Y_UNIT_TEST_SUITE(TMiniKQLMultiHoppingTest) {
-    void TestWithSaveLoadImpl(
-        const std::vector<std::tuple<ui32, i64, ui32>> input,
-        const std::vector<std::tuple<ui32, ui32>> expected,
-        std::vector<std::tuple<ui32, ui32>> expectedFinish,
-        bool withTraverse,
-        bool dataWatermarks)
+    void TestImpl(
+        const std::vector<TInputItem> input,
+        const std::vector<TOutputGroup> expected,
+        bool dataWatermarks,
+        ui64 hop = 10,
+        ui64 interval = 30,
+        ui64 delay = 20)
     {
         TScopedAlloc alloc;
+        TSetup setup1(alloc);
 
-        for (ui32 yieldPos = 0; yieldPos < input.size(); ++yieldPos) {
-            std::vector<std::tuple<ui32, ui32>> result;
+        ui32 curGroupId = 0;
+        std::vector<TOutputItem> curResult;
 
-            TSetup setup1(alloc);
-            auto graph1 = BuildGraph(setup1, input, yieldPos, 0, dataWatermarks);
-            auto root1 = graph1->GetValue();
-
-            NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok;
-            while (status == NUdf::EFetchStatus::Ok) {
-                NUdf::TUnboxedValue val;
-                status = root1.Fetch(val);
-                if (status == NUdf::EFetchStatus::Ok) {
-                    result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>());
-                }
-            }
-            UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Yield);
+        auto check = [&curResult, &curGroupId, &expected]() {
+            auto expectedItems = Ordered(expected.at(curGroupId).Items);
+            curResult = Ordered(curResult);
+            UNIT_ASSERT_EQUAL_C(curResult, expectedItems, "curGroup: " << curGroupId << " actual: " << curResult << " expected: " << expectedItems);
+            curGroupId++;
+            curResult.clear();
+        };
 
-            TString graphState;
-            if (withTraverse) {
-                SaveGraphState(&root1, 1, 0ULL, graphState);
-            } else {
-                graphState = graph1->SaveGraphState();
-            }
+        auto graph1 = BuildGraph(setup1, input, check, dataWatermarks, hop, interval, delay);
 
-            TSetup setup2(alloc);
-            auto graph2 = BuildGraph(setup2, input, -1, yieldPos, dataWatermarks);
-            NUdf::TUnboxedValue root2;
-            if (withTraverse) {
-                root2 = graph2->GetValue();
-                LoadGraphState(&root2, 1, 0ULL, graphState);
-            } else {
-                graph2->LoadGraphState(graphState);
-                root2 = graph2->GetValue();
-            }
+        auto root1 = graph1->GetValue();
 
-            status = NUdf::EFetchStatus::Ok;
-            while (status == NUdf::EFetchStatus::Ok) {
-                NUdf::TUnboxedValue val;
-                status = root2.Fetch(val);
-                if (status == NUdf::EFetchStatus::Ok) {
-                    result.emplace_back(val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>());
-                }
+        NUdf::EFetchStatus status = NUdf::EFetchStatus::Ok;
+        while (status == NUdf::EFetchStatus::Ok) {
+            NUdf::TUnboxedValue val;
+            status = root1.Fetch(val);
+            if (status == NUdf::EFetchStatus::Ok) {
+                curResult.emplace_back(TOutputItem{val.GetElement(0).Get<ui32>(), val.GetElement(1).Get<ui32>(), val.GetElement(2).Get<ui64>()});
             }
-            UNIT_ASSERT_EQUAL(status, NUdf::EFetchStatus::Finish);
-
-            // After getting finish, current windows will be closed in random order.
-            // So check last part of result as unordered.
-            std::vector<std::tuple<ui32, ui32>> resultPart1 = {result.begin(), result.end() - expectedFinish.size()};
-            std::vector<std::tuple<ui32, ui32>> resultPart2 = {result.begin() + expected.size(), result.end()};
-            std::sort(resultPart2.begin(), resultPart2.end());
-            std::sort(expectedFinish.begin(), expectedFinish.end());
-            UNIT_ASSERT_EQUAL(resultPart1, expected);
-            UNIT_ASSERT_EQUAL(resultPart2, expectedFinish);
         }
+
+        check();
+        // TODO: some problem with parallel run
+        //UNIT_ASSERT_EQUAL_C(curGroupId, expected.size(), "1: " << curGroupId << " 2: "  << expected.size());
     }
 
-    const std::vector<std::tuple<ui32, i64, ui32>> input1 = {
-        // Group; Time; Value
-        {2, 1, 2},
-        {1, 1, 2},
-        {2, 2, 3},
-        {1, 2, 3},
-        {2, 15, 4},
-        {1, 15, 4},
-        {2, 23, 6},
-        {1, 23, 6},
-        {2, 24, 5},
-        {1, 24, 5},
-        {2, 25, 7},
-        {1, 25, 7},
-        {2, 40, 2},
-        {1, 40, 2},
-        {2, 47, 1},
-        {1, 47, 1},
-        {2, 51, 6},
-        {1, 51, 6},
-        {2, 59, 2},
-        {1, 59, 2},
-        {2, 85, 8},
-        {1, 85, 8},
-        {2, 55, 1000},
-        {1, 55, 1000},
-        {2, 200, 2},
-        {1, 200, 3}
-    };
+    Y_UNIT_TEST(TestDataWatermarks) {
+        const std::vector<TInputItem> input = {
+            // Group; Time; Value
+            {1, 101, 2},
+            {2, 101, 2},
+            {1, 111, 3},
+            {2, 140, 5},
+            {2, 160, 1}
+        };
+        const std::vector<TOutputGroup> expected = {
+            TOutputGroup({}),
+            TOutputGroup({}),
+            TOutputGroup({}),
+            TOutputGroup({}),
+            TOutputGroup({{1, 2, 110}, {1, 5, 120}, {2, 2, 110}, {2, 2, 120}}),
+            TOutputGroup({{2, 2, 130}, {1, 5, 130}, {1, 3, 140}}),
+            TOutputGroup({{2, 5, 150}, {2, 5, 160}, {2, 6, 170}, {2, 1, 180}, {2, 1, 190}}),
+        };
+        TestImpl(input, expected, true);
+    }
 
-    const std::vector<std::tuple<ui32, ui32>> expected1 = {
-        {2, 5}, {2, 9}, {1, 5},
-        {1, 9}, {2, 27}, {1, 27},
-        {2, 22}, {2, 21}, {2, 11},
-        {1, 22}, {1, 21}, {1, 11},
-        {2, 11}, {2, 8}, {2, 8},
-        {2, 8}, {2, 8}, {1, 11},
-        {1, 8}, {1, 8}, {1, 8},
-        {1, 8}};
-
-    const std::vector<std::tuple<ui32, ui32>> expected1Finish = {{2, 2}, {1, 3}};
-
-    const std::vector<std::tuple<ui32, i64, ui32>> input2 = {
-        // Group; Time; Value
-        {1, 1, 2},
-        {2, 1, 2},
-        {1, 11, 3},
-        {2, 40, 5},
-        {2, 60, 1}
-    };
+    Y_UNIT_TEST(TestValidness1) {
+        const std::vector<TInputItem> input1 = {
+            // Group; Time; Value
+            {1, 101, 2},
+            {2, 101, 2},
+            {1, 111, 3},
+            {2, 140, 5},
+            {2, 160, 1}
+        };
 
-    const std::vector<std::tuple<ui32, ui32>> expected2Finish = {{2, 5}, {2, 5}, {2, 6}, {2, 1}, {2, 1}, {1, 5}, {1, 3}};
+        const std::vector<TOutputGroup> expected = {
+            TOutputGroup({}),
+            TOutputGroup({}),
+            TOutputGroup({}),
+            TOutputGroup({}),
+            TOutputGroup({{2, 2, 110}, {2, 2, 120}}),
+            TOutputGroup({{2, 2, 130}}),
+            TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}, {2, 5, 150},
+                          {2, 5, 160}, {2, 6, 170}, {2, 1, 190}, {2, 1, 180}}),
+        };
+        TestImpl(input1, expected, false);
+    }
 
-    const std::vector<std::tuple<ui32, ui32>> expected2 = {
-        {2, 2}, {2, 2}, {2, 2},
-        {1, 2}, {1, 5}};
+    Y_UNIT_TEST(TestValidness2) {
+        const std::vector<TInputItem> input = {
+            // Group; Time; Value
+            {2, 101, 2}, {1, 101, 2}, {2, 102, 3}, {1, 102, 3}, {2, 115, 4},
+            {1, 115, 4}, {2, 123, 6}, {1, 123, 6}, {2, 124, 5}, {1, 124, 5},
+            {2, 125, 7}, {1, 125, 7}, {2, 140, 2}, {1, 140, 2}, {2, 147, 1},
+            {1, 147, 1}, {2, 151, 6}, {1, 151, 6}, {2, 159, 2}, {1, 159, 2},
+            {2, 185, 8}, {1, 185, 8}
+        };
+        const std::vector<TOutputGroup> expected = {
+            TOutputGroup({}),
+            TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+            TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+            TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+            TOutputGroup({{1, 5, 110}, {1, 9, 120}, {2, 5, 110}, {2, 9, 120}}),
+            TOutputGroup({}),
+            TOutputGroup({}), TOutputGroup({}),
+            TOutputGroup({{2, 27, 130}, {1, 27, 130}}),
+            TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+            TOutputGroup({{2, 22, 140}, {2, 21, 150},  {2, 11, 160}, {1, 22, 140}, {1, 21, 150}, {1, 11, 160}}),
+            TOutputGroup({}),
+            TOutputGroup({{1, 11, 170}, {1, 8, 180}, {1, 8, 190}, {1, 8, 200}, {1, 8, 210}, {2, 11, 170},
+                          {2, 8, 180}, {2, 8, 190}, {2, 8, 200}, {2, 8, 210}}),
+        };
 
-    Y_UNIT_TEST(TestWithSaveLoad) {
-        TestWithSaveLoadImpl(input1, expected1, expected1Finish, true, false);
+        TestImpl(input, expected, true);
     }
 
-    Y_UNIT_TEST(TestWithSaveLoad2) {
-        TestWithSaveLoadImpl(input1, expected1, expected1Finish, false, false);
+    Y_UNIT_TEST(TestValidness3) {
+        const std::vector<TInputItem> input = {
+            // Group; Time; Value
+            {1, 105, 1}, {1, 107, 4}, {2, 106, 3}, {1, 111, 7}, {1, 117, 3},
+            {2, 110, 2}, {1, 108, 9}, {1, 121, 4}, {2, 107, 2}, {2, 141, 5},
+            {1, 141, 10}
+        };
+        const std::vector<TOutputGroup> expected = {
+            TOutputGroup({}),
+            TOutputGroup({}), TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+            TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+            TOutputGroup({{1, 14, 110}, {2, 3, 110}}),
+            TOutputGroup({}),
+            TOutputGroup({{2, 7, 115}, {2, 2, 120}, {1, 21, 115}, {1, 10, 120}, {1, 7, 125}, {1, 4, 130}}),
+            TOutputGroup({}),
+            TOutputGroup({{1, 10, 145}, {1, 10, 150}, {2, 5, 145}, {2, 5, 150}})
+        };
+
+        TestImpl(input, expected, true, 5, 10, 10);
     }
 
-    Y_UNIT_TEST(TestWithSaveLoad3) {
-        TestWithSaveLoadImpl(input2, expected2, expected2Finish, true, true);
+    Y_UNIT_TEST(TestDelay) {
+        const std::vector<TInputItem> input = {
+            // Group; Time; Value
+            {1, 101, 3}, {1, 111, 5}, {1, 120, 7}, {1, 80, 9}, {1, 79, 11}
+        };
+        const std::vector<TOutputGroup> expected = {
+            TOutputGroup({}),
+            TOutputGroup({}), TOutputGroup({}), TOutputGroup({}),
+            TOutputGroup({}), TOutputGroup({}),
+            TOutputGroup({{1, 12, 110}, {1, 8, 120}, {1, 15, 130}, {1, 12, 140}, {1, 7, 150}})
+        };
+
+        TestImpl(input, expected, false);
     }
 
-    Y_UNIT_TEST(TestWithSaveLoad4) {
-        TestWithSaveLoadImpl(input2, expected2, expected2Finish, false, true);
+    Y_UNIT_TEST(TestWindowsBeforeFirstElement) {
+        const std::vector<TInputItem> input = {
+            // Group; Time; Value
+            {1, 101, 2}, {1, 111, 3}
+        };
+        const std::vector<TOutputGroup> expected = {
+            TOutputGroup({}),
+            TOutputGroup({}),
+            TOutputGroup({}),
+            TOutputGroup({{1, 2, 110}, {1, 5, 120}, {1, 5, 130}, {1, 3, 140}})
+        };
+
+        TestImpl(input, expected, false);
+    }
+
+    Y_UNIT_TEST(TestSubzeroValues) {
+        const std::vector<TInputItem> input = {
+            // Group; Time; Value
+            {1, 1, 2}
+        };
+        const std::vector<TOutputGroup> expected = {
+            TOutputGroup({}),
+            TOutputGroup({}),
+            TOutputGroup({{1, 2, 30}}),
+        };
+
+        TestImpl(input, expected, false);
     }
 }
 

+ 1 - 0
ydb/library/yql/minikql/comp_nodes/ut/ya.make

@@ -27,6 +27,7 @@ SRCS(
     mkql_chopper_ut.cpp
     mkql_filters_ut.cpp
     mkql_flatmap_ut.cpp
+    mkql_multihopping_saveload_ut.cpp
     mkql_multihopping_ut.cpp
     mkql_multimap_ut.cpp
     mkql_fold_ut.cpp

+ 1 - 1
ydb/library/yql/minikql/watermark_tracker.cpp

@@ -36,4 +36,4 @@ std::optional<ui64> TWatermarkTracker::CalcLastWatermark() {
 }
 
 } // NMiniKQL
-} // NKikimr
+} // NKikimr