Просмотр исходного кода

Add incremental restore type in ChangeExchange (#8986)

Innokentii Mokin 6 месяцев назад
Родитель
Сommit
68b5a8de3c

+ 1 - 0
ydb/core/change_exchange/change_record.h

@@ -31,6 +31,7 @@ public:
         AsyncIndex,
         CdcDataChange,
         CdcHeartbeat,
+        IncrementalRestore,
     };
 
 public:

+ 1 - 0
ydb/core/protos/change_exchange.proto

@@ -47,6 +47,7 @@ message TChangeRecord {
         TDataChange AsyncIndex = 7;
         TDataChange CdcDataChange = 8;
         TCdcHeartbeat CdcHeartbeat = 9;
+        TDataChange IncrementalRestore = 10;
     }
 }
 

+ 5 - 0
ydb/core/tx/datashard/change_record.cpp

@@ -18,6 +18,10 @@ void TChangeRecord::Serialize(NKikimrChangeExchange::TChangeRecord& record) cons
             Y_ABORT_UNLESS(record.MutableAsyncIndex()->ParseFromArray(Body.data(), Body.size()));
             break;
         }
+        case EKind::IncrementalRestore: {
+            Y_ABORT_UNLESS(record.MutableIncrementalRestore()->ParseFromArray(Body.data(), Body.size()));
+            break;
+        }
         case EKind::CdcDataChange: {
             Y_ABORT_UNLESS(record.MutableCdcDataChange()->ParseFromArray(Body.data(), Body.size()));
             break;
@@ -41,6 +45,7 @@ TConstArrayRef<TCell> TChangeRecord::GetKey() const {
 
     switch (Kind) {
         case EKind::AsyncIndex:
+        case EKind::IncrementalRestore:
         case EKind::CdcDataChange: {
             const auto parsed = ParseBody(Body);
 

+ 1 - 0
ydb/core/tx/datashard/change_record_cdc_serializer.cpp

@@ -61,6 +61,7 @@ public:
         case TChangeRecord::EKind::CdcHeartbeat:
             return SerializeHeartbeat(cmd, record);
         case TChangeRecord::EKind::AsyncIndex:
+        case TChangeRecord::EKind::IncrementalRestore:
             Y_ABORT("Unexpected");
         }
     }

+ 38 - 8
ydb/core/tx/datashard/datashard_change_receiving.cpp

@@ -166,11 +166,20 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase<TDataShard> {
         }
     }
 
+    static bool UseStepTxId(const NKikimrChangeExchange::TChangeRecord& record) {
+        return record.GetKindCase() == NKikimrChangeExchange::TChangeRecord::kAsyncIndex;
+    }
+
+    static bool ValidChangeKind(const NKikimrChangeExchange::TChangeRecord& record) {
+        return record.GetKindCase() == NKikimrChangeExchange::TChangeRecord::kAsyncIndex
+            || record.GetKindCase() == NKikimrChangeExchange::TChangeRecord::kIncrementalRestore;
+    }
+
     bool ProcessRecord(const NKikimrChangeExchange::TChangeRecord& record, TTransactionContext& txc, const TActorContext& ctx) {
         Key.clear();
         Value.clear();
 
-        if (record.GetKindCase() != NKikimrChangeExchange::TChangeRecord::kAsyncIndex) {
+        if (!ValidChangeKind(record)) {
             AddRecordStatus(ctx, record.GetOrder(), NKikimrChangeExchange::TEvStatus::STATUS_REJECT,
                 NKikimrChangeExchange::TEvStatus::REASON_UNEXPECTED_KIND,
                 TStringBuilder() << "Unexpected kind: " << static_cast<ui32>(record.GetKindCase()));
@@ -186,9 +195,10 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase<TDataShard> {
             return false;
         }
 
+        const TTableId tableId(Self->GetPathOwnerId(), record.GetLocalPathId());
         const auto& tableInfo = *it->second;
-        const auto& asyncIndex = record.GetAsyncIndex();
-        const auto& serializedKey = asyncIndex.GetKey();
+        const auto& change = record.HasAsyncIndex() ? record.GetAsyncIndex() : record.GetIncrementalRestore();
+        const auto& serializedKey = change.GetKey();
 
         if (serializedKey.TagsSize() != tableInfo.KeyColumnIds.size()) {
             AddRecordStatus(ctx, record.GetOrder(), NKikimrChangeExchange::TEvStatus::STATUS_REJECT,
@@ -248,11 +258,11 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase<TDataShard> {
             return false;
         }
 
-        const NTable::ERowOp rop = GetRowOperation(asyncIndex);
+        const NTable::ERowOp rop = GetRowOperation(change);
         switch (rop) {
             case NTable::ERowOp::Upsert:
             case NTable::ERowOp::Reset: {
-                const auto& serializedValue = GetValue(asyncIndex);
+                const auto& serializedValue = GetValue(change);
 
                 if (!TSerializedCellVec::TryParse(serializedValue.GetData(), ValueCells)) {
                     AddRecordStatus(ctx, record.GetOrder(), NKikimrChangeExchange::TEvStatus::STATUS_REJECT,
@@ -305,7 +315,20 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase<TDataShard> {
                 return false;
         }
 
-        txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, TRowVersion(record.GetStep(), record.GetTxId()));
+        if (!UseStepTxId(record) && !MvccReadWriteVersion) {
+            auto [readVersion, writeVersion] = Self->GetReadWriteVersions();
+            Y_DEBUG_ABORT_UNLESS(readVersion == writeVersion);
+            MvccReadWriteVersion = writeVersion;
+            Pipeline.AddCommittingOp(*MvccReadWriteVersion);
+        }
+
+        if (UseStepTxId(record)) {
+            txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, TRowVersion(record.GetStep(), record.GetTxId()));
+        } else {
+            Self->SysLocksTable().BreakLocks(tableId, KeyCells.GetCells()); // probably redundant, we expect target table to be locked until complete restore
+            txc.DB.Update(tableInfo.LocalTid, rop, Key, Value, *MvccReadWriteVersion);
+        }
+
         Self->GetConflictsCache().GetTableCache(tableInfo.LocalTid).RemoveUncommittedWrites(KeyCells.GetCells(), txc.DB);
         tableInfo.Stats.UpdateTime = TAppData::TimeProvider->Now();
         AddRecordStatus(ctx, record.GetOrder(), NKikimrChangeExchange::TEvStatus::STATUS_OK);
@@ -314,8 +337,9 @@ class TDataShard::TTxApplyChangeRecords: public TTransactionBase<TDataShard> {
     }
 
 public:
-    explicit TTxApplyChangeRecords(TDataShard* self, TEvChangeExchange::TEvApplyRecords::TPtr ev)
+    explicit TTxApplyChangeRecords(TDataShard* self, TPipeline& pipeline, TEvChangeExchange::TEvApplyRecords::TPtr ev)
         : TTransactionBase(self)
+        , Pipeline(pipeline)
         , Ev(std::move(ev))
         , Status(new TEvChangeExchange::TEvStatus)
     {
@@ -392,6 +416,10 @@ public:
     void Complete(const TActorContext& ctx) override {
         Y_ABORT_UNLESS(Status);
 
+        if (MvccReadWriteVersion) {
+            Pipeline.RemoveCommittingOp(*MvccReadWriteVersion);
+        }
+
         if (Status->Record.GetStatus() == NKikimrChangeExchange::TEvStatus::STATUS_OK) {
             Self->IncCounter(COUNTER_CHANGE_EXCHANGE_SUCCESSFUL_APPLY);
         } else {
@@ -402,8 +430,10 @@ public:
     }
 
 private:
+    TPipeline& Pipeline;
     TEvChangeExchange::TEvApplyRecords::TPtr Ev;
     THolder<TEvChangeExchange::TEvStatus> Status;
+    std::optional<TRowVersion> MvccReadWriteVersion;
 
     TSerializedCellVec KeyCells;
     TSerializedCellVec ValueCells;
@@ -446,7 +476,7 @@ void TDataShard::Handle(TEvChangeExchange::TEvApplyRecords::TPtr& ev, const TAct
         << ": origin# " << ev->Get()->Record.GetOrigin()
         << ", generation# " << ev->Get()->Record.GetGeneration()
         << ", at tablet# " << TabletID());
-    Execute(new TTxApplyChangeRecords(this, ev), ctx);
+    Execute(new TTxApplyChangeRecords(this, Pipeline, ev), ctx);
 }
 
 }