Fix unexpected read iterator stream reset (#7697)

Aleksei Borzenkov 7 months ago

+ 70 - 13

@@ -5,6 +5,7 @@
 #include "datashard_locks_db.h"
 #include "probes.h"
+#include <ydb/core/base/counters.h>
 #include <ydb/core/formats/arrow/arrow_batch_builder.h>
 #include <ydb/library/actors/core/monotonic_provider.h>
@@ -315,6 +316,8 @@ public:
         , Self(self)
         , TableId(state.PathId.OwnerId, state.PathId.LocalPathId, state.SchemaVersion)
         , FirstUnprocessedQuery(State.FirstUnprocessedQuery)
+        , LastProcessedKey(State.LastProcessedKey)
+        , LastProcessedKeyErasedOrMissing(State.LastProcessedKeyErasedOrMissing)
         EndTime = StartTime;
@@ -329,10 +332,10 @@ public:
         bool toInclusive;
         TSerializedCellVec keyFromCells;
         TSerializedCellVec keyToCells;
-        if (Y_UNLIKELY(FirstUnprocessedQuery == State.FirstUnprocessedQuery && State.LastProcessedKey)) {
+        if (LastProcessedKey) {
             if (!State.Reverse) {
-                keyFromCells = TSerializedCellVec(State.LastProcessedKey);
-                fromInclusive = State.LastProcessedKeyErasedOrMissing;
+                keyFromCells = TSerializedCellVec(LastProcessedKey);
+                fromInclusive = LastProcessedKeyErasedOrMissing;
                 keyToCells = range.To;
                 toInclusive = range.ToInclusive;
@@ -341,8 +344,8 @@ public:
                 keyFromCells = range.From;
                 fromInclusive = range.FromInclusive;
-                keyToCells = TSerializedCellVec(State.LastProcessedKey);
-                toInclusive = State.LastProcessedKeyErasedOrMissing;
+                keyToCells = TSerializedCellVec(LastProcessedKey);
+                toInclusive = LastProcessedKeyErasedOrMissing;
         } else {
             keyFromCells = range.From;
@@ -505,6 +508,7 @@ public:
         while (FirstUnprocessedQuery < State.Request->Ranges.size()) {
             if (ReachedTotalRowsLimit()) {
                 FirstUnprocessedQuery = -1;
+                LastProcessedKey.clear();
                 return true;
@@ -531,6 +535,7 @@ public:
+            LastProcessedKey.clear();
         return true;
@@ -542,6 +547,7 @@ public:
         while (FirstUnprocessedQuery < State.Request->Keys.size()) {
             if (ReachedTotalRowsLimit()) {
                 FirstUnprocessedQuery = -1;
+                LastProcessedKey.clear();
                 return true;
@@ -567,6 +573,7 @@ public:
+            LastProcessedKey.clear();
         return true;
@@ -732,6 +739,28 @@ public:
     void UpdateState(TReadIteratorState& state, bool sentResult) {
+        if (state.FirstUnprocessedQuery == FirstUnprocessedQuery &&
+            state.LastProcessedKey && !LastProcessedKey)
+        {
+            LOG_CRIT_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
+                "DataShard " << Self->TabletID() << " detected unexpected reset of LastProcessedKey:"
+                << " ReadId# " << State.ReadId
+                << " LastSeqNo# " << State.SeqNo
+                << " LastQuery# " << State.FirstUnprocessedQuery
+                << " RowsRead# " << RowsRead
+                << " RowsProcessed# " << RowsProcessed
+                << " RowsSinceLastCheck# " << RowsSinceLastCheck
+                << " BytesInResult# " << BytesInResult
+                << " DeletedRowSkips# " << DeletedRowSkips
+                << " InvisibleRowSkips# " << InvisibleRowSkips
+                << " Quota.Rows# " << State.Quota.Rows
+                << " Quota.Bytes# " << State.Quota.Bytes
+                << " State.TotalRows# " << State.TotalRows
+                << " State.TotalRowsLimit# " << State.TotalRowsLimit
+                << " State.MaxRowsInResult# " << State.MaxRowsInResult);
+            Self->IncCounterReadIteratorLastKeyReset();
+        }
         state.TotalRows += RowsRead;
         state.FirstUnprocessedQuery = FirstUnprocessedQuery;
         state.LastProcessedKey = LastProcessedKey;
@@ -1683,6 +1712,7 @@ public:
         if (Reader->HasUnreadQueries()) {
             Reader->UpdateState(state, ResultSent);
             if (!state.IsExhausted()) {
+                state.ReadContinuePending = true;
                     new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId));
@@ -2333,6 +2363,15 @@ public:
         auto& state = *it->second;
+        if (state.IsExhausted()) {
+            // iterator quota reduced and exhausted while ReadContinue was inflight
+            LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId
+                << ", quota exhausted while rescheduling");
+            state.ReadContinuePending = false;
+            Result.reset();
+            return true;
+        }
         LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " ReadContinue for iterator# " << ReadId
             << ", firstUnprocessedQuery# " << state.FirstUnprocessedQuery);
@@ -2446,6 +2485,7 @@ public:
         if (Reader->Read(txc, ctx)) {
             // Retry later when dependencies are resolved
             if (!Reader->GetVolatileReadDependencies().empty()) {
+                state.ReadContinuePending = true;
@@ -2532,6 +2572,8 @@ public:
         auto& state = *it->second;
+        state.ReadContinuePending = false;
         if (!Result) {
             LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " read iterator# " << ReadId
                 << " TTxReadContinue::Execute() finished without Result, aborting");
@@ -2579,14 +2621,14 @@ public:
         if (Reader->HasUnreadQueries()) {
-            Y_ASSERT(it->second);
-            auto& state = *it->second;
+            bool wasExhausted = state.IsExhausted();
             Reader->UpdateState(state, useful);
             if (!state.IsExhausted()) {
+                state.ReadContinuePending = true;
                     new TEvDataShard::TEvReadContinue(ReadId.Sender, ReadId.ReadId));
-            } else {
+            } else if (!wasExhausted) {
                 LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
                     << " read iterator# " << ReadId << " exhausted");
@@ -2859,14 +2901,19 @@ void TDataShard::Handle(TEvDataShard::TEvReadAck::TPtr& ev, const TActorContext&
     bool wasExhausted = state.IsExhausted();
-        record.GetMaxRows(),
-        record.GetMaxBytes());
+        record.HasMaxRows() ? record.GetMaxRows() : Max<ui64>(),
+        record.HasMaxBytes() ? record.GetMaxBytes() : Max<ui64>());
     if (wasExhausted && !state.IsExhausted()) {
-        ctx.Send(
-            SelfId(),
-            new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
+        if (!state.ReadContinuePending) {
+            state.ReadContinuePending = true;
+            ctx.Send(
+                SelfId(),
+                new TEvDataShard::TEvReadContinue(ev->Sender, record.GetReadId()));
+        }
+    } else if (!wasExhausted && state.IsExhausted()) {
     LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, TabletID() << " ReadAck for read iterator# " << readId
@@ -2995,6 +3042,16 @@ void TDataShard::UnsubscribeReadIteratorSessions(const TActorContext& ctx) {
+void TDataShard::IncCounterReadIteratorLastKeyReset() {
+    if (!CounterReadIteratorLastKeyReset) {
+        CounterReadIteratorLastKeyReset = GetServiceCounters(AppData()->Counters, "tablets")
+            ->GetSubgroup("type", "DataShard")
+            ->GetSubgroup("category", "app")
+            ->GetCounter("DataShard/ReadIteratorLastKeyReset", true);
+    }
+    ++*CounterReadIteratorLastKeyReset;
 } // NKikimr::NDataShard

+ 4 - 0

@@ -3322,6 +3322,10 @@ protected:
     bool AllowCancelROwithReadsets() const;
     void ResolveTablePath(const TActorContext &ctx);
+    NMonitoring::TDynamicCounters::TCounterPtr CounterReadIteratorLastKeyReset;
+    void IncCounterReadIteratorLastKeyReset();
 NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code);

+ 134 - 0

@@ -4627,6 +4627,140 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
             "result2: " << result2);
+    template<class TEvType>
+    class TBlockEvents : public std::deque<typename TEvType::TPtr> {
+    public:
+        TBlockEvents(TTestActorRuntime& runtime, std::function<bool(typename TEvType::TPtr&)> condition = {})
+            : Runtime(runtime)
+            , Condition(std::move(condition))
+            , Holder(Runtime.AddObserver<TEvType>(
+                [this](typename TEvType::TPtr& ev) {
+                    this->Process(ev);
+                }))
+        {}
+        TBlockEvents& Unblock(size_t count = -1) {
+            while (!this->empty() && count > 0) {
+                auto& ev = this->front();
+                IEventHandle* ptr = ev.Get();
+                UnblockedOnce.insert(ptr);
+                Runtime.Send(ev.Release(), 0, /* viaActorSystem */ true);
+                this->pop_front();
+                --count;
+            }
+            return *this;
+        }
+        void Stop() {
+            UnblockedOnce.clear();
+            Holder.Remove();
+        }
+    private:
+        void Process(typename TEvType::TPtr& ev) {
+            IEventHandle* ptr = ev.Get();
+            auto it = UnblockedOnce.find(ptr);
+            if (it != UnblockedOnce.end()) {
+                UnblockedOnce.erase(it);
+                return;
+            }
+            if (Condition && !Condition(ev)) {
+                return;
+            }
+            this->emplace_back(std::move(ev));
+        }
+    private:
+        TTestActorRuntime& Runtime;
+        std::function<bool(typename TEvType::TPtr&)> Condition;
+        TTestActorRuntime::TEventObserverHolder Holder;
+        THashSet<IEventHandle*> UnblockedOnce;
+    };
+    Y_UNIT_TEST(Bug_7674_IteratorDuplicateRows) {
+        TPortManager pm;
+        TServerSettings serverSettings(pm.GetPort(2134));
+        serverSettings.SetDomainName("Root")
+            .SetUseRealThreads(false);
+        TServer::TPtr server = new TServer(serverSettings);
+        auto& runtime = *server->GetRuntime();
+        auto sender = runtime.AllocateEdgeActor();
+        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+        InitRoot(server, sender);
+        TDisableDataShardLogBatching disableDataShardLogBatching;
+        CreateShardedTable(server, sender, "/Root", "table-1", 1);
+        ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10), (2, 20), (3, 30), (4, 40), (5, 50);");
+        ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (6, 60), (7, 70), (8, 80), (9, 90), (10, 100);");
+        runtime.SimulateSleep(TDuration::Seconds(1));
+        auto forceSmallChunks = runtime.AddObserver<TEvDataShard::TEvRead>(
+            [&](TEvDataShard::TEvRead::TPtr& ev) {
+                auto* msg = ev->Get();
+                // Force chunks of at most 3 rows
+                msg->Record.SetMaxRowsInResult(3);
+            });
+        TBlockEvents<TEvDataShard::TEvReadAck> blockedAcks(runtime);
+        TBlockEvents<TEvDataShard::TEvReadResult> blockedResults(runtime);
+        TBlockEvents<TEvDataShard::TEvReadContinue> blockedContinue(runtime);
+        auto waitFor = [&](const TString& description, const auto& condition, size_t count = 1) {
+            while (!condition()) {
+                UNIT_ASSERT_C(count > 0, "... failed to wait for " << description);
+                Cerr << "... waiting for " << description << Endl;
+                TDispatchOptions options;
+                options.CustomFinalCondition = [&]() {
+                    return condition();
+                };
+                runtime.DispatchEvents(options);
+                --count;
+            }
+        };
+        auto readFuture = KqpSimpleSend(runtime, "SELECT key, value FROM `/Root/table-1` ORDER BY key LIMIT 7");
+        waitFor("first TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
+        waitFor("first TEvReadResult", [&]{ return blockedResults.size() >= 1; });
+        blockedContinue.Unblock(1);
+        waitFor("second TEvReadContinue", [&]{ return blockedContinue.size() >= 1; });
+        waitFor("second TEvReadResult", [&]{ return blockedResults.size() >= 2; });
+        // We need both results to arrive without pauses
+        blockedResults.Unblock();
+        waitFor("both TEvReadAcks", [&]{ return blockedAcks.size() >= 2; });
+        // Unblock the first TEvReadAck and then pending TEvReadContinue
+        blockedAcks.Unblock(1);
+        blockedContinue.Unblock(1);
+        // Give it some time to trigger the bug
+        runtime.SimulateSleep(TDuration::MilliSeconds(1));
+        // Stop blocking everything
+        blockedAcks.Unblock().Stop();
+        blockedResults.Unblock().Stop();
+        blockedContinue.Unblock().Stop();
+            FormatResult(AwaitResponse(runtime, std::move(readFuture))),
+            "{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
+            "{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
+            "{ items { uint32_value: 3 } items { uint32_value: 30 } }, "
+            "{ items { uint32_value: 4 } items { uint32_value: 40 } }, "
+            "{ items { uint32_value: 5 } items { uint32_value: 50 } }, "
+            "{ items { uint32_value: 6 } items { uint32_value: 60 } }, "
+            "{ items { uint32_value: 7 } items { uint32_value: 70 } }");
+    }
 } // namespace NKikimr

+ 1 - 0

@@ -205,6 +205,7 @@ public:
     TActorId SessionId;
     TMonotonic StartTs;
     bool IsFinished = false;
+    bool ReadContinuePending = false;
     // note that we send SeqNo's starting from 1
     ui64 SeqNo = 0;