Browse Source

use ss tablet id for schemeshard and sender for tx_proxy (#8684)

ivanmorozov333 6 months ago
parent
commit
53df48f10a

+ 5 - 0
ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp

@@ -48,4 +48,9 @@ bool TLongTxTransactionOperator::DoParse(TColumnShard& /*owner*/, const TString&
     return true;
 }
 
+void TLongTxTransactionOperator::DoSendReply(TColumnShard& owner, const TActorContext& ctx) {
+    const auto& txInfo = GetTxInfo();
+    ctx.Send(txInfo.Source, BuildProposeResultEvent(owner).release());
+}
+
 }

+ 2 - 0
ydb/core/tx/columnshard/transactions/operators/long_tx_write.h

@@ -24,6 +24,8 @@ namespace NKikimr::NColumnShard {
         virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {
 
         }
+        virtual void DoSendReply(TColumnShard& owner, const TActorContext& ctx) override;
+
         virtual void DoFinishProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) override {
         }
         virtual void DoFinishProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {

+ 19 - 4
ydb/core/tx/columnshard/transactions/operators/propose_tx.cpp

@@ -3,12 +3,25 @@
 namespace NKikimr::NColumnShard {
 
 void IProposeTxOperator::DoSendReply(TColumnShard& owner, const TActorContext& ctx) {
+    if (owner.CurrentSchemeShardId) {
+        AFL_VERIFY(owner.CurrentSchemeShardId);
+        ctx.Send(MakePipePerNodeCacheID(false),
+            new TEvPipeCache::TEvForward(BuildProposeResultEvent(owner).release(), (ui64)owner.CurrentSchemeShardId, true));
+    } else {
+        AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "scheme_shard_tablet_not_initialized")("source", GetTxInfo().Source);
+        ctx.Send(GetTxInfo().Source, BuildProposeResultEvent(owner).release());
+    }
+}
+
+std::unique_ptr<NKikimr::TEvColumnShard::TEvProposeTransactionResult> IProposeTxOperator::BuildProposeResultEvent(const TColumnShard& owner) const {
     const auto& txInfo = GetTxInfo();
-    std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> evResult = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
-        owner.TabletID(), txInfo.TxKind, txInfo.TxId, GetProposeStartInfoVerified().GetStatus(), GetProposeStartInfoVerified().GetStatusMessage());
+    std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> evResult =
+        std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(owner.TabletID(), txInfo.TxKind, txInfo.TxId,
+            GetProposeStartInfoVerified().GetStatus(), GetProposeStartInfoVerified().GetStatusMessage());
     if (IsFail()) {
         owner.Counters.GetTabletCounters()->IncCounter(COUNTER_PREPARE_ERROR);
-        AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())("tx_id", txInfo.TxId);
+        AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())(
+            "tx_id", txInfo.TxId);
     } else {
         evResult->Record.SetMinStep(txInfo.MinStep);
         evResult->Record.SetMaxStep(txInfo.MaxStep);
@@ -16,8 +29,10 @@ void IProposeTxOperator::DoSendReply(TColumnShard& owner, const TActorContext& c
             evResult->Record.MutableDomainCoordinators()->CopyFrom(owner.ProcessingParams->GetCoordinators());
         }
         owner.Counters.GetTabletCounters()->IncCounter(COUNTER_PREPARE_SUCCESS);
+        AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())(
+            "tx_id", txInfo.TxId);
     }
-    ctx.Send(txInfo.Source, evResult.release());
+    return evResult;
 }
 
 }

+ 1 - 0
ydb/core/tx/columnshard/transactions/operators/propose_tx.h

@@ -12,6 +12,7 @@ protected:
     virtual bool DoCheckTxInfoForReply(const TFullTxInfo& originalTxInfo) const override {
         return GetTxInfo() == originalTxInfo;
     }
+    std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> BuildProposeResultEvent(const TColumnShard& owner) const;
     virtual void DoSendReply(TColumnShard& owner, const TActorContext& ctx) override;
     virtual bool DoCheckAllowUpdate(const TFullTxInfo& currentTxInfo) const override {
         if (!currentTxInfo.SeqNo || !GetTxInfo().SeqNo) {

+ 11 - 16
ydb/core/tx/columnshard/transactions/operators/schema.cpp

@@ -40,7 +40,17 @@ public:
     }
 };
 
-NKikimr::NColumnShard::TTxController::TProposeResult TSchemaTransactionOperator::DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) {
+TTxController::TProposeResult TSchemaTransactionOperator::DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) {
+    auto seqNo = SeqNoFromProto(SchemaTxBody.GetSeqNo());
+    auto lastSeqNo = owner.LastSchemaSeqNo;
+
+    // Check if proposal is outdated
+    if (seqNo < lastSeqNo) {
+        auto errorMessage = TStringBuilder() << "Ignoring outdated schema tx proposal at tablet " << owner.TabletID() << " txId " << GetTxId()
+                                             << " ssId " << owner.CurrentSchemeShardId << " seqNo " << seqNo << " lastSeqNo " << lastSeqNo;
+        return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_CHANGED, errorMessage);
+    }
+
     switch (SchemaTxBody.TxBody_case()) {
         case NKikimrTxColumnShard::TSchemaTxBody::kInitShard:
         {
@@ -67,21 +77,6 @@ NKikimr::NColumnShard::TTxController::TProposeResult TSchemaTransactionOperator:
             break;
     }
 
-    auto seqNo = SeqNoFromProto(SchemaTxBody.GetSeqNo());
-    auto lastSeqNo = owner.LastSchemaSeqNo;
-
-    // Check if proposal is outdated
-    if (seqNo < lastSeqNo) {
-        auto errorMessage = TStringBuilder()
-            << "Ignoring outdated schema tx proposal at tablet "
-            << owner.TabletID()
-            << " txId " << GetTxId()
-            << " ssId " << owner.CurrentSchemeShardId
-            << " seqNo " << seqNo
-            << " lastSeqNo " << lastSeqNo;
-        return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_CHANGED, errorMessage);
-    }
-
     owner.UpdateSchemaSeqNo(seqNo, txc);
     return TProposeResult();
 }

+ 5 - 2
ydb/core/tx/columnshard/transactions/tx_controller.cpp

@@ -278,7 +278,10 @@ TDuration TTxController::GetTxCompleteLag(ui64 timecastStep) const {
 TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
     auto it = Operators.find(txId);
     if (it == Operators.end()) {
+        AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_plan_tx")("tx_id", txId);
         return EPlanResult::Skipped;
+    } else {
+        AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "plan_tx")("tx_id", txId)("plan_step", it->second->MutableTxInfo().PlanStep);
     }
     auto& txInfo = it->second->MutableTxInfo();
     if (txInfo.PlanStep == 0) {
@@ -304,8 +307,8 @@ void TTxController::OnTabletInit() {
 
 std::shared_ptr<TTxController::ITransactionOperator> TTxController::StartProposeOnExecute(
     const TTxController::TTxInfo& txInfo, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc) {
-    NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnExecute")(
-        "tx_info", txInfo.DebugString())("tx_info", txInfo.DebugString());
+    NActors::TLogContextGuard lGuard =
+        NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnExecute")("tx_info", txInfo.DebugString());
     AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
     std::shared_ptr<TTxController::ITransactionOperator> txOperator(
         TTxController::ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo));