Просмотр исходного кода

Fix use-after-free in CommittingOps tracking (#8712)

Aleksei Borzenkov 6 месяцев назад

+ 11 - 7

@@ -2274,11 +2274,15 @@ void TPipeline::AddCommittingOp(const TOperation::TPtr& op) {
     if (!Self->IsMvccEnabled() || op->IsReadOnly())
+    Y_VERIFY_S(!op->GetCommittingOpsVersion(),
+        "Trying to AddCommittingOp " << *op << " more than once");
     TRowVersion version = Self->GetReadWriteVersions(op.Get()).WriteVersion;
     if (op->IsImmediate())
         CommittingOps.Add(op->GetTxId(), version);
+    op->SetCommittingOpsVersion(version);
 void TPipeline::RemoveCommittingOp(const TRowVersion& version) {
@@ -2288,13 +2292,13 @@ void TPipeline::RemoveCommittingOp(const TRowVersion& version) {
 void TPipeline::RemoveCommittingOp(const TOperation::TPtr& op) {
-    if (!Self->IsMvccEnabled() || op->IsReadOnly())
-        return;
-    if (op->IsImmediate())
-        CommittingOps.Remove(op->GetTxId());
-    else
-        CommittingOps.Remove(TRowVersion(op->GetStep(), op->GetTxId()));
+    if (const auto& version = op->GetCommittingOpsVersion()) {
+        if (op->IsImmediate())
+            CommittingOps.Remove(op->GetTxId(), *version);
+        else
+            CommittingOps.Remove(*version);
+        op->ResetCommittingOpsVersion();
+    }
 bool TPipeline::WaitCompletion(const TOperation::TPtr& op) const {

+ 23 - 7

@@ -424,11 +424,13 @@ private:
             ui64 Step;
             ui64 TxId;
             mutable ui32 Counter;
+            mutable ui32 TxCounter;
             TItem(const TRowVersion& from)
                 : Step(from.Step)
                 , TxId(from.TxId)
                 , Counter(1u)
+                , TxCounter(0u)
             friend constexpr bool operator<(const TItem& a, const TItem& b) {
@@ -442,6 +444,7 @@ private:
         using TItemsSet = TSet<TItem>;
         using TTxIdMap = THashMap<ui64, TItemsSet::iterator>;
         inline void Add(ui64 txId, TRowVersion version) {
             auto res = ItemsSet.emplace(version);
@@ -450,6 +453,7 @@ private:
             auto res2 = TxIdMap.emplace(txId, res.first);
             Y_VERIFY_S(res2.second, "Unexpected duplicate immediate tx " << txId
                     << " committing at " << version);
+            res.first->TxCounter += 1;
         inline void Add(TRowVersion version) {
@@ -458,17 +462,29 @@ private:
                 res.first->Counter += 1;
-        inline void Remove(ui64 txId) {
-            if (auto it = TxIdMap.find(txId); it != TxIdMap.end()) {
-                if (--it->second->Counter == 0)
-                    ItemsSet.erase(it->second);
-                TxIdMap.erase(it);
-            }
+        inline void Remove(ui64 txId, TRowVersion version) {
+            auto it = TxIdMap.find(txId);
+            Y_VERIFY_S(it != TxIdMap.end(), "Removing immediate tx " << txId << " " << version
+                    << " does not match a previous Add");
+            Y_VERIFY_S(TRowVersion(it->second->Step, it->second->TxId) == version, "Removing immediate tx " << txId << " " << version
+                    << " does not match a previous Add " << TRowVersion(it->second->Step, it->second->TxId));
+            Y_VERIFY_S(it->second->TxCounter > 0, "Removing immediate tx " << txId << " " << version
+                    << " with a mismatching TxCounter");
+            --it->second->TxCounter;
+            if (--it->second->Counter == 0)
+                ItemsSet.erase(it->second);
+            TxIdMap.erase(it);
         inline void Remove(TRowVersion version) {
-            if (auto it = ItemsSet.find(version); it != ItemsSet.end() && --it->Counter == 0)
+            auto it = ItemsSet.find(version);
+            Y_VERIFY_S(it != ItemsSet.end(), "Removing version " << version
+                    << " does not match a previous Add");
+            if (--it->Counter == 0) {
+                Y_VERIFY_S(it->TxCounter == 0, "Removing version " << version
+                    << " while TxCounter has active references, possible Add/Remove mismatch");
+            }
         inline bool HasOpsBelow(TRowVersion upperBound) const {

+ 343 - 0

@@ -2,6 +2,8 @@
 #include <ydb/core/tx/data_events/payload_helper.h>
 #include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
 #include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/testlib/actors/block_events.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
 #include "datashard_ut_common_kqp.h"
 namespace NKikimr {
@@ -1242,5 +1244,346 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
+    Y_UNIT_TEST(ImmediateAndPlannedCommittedOpsRace) {
+        TPortManager pm;
+        TServerSettings serverSettings(pm.GetPort(2134));
+        serverSettings.SetDomainName("Root")
+            .SetUseRealThreads(false)
+            // It's easier to reproduce without volatile transactions, since
+            // then we can block pipeline by blocking readsets
+            .SetEnableDataShardVolatileTransactions(false);
+        auto [runtime, server, sender] = TestCreateServer(serverSettings);
+        TDisableDataShardLogBatching disableDataShardLogBatching;
+            KqpSchemeExec(runtime, R"(
+                CREATE TABLE `/Root/table` (key int, value int, PRIMARY KEY (key))
+                WITH (PARTITION_AT_KEYS = (10));
+            )"),
+            "SUCCESS");
+        const auto tableId = ResolveTableId(server, sender, "/Root/table");
+        const auto shards = GetTableShards(server, sender, "/Root/table");
+        UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
+        TVector<TShardedTableOptions::TColumn> columns{
+            {"key", "Int32", true, false},
+            {"value", "Int32", false, false},
+        };
+        const ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
+        const ui64 lockTxId1 = 1234567890001;
+        const ui64 lockTxId2 = 1234567890002;
+        const ui64 lockTxId3 = 1234567890003;
+        const ui64 lockNodeId = runtime.GetNodeId(0);
+        NLongTxService::TLockHandle lockHandle1(lockTxId1, runtime.GetActorSystem(0));
+        NLongTxService::TLockHandle lockHandle2(lockTxId2, runtime.GetActorSystem(0));
+        NLongTxService::TLockHandle lockHandle3(lockTxId3, runtime.GetActorSystem(0));
+        auto shard1 = shards.at(0);
+        auto shard1actor = ResolveTablet(runtime, shard1);
+        auto shard2 = shards.at(1);
+        NKikimrDataEvents::TLock lock1shard1;
+        NKikimrDataEvents::TLock lock1shard2;
+        NKikimrDataEvents::TLock lock2;
+        // 1. Make a read (lock1 shard1)
+        auto read1sender = runtime.AllocateEdgeActor();
+        {
+            Cerr << "... making a read from " << shard1 << Endl;
+            auto req = std::make_unique<TEvDataShard::TEvRead>();
+            {
+                auto& record = req->Record;
+                record.SetReadId(1);
+                record.MutableTableId()->SetOwnerId(tableId.PathId.OwnerId);
+                record.MutableTableId()->SetTableId(tableId.PathId.LocalPathId);
+                record.AddColumns(1);
+                record.AddColumns(2);
+                record.SetLockTxId(lockTxId1);
+                record.SetLockNodeId(lockNodeId);
+                record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);
+                i32 key = 1;
+                TVector<TCell> keys;
+                keys.push_back(TCell::Make(key));
+                req->Keys.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(keys)));
+            }
+            ForwardToTablet(runtime, shard1, read1sender, req.release());
+            auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvReadResult>(read1sender);
+            auto* res = ev->Get();
+            UNIT_ASSERT_VALUES_EQUAL(res->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS);
+            UNIT_ASSERT_VALUES_EQUAL(res->Record.GetFinished(), true);
+            UNIT_ASSERT_VALUES_EQUAL(res->Record.GetTxLocks().size(), 1u);
+            lock1shard1 = res->Record.GetTxLocks().at(0);
+            UNIT_ASSERT_C(lock1shard1.GetCounter() < 1000, "Unexpected lock in the result: " << lock1shard1.ShortDebugString());
+        }
+        // 2. Make an uncommitted write (lock1 shard2)
+        {
+            Cerr << "... making an uncommmited write to " << shard2 << Endl;
+            auto req = MakeWriteRequestOneKeyValue(
+                std::nullopt,
+                NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
+                NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+                tableId,
+                columns,
+                11, 1101);
+            req->SetLockId(lockTxId1, lockNodeId);
+            auto result = Write(runtime, sender, shard2, std::move(req));
+            UNIT_ASSERT_VALUES_EQUAL(result.GetTxLocks().size(), 1u);
+            lock1shard2 = result.GetTxLocks().at(0);
+            UNIT_ASSERT_C(lock1shard2.GetCounter() < 1000, "Unexpected lock in the result: " << lock1shard2.ShortDebugString());
+        }
+        // 3. Make an uncommitted write (lock2 shard1)
+        {
+            Cerr << "... making an uncommmited write to " << shard1 << Endl;
+            auto req = MakeWriteRequestOneKeyValue(
+                std::nullopt,
+                NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
+                NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+                tableId,
+                columns,
+                2, 202);
+            req->SetLockId(lockTxId2, lockNodeId);
+            auto result = Write(runtime, sender, shard1, std::move(req));
+            UNIT_ASSERT_VALUES_EQUAL(result.GetTxLocks().size(), 1u);
+            lock2 = result.GetTxLocks().at(0);
+            UNIT_ASSERT_C(lock2.GetCounter() < 1000, "Unexpected lock in the result: " << lock2.ShortDebugString());
+        }
+        // 4. Break lock2 so later we could make an aborted distributed commit
+        {
+            Cerr << "... making an immediate write to " << shard1 << Endl;
+            auto req = MakeWriteRequestOneKeyValue(
+                std::nullopt,
+                NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
+                NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+                tableId,
+                columns,
+                2, 203);
+            Write(runtime, sender, shard1, std::move(req));
+        }
+        // Start blocking readsets
+        TBlockEvents<TEvTxProcessing::TEvReadSet> blockedReadSets(runtime);
+        // Prepare an upsert (readsets flow between shards)
+        ui64 txId1 = 1234567890011;
+        auto tx1sender = runtime.AllocateEdgeActor();
+        {
+            auto req1 = MakeWriteRequestOneKeyValue(
+                txId1,
+                NKikimrDataEvents::TEvWrite::MODE_PREPARE,
+                NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+                tableId,
+                columns,
+                3, 304);
+            req1->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
+            req1->Record.MutableLocks()->AddSendingShards(shard1);
+            req1->Record.MutableLocks()->AddSendingShards(shard2);
+            req1->Record.MutableLocks()->AddReceivingShards(shard1);
+            req1->Record.MutableLocks()->AddReceivingShards(shard2);
+            *req1->Record.MutableLocks()->AddLocks() = lock1shard1;
+            auto req2 = MakeWriteRequestOneKeyValue(
+                txId1,
+                NKikimrDataEvents::TEvWrite::MODE_PREPARE,
+                NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+                tableId,
+                columns,
+                13, 1304);
+            req2->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
+            req2->Record.MutableLocks()->AddSendingShards(shard1);
+            req2->Record.MutableLocks()->AddSendingShards(shard2);
+            req2->Record.MutableLocks()->AddReceivingShards(shard1);
+            req2->Record.MutableLocks()->AddReceivingShards(shard2);
+            *req2->Record.MutableLocks()->AddLocks() = lock1shard2;
+            Cerr << "... preparing tx1 at " << shard1 << Endl;
+            auto res1 = Write(runtime, tx1sender, shard1, std::move(req1));
+            Cerr << "... preparing tx1 at " << shard2 << Endl;
+            auto res2 = Write(runtime, tx1sender, shard2, std::move(req2));
+            ui64 minStep = Max(res1.GetMinStep(), res2.GetMinStep());
+            ui64 maxStep = Min(res1.GetMaxStep(), res2.GetMaxStep());
+            Cerr << "... planning tx1 at " << coordinator << Endl;
+            SendProposeToCoordinator(
+                runtime, tx1sender, shards, {
+                    .TxId = txId1,
+                    .Coordinator = coordinator,
+                    .MinStep = minStep,
+                    .MaxStep = maxStep,
+                });
+        }
+        runtime.WaitFor("blocked readsets", [&]{ return blockedReadSets.size() >= 2; });
+        UNIT_ASSERT_VALUES_EQUAL(blockedReadSets.size(), 2u);
+        // Start blocking new plan steps
+        TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlanSteps(runtime);
+        // Prepare an upsert (readset flows from shard 1 to shard 2, already broken)
+        // Must not conflict with other transactions
+        ui64 txId2 = 1234567890012;
+        auto tx2sender = runtime.AllocateEdgeActor();
+        {
+            auto req1 = MakeWriteRequestOneKeyValue(
+                txId2,
+                NKikimrDataEvents::TEvWrite::MODE_PREPARE,
+                NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+                tableId,
+                columns,
+                5, 505);
+            req1->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
+            req1->Record.MutableLocks()->AddSendingShards(shard1);
+            req1->Record.MutableLocks()->AddReceivingShards(shard2);
+            *req1->Record.MutableLocks()->AddLocks() = lock2;
+            auto req2 = MakeWriteRequestOneKeyValue(
+                txId2,
+                NKikimrDataEvents::TEvWrite::MODE_PREPARE,
+                NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+                tableId,
+                columns,
+                15, 1505);
+            req2->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit);
+            req2->Record.MutableLocks()->AddSendingShards(shard1);
+            req2->Record.MutableLocks()->AddReceivingShards(shard2);
+            Cerr << "... preparing tx2 at " << shard1 << Endl;
+            auto res1 = Write(runtime, tx2sender, shard1, std::move(req1));
+            Cerr << "... preparing tx2 at " << shard2 << Endl;
+            auto res2 = Write(runtime, tx2sender, shard2, std::move(req2));
+            ui64 minStep = Max(res1.GetMinStep(), res2.GetMinStep());
+            ui64 maxStep = Min(res1.GetMaxStep(), res2.GetMaxStep());
+            Cerr << "... planning tx2 at " << coordinator << Endl;
+            SendProposeToCoordinator(
+                runtime, tx2sender, shards, {
+                    .TxId = txId2,
+                    .Coordinator = coordinator,
+                    .MinStep = minStep,
+                    .MaxStep = maxStep,
+                });
+        }
+        runtime.WaitFor("blocked plan steps", [&]{ return blockedPlanSteps.size() >= 2; });
+        UNIT_ASSERT_VALUES_EQUAL(blockedPlanSteps.size(), 2u);
+        // Block TEvPrivate::TEvProgressTransaction for shard1
+        TBlockEvents<IEventHandle> blockedProgress(runtime,
+            [&](const TAutoPtr<IEventHandle>& ev) {
+                return ev->GetRecipientRewrite() == shard1actor &&
+                    ev->GetTypeRewrite() == EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0;
+            });
+        blockedPlanSteps.Unblock();
+        runtime.WaitFor("blocked progress", [&]{ return blockedProgress.size() >= 1; });
+        runtime.SimulateSleep(TDuration::MilliSeconds(1)); // let it commit
+        UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 1u);
+        // Make an unrelated immediate write, this will pin write (and future snapshot) version to tx2
+        {
+            Cerr << "... making an immediate write to " << shard1 << Endl;
+            auto req = MakeWriteRequestOneKeyValue(
+                std::nullopt,
+                NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
+                NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+                tableId,
+                columns,
+                4, 406);
+            Write(runtime, sender, shard1, std::move(req));
+        }
+        // Block commit attempts at shard1
+        TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits(runtime,
+            [&](const TEvBlobStorage::TEvPut::TPtr& ev) {
+                auto* msg = ev->Get();
+                return msg->Id.TabletID() == shard1 && msg->Id.Channel() == 0;
+            });
+        // Make an uncommitted write to a key overlapping with tx1
+        // Since tx1 has been validated, and reads are pinned at tx2, tx3 will
+        // be after tx1 and blocked by a read dependency. Since tx2 has not
+        // entered the pipeline yet, version will not be above tx2.
+        auto tx3sender = runtime.AllocateEdgeActor();
+        {
+            Cerr << "... starting uncommitted upsert at " << shard1 << Endl;
+            auto req = MakeWriteRequestOneKeyValue(
+                std::nullopt,
+                NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE,
+                NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT,
+                tableId,
+                columns,
+                3, 307);
+            req->SetLockId(lockTxId3, lockNodeId);
+            runtime.SendToPipe(shard1, tx3sender, req.release());
+        }
+        // Wait for some time and make sure there have been no unexpected
+        // commits, which would indicate the upsert is blocked by tx1.
+        runtime.SimulateSleep(TDuration::MilliSeconds(1));
+        UNIT_ASSERT_VALUES_EQUAL_C(blockedCommits.size(), 0u,
+            "The uncommitted upsert didn't block. Something may have changed and the test needs to be revised.");
+        // Now, while blocking commits, unblock progress and let tx2 to execute,
+        // which will abort due to broken locks.
+        blockedProgress.Unblock();
+        blockedProgress.Stop();
+        runtime.SimulateSleep(TDuration::MilliSeconds(1));
+        size_t commitsAfterTx2 = blockedCommits.size();
+        Cerr << "... observed " << commitsAfterTx2 << " commits after tx2 unblock" << Endl;
+        UNIT_ASSERT_C(commitsAfterTx2 >= 2,
+            "Expected tx2 to produce at least 2 commits (store out rs + abort tx)"
+            << ", observed " << commitsAfterTx2 << ". Something may have changed.");
+        // Now, while still blocking commits, unblock readsets
+        // Everything will unblock and execute tx1 then tx3
+        blockedReadSets.Unblock();
+        blockedReadSets.Stop();
+        runtime.SimulateSleep(TDuration::MilliSeconds(1));
+        size_t commitsAfterTx3 = blockedCommits.size() - commitsAfterTx2;
+        Cerr << "... observed " << commitsAfterTx3 << " more commits after readset unblock" << Endl;
+        UNIT_ASSERT_C(commitsAfterTx3 >= 2,
+            "Expected at least 2 commits after readset unblock (tx1, tx3), but only "
+            << commitsAfterTx3 << " have been observed.");
+        // Finally, stop blocking commits
+        // We expect completion handlers to run in tx3, tx1, tx2 order, triggering the bug
+        blockedCommits.Unblock();
+        blockedCommits.Stop();
+        runtime.SimulateSleep(TDuration::MilliSeconds(1));
+        // Check tx3 reply
+        {
+            auto ev = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(tx3sender);
+            UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
+        }
+        // Check tx1 reply
+        {
+            auto ev1 = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(tx1sender);
+            UNIT_ASSERT_VALUES_EQUAL(ev1->Get()->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
+            auto ev2 = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(tx1sender);
+            UNIT_ASSERT_VALUES_EQUAL(ev2->Get()->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED);
+        }
+        // Check tx2 reply
+        {
+            auto ev1 = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(tx2sender);
+            UNIT_ASSERT_VALUES_EQUAL(ev1->Get()->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
+            auto ev2 = runtime.GrabEdgeEventRethrow<NEvents::TDataEvents::TEvWriteResult>(tx2sender);
+            UNIT_ASSERT_VALUES_EQUAL(ev2->Get()->Record.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
+        }
+    }
 } // namespace NKikimr

+ 8 - 0

@@ -883,6 +883,12 @@ public:
     virtual void OnCleanup(TDataShard& self, std::vector<std::unique_ptr<IEventHandle>>& replies);
+    // CommittingOps book keeping
+    const std::optional<TRowVersion>& GetCommittingOpsVersion() const { return CommittingOpsVersion; }
+    void SetCommittingOpsVersion(const TRowVersion& version) { CommittingOpsVersion = version; }
+    void ResetCommittingOpsVersion() { CommittingOpsVersion.reset(); }
         : TOperation(TBasicOpInfo())
@@ -956,6 +962,8 @@ private:
     static NMiniKQL::IEngineFlat::TValidationInfo EmptyKeysInfo;
+    std::optional<TRowVersion> CommittingOpsVersion;
     std::optional<TRowVersion> MvccReadWriteVersion;

+ 3 - 0

@@ -2003,6 +2003,9 @@ void AddValueToCells(ui64 value, const TString& columnType, TVector<TCell>& cell
     } else if (columnType == "Uint32") {
         ui32 value32 = (ui32)value;
         cells.emplace_back(TCell((const char*)&value32, sizeof(ui32)));
+    } else if (columnType == "Int32") {
+        i32 value32 = (i32)value;
+        cells.push_back(TCell::Make(value32));
     } else if (columnType == "Utf8") {
         stringValues.emplace_back(Sprintf("String_%" PRIu64, value));
         cells.emplace_back(TCell(stringValues.back().c_str(), stringValues.back().size()));