Browse Source

Move ReValidateKeys to TKeyValidator (#1217)

azevaykin 1 year ago
parent
commit
4c2eccd076

+ 9 - 10
ydb/core/tx/datashard/datashard__engine_host.cpp

@@ -585,16 +585,7 @@ public:
     ui64 GetTableSchemaVersion(const TTableId& tableId) const override {
         if (TSysTables::IsSystemTable(tableId))
             return 0;
-        const auto& userTables = Self->GetUserTables();
-        auto it = userTables.find(tableId.PathId.LocalPathId);
-        if (it == userTables.end()) {
-            Y_FAIL_S("DatshardEngineHost (tablet id: " << Self->TabletID()
-                     << " state: " << Self->GetState()
-                     << ") unables to find given table with id: " << tableId);
-            return 0;
-        } else {
-            return it->second->GetTableSchemaVersion();
-        }
+        return GetKeyValidator().GetTableSchemaVersion(tableId);
     }
 
     ui64 GetWriteTxId(const TTableId& tableId) const override {
@@ -997,6 +988,13 @@ private:
         return static_cast<const TDataShardSysTables *>(Self->GetDataShardSysTables())->Get(tableId);
     }
 
+    TKeyValidator& GetKeyValidator() {
+        return EngineBay.GetKeyValidator();
+    }
+    const TKeyValidator& GetKeyValidator() const {
+        return EngineBay.GetKeyValidator();
+    }
+
     TDataShard* Self;
     TEngineBay& EngineBay;
     NTable::TDatabase& DB;
@@ -1023,6 +1021,7 @@ private:
 TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActorContext& ctx,
                        std::pair<ui64, ui64> stepTxId)
     : StepTxId(stepTxId)
+    , KeyValidator(*self, txc.DB)
     , LockTxId(0)
     , LockNodeId(0)
 {

+ 5 - 12
ydb/core/tx/datashard/datashard_write_operation.cpp

@@ -192,18 +192,11 @@ void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NPro
 
 ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors)
 {
-    using EResult = NMiniKQL::IEngineFlat::EResult;
+    SetTxKeys(RecordOperation().GetColumnIds());
+
+    bool isValid = ReValidateKeys();
+    Y_ABORT_UNLESS(allowErrors || isValid, "Validation errors: %s", ErrStr.data());
 
-    EResult result = EngineBay.Validate();
-    if (allowErrors) {
-        if (result != EResult::Ok) {
-            ErrStr = EngineBay.GetEngine()->GetErrors();
-            ErrCode = ConvertErrCode(result);
-            return 0;
-        }
-    } else {
-        Y_ABORT_UNLESS(result == EResult::Ok, "Engine errors: %s", EngineBay.GetEngine()->GetErrors().data());
-    }
     return KeysCount();
 }
 
@@ -212,7 +205,7 @@ bool TValidatedWriteTx::ReValidateKeys()
     using EResult = NMiniKQL::IEngineFlat::EResult;
 
 
-    auto [result, error] = EngineBay.GetKqpComputeCtx().ValidateKeys(EngineBay.TxInfo());
+    auto [result, error] = GetKeyValidator().ValidateKeys();
     if (result != EResult::Ok) {
         ErrStr = std::move(error);
         ErrCode = ConvertErrCode(result);

+ 1 - 1
ydb/core/tx/datashard/datashard_write_operation.h

@@ -148,7 +148,7 @@ public:
     }
 
     bool ParseRecord(const TDataShard::TTableInfos& tableInfos);
-    void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds);  
+    void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds);
 
     ui32 ExtractKeys(bool allowErrors);
     bool ReValidateKeys();

+ 65 - 2
ydb/core/tx/datashard/key_validator.cpp

@@ -1,15 +1,24 @@
 #include "key_validator.h"
-#include "ydb/core/base/appdata_fwd.h"
+#include "datashard_impl.h"
+#include "range_ops.h"
 
 
 #include <ydb/library/actors/core/actor.h>
 #include <ydb/library/actors/core/log.h>
 #include <ydb/library/services/services.pb.h>
-#include <ydb/core/tx/datashard/range_ops.h>
+
+
 
 using namespace NKikimr;
 using namespace NKikimr::NDataShard;
 
+TKeyValidator::TKeyValidator(const TDataShard& self, const NTable::TDatabase& db) 
+    : Self(self)
+    , Db(db)
+{
+
+}
+
 void TKeyValidator::AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit, bool reverse)
 {
     TVector<TKeyDesc::TColumnOp> columnOps;
@@ -56,6 +65,60 @@ void TKeyValidator::AddWriteRange(const TTableId& tableId, const TTableRange& ra
     Info.SetLoaded();
 }
 
+bool TKeyValidator::IsValidKey(TKeyDesc& key) const {
+    ui64 localTableId = Self.GetLocalTableId(key.TableId);
+    return NMiniKQL::IsValidKey(Db.GetScheme(), localTableId, key);
+}
+
+ui64 TKeyValidator::GetTableSchemaVersion(const TTableId& tableId) const {
+    if (TSysTables::IsSystemTable(tableId))
+        return 0;
+
+    const auto& userTables = Self.GetUserTables();
+    auto it = userTables.find(tableId.PathId.LocalPathId);
+    if (it == userTables.end()) {
+        Y_FAIL_S("TKeyValidator (tablet id: " << Self.TabletID() << " state: " << Self.GetState() << ") unable to find given table with id: " << tableId);
+        return 0;
+    } else {
+        return it->second->GetTableSchemaVersion();
+    }
+}
+
+std::tuple<NMiniKQL::IEngineFlat::EResult, TString> TKeyValidator::ValidateKeys() const {
+    using EResult = NMiniKQL::IEngineFlat::EResult;
+
+    for (const auto& validKey : Info.Keys) {
+        TKeyDesc* key = validKey.Key.get();
+
+        bool valid = IsValidKey(*key);
+
+        if (valid) {
+            auto curSchemaVersion = GetTableSchemaVersion(key->TableId);
+            if (key->TableId.SchemaVersion && curSchemaVersion && curSchemaVersion != key->TableId.SchemaVersion) {
+                auto error = TStringBuilder()
+                             << "Schema version mismatch for table id: " << key->TableId
+                             << " key table version: " << key->TableId.SchemaVersion
+                             << " current table version: " << curSchemaVersion;
+                return {EResult::SchemeChanged, std::move(error)};
+            }
+        } else {
+            switch (key->Status) {
+                case TKeyDesc::EStatus::SnapshotNotExist:
+                    return {EResult::SnapshotNotExist, ""};
+                case TKeyDesc::EStatus::SnapshotNotReady:
+                    key->Status = TKeyDesc::EStatus::Ok;
+                    return {EResult::SnapshotNotReady, ""};
+                default:
+                    auto error = TStringBuilder()
+                                 << "Validate (" << __LINE__ << "): Key validation status: " << (ui32)key->Status;
+                    return {EResult::KeyError, std::move(error)};
+            }
+        }
+    }
+
+    return {EResult::Ok, ""};
+}
+
 NMiniKQL::IEngineFlat::TValidationInfo& TKeyValidator::GetInfo() {
     return Info;
 }

+ 12 - 0
ydb/core/tx/datashard/key_validator.h

@@ -1,5 +1,6 @@
 #pragma once
 
+#include <ydb/core/tablet_flat/flat_database.h>
 
 #include <ydb/core/engine/mkql_engine_flat.h>
 #include <ydb/core/scheme_types/scheme_type_registry.h>
@@ -7,8 +8,11 @@
 
 namespace NKikimr::NDataShard {
 
+class TDataShard;
+
 class TKeyValidator {
 public:
+    TKeyValidator(const TDataShard& self, const NTable::TDatabase& db);
 
     struct TColumnWriteMeta {
         NTable::TColumn Column;
@@ -17,10 +21,18 @@ public:
 
     void AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit = 0, bool reverse = false);
     void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp);
+    
+    bool IsValidKey(TKeyDesc& key) const;
+    std::tuple<NMiniKQL::IEngineFlat::EResult, TString> ValidateKeys() const;
+
+    ui64 GetTableSchemaVersion(const TTableId& tableId) const;
 
     NMiniKQL::IEngineFlat::TValidationInfo& GetInfo();
     const NMiniKQL::IEngineFlat::TValidationInfo& GetInfo() const;
 private:
+    const TDataShard& Self;
+    const NTable::TDatabase& Db;
+
     NMiniKQL::IEngineFlat::TValidationInfo Info;
 };
 

+ 16 - 0
ydb/core/tx/datashard/write_unit.cpp

@@ -123,6 +123,22 @@ public:
         TDataShardLocksDb locksDb(DataShard, txc);
         TSetupSysLocks guardLocks(op, DataShard, &locksDb);
 
+        const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
+
+        if (op->IsImmediate() && !writeOp->ReValidateKeys()) {
+            // Immediate transactions may be reordered with schema changes and become invalid
+            Y_ABORT_UNLESS(!writeTx->Ready());
+            writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, writeTx->GetErrStr());
+            return EExecutionStatus::Executed;
+        }
+
+        if (writeTx->CheckCancelled()) {
+            writeOp->ReleaseTxData(txc, ctx);
+            writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, "Tx was cancelled");
+            DataShard.IncCounter(COUNTER_WRITE_CANCELLED);
+            return EExecutionStatus::Executed;
+        }
+
         try {
             DoExecute(&DataShard, writeOp, txc, ctx);
         } catch (const TNeedGlobalTxId&) {