Browse Source

Resharper transactions are hanging (#7676)

Alek5andr-Kotov 7 months ago
parent
commit
d9b7f3727e

+ 91 - 43
ydb/core/persqueue/pq_impl.cpp

@@ -1340,6 +1340,7 @@ void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
         EndWriteTabletState(resp, ctx);
         break;
     case WRITE_TX_COOKIE:
+        PQ_LOG_D("Handle TEvKeyValue::TEvResponse (WRITE_TX_COOKIE)");
         EndWriteTxs(resp, ctx);
         break;
     default:
@@ -3459,7 +3460,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorCo
     tx->OnReadSetAck(event);
     tx->UnbindMsgsFromPipe(event.GetTabletConsumer());
 
-    if (tx->State == NKikimrPQ::TTransaction::EXECUTED) {
+    if (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) {
         CheckTxState(ctx, *tx);
 
         TryWriteTxs(ctx);
@@ -3803,20 +3804,34 @@ void TPersQueue::ProcessWriteTxs(const TActorContext& ctx,
 void TPersQueue::ProcessDeleteTxs(const TActorContext& ctx,
                                   NKikimrClient::TKeyValueRequest& request)
 {
-    Y_ABORT_UNLESS(!WriteTxsInProgress);
+    Y_ABORT_UNLESS(!WriteTxsInProgress,
+                   "PQ %" PRIu64,
+                   TabletID());
 
     for (ui64 txId : DeleteTxs) {
-        auto tx = GetTransaction(ctx, txId);
-        Y_ABORT_UNLESS(tx);
+        PQ_LOG_D("delete key for TxId " << txId);
+        AddCmdDeleteTx(request, txId);
 
-        tx->AddCmdDelete(request);
-
-        ChangedTxs.insert(tx->TxId);
+        auto tx = GetTransaction(ctx, txId);
+        if (tx) {
+            ChangedTxs.insert(txId);
+        }
     }
 
     DeleteTxs.clear();
 }
 
+void TPersQueue::AddCmdDeleteTx(NKikimrClient::TKeyValueRequest& request,
+                                ui64 txId)
+{
+    TString key = GetTxKey(txId);
+    auto range = request.AddCmdDeleteRange()->MutableRange();
+    range->SetFrom(key);
+    range->SetIncludeFrom(true);
+    range->SetTo(key);
+    range->SetIncludeTo(true);
+}
+
 void TPersQueue::ProcessConfigTx(const TActorContext& ctx,
                                  TEvKeyValue::TEvRequest* request)
 {
@@ -3938,6 +3953,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx,
 void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx,
                                            TDistributedTransaction& tx)
 {
+    PQ_LOG_D("TPersQueue::SendEvReadSetAckToSenders");
     for (auto& [target, event] : tx.ReadSetAcks) {
         PQ_LOG_D("Send TEvTxProcessing::TEvReadSetAck " << event->ToString());
         ctx.Send(target, event.release());
@@ -4307,35 +4323,45 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
                 Y_ABORT_UNLESS(false);
             }
 
+            WriteTx(tx, NKikimrPQ::TTransaction::EXECUTED);
+
             tx.State = NKikimrPQ::TTransaction::EXECUTED;
             PQ_LOG_D("TxId " << tx.TxId <<
                      ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
-        } else {
-            break;
         }
 
-        [[fallthrough]];
+        break;
 
     case NKikimrPQ::TTransaction::EXECUTED:
-        PQ_LOG_D("HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive());
-        if (tx.HaveAllRecipientsReceive()) {
-            if (tx.WriteId.Defined()) {
-                BeginDeleteTx(tx);
-            } else {
-                DeleteTx(tx);
-            }
+        SendEvReadSetAckToSenders(ctx, tx);
+
+        tx.State = NKikimrPQ::TTransaction::WAIT_RS_ACKS;
+        PQ_LOG_D("TxId " << tx.TxId <<
+                 ", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
+
+        [[fallthrough]];
+
+    case NKikimrPQ::TTransaction::WAIT_RS_ACKS:
+        PQ_LOG_D("HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive() <<
+                 ", WriteIdIsDisabled " << WriteIdIsDisabled(tx.WriteId));
+        if (tx.HaveAllRecipientsReceive() && WriteIdIsDisabled(tx.WriteId)) {
+            DeleteTx(tx);
+            // implicitly switch to the state DELETING
         }
 
         break;
 
     case NKikimrPQ::TTransaction::DELETING:
         // The PQ tablet has persisted its state. Now she can delete the transaction and take the next one.
-        SendEvReadSetAckToSenders(ctx, tx);
         if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) {
             TxQueue.pop();
             TryStartTransaction(ctx);
         }
+
+        DeleteWriteId(tx.WriteId);
+        PQ_LOG_D("delete TxId " << tx.TxId);
         Txs.erase(tx.TxId);
+
         // If this was the last transaction, then you need to send responses to messages about changes
         // in the status of the PQ tablet (if they came)
         TryReturnTabletStateAll(ctx);
@@ -4343,6 +4369,41 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
     }
 }
 
+bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
+{
+    if (!writeId.Defined()) {
+        return true;
+    }
+
+    Y_ABORT_UNLESS(TxWrites.contains(*writeId),
+                   "PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
+                   TabletID(), writeId->NodeId, writeId->KeyId);
+    const TTxWriteInfo& writeInfo = TxWrites.at(*writeId);
+
+    bool disabled =
+        (writeInfo.LongTxSubscriptionStatus != NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) &&
+        writeInfo.Partitions.empty()
+        ;
+
+    PQ_LOG_D("WriteId " << *writeId << " is " << (disabled ? "disabled" : "enabled"));
+
+    return disabled;
+}
+
+void TPersQueue::DeleteWriteId(const TMaybe<TWriteId>& writeId)
+{
+    if (!writeId.Defined()) {
+        return;
+    }
+
+    Y_ABORT_UNLESS(TxWrites.contains(*writeId),
+                   "PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
+                   TabletID(), writeId->NodeId, writeId->KeyId);
+
+    PQ_LOG_D("delete WriteId " << *writeId);
+    TxWrites.erase(*writeId);
+}
+
 void TPersQueue::WriteTx(TDistributedTransaction& tx, NKikimrPQ::TTransaction::EState state)
 {
     WriteTxs[tx.TxId] = state;
@@ -4375,7 +4436,9 @@ void TPersQueue::CheckChangedTxStates(const TActorContext& ctx)
 {
     for (ui64 txId : ChangedTxs) {
         auto tx = GetTransaction(ctx, txId);
-        Y_ABORT_UNLESS(tx);
+        Y_ABORT_UNLESS(tx,
+                       "PQ %" PRIu64 ", TxId %" PRIu64,
+                       TabletID(), txId);
 
         CheckTxState(ctx, *tx);
     }
@@ -4698,7 +4761,9 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
     PQ_LOG_D("delete write info for WriteId " << writeId << " and TxId " << txId);
 
     auto* tx = GetTransaction(ctx, txId);
-    if (!tx || (tx->State == NKikimrPQ::TTransaction::EXECUTED)) {
+    if (!tx ||
+        (tx->State == NKikimrPQ::TTransaction::EXECUTED) ||
+        (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS)) {
         BeginDeletePartitions(writeInfo);
     }
 }
@@ -4754,11 +4819,11 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
         UnsubscribeWriteId(writeId, ctx);
         if (writeInfo.TxId.Defined()) {
             if (auto tx = GetTransaction(ctx, *writeInfo.TxId); tx) {
-                DeleteTx(*tx);
+                if (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) {
+                    CheckTxState(ctx, *tx);
+                }
             }
         }
-        PQ_LOG_D("delete WriteId " << writeId);
-        TxWrites.erase(writeId);
     }
     TxWritesChanged = true;
 
@@ -4767,6 +4832,9 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
 
 void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext&)
 {
+    PQ_LOG_D("Handle TEvPQ::TEvTransactionCompleted" <<
+             " WriteId " << ev->Get()->WriteId);
+
     auto* event = ev->Get();
     if (!event->WriteId.Defined()) {
         return;
@@ -4782,26 +4850,6 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo
     BeginDeletePartitions(writeInfo);
 }
 
-void TPersQueue::BeginDeleteTx(const TDistributedTransaction& tx)
-{
-    Y_ABORT_UNLESS(tx.WriteId.Defined());
-    const TWriteId& writeId = *tx.WriteId;
-    PQ_LOG_D("begin delete write info for WriteId " << writeId);
-    if (!TxWrites.contains(writeId)) {
-        // the transaction has already been completed
-        PQ_LOG_D("unknown WriteId " << writeId);
-        return;
-    }
-
-    TTxWriteInfo& writeInfo = TxWrites.at(writeId);
-    if (writeInfo.LongTxSubscriptionStatus == NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) {
-        PQ_LOG_D("wait for WriteId subscription status");
-        return;
-    }
-
-    BeginDeletePartitions(writeInfo);
-}
-
 void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
 {
     if (writeInfo.Deleting) {

+ 6 - 1
ydb/core/persqueue/pq_impl.h

@@ -499,7 +499,6 @@ private:
     void Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx);
     void Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext& ctx);
 
-    void BeginDeleteTx(const TDistributedTransaction& tx);
     void BeginDeletePartitions(TTxWriteInfo& writeInfo);
 
     bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
@@ -524,6 +523,12 @@ private:
 
     void SendTransactionsReadRequest(const TString& fromKey, bool includeFrom,
                                      const TActorContext& ctx);
+
+    void AddCmdDeleteTx(NKikimrClient::TKeyValueRequest& request,
+                        ui64 txId);
+
+    bool WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const;
+    void DeleteWriteId(const TMaybe<TWriteId>& writeId);
 };
 
 

+ 1 - 12
ydb/core/persqueue/transaction.cpp

@@ -323,6 +323,7 @@ bool TDistributedTransaction::HaveParticipantsDecision() const
 
 bool TDistributedTransaction::HaveAllRecipientsReceive() const
 {
+    PQ_LOG_D("PredicateAcks: " << PredicateAcksCount << "/" << PredicateRecipients.size());
     return PredicateRecipients.size() == PredicateAcksCount;
 }
 
@@ -389,18 +390,6 @@ void TDistributedTransaction::AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx)
     *tx.MutableBootstrapConfig() = BootstrapConfig;
 }
 
-void TDistributedTransaction::AddCmdDelete(NKikimrClient::TKeyValueRequest& request)
-{
-    TString key = GetKey();
-    auto range = request.AddCmdDeleteRange()->MutableRange();
-    range->SetFrom(key);
-    range->SetIncludeFrom(true);
-    range->SetTo(key);
-    range->SetIncludeTo(true);
-
-    PQ_LOG_D("add CmdDeleteRange for key " << key);
-}
-
 void TDistributedTransaction::SetDecision(NKikimrTx::TReadSetData::EDecision& var, NKikimrTx::TReadSetData::EDecision value)
 {
     if ((var == NKikimrTx::TReadSetData::DECISION_UNKNOWN) || (value == NKikimrTx::TReadSetData::DECISION_ABORT)) {

+ 0 - 1
ydb/core/persqueue/transaction.h

@@ -75,7 +75,6 @@ struct TDistributedTransaction {
     bool HaveAllRecipientsReceive() const;
 
     void AddCmdWrite(NKikimrClient::TKeyValueRequest& request, EState state);
-    void AddCmdDelete(NKikimrClient::TKeyValueRequest& request);
 
     static void SetDecision(NKikimrTx::TReadSetData::EDecision& var, NKikimrTx::TReadSetData::EDecision value);
 

+ 15 - 12
ydb/core/persqueue/ut/pqtablet_ut.cpp

@@ -417,33 +417,36 @@ void TPQTabletFixture::WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadS
         UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
     }
 
+    auto readSet = std::move(*tablet.ReadSet);
+    tablet.ReadSet = Nothing();
+
     if (matcher.Step.Defined()) {
-        UNIT_ASSERT(tablet.ReadSet->HasStep());
-        UNIT_ASSERT_VALUES_EQUAL(*matcher.Step, tablet.ReadSet->GetStep());
+        UNIT_ASSERT(readSet.HasStep());
+        UNIT_ASSERT_VALUES_EQUAL(*matcher.Step, readSet.GetStep());
     }
     if (matcher.TxId.Defined()) {
-        UNIT_ASSERT(tablet.ReadSet->HasTxId());
-        UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, tablet.ReadSet->GetTxId());
+        UNIT_ASSERT(readSet.HasTxId());
+        UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, readSet.GetTxId());
     }
     if (matcher.Source.Defined()) {
-        UNIT_ASSERT(tablet.ReadSet->HasTabletSource());
-        UNIT_ASSERT_VALUES_EQUAL(*matcher.Source, tablet.ReadSet->GetTabletSource());
+        UNIT_ASSERT(readSet.HasTabletSource());
+        UNIT_ASSERT_VALUES_EQUAL(*matcher.Source, readSet.GetTabletSource());
     }
     if (matcher.Target.Defined()) {
-        UNIT_ASSERT(tablet.ReadSet->HasTabletDest());
-        UNIT_ASSERT_VALUES_EQUAL(*matcher.Target, tablet.ReadSet->GetTabletDest());
+        UNIT_ASSERT(readSet.HasTabletDest());
+        UNIT_ASSERT_VALUES_EQUAL(*matcher.Target, readSet.GetTabletDest());
     }
     if (matcher.Decision.Defined()) {
-        UNIT_ASSERT(tablet.ReadSet->HasReadSet());
+        UNIT_ASSERT(readSet.HasReadSet());
 
         NKikimrTx::TReadSetData data;
-        Y_ABORT_UNLESS(data.ParseFromString(tablet.ReadSet->GetReadSet()));
+        Y_ABORT_UNLESS(data.ParseFromString(readSet.GetReadSet()));
 
         UNIT_ASSERT_EQUAL(*matcher.Decision, data.GetDecision());
     }
     if (matcher.Producer.Defined()) {
-        UNIT_ASSERT(tablet.ReadSet->HasTabletProducer());
-        UNIT_ASSERT_VALUES_EQUAL(*matcher.Producer, tablet.ReadSet->GetTabletProducer());
+        UNIT_ASSERT(readSet.HasTabletProducer());
+        UNIT_ASSERT_VALUES_EQUAL(*matcher.Producer, readSet.GetTabletProducer());
     }
 }
 

+ 2 - 1
ydb/core/protos/pqconfig.proto

@@ -1097,7 +1097,8 @@ message TTransaction {
         CALCULATED = 6;
         WAIT_RS = 7;      // persist
         EXECUTING = 8;
-        EXECUTED = 9;
+        EXECUTED = 9;     // persist
+        WAIT_RS_ACKS = 11;
         DELETING = 10;
     };
 

+ 34 - 0
ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

@@ -1846,6 +1846,40 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_26, TFixture)
     UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
 }
 
+Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture)
+{
+    CreateTopic("topic_A", TEST_CONSUMER);
+    CreateTopic("topic_B", TEST_CONSUMER);
+    CreateTopic("topic_C", TEST_CONSUMER);
+
+    for (size_t i = 0; i < 2; ++i) {
+        WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, 0);
+        WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, 0);
+
+        NTable::TSession tableSession = CreateTableSession();
+        NTable::TTransaction tx = BeginTx(tableSession);
+
+        auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0);
+        UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
+        WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0);
+        WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID);
+
+        messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0);
+        UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
+        WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0);
+        WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID);
+
+        CommitTx(tx, EStatus::SUCCESS);
+
+        messages = ReadFromTopic("topic_C", TEST_CONSUMER, TDuration::Seconds(2), nullptr, 0);
+        UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
+
+        DumpPQTabletKeys("topic_A");
+        DumpPQTabletKeys("topic_B");
+        DumpPQTabletKeys("topic_C");
+    }
+}
+
 }
 
 }