Browse Source

ut_read_iterator + EvWrite (#742)

ut_read_iterator + EvWrite
azevaykin 1 year ago
parent
commit
75470f7094

+ 67 - 25
ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

@@ -10,6 +10,9 @@
 #include <ydb/core/tx/tx_proxy/proxy.h>
 #include <ydb/core/tx/tx_proxy/read_table.h>
 
+#include <ydb/core/tx/data_events/events.h>
+#include <ydb/core/tx/data_events/payload_helper.h>
+
 #include <ydb/public/sdk/cpp/client/ydb_result/result.h>
 
 #include <algorithm>
@@ -25,46 +28,52 @@ namespace {
 
 using TCellVec = std::vector<TCell>;
 
-void CreateTable(Tests::TServer::TPtr server,
+TVector<TShardedTableOptions::TColumn> GetColumns() {
+    TVector<TShardedTableOptions::TColumn> columns = {
+        {"key1", "Uint32", true, false},
+        {"key2", "Uint32", true, false},
+        {"key3", "Uint32", true, false},
+        {"value", "Uint32", false, false}};
+
+    return columns;
+}
+
+TVector<TShardedTableOptions::TColumn> GetMoviesColumns() {
+    TVector<TShardedTableOptions::TColumn> columns = {
+        {"id", "Uint32", true, false},
+        {"title", "String", false, false},
+        {"rating", "Uint32", false, false}};
+
+    return columns;
+}
+
+std::tuple<TVector<ui64>, ui64> CreateTable(Tests::TServer::TPtr server,
                  TActorId sender,
                  const TString &root,
                  const TString &name,
                  bool withFollower = false,
                  ui64 shardCount = 1)
 {
-    TVector<TShardedTableOptions::TColumn> columns = {
-        {"key1", "Uint32", true, false},
-        {"key2", "Uint32", true, false},
-        {"key3", "Uint32", true, false},
-        {"value", "Uint32", false, false}
-    };
-
     auto opts = TShardedTableOptions()
         .Shards(shardCount)
-        .Columns(columns);
+        .Columns(GetColumns());
 
     if (withFollower)
         opts.Followers(1);
 
-    CreateShardedTable(server, sender, root, name, opts);
+    return CreateShardedTable(server, sender, root, name, opts);
 }
 
-void CreateMoviesTable(Tests::TServer::TPtr server,
+std::tuple<TVector<ui64>, ui64> CreateMoviesTable(Tests::TServer::TPtr server,
                        TActorId sender,
                        const TString &root,
                        const TString &name)
 {
-    TVector<TShardedTableOptions::TColumn> columns = {
-        {"id", "Uint32", true, false},
-        {"title", "String", false, false},
-        {"rating", "Uint32", false, false}
-    };
-
     auto opts = TShardedTableOptions()
         .Shards(1)
-        .Columns(columns);
+        .Columns(GetMoviesColumns());
 
-    CreateShardedTable(server, sender, root, name, opts);
+    return CreateShardedTable(server, sender, root, name, opts);
 }
 
 struct TRowWriter : public NArrow::IRowWriter {
@@ -308,11 +317,14 @@ void AddRangeQuery(
 struct TTableInfo {
     TString Name;
 
+    ui64 TableId;
     ui64 TabletId;
     ui64 OwnerId;
     NKikimrTxDataShard::TEvGetInfoResponse::TUserTable UserTable;
 
     TActorId ClientId;
+
+    TVector<TShardedTableOptions::TColumn> Columns;
 };
 
 struct TTestHelper {
@@ -345,7 +357,7 @@ struct TTestHelper {
         {
             auto& table1 = Tables["table-1"];
             table1.Name = "table-1";
-            CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount);
+            auto [shards, tableId] = CreateTable(Server, Sender, "/Root", "table-1", WithFollower, ShardCount);
             ExecSQL(Server, Sender, R"(
                 UPSERT INTO `/Root/table-1`
                 (key1, key2, key3, value)
@@ -360,7 +372,7 @@ struct TTestHelper {
                 (11, 11, 11, 1111);
             )");
 
-            auto shards = GetTableShards(Server, Sender, "/Root/table-1");
+            table1.TableId = tableId;
             table1.TabletId = shards.at(0);
 
             auto [tables, ownerId] = GetTables(Server, table1.TabletId);
@@ -368,12 +380,14 @@ struct TTestHelper {
             table1.UserTable = tables["table-1"];
 
             table1.ClientId = runtime.ConnectToPipe(table1.TabletId, Sender, 0, GetTestPipeConfig());
+
+            table1.Columns = GetColumns();
         }
 
         {
             auto& table2 = Tables["movies"];
             table2.Name = "movies";
-            CreateMoviesTable(Server, Sender, "/Root", "movies");
+            auto [shards, tableId] = CreateMoviesTable(Server, Sender, "/Root", "movies");
             ExecSQL(Server, Sender, R"(
                 UPSERT INTO `/Root/movies`
                 (id, title, rating)
@@ -383,7 +397,7 @@ struct TTestHelper {
                 (3, "Hard die", 8);
             )");
 
-            auto shards = GetTableShards(Server, Sender, "/Root/movies");
+            table2.TableId = tableId;
             table2.TabletId = shards.at(0);
 
             auto [tables, ownerId] = GetTables(Server, table2.TabletId);
@@ -391,14 +405,16 @@ struct TTestHelper {
             table2.UserTable = tables["movies"];
 
             table2.ClientId = runtime.ConnectToPipe(table2.TabletId, Sender, 0, GetTestPipeConfig());
+
+            table2.Columns = GetMoviesColumns();
         }
 
         {
             auto& table3 = Tables["table-1-many"];
             table3.Name = "table-1-many";
-            CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount);
+            auto [shards, tableId] = CreateTable(Server, Sender, "/Root", "table-1-many", WithFollower, ShardCount);
 
-            auto shards = GetTableShards(Server, Sender, "/Root/table-1-many");
+            table3.TableId = tableId;
             table3.TabletId = shards.at(0);
 
             auto [tables, ownerId] = GetTables(Server, table3.TabletId);
@@ -406,6 +422,8 @@ struct TTestHelper {
             table3.UserTable = tables["table-1-many"];
 
             table3.ClientId = runtime.ConnectToPipe(table3.TabletId, Sender, 0, GetTestPipeConfig());
+
+            table3.Columns = GetColumns();
         }
     }
 
@@ -717,6 +735,30 @@ struct TTestHelper {
         UNIT_ASSERT_VALUES_EQUAL(rowsRead, Min(rowCount, limit));
     }
 
+    NKikimrDataEvents::TEvWriteResult WriteRow(const TString& tableName, ui64 txId, const TVector<ui32>& values, NKikimrDataEvents::TEvWrite::ETxMode txMode = NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE) {
+        const auto& table = Tables[tableName];
+
+        auto opts = TShardedTableOptions().Columns(table.Columns);
+        size_t columnCount = table.Columns.size();
+
+        std::vector<ui32> columnIds(columnCount);
+        std::iota(columnIds.begin(), columnIds.end(), 1);
+
+        Y_ABORT_UNLESS(values.size() == columnCount);
+
+        TVector<TCell> cells;
+        for (ui32 col = 0; col < columnCount; ++col)
+            cells.emplace_back(TCell((const char*)&values[col], sizeof(ui32)));
+
+        TSerializedCellMatrix matrix(cells, 1, columnCount);
+
+        auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, txMode);
+        ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
+        evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, table.TableId, 1, columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
+
+        return Write(*Server->GetRuntime(), Sender, table.TabletId, std::move(evWrite));
+    }
+
     struct THangedReturn {
         ui64 LastPlanStep = 0;
         TVector<THolder<IEventHandle>> ReadSets;

+ 4 - 7
ydb/core/tx/datashard/datashard_ut_write.cpp

@@ -37,7 +37,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
         const ui32 rowCount = 3;
         ui64 txId = 100;
-        Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+        Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
 
         auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
 
@@ -55,7 +55,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
         const ui32 rowCount = 3;
         ui64 txId = 100;
-        Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
+        Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
 
         auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
 
@@ -77,11 +77,8 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
         ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload(matrix.ReleaseBuffer());
         evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1, {1}, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
 
-        runtime.SendToPipe(shards[0], sender, evWrite.release(), 0, GetPipeConfigWithRetries());
-        auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
+        const auto& record = Write(runtime, sender, shards[0], std::move(evWrite), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
 
-        const auto& record = ev->Get()->Record;
-        UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
         UNIT_ASSERT_VALUES_EQUAL(record.GetIssues().size(), 1);
         UNIT_ASSERT(record.GetIssues(0).message().Contains("Operation [0:100] writes key of 1049601 bytes which exceeds limit 1049600 bytes"));
     }
@@ -94,7 +91,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
 
         const ui32 rowCount = 3;
         ui64 txId = 100;
-        Write(runtime, shards[0], tableId, opts.Columns_, rowCount, sender, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
+        Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
 
         auto table1state = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
 

+ 11 - 5
ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp

@@ -1843,13 +1843,13 @@ std::unique_ptr<NEvents::TDataEvents::TEvWrite> MakeWriteRequest(ui64 txId, NKik
     return evWrite;
 }
 
-NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
+NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
 {
-    auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
+    auto txMode = request->Record.GetTxMode();
     runtime.SendToPipe(shardId, sender, request.release(), 0, GetPipeConfigWithRetries());
 
     auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(sender);
-    auto status = ev->Get()->Record.GetStatus();
+    auto resultRecord = ev->Get()->Record;
     
     if (expectedStatus == NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED) {
         switch (txMode) {
@@ -1866,9 +1866,15 @@ NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId
                 break;
         }
     }
-    UNIT_ASSERT_C(status == expectedStatus, "Status: " << ev->Get()->Record.GetStatus() << " Issues: " << ev->Get()->Record.GetIssues());
+    UNIT_ASSERT_C(resultRecord.GetStatus() == expectedStatus, "Status: " << resultRecord.GetStatus() << " Issues: " << resultRecord.GetIssues());
 
-    return ev->Get()->Record;
+    return resultRecord;
+}
+
+NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus)
+{
+    auto request = MakeWriteRequest(txId, txMode, tableId, columns, rowCount);
+    return Write(runtime, sender, shardId, std::move(request), expectedStatus);
 }
 
 void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values)

+ 2 - 1
ydb/core/tx/datashard/ut_common/datashard_ut_common.h

@@ -710,7 +710,8 @@ void ExecSQL(Tests::TServer::TPtr server,
              bool dml = true,
              Ydb::StatusIds::StatusCode code = Ydb::StatusIds::SUCCESS);
 
-NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, TActorId sender, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
+NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, std::unique_ptr<NEvents::TDataEvents::TEvWrite>&& request, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
+NKikimrDataEvents::TEvWriteResult Write(TTestActorRuntime& runtime, TActorId sender, ui64 shardId, ui64 tableId, const TVector<TShardedTableOptions::TColumn>& columns, ui32 rowCount, ui64 txId, NKikimrDataEvents::TEvWrite::ETxMode txMode, NKikimrDataEvents::TEvWriteResult::EStatus expectedStatus = NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED);
 
 void UploadRows(TTestActorRuntime& runtime, const TString& tablePath, const TVector<std::pair<TString, Ydb::Type_PrimitiveTypeId>>& types, const TVector<TCell>& keys, const TVector<TCell>& values);