Browse Source

the `TEvTxCalcPredicate` message for the completed transaction (#8809)

Alek5andr-Kotov 6 months ago
parent
commit
79a6b7ae19

+ 2 - 2
ydb/core/persqueue/events/internal.h

@@ -822,7 +822,7 @@ struct TEvPQ {
     };
 
     struct TEvTxCalcPredicateResult : public TEventLocal<TEvTxCalcPredicateResult, EvTxCalcPredicateResult> {
-        TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, bool predicate) :
+        TEvTxCalcPredicateResult(ui64 step, ui64 txId, const NPQ::TPartitionId& partition, TMaybe<bool> predicate) :
             Step(step),
             TxId(txId),
             Partition(partition),
@@ -833,7 +833,7 @@ struct TEvPQ {
         ui64 Step;
         ui64 TxId;
         NPQ::TPartitionId Partition;
-        bool Predicate = false;
+        TMaybe<bool> Predicate;
     };
 
     struct TEvProposePartitionConfig : public TEventLocal<TEvProposePartitionConfig, EvProposePartitionConfig> {

+ 15 - 0
ydb/core/persqueue/partition.cpp

@@ -966,6 +966,21 @@ void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const
 
 void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
 {
+    PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
+             " Step " << ev->Get()->Step <<
+             ", TxId " << ev->Get()->TxId);
+
+    if (PlanStep.Defined() && TxId.Defined()) {
+        if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
+            Send(Tablet,
+                 MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Get()->Step,
+                                                             ev->Get()->TxId,
+                                                             Partition,
+                                                             Nothing()).Release());
+            return;
+        }
+    }
+
     PushBackDistrTx(ev->Release());
 
     ProcessTxsAndUserActs(ctx);

+ 7 - 11
ydb/core/persqueue/pq_impl.cpp

@@ -3504,7 +3504,7 @@ void TPersQueue::Handle(TEvPQ::TEvTxCalcPredicateResult::TPtr& ev, const TActorC
              " Step " << event.Step <<
              ", TxId " << event.TxId <<
              ", Partition " << event.Partition <<
-             ", Predicate " << (event.Predicate ? "true" : "false"));
+             ", Predicate " << event.Predicate);
 
     auto tx = GetTransaction(ctx, event.TxId);
     if (!tx) {
@@ -4210,9 +4210,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
 
         tx.WriteInProgress = false;
 
-        //
-        // запланированные события будут отправлены в EndWriteTxs
-        //
+        // scheduled events will be sent to EndWriteTxs
 
         tx.State = NKikimrPQ::TTransaction::PREPARED;
         PQ_LOG_D("TxId " << tx.TxId <<
@@ -4240,9 +4238,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
 
         tx.WriteInProgress = false;
 
-        //
-        // запланированные события будут отправлены в EndWriteTxs
-        //
+        // scheduled events will be sent to EndWriteTxs
 
         tx.State = NKikimrPQ::TTransaction::PLANNED;
         PQ_LOG_D("TxId " << tx.TxId <<
@@ -4272,6 +4268,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
             switch (tx.Kind) {
             case NKikimrPQ::TTransaction::KIND_DATA:
             case NKikimrPQ::TTransaction::KIND_CONFIG:
+                WriteTx(tx, NKikimrPQ::TTransaction::CALCULATED);
+
                 tx.State = NKikimrPQ::TTransaction::CALCULATED;
                 PQ_LOG_D("TxId " << tx.TxId <<
                          ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
@@ -4281,14 +4279,12 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
             case NKikimrPQ::TTransaction::KIND_UNKNOWN:
                 Y_ABORT_UNLESS(false);
             }
-        } else {
-            break;
         }
 
-        [[fallthrough]];
+        break;
 
     case NKikimrPQ::TTransaction::CALCULATED:
-        Y_ABORT_UNLESS(!tx.WriteInProgress,
+        Y_ABORT_UNLESS(tx.WriteInProgress,
                        "PQ %" PRIu64 ", TxId %" PRIu64,
                        TabletID(), tx.TxId);
 

+ 11 - 4
ydb/core/persqueue/transaction.cpp

@@ -215,8 +215,13 @@ void TDistributedTransaction::OnTxCalcPredicateResult(const TEvPQ::TEvTxCalcPred
 {
     PQ_LOG_D("Handle TEvTxCalcPredicateResult");
 
-    OnPartitionResult(event,
-                      event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT);
+    TMaybe<EDecision> decision;
+
+    if (event.Predicate.Defined()) {
+        decision = *event.Predicate ? NKikimrTx::TReadSetData::DECISION_COMMIT : NKikimrTx::TReadSetData::DECISION_ABORT;
+    }
+
+    OnPartitionResult(event, decision);
 }
 
 void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvProposePartitionConfigResult& event)
@@ -228,14 +233,16 @@ void TDistributedTransaction::OnProposePartitionConfigResult(const TEvPQ::TEvPro
 }
 
 template<class E>
-void TDistributedTransaction::OnPartitionResult(const E& event, EDecision decision)
+void TDistributedTransaction::OnPartitionResult(const E& event, TMaybe<EDecision> decision)
 {
     Y_ABORT_UNLESS(Step == event.Step);
     Y_ABORT_UNLESS(TxId == event.TxId);
 
     Y_ABORT_UNLESS(Partitions.contains(event.Partition.OriginalPartitionId));
 
-    SetDecision(SelfDecision, decision);
+    if (decision.Defined()) {
+        SetDecision(SelfDecision, *decision);
+    }
 
     ++PartitionRepliesCount;
 

+ 1 - 1
ydb/core/persqueue/transaction.h

@@ -90,7 +90,7 @@ struct TDistributedTransaction {
     void InitPartitions();
 
     template<class E>
-    void OnPartitionResult(const E& event, EDecision decision);
+    void OnPartitionResult(const E& event, TMaybe<EDecision> decision);
 
     TString LogPrefix() const;