Browse Source

Return seqno & row index for duplicate rows (#7660)

Mikhail Surin 7 months ago
parent
commit
cc68f9c904
1 changed files with 14 additions and 5 deletions
  1. 14 5
      ydb/core/kqp/runtime/kqp_read_actor.cpp

+ 14 - 5
ydb/core/kqp/runtime/kqp_read_actor.cpp

@@ -54,11 +54,13 @@ public:
         size_t ProcessedRows = 0;
         size_t PackedRows = 0;
         ui64 ReadId;
+        ui64 SeqNo;
 
-        TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult, ui64 readId)
+        TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult, ui64 readId, ui64 seqNo)
             : ShardId(shardId)
             , ReadResult(std::move(readResult))
             , ReadId(readId)
+            , SeqNo(seqNo)
         {
         }
     };
@@ -983,6 +985,7 @@ public:
         CA_LOG_D("Taken " << Locks.size() << " locks");
         Reads[id].SerializedContinuationToken = record.GetContinuationToken();
 
+        ui64 seqNo = ev->Get()->Record.GetSeqNo();
         Reads[id].RegisterMessage(*ev->Get());
 
 
@@ -992,7 +995,7 @@ public:
             << " finished = " << ev->Get()->Record.GetFinished());
         CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()) << " continuation token " << DebugPrintContionuationToken(record.GetContinuationToken()));
 
-        Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()), id});
+        Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()), id, seqNo});
         NotifyCA();
     }
 
@@ -1057,7 +1060,7 @@ public:
     }
 
     NMiniKQL::TBytesStatistics PackArrow(TResult& handle, i64& freeSpace) {
-        auto& [shardId, result, batch, _, packed, readId] = handle;
+        auto& [shardId, result, batch, _, packed, readId, seqNo] = handle;
         NMiniKQL::TBytesStatistics stats;
         bool hasResultColumns = false;
         if (result->Get()->GetRowsCount() == 0) {
@@ -1144,7 +1147,7 @@ public:
     }
 
     NMiniKQL::TBytesStatistics PackCells(TResult& handle, i64& freeSpace) {
-        auto& [shardId, result, batch, processedRows, packed, readId] = handle;
+        auto& [shardId, result, batch, processedRows, packed, readId, seqNo] = handle;
         NMiniKQL::TBytesStatistics stats;
         batch->reserve(batch->size());
         CA_LOG_D(TStringBuilder() << "enter pack cells method "
@@ -1205,13 +1208,17 @@ public:
                         << " current is " << handle.ShardId
                         << " previous readId is " << ptr->ReadId
                         << " current is " << handle.ReadId
+                        << " previous seqNo is " << ptr->SeqNo
+                        << " current is " << handle.SeqNo
+                        << " previous row number is " << ptr->RowIndex
+                        << " current is " << rowIndex 
                         << " key is " << rowRepr;
                     CA_LOG_E(rowMessage);
                     Counters->RowsDuplicationsFound->Inc();
                     RuntimeError(rowMessage, NYql::NDqProto::StatusIds::INTERNAL_ERROR, {});
                     return stats;
                 }
-                DuplicateCheckStats[result] = {.ReadId = readId , .ShardId = handle.ShardId};
+                DuplicateCheckStats[result] = {.ReadId = readId , .ShardId = handle.ShardId, .SeqNo = seqNo, .RowIndex = rowIndex };
             }
 
             stats.DataBytes += rowSize;
@@ -1547,6 +1554,8 @@ private:
     struct TDuplicationStats {
         ui64 ReadId;
         ui64 ShardId;
+        ui64 SeqNo;
+        ui64 RowIndex;
     };
     THashMap<TString, TDuplicationStats> DuplicateCheckStats;
     TVector<TResultColumn> DuplicateCheckExtraColumns;