Browse Source

KIKIMR-20778: Тест на лаг обновления данных через bulk_upsert (#1143)

Олег 1 year ago
parent
commit
9e04a3a768
1 changed files with 39 additions and 1 deletions
  1. 39 1
      ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

+ 39 - 1
ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

@@ -534,8 +534,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
     void WriteTestData(TKikimrRunner& kikimr, TString testTable, ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, bool withSomeNulls = false) {
         UNIT_ASSERT(testTable != "/Root/benchTable"); // TODO: check schema instead
         TLocalHelper lHelper(kikimr);
-        if (withSomeNulls)
+        if (withSomeNulls) {
             lHelper.WithSomeNulls();
+        }
         auto batch = lHelper.TestArrowBatch(pathIdBegin, tsBegin, rowCount);
         lHelper.SendDataViaActorSystem(testTable, batch);
     }
@@ -5292,6 +5293,43 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
         testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=2", "[[2;\"test_res_2\";#;[\"val1\"]]]");
     }
 
+    Y_UNIT_TEST(BulkUpsertUpdate) {
+        TKikimrSettings runnerSettings;
+        runnerSettings.WithSampleTables = false;
+        auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+        TTestHelper testHelper(runnerSettings);
+
+        TVector<TTestHelper::TColumnSchema> schema = {
+            TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int64).SetNullable(false),
+            TTestHelper::TColumnSchema().SetName("value").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
+        };
+
+        TTestHelper::TColumnTable testTable;
+        testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({ "id" }).SetSharding({ "id" }).SetSchema(schema);
+        testHelper.CreateTable(testTable);
+        {
+            TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+            tableInserter.AddRow().Add(1).Add(10);
+            testHelper.BulkUpsert(testTable, tableInserter);
+        }
+        while (csController->GetIndexations().Val() < 1) {
+            Cout << "Wait indexation..." << Endl;
+            Sleep(TDuration::Seconds(2));
+        }
+        testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` WHERE id = 1", "[[10]]");
+        {
+            TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+            tableInserter.AddRow().Add(1).Add(110);
+            testHelper.BulkUpsert(testTable, tableInserter);
+        }
+        testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` WHERE id = 1", "[[110]]");
+        while (csController->GetIndexations().Val() < 2) {
+            Cout << "Wait indexation..." << Endl;
+            Sleep(TDuration::Seconds(2));
+        }
+        testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` WHERE id = 1", "[[110]]");
+    }
+
 }
 
 } // namespace NKqp