Browse Source

KIKIMR-19377: validate NOT NULL constraint on reads

KIKIMR-19377: validate NOT NULL constraint on datashard reads

datashard common ut
qrort 1 year ago
parent
commit
aa31162bb2

+ 10 - 0
.mapping.json

@@ -4748,6 +4748,11 @@
   "ydb/core/kqp/ut/cost/CMakeLists.linux-x86_64.txt":"",
   "ydb/core/kqp/ut/cost/CMakeLists.txt":"",
   "ydb/core/kqp/ut/cost/CMakeLists.windows-x86_64.txt":"",
+  "ydb/core/kqp/ut/data/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/core/kqp/ut/data/CMakeLists.linux-aarch64.txt":"",
+  "ydb/core/kqp/ut/data/CMakeLists.linux-x86_64.txt":"",
+  "ydb/core/kqp/ut/data/CMakeLists.txt":"",
+  "ydb/core/kqp/ut/data/CMakeLists.windows-x86_64.txt":"",
   "ydb/core/kqp/ut/effects/CMakeLists.darwin-x86_64.txt":"",
   "ydb/core/kqp/ut/effects/CMakeLists.linux-aarch64.txt":"",
   "ydb/core/kqp/ut/effects/CMakeLists.linux-x86_64.txt":"",
@@ -5563,6 +5568,11 @@
   "ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux-x86_64.txt":"",
   "ydb/core/tx/datashard/ut_change_exchange/CMakeLists.txt":"",
   "ydb/core/tx/datashard/ut_change_exchange/CMakeLists.windows-x86_64.txt":"",
+  "ydb/core/tx/datashard/ut_common/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/core/tx/datashard/ut_common/CMakeLists.linux-aarch64.txt":"",
+  "ydb/core/tx/datashard/ut_common/CMakeLists.linux-x86_64.txt":"",
+  "ydb/core/tx/datashard/ut_common/CMakeLists.txt":"",
+  "ydb/core/tx/datashard/ut_common/CMakeLists.windows-x86_64.txt":"",
   "ydb/core/tx/datashard/ut_compaction/CMakeLists.darwin-x86_64.txt":"",
   "ydb/core/tx/datashard/ut_compaction/CMakeLists.linux-aarch64.txt":"",
   "ydb/core/tx/datashard/ut_compaction/CMakeLists.linux-x86_64.txt":"",

+ 2 - 0
ydb/core/kqp/common/kqp_resolve.h

@@ -53,6 +53,7 @@ public:
                 column.Type = NScheme::TTypeInfo(phyColumn.GetTypeId(),
                     NPg::TypeDescFromPgTypeName(phyColumn.GetPgTypeName()));
             }
+            column.NotNull = phyColumn.GetNotNull();
 
             Columns.emplace(phyColumn.GetId().GetName(), std::move(column));
             if (!phyColumn.GetDefaultFromSequence().empty()) {
@@ -85,6 +86,7 @@ public:
             TKqpTableKeys::TColumn column;
             column.Id = systemColumn->ColumnId;
             column.Type = NScheme::TTypeInfo(systemColumn->TypeId);
+            column.NotNull = false;
             Columns.emplace(columnName, std::move(column));
         }
 

+ 1 - 0
ydb/core/kqp/executer_actor/kqp_executer_impl.h

@@ -935,6 +935,7 @@ protected:
                 protoColumn->SetId(column.Id);
                 auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, column.TypeMod);
                 protoColumn->SetType(columnType.TypeId);
+                protoColumn->SetNotNull(column.NotNull);
                 if (columnType.TypeInfo) {
                     *protoColumn->MutableTypeInfo() = *columnType.TypeInfo;
                 }

+ 2 - 0
ydb/core/kqp/executer_actor/kqp_tasks_graph.h

@@ -181,6 +181,7 @@ public:
         NScheme::TTypeInfo Type;
         TString TypeMod;
         TString Name;
+        bool NotNull;
     };
 
     struct TColumnWrite {
@@ -270,6 +271,7 @@ TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, TIntrusiveConstPtr<
         c.Type = tableColumn.Type;
         c.TypeMod = tableColumn.TypeMod;
         c.Name = column.GetName();
+        c.NotNull = tableColumn.NotNull;
 
         columns.emplace_back(std::move(c));
     }

+ 1 - 1
ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

@@ -176,7 +176,7 @@ void FillTable(const TKikimrTableMetadata& tableMeta, THashSet<TStringBuf>&& col
         } else if (column->IsDefaultFromLiteral()) {
             phyColumn.MutableDefaultFromLiteral()->CopyFrom(column->DefaultFromLiteral);
         }
-
+        phyColumn.SetNotNull(column->NotNull);
         if (column->TypeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
             phyColumn.SetPgTypeName(NPg::PgTypeNameFromTypeDesc(column->TypeInfo.GetTypeDesc()));
         }

+ 34 - 0
ydb/core/kqp/runtime/kqp_read_actor.cpp

@@ -920,6 +920,19 @@ public:
         return TStringBuilder() << "first request = " << token.GetFirstUnprocessedQuery() << " lastkey = " << lastKey;
     }
 
+    void ReportNullValue(const THolder<TEventHandle<TEvDataShard::TEvReadResult>>& result, size_t columnIndex) {
+        CA_LOG_D(TStringBuilder() << "validation failed, "
+            << " seqno = " << result->Get()->Record.GetSeqNo()
+            << " finished = " << result->Get()->Record.GetFinished());
+        NYql::TIssue issue;
+        issue.SetCode(NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION, NYql::TSeverityIds::S_FATAL);
+        issue.SetMessage(TStringBuilder()
+            << "Read from column index " << columnIndex << ": got NULL from NOT NULL column");
+        NYql::TIssues issues;
+        issues.AddIssue(std::move(issue));
+        Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::INTERNAL_ERROR));
+    }
+
     void HandleRead(TEvDataShard::TEvReadResult::TPtr ev) {
         const auto& record = ev->Get()->Record;
         auto id = record.GetReadId();
@@ -1008,6 +1021,7 @@ public:
             << " seqno = " << ev->Get()->Record.GetSeqNo()
             << " finished = " << ev->Get()->Record.GetFinished());
         CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()) << " continuation token " << DebugPrintContionuationToken(record.GetContinuationToken()));
+
         Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
         NotifyCA();
     }
@@ -1106,6 +1120,20 @@ public:
                     stats.AddStatistics(
                         NMiniKQL::WriteColumnValuesFromArrow(editAccessors, *result->Get()->GetArrowBatch(), columnIndex, resultColumnIndex, column.TypeInfo)
                     );
+                    if (column.NotNull) {
+                        std::shared_ptr<arrow::Array> columnSharedPtr = result->Get()->GetArrowBatch()->column(columnIndex);       
+                        bool gotNullValue = false;
+                        for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
+                            if (columnSharedPtr->IsNull(rowIndex)) {
+                                gotNullValue = true;
+                                break;
+                            }
+                        }
+                        if (gotNullValue) {
+                            ReportNullValue(result, columnIndex);
+                            return stats;
+                        }
+                    }
                     columnIndex += 1;
                 }
             }
@@ -1164,6 +1192,10 @@ public:
                 if (column.IsSystem) {
                     NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, column.Tag, column.TypeInfo);
                 } else {
+                    if (column.NotNull && row[columnIndex].IsNull()) {
+                        ReportNullValue(result, columnIndex);
+                        return stats;
+                    }
                     rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], column.TypeInfo);
                     columnIndex += 1;
                 }
@@ -1390,6 +1422,7 @@ private:
             column.Tag = srcColumn.GetId();
             column.TypeInfo = MakeTypeInfo(srcColumn);
             column.IsSystem = IsSystemColumn(column.Tag);
+            column.NotNull = srcColumn.GetNotNull();
             ResultColumns.push_back(column);
         }
     }
@@ -1399,6 +1432,7 @@ private:
         bool IsSystem = false;
         ui32 Tag = 0;
         NScheme::TTypeInfo TypeInfo;
+        bool NotNull;
     };
 
     const NKikimrTxDataShard::TKqpReadRangesSourceSettings* Settings;

+ 1 - 0
ydb/core/kqp/ut/CMakeLists.txt

@@ -9,6 +9,7 @@
 add_subdirectory(arrow)
 add_subdirectory(common)
 add_subdirectory(cost)
+add_subdirectory(data)
 add_subdirectory(effects)
 add_subdirectory(federated_query)
 add_subdirectory(idx_test)

+ 6 - 4
ydb/core/kqp/ut/arrow/kqp_types_arrow_ut.cpp

@@ -48,6 +48,7 @@ void InsertAllColumnsAndCheckSelectAll(TKikimrRunner* runner) {
             YsonValue Yson,
             JsonDocumentValue JsonDocument,
             DyNumberValue DyNumber,
+            Int32NotNullValue Int32 NOT NULL,
             PRIMARY KEY (Key)
         );
     )").GetValueSync();
@@ -55,18 +56,18 @@ void InsertAllColumnsAndCheckSelectAll(TKikimrRunner* runner) {
 
     auto insertResult = session.ExecuteDataQuery(R"(
         --!syntax_v1
-        INSERT INTO `/Root/Tmp` (Key, BoolValue, Int32Value, Uint32Value, Int64Value, Uint64Value, FloatValue, DoubleValue, StringValue, Utf8Value, DateValue, DatetimeValue, TimestampValue, IntervalValue, DecimalValue, JsonValue, YsonValue, JsonDocumentValue, DyNumberValue) VALUES
-        (42, true, -1, 1, -2, 2, CAST(3.0 AS Float), 4.0, "five", Utf8("six"), Date("2007-07-07"), Datetime("2008-08-08T08:08:08Z"), Timestamp("2009-09-09T09:09:09.09Z"), Interval("P10D"), CAST("11.11" AS Decimal(22, 9)), "[12]", "[13]", JsonDocument("[14]"), DyNumber("15.15"));
+        INSERT INTO `/Root/Tmp` (Key, BoolValue, Int32Value, Uint32Value, Int64Value, Uint64Value, FloatValue, DoubleValue, StringValue, Utf8Value, DateValue, DatetimeValue, TimestampValue, IntervalValue, DecimalValue, JsonValue, YsonValue, JsonDocumentValue, DyNumberValue, Int32NotNullValue) VALUES
+        (42, true, -1, 1, -2, 2, CAST(3.0 AS Float), 4.0, "five", Utf8("six"), Date("2007-07-07"), Datetime("2008-08-08T08:08:08Z"), Timestamp("2009-09-09T09:09:09.09Z"), Interval("P10D"), CAST("11.11" AS Decimal(22, 9)), "[12]", "[13]", JsonDocument("[14]"), DyNumber("15.15"), 123);
     )", TTxControl::BeginTx().CommitTx()).GetValueSync();
     UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
 
-    auto it = db.StreamExecuteScanQuery("SELECT Key, BoolValue, Int32Value, Uint32Value, Int64Value, Uint64Value, FloatValue, DoubleValue, StringValue, Utf8Value, DateValue, DatetimeValue, TimestampValue, IntervalValue, DecimalValue, JsonValue, YsonValue, JsonDocumentValue, DyNumberValue FROM `/Root/Tmp`").GetValueSync();
+    auto it = db.StreamExecuteScanQuery("SELECT Key, BoolValue, Int32Value, Uint32Value, Int64Value, Uint64Value, FloatValue, DoubleValue, StringValue, Utf8Value, DateValue, DatetimeValue, TimestampValue, IntervalValue, DecimalValue, JsonValue, YsonValue, JsonDocumentValue, DyNumberValue, Int32NotNullValue FROM `/Root/Tmp`").GetValueSync();
     UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
     auto streamPart = it.ReadNext().GetValueSync();
     UNIT_ASSERT_C(streamPart.IsSuccess(), streamPart.GetIssues().ToString());
     auto resultSet = streamPart.ExtractResultSet();
     auto columns = resultSet.GetColumnsMeta();
-    UNIT_ASSERT_C(columns.size() == 19, "Wrong columns count");
+    UNIT_ASSERT_C(columns.size() == 20, "Wrong columns count");
     NYdb::TResultSetParser parser(resultSet);
     UNIT_ASSERT_C(parser.TryNextRow(), "Row is missing");
     UNIT_ASSERT(*parser.ColumnParser(0).GetOptionalUint64().Get() == 42);
@@ -89,6 +90,7 @@ void InsertAllColumnsAndCheckSelectAll(TKikimrRunner* runner) {
     UNIT_ASSERT(*parser.ColumnParser(16).GetOptionalYson().Get() == TString("[13]"));
     UNIT_ASSERT(*parser.ColumnParser(17).GetOptionalJsonDocument().Get() == TString("[14]"));
     UNIT_ASSERT(*parser.ColumnParser(18).GetOptionalDyNumber().Get() == TString(".1515e2"));
+    UNIT_ASSERT(parser.ColumnParser(19).GetInt32() == 123);
 }
 
 }

+ 82 - 0
ydb/core/kqp/ut/data/CMakeLists.darwin-x86_64.txt

@@ -0,0 +1,82 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-kqp-ut-data)
+target_compile_options(ydb-core-kqp-ut-data PRIVATE
+  -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-kqp-ut-data PRIVATE
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp
+)
+target_link_libraries(ydb-core-kqp-ut-data PUBLIC
+  contrib-libs-cxxsupp
+  yutil
+  library-cpp-cpuid_check
+  cpp-testing-unittest_main
+  ydb-core-kqp
+  kqp-ut-common
+  core-testlib-default
+  ydb-core-tx
+  tx-datashard-ut_common
+  cpp-client-ydb_types
+)
+target_link_options(ydb-core-kqp-ut-data PRIVATE
+  -Wl,-platform_version,macos,11.0,11.0
+  -fPIC
+  -fPIC
+  -framework
+  CoreFoundation
+)
+target_sources(ydb-core-kqp-ut-data PRIVATE
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/data/kqp_read_null_ut.cpp
+)
+set_property(
+  TARGET
+  ydb-core-kqp-ut-data
+  PROPERTY
+  SPLIT_FACTOR
+  10
+)
+add_yunittest(
+  NAME
+  ydb-core-kqp-ut-data
+  TEST_TARGET
+  ydb-core-kqp-ut-data
+  TEST_ARG
+  --print-before-suite
+  --print-before-test
+  --fork-tests
+  --print-times
+  --show-fails
+)
+set_yunittest_property(
+  TEST
+  ydb-core-kqp-ut-data
+  PROPERTY
+  LABELS
+  MEDIUM
+)
+set_yunittest_property(
+  TEST
+  ydb-core-kqp-ut-data
+  PROPERTY
+  PROCESSORS
+  1
+)
+set_yunittest_property(
+  TEST
+  ydb-core-kqp-ut-data
+  PROPERTY
+  TIMEOUT
+  600
+)
+target_allocator(ydb-core-kqp-ut-data
+  system_allocator
+)
+vcs_info(ydb-core-kqp-ut-data)

+ 85 - 0
ydb/core/kqp/ut/data/CMakeLists.linux-aarch64.txt

@@ -0,0 +1,85 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-kqp-ut-data)
+target_compile_options(ydb-core-kqp-ut-data PRIVATE
+  -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-kqp-ut-data PRIVATE
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp
+)
+target_link_libraries(ydb-core-kqp-ut-data PUBLIC
+  contrib-libs-linux-headers
+  contrib-libs-cxxsupp
+  yutil
+  cpp-testing-unittest_main
+  ydb-core-kqp
+  kqp-ut-common
+  core-testlib-default
+  ydb-core-tx
+  tx-datashard-ut_common
+  cpp-client-ydb_types
+)
+target_link_options(ydb-core-kqp-ut-data PRIVATE
+  -ldl
+  -lrt
+  -Wl,--no-as-needed
+  -fPIC
+  -fPIC
+  -lpthread
+  -lrt
+  -ldl
+)
+target_sources(ydb-core-kqp-ut-data PRIVATE
+  ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/data/kqp_read_null_ut.cpp
+)
+set_property(
+  TARGET
+  ydb-core-kqp-ut-data
+  PROPERTY
+  SPLIT_FACTOR
+  10
+)
+add_yunittest(
+  NAME
+  ydb-core-kqp-ut-data
+  TEST_TARGET
+  ydb-core-kqp-ut-data
+  TEST_ARG
+  --print-before-suite
+  --print-before-test
+  --fork-tests
+  --print-times
+  --show-fails
+)
+set_yunittest_property(
+  TEST
+  ydb-core-kqp-ut-data
+  PROPERTY
+  LABELS
+  MEDIUM
+)
+set_yunittest_property(
+  TEST
+  ydb-core-kqp-ut-data
+  PROPERTY
+  PROCESSORS
+  1
+)
+set_yunittest_property(
+  TEST
+  ydb-core-kqp-ut-data
+  PROPERTY
+  TIMEOUT
+  600
+)
+target_allocator(ydb-core-kqp-ut-data
+  cpp-malloc-jemalloc
+)
+vcs_info(ydb-core-kqp-ut-data)

Some files were not shown because too many files changed in this diff