azevaykin 1 год назад
Родитель
Сommit
bf2e0234ce

+ 1 - 1
ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

@@ -3523,7 +3523,7 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
         Cerr << "========= Write many rows" << Endl;
         for (ui64 i = 0; i < writeCount; ++i) {
             ui64 seed = 1000000 + i * rowCount * columns.size();
-            auto writeRequest = MakeWriteRequest(++helper.TxId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, tableId, columns, rowCount, seed);
+            auto writeRequest = MakeWriteRequest(++helper.TxId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columns, rowCount, seed);
             writeRequest->Record.SetLockTxId(lockTxId);
             writeRequest->Record.SetLockNodeId(nodeId);
 

+ 1 - 1
ydb/core/tx/datashard/datashard_ut_stats.cpp

@@ -77,7 +77,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
             UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetChannels()[0].GetIndexSize(), 54u);
         }
 
-        Write(runtime, sender, shard1, tableId1, TShardedTableOptions().Columns_, 1, 100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+        Upsert(runtime, sender, shard1, tableId1, TShardedTableOptions().Columns_, 1, 100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
 
         {
             Cerr << "... waiting for stats after write" << Endl;

+ 1 - 1
ydb/core/tx/datashard/datashard_ut_trace.cpp

@@ -447,7 +447,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) {
         NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095);
         const ui32 rowCount = 3;
         ui64 txId = 100;
-        auto request = MakeWriteRequest(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, tableId, opts.Columns_, rowCount);
+        auto request = MakeWriteRequest(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, opts.Columns_, rowCount);
         runtime.SendToPipe(shards[0], sender, request.release(), 0, GetPipeConfigWithRetries(), TActorId(), 0, std::move(traceId));
 
         auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);

+ 159 - 19
ydb/core/tx/datashard/datashard_ut_write.cpp

@@ -32,7 +32,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
         return {runtime, server, sender};
     }
 
-    Y_UNIT_TEST_TWIN(UpsertImmediate, EvWrite) {
+    Y_UNIT_TEST_TWIN(ExecSQLUpsertImmediate, EvWrite) {
         auto [runtime, server, sender] = TestCreateServer();
 
         auto opts = TShardedTableOptions();
@@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
         }
     }
 
-    Y_UNIT_TEST_QUAD(UpsertPrepared, EvWrite, Volatile) {
+    Y_UNIT_TEST_QUAD(ExecSQLUpsertPrepared, EvWrite, Volatile) {
         auto [runtime, server, sender] = TestCreateServer();
 
         runtime.GetAppData().FeatureFlags.SetEnableDataShardVolatileTransactions(Volatile);
@@ -83,7 +83,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
         }
     }
 
-    Y_UNIT_TEST(WriteImmediate) {
+    Y_UNIT_TEST(UpsertImmediate) {
         auto [runtime, server, sender] = TestCreateServer();
 
         auto opts = TShardedTableOptions().Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}});
@@ -95,7 +95,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
         Cout << "========= Send immediate write =========\n";
         {
-            const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+            const auto writeResult = Upsert(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
             
             UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
             UNIT_ASSERT_VALUES_EQUAL(writeResult.GetStep(), 0);
@@ -114,7 +114,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
         }
     }
 
-    Y_UNIT_TEST(WriteImmediateManyColumns) {
+    Y_UNIT_TEST(UpsertImmediateManyColumns) {
         auto [runtime, server, sender] = TestCreateServer();
 
         auto opts = TShardedTableOptions().Columns({{"key64", "Uint64", true, false}, {"key32", "Uint32", true, false},
@@ -125,9 +125,9 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
         ui64 txId = 100;
 
-        Cout << "========= Send immediate write =========\n";
+        Cout << "========= Send immediate upsert =========\n";
         {
-            Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+            Upsert(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
         }
 
         Cout << "========= Read table =========\n";
@@ -137,16 +137,28 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
                                                  "key64 = 5, key32 = 6, value64 = 7, value32 = 8, valueUtf8 = String_9\n"
                                                  "key64 = 10, key32 = 11, value64 = 12, value32 = 13, valueUtf8 = String_14\n");
         }
+
+        Cout << "========= Send immediate delete =========\n";
+        {
+            Delete(runtime, sender, shard, tableId, opts.Columns_, 1, ++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+        }
+
+        Cout << "========= Read table with 1th row deleted =========\n";
+        {
+            auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
+            UNIT_ASSERT_VALUES_EQUAL(tableState, "key64 = 5, key32 = 6, value64 = 7, value32 = 8, valueUtf8 = String_9\n"
+                                                 "key64 = 10, key32 = 11, value64 = 12, value32 = 13, valueUtf8 = String_14\n");
+        }
     }
 
-    Y_UNIT_TEST(WriteImmediateHugeKey) {
+    Y_UNIT_TEST(WriteImmediateBadRequest) {
         auto [runtime, server, sender] = TestCreateServer();
 
         auto opts = TShardedTableOptions().Columns({{"key", "Utf8", true, false}});
         auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
         const ui64 shard = shards[0];
 
-        Cout << "========= Send immediate write =========\n";
+        Cout << "========= Send immediate write with huge key=========\n";
         {
             TString hugeStringValue(NLimits::MaxWriteKeySize + 1, 'X');
             TSerializedCellMatrix matrix({TCell(hugeStringValue.c_str(), hugeStringValue.size())}, 1, 1);
@@ -159,9 +171,71 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
             UNIT_ASSERT_VALUES_EQUAL(writeResult.GetIssues().size(), 1);
             UNIT_ASSERT(writeResult.GetIssues(0).message().Contains("Row key size of 1049601 bytes is larger than the allowed threshold 1049600"));
         }
+
+        Cout << "========= Send immediate write with OPERATION_UNSPECIFIED =========\n";
+        {
+            TString stringValue('X');
+            TSerializedCellMatrix matrix({TCell(stringValue.c_str(), stringValue.size())}, 1, 1);
+
+            auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(100, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+            ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
+            auto operation = evWrite->Record.AddOperations();
+            operation->SetType(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UNSPECIFIED);
+            operation->SetPayloadFormat(NKikimrDataEvents::FORMAT_CELLVEC);
+            operation->SetPayloadIndex(payloadIndex);
+            operation->MutableTableId()->SetOwnerId(tableId.PathId.OwnerId);
+            operation->MutableTableId()->SetTableId(tableId.PathId.LocalPathId);
+            operation->MutableTableId()->SetSchemaVersion(tableId.SchemaVersion);
+            operation->MutableColumnIds()->Add(1);
+
+            const auto writeResult = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetIssues().size(), 1);
+            UNIT_ASSERT(writeResult.GetIssues(0).message().Contains("OPERATION_UNSPECIFIED operation is not supported now"));
+        }
     }
 
-    Y_UNIT_TEST_TWIN(WritePrepared, Volatile) {
+    Y_UNIT_TEST(DeleteImmediate) {
+        auto [runtime, server, sender] = TestCreateServer();
+
+        auto opts = TShardedTableOptions().Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}});
+        auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
+        const ui64 shard = shards[0];
+
+        ui64 txId = 100;
+
+        Cout << "========= Send immediate upsert =========\n";
+        {
+            Upsert(runtime, sender, shard, tableId, opts.Columns_, 3, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+        }
+
+        Cout << "========= Read table =========\n";
+        {
+            auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
+            UNIT_ASSERT_VALUES_EQUAL(tableState, expectedTableState);
+        }
+
+        Cout << "========= Send immediate delete =========\n";
+        {
+            const auto writeResult = Delete(runtime, sender, shard, tableId, opts.Columns_, 1, ++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetStep(), 0);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrderId(), txId);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetTxId(), txId);
+
+            const auto& tableAccessStats = writeResult.GetTxStats().GetTableAccessStats(0);
+            UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetTableInfo().GetName(), "/Root/table-1");
+            UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetEraseRow().GetCount(), 1);
+        }
+
+        Cout << "========= Read table with 1th row deleted =========\n";
+        {
+            auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
+            UNIT_ASSERT_VALUES_EQUAL(tableState, "key = 2, value = 3\nkey = 4, value = 5\n");
+        }
+    }    
+
+    Y_UNIT_TEST_TWIN(UpsertPrepared, Volatile) {
         auto [runtime, server, sender] = TestCreateServer();
 
         TShardedTableOptions opts;
@@ -176,7 +250,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
         Cout << "========= Send prepare =========\n";
         {
-            const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, 
+            const auto writeResult = Upsert(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, 
                 Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);
 
             UNIT_ASSERT_GT(writeResult.GetMinStep(), 0);
@@ -218,7 +292,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
         }
     }
 
-    Y_UNIT_TEST_TWIN(WritePreparedManyTables, Volatile) {
+    Y_UNIT_TEST_TWIN(UpsertPreparedManyTables, Volatile) {
         auto [runtime, server, sender] = TestCreateServer();
 
         TShardedTableOptions opts;
@@ -236,7 +310,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
         Cerr << "===== Write prepared to table 1" << Endl;
         {
-            const auto writeResult = Write(runtime, sender, tabletId1, tableId1, opts.Columns_, rowCount, txId, 
+            const auto writeResult = Upsert(runtime, sender, tabletId1, tableId1, opts.Columns_, rowCount, txId, 
                 Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);
 
             minStep1 = writeResult.GetMinStep();
@@ -245,7 +319,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
         Cerr << "===== Write prepared to table 2" << Endl;
         {
-            const auto writeResult = Write(runtime, sender, tabletId2, tableId2, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
+            const auto writeResult = Upsert(runtime, sender, tabletId2, tableId2, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
 
             minStep2 = writeResult.GetMinStep();
             maxStep2 = writeResult.GetMaxStep();
@@ -283,7 +357,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
     }
 
     
-    Y_UNIT_TEST_TWIN(WritePreparedNoTxCache, Volatile) {
+    Y_UNIT_TEST_TWIN(UpsertPreparedNoTxCache, Volatile) {
         auto [runtime, server, sender] = TestCreateServer();
 
         TShardedTableOptions opts;
@@ -297,7 +371,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
         Cout << "========= Send prepare =========\n";
         {
-            const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, 
+            const auto writeResult = Upsert(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, 
                 Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);
 
             minStep = writeResult.GetMinStep();
@@ -322,6 +396,72 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
     }
 
+    Y_UNIT_TEST_TWIN(DeletePrepared, Volatile) {
+        auto [runtime, server, sender] = TestCreateServer();
+
+        auto opts = TShardedTableOptions().Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}});
+        const TString tableName = "table-1";
+        const auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", tableName, opts);
+        const ui64 shard = shards[0];
+        const ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
+
+        ui64 txId = 100;
+        ui64 minStep, maxStep;
+
+        Cout << "========= Send immediate upsert =========\n";
+        {
+            Upsert(runtime, sender, shard, tableId, opts.Columns_, 3, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+        }
+
+        Cout << "========= Read table =========\n";
+        {
+            auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
+            UNIT_ASSERT_VALUES_EQUAL(tableState, expectedTableState);
+        }
+
+        Cout << "========= Send delete prepare =========\n";
+        {
+            const auto writeResult = Delete(runtime, sender, shard, tableId, opts.Columns_, 1, ++txId, Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);
+
+            UNIT_ASSERT_GT(writeResult.GetMinStep(), 0);
+            UNIT_ASSERT_GT(writeResult.GetMaxStep(), writeResult.GetMinStep());
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetTxId(), txId);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetDomainCoordinators().size(), 1);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetDomainCoordinators(0), coordinator);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetTabletInfo().GetTabletId(), shard);
+
+            minStep = writeResult.GetMinStep();
+            maxStep = writeResult.GetMaxStep();
+        }
+
+        Cout << "========= Send propose to coordinator =========\n";
+        {
+            SendProposeToCoordinator(server, shards, minStep, maxStep, txId);
+        }
+
+        Cout << "========= Wait for completed transaction =========\n";
+        {
+            auto writeResult = WaitForWriteCompleted(runtime, sender);
+
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
+            UNIT_ASSERT_GE(writeResult.GetStep(), minStep);
+            UNIT_ASSERT_LE(writeResult.GetStep(), maxStep);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrderId(), txId);
+            UNIT_ASSERT_VALUES_EQUAL(writeResult.GetTxId(), txId);
+
+            const auto& tableAccessStats = writeResult.GetTxStats().GetTableAccessStats(0);
+            UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetTableInfo().GetName(), "/Root/" + tableName);
+            UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetEraseRow().GetCount(), 1);
+        }
+
+        Cout << "========= Read table =========\n";
+        {
+            auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/" + tableName)).All();
+            UNIT_ASSERT_VALUES_EQUAL(tableState, "key = 2, value = 3\nkey = 4, value = 5\n");
+        }
+    }
+
     Y_UNIT_TEST(RejectOnChangeQueueOverflow) {
         TPortManager pm;
         TServerSettings serverSettings(pm.GetPort(2134));
@@ -352,21 +492,21 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
         Cout << "========= Send immediate write, expecting success =========\n";
         {
-            Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, ++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+            Upsert(runtime, sender, shard, tableId, opts.Columns_, rowCount, ++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
         }
 
         UNIT_ASSERT_VALUES_EQUAL(blockedEnqueueRecords.size(), rowCount);
 
         Cout << "========= Send immediate write, expecting overloaded =========\n";
         {
-            Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, ++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
+            Upsert(runtime, sender, shard, tableId, opts.Columns_, rowCount, ++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
         }
 
         Cout << "========= Send immediate write + OverloadSubscribe, expecting overloaded =========\n";
         {
             ui64 secNo = 55;
 
-            auto request = MakeWriteRequest(++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, tableId, opts.Columns_, rowCount);
+            auto request = MakeWriteRequest(++txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, opts.Columns_, rowCount);
             request->Record.SetOverloadSubscribe(secNo);
            
             auto writeResult = Write(runtime, sender, shard, std::move(request), NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);

+ 18 - 4
ydb/core/tx/datashard/datashard_write_operation.cpp

@@ -63,7 +63,7 @@ TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, ui64 globalTxId, TInstant
 
     if (record.operations().size() != 0) {
         Y_ABORT_UNLESS(record.operations().size() == 1, "Only one operation is supported now");
-        Y_ABORT_UNLESS(record.operations(0).GetType() == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, "Only UPSERT operation is supported now");
+
         const NKikimrDataEvents::TEvWrite::TOperation& recordOperation = record.operations(0);
 
         ColumnIds = {recordOperation.GetColumnIds().begin(), recordOperation.GetColumnIds().end()};
@@ -84,6 +84,19 @@ TValidatedWriteTx::~TValidatedWriteTx() {
 }
 
 bool TValidatedWriteTx::ParseOperation(const NEvents::TDataEvents::TEvWrite& ev, const NKikimrDataEvents::TEvWrite::TOperation& recordOperation, const TUserTable::TTableInfos& tableInfos) {
+    
+    OperationType = recordOperation.GetType();
+    switch (OperationType) {
+        case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT:
+        case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_DELETE:
+            break;
+        default: {
+            ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
+            ErrStr = TStringBuilder() << OperationType << " operation is not supported now";
+            return false;
+        }
+    }
+
     const NKikimrDataEvents::TTableId& tableIdRecord = recordOperation.GetTableId();
 
     auto tableInfoPtr = tableInfos.FindPtr(tableIdRecord.GetTableId());
@@ -442,7 +455,7 @@ void TWriteOperation::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBase&
     LocksCache().Locks.clear();
     ArtifactFlags = 0;
 
-    LOG_D("tx " << GetTxId() << " released its data");
+    LOG_D("tx " << GetTxId() << " at " << TabletId << " released its data");
 }
 
 void TWriteOperation::DbStoreLocksAccessLog(NTable::TDatabase& txcDb)
@@ -463,7 +476,7 @@ void TWriteOperation::DbStoreLocksAccessLog(NTable::TDatabase& txcDb)
     TStringBuf vecData(vecDataStart, vecDataSize);
     db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update(NIceDb::TUpdate<Schema::TxArtifacts::Locks>(vecData));
 
-    LOG_T("Storing " << vec.size() << " locks for txid=" << GetTxId() << " in " << TabletId);
+    LOG_T("Storing " << vec.size() << " locks for txid=" << GetTxId() << " at " << TabletId);
 }
 
 void TWriteOperation::DbStoreArtifactFlags(NTable::TDatabase& txcDb)
@@ -473,7 +486,7 @@ void TWriteOperation::DbStoreArtifactFlags(NTable::TDatabase& txcDb)
     NIceDb::TNiceDb db(txcDb);
     db.Table<Schema::TxArtifacts>().Key(GetTxId()).Update<Schema::TxArtifacts::Flags>(ArtifactFlags);
 
-    LOG_T("Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " in " << TabletId);
+    LOG_T("Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() << " at " << TabletId);
 }
 
 ui64 TWriteOperation::GetMemoryConsumption() const {
@@ -646,6 +659,7 @@ void TWriteOperation::UntrackMemory() const {
 void TWriteOperation::SetError(const NKikimrDataEvents::TEvWriteResult::EStatus& status, const TString& errorMsg) {
     SetAbortedFlag();
     WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletId, GetTxId(), status, errorMsg);
+    LOG_I("Write transaction " << GetTxId() << " at " << TabletId << " has an error: " << errorMsg);
 }
 
 void TWriteOperation::SetWriteResult(std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>&& writeResult) {

+ 1 - 0
ydb/core/tx/datashard/datashard_write_operation.h

@@ -117,6 +117,7 @@ private:
     YDB_READONLY_DEF(ui64, GlobalTxId);
     YDB_READONLY_DEF(TTableId, TableId);
     YDB_READONLY_DEF(std::optional<NKikimrDataEvents::TKqpLocks>, KqpLocks);
+    YDB_READONLY_DEF(NKikimrDataEvents::TEvWrite::TOperation::EOperationType, OperationType);
     YDB_READONLY_DEF(std::vector<ui32>, ColumnIds);
     YDB_READONLY_DEF(TSerializedCellMatrix, Matrix);
     YDB_READONLY_DEF(TInstant, ReceivedAt);

+ 43 - 24
ydb/core/tx/datashard/execute_write_unit.cpp

@@ -77,29 +77,22 @@ public:
 
         const ui64 tableId = writeTx->GetTableId().PathId.LocalPathId;
         const TTableId fullTableId(DataShard.GetPathOwnerId(), tableId);
-        const ui64 localTableId = DataShard.GetLocalTableId(fullTableId);
-        if (localTableId == 0) {
-            writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, TStringBuilder() << "Unknown table id " << tableId);
-            return;
-        }
-        const ui64 shadowTableId = DataShard.GetShadowTableId(fullTableId);
-        const TUserTable& TableInfo_ = *DataShard.GetUserTables().at(tableId);
-        Y_ABORT_UNLESS(TableInfo_.LocalTid == localTableId);
-        Y_ABORT_UNLESS(TableInfo_.ShadowTid == shadowTableId);
+        const TUserTable& userTable = *DataShard.GetUserTables().at(tableId);
 
         const NTable::TScheme& scheme = txc.DB.GetScheme();
-        const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(localTableId);
+        const NTable::TScheme::TTableInfo* tableInfo = scheme.GetTableInfo(userTable.LocalTid);
 
         TSmallVec<TRawTypeValue> key;
         TSmallVec<NTable::TUpdateOp> ops;
 
         const TSerializedCellMatrix& matrix = writeTx->GetMatrix();
+        const auto operationType = writeTx->GetOperationType();
 
         for (ui32 rowIdx = 0; rowIdx < matrix.GetRowCount(); ++rowIdx)
         {
             key.clear();
-            key.reserve(TableInfo_.KeyColumnIds.size());
-            for (ui16 keyColIdx = 0; keyColIdx < TableInfo_.KeyColumnIds.size(); ++keyColIdx) {
+            key.reserve(userTable.KeyColumnIds.size());
+            for (ui16 keyColIdx = 0; keyColIdx < userTable.KeyColumnIds.size(); ++keyColIdx) {
                 const TCell& cell = matrix.GetCell(rowIdx, keyColIdx);
                 ui32 keyCol = tableInfo->KeyColumns[keyColIdx];
                 if (cell.IsNull()) {
@@ -110,23 +103,49 @@ public:
                 }
             }
 
-            ops.clear();
-            Y_ABORT_UNLESS(matrix.GetColCount() >= TableInfo_.KeyColumnIds.size());
-            ops.reserve(matrix.GetColCount() - TableInfo_.KeyColumnIds.size());
+            switch (operationType) {
+                case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT:
+                {
+                    ops.clear();
+                    Y_ABORT_UNLESS(matrix.GetColCount() >= userTable.KeyColumnIds.size());
+                    ops.reserve(matrix.GetColCount() - userTable.KeyColumnIds.size());
 
-            for (ui16 valueColIdx = TableInfo_.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
-                ui32 columnTag = writeTx->GetColumnIds()[valueColIdx];
-                const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
+                    for (ui16 valueColIdx = userTable.KeyColumnIds.size(); valueColIdx < matrix.GetColCount(); ++valueColIdx) {
+                        ui32 columnTag = writeTx->GetColumnIds()[valueColIdx];
+                        const TCell& cell = matrix.GetCell(rowIdx, valueColIdx);
 
-                NScheme::TTypeInfo vtypeInfo = scheme.GetColumnInfo(tableInfo, columnTag)->PType;
-                ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeInfo));
-            }
+                        NScheme::TTypeInfo vtypeInfo = scheme.GetColumnInfo(tableInfo, columnTag)->PType;
+                        ops.emplace_back(columnTag, NTable::ECellOp::Set, cell.IsNull() ? TRawTypeValue() : TRawTypeValue(cell.Data(), cell.Size(), vtypeInfo));
+                    }
 
-            userDb.UpdateRow(fullTableId, key, ops);
+                    userDb.UpdateRow(fullTableId, key, ops);
+                    break;
+                }
+                case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_DELETE:
+                {
+                    userDb.EraseRow(fullTableId, key);
+                    break;
+                }
+                default:
+                    // Checked before in TWriteOperation
+                    Y_FAIL_S(operationType << " operation is not supported now");
+            }
         }
 
-        DataShard.IncCounter(COUNTER_WRITE_ROWS, matrix.GetRowCount());
-        DataShard.IncCounter(COUNTER_WRITE_BYTES, matrix.GetBuffer().size());
+        switch (operationType) {
+            case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT: {
+                DataShard.IncCounter(COUNTER_WRITE_ROWS, matrix.GetRowCount());
+                DataShard.IncCounter(COUNTER_WRITE_BYTES, matrix.GetBuffer().size());
+                break;
+            }
+            case NKikimrDataEvents::TEvWrite::TOperation::OPERATION_DELETE: {
+                DataShard.IncCounter(COUNTER_ERASE_ROWS, matrix.GetRowCount());
+                break;
+            }
+            default:
+                // Checked before in TWriteOperation
+                Y_FAIL_S(operationType << " operation is not supported now");
+        }
 
         LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Executed write operation for " << *writeOp << " at " << DataShard.TabletID() << ", row count=" << matrix.GetRowCount());
     }

+ 21 - 9
ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

@@ -1834,15 +1834,23 @@ void ExecSQL(Tests::TServer::TPtr server,
     UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code);
 }
 
-std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 seed) {
-    std::vector<ui32> columnIds(columns.size());
-    std::iota(columnIds.begin(), columnIds.end(), 1);
+std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 seed) {
+    std::vector<ui32> columnIds;
+    for (ui32 col = 0; col < columns.size(); ++col) {
+        const auto& column = columns[col];
+        if (column.IsKey || operationType != NKikimrDataEvents::TEvWrite::TOperation::OPERATION_DELETE)
+            columnIds.push_back(col + 1);
+    }
 
     TVector<TString> stringValues;
     TVector<TCell> cells;
+
     for (ui32 row = 0; row < rowCount; ++row) {
         for (ui32 col = 0; col < columns.size(); ++col) {
-            const TString& columnType = columns[col].Type;
+            const auto& column = columns[col];
+            if (!column.IsKey && operationType == NKikimrDataEvents::TEvWrite::TOperation::OPERATION_DELETE)
+                continue;
+            const TString& columnType = column.Type;
             ui64 value = row * columns.size() + col + seed;
             if (columnType == "Uint64") {
                 cells.emplace_back(TCell((const char*)&value, sizeof(ui64)));
@@ -1858,14 +1866,14 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKik
         }
     }
 
-    TSerializedCellMatrix matrix(cells, rowCount, columns.size());
+    TSerializedCellMatrix matrix(cells, rowCount, columnIds.size());
     TString blobData = matrix.ReleaseBuffer();
 
     UNIT_ASSERT(blobData.size() < 8_MB);
 
     auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
     ui64 payloadIndex = NKikimr::NEvWrite::TPayloadWriter<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(std::move(blobData));
-    evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
+    evWrite->AddOperation(operationType, tableId, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
 
     return evWrite;
 }
@@ -1898,13 +1906,17 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sen
     return resultRecord;
 }
 
-NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
+NKikimrDataEvents::TEvWriteResult Upsert(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
 {
-    auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
+    auto request = MakeWriteRequest(txId, txMode, NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, columns, rowCount);
     return Write(runtime, sender, shardId, std::move(request), expectedStatus);
 }
 
-
+NKikimrDataEvents::TEvWriteResult Delete(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
+{
+    auto request = MakeWriteRequest(txId, txMode, NKikimrDataEvents::TEvWrite::TOperation::OPERATION_DELETE, tableId, columns, rowCount);
+    return Write(runtime, sender, shardId, std::move(request), expectedStatus);
+}
 
 TTestActorRuntimeBase::TEventObserverHolderPair ReplaceEvProposeTransactionWithEvWrite(TTestActorRuntime& runtime, TEvWriteRows& rows) {
     if (rows.empty())

+ 3 - 2
ydb/core/tx/datashard/ut_common/datashard_ut_common.h

@@ -711,9 +711,10 @@ void ExecSQL(Tests::TServer::TPtr server,
              bool dml = true,
              Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS);
 
-std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 seed = 0);
+std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWrite_TOperation::EOperationType operationType, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 seed = 0);
 NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
-NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
+NKikimrDataEvents::TEvWriteResult Upsert(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
+NKikimrDataEvents::TEvWriteResult Delete(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, const TTableId& tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
 NKikimrDataEvents::TEvWriteResult WaitForWriteCompleted(TTestActorRuntime& runtime, TActorId sender, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
 
 struct TEvWriteRow {