Browse Source

KIKIMR-19216: new granules conflict resolving temporary. improve cleanup setup method

ivanmorozov 1 year ago
parent
commit
991934594f

+ 9 - 9
ydb/core/tx/columnshard/engines/changes/cleanup.cpp

@@ -9,7 +9,7 @@ void TCleanupColumnEngineChanges::DoDebugString(TStringOutput& out) const {
     if (ui32 dropped = PortionsToDrop.size()) {
         out << "drop " << dropped << " portions";
         for (auto& portionInfo : PortionsToDrop) {
-            out << portionInfo->DebugString();
+            out << portionInfo.DebugString();
         }
     }
 }
@@ -18,23 +18,23 @@ void TCleanupColumnEngineChanges::DoWriteIndex(NColumnShard::TColumnShard& self,
     self.IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
     THashSet<TUnifiedBlobId> blobIds;
     for (auto&& p : PortionsToDrop) {
-        auto removing = BlobsAction.GetRemoving(*p);
-        for (auto&& r : p->Records) {
+        auto removing = BlobsAction.GetRemoving(p);
+        for (auto&& r : p.Records) {
             removing->DeclareRemove(r.BlobRange.BlobId);
         }
-        self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p->RawBytesSum());
+        self.IncCounter(NColumnShard::COUNTER_RAW_BYTES_ERASED, p.RawBytesSum());
     }
 }
 
 bool TCleanupColumnEngineChanges::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) {
     THashSet<TUnifiedBlobId> blobIds;
     for (auto& portionInfo : PortionsToDrop) {
-        if (!self.ErasePortion(*portionInfo)) {
-            AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo->DebugString());
-            return false;
+        if (!self.ErasePortion(portionInfo)) {
+            AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString());
+            continue;
         }
-        for (auto& record : portionInfo->Records) {
-            self.ColumnsTable->Erase(context.DB, *portionInfo, record);
+        for (auto& record : portionInfo.Records) {
+            self.ColumnsTable->Erase(context.DB, portionInfo, record);
         }
     }
 

+ 2 - 2
ydb/core/tx/columnshard/engines/changes/cleanup.h

@@ -34,12 +34,12 @@ public:
     virtual THashSet<TPortionAddress> GetTouchedPortions() const override {
         THashSet<TPortionAddress> result;
         for (const auto& portionInfo : PortionsToDrop) {
-            result.emplace(portionInfo->GetAddress());
+            result.emplace(portionInfo.GetAddress());
         }
         return result;
     }
 
-    std::vector<std::shared_ptr<TPortionInfo>> PortionsToDrop;
+    std::vector<TPortionInfo> PortionsToDrop;
     bool NeedRepeat = false;
 
     virtual ui32 GetWritePortionsCount() const override {

+ 12 - 4
ydb/core/tx/columnshard/engines/changes/with_appended.cpp

@@ -56,19 +56,27 @@ void TChangesWithAppend::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIn
 
 bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) {
     // Save new granules
+    std::map<ui64, ui64> remapGranules;
     for (auto& [granule, p] : NewGranules) {
         ui64 pathId = p.first;
         TMark mark = p.second;
         TGranuleRecord rec(pathId, granule, context.Snapshot, mark.GetBorder());
-        self.SetGranule(rec);
-        self.GranulesTable->Write(context.DB, rec);
+        auto oldGranuleId = self.NewGranule(rec);
+        if (!oldGranuleId) {
+            self.GranulesTable->Write(context.DB, rec);
+        } else {
+            remapGranules.emplace(rec.Granule, *oldGranuleId);
+        }
     }
     // Save new portions (their column records)
 
     for (auto& portionInfoWithBlobs : AppendedPortions) {
         auto& portionInfo = portionInfoWithBlobs.GetPortionInfo();
         Y_VERIFY(!portionInfo.Empty());
-
+        auto it = remapGranules.find(portionInfo.GetGranule());
+        if (it != remapGranules.end()) {
+            portionInfo.SetGranule(it->second);
+        }
         self.UpsertPortion(portionInfo);
         for (auto& record : portionInfo.Records) {
             self.ColumnsTable->Write(context.DB, portionInfo, record);
@@ -96,7 +104,7 @@ bool TChangesWithAppend::DoApplyChanges(TColumnEngineForLogs& self, TApplyChange
     }
 
     for (auto& portionInfo : PortionsToRemove) {
-        self.CleanupPortions.insert(portionInfo.GetAddress());
+        self.CleanupPortions[portionInfo.GetRemoveSnapshot()].emplace_back(portionInfo);
     }
 
     return true;

+ 30 - 34
ydb/core/tx/columnshard/engines/column_engine_logs.cpp

@@ -142,7 +142,8 @@ void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndex
 }
 
 bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs, const THashSet<ui64>& pathsToDrop) {
-    ClearIndex();
+    Y_VERIFY(!Loaded);
+    Loaded = true;
     {
         auto guard = GranulesStorage->StartPackModification();
         if (!LoadGranules(db)) {
@@ -165,7 +166,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBl
         for (const auto& [_, portionInfo] : spg->GetPortions()) {
             UpdatePortionStats(*portionInfo, EStatsUpdateType::ADD);
             if (portionInfo->CheckForCleanup()) {
-                CleanupPortions.emplace(portionInfo->GetAddress());
+                CleanupPortions[portionInfo->GetRemoveSnapshot()].emplace_back(*portionInfo);
             }
         }
     }
@@ -309,7 +310,7 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
             Y_VERIFY(spg);
             for (auto& [portion, info] : spg->GetPortions()) {
                 affectedRecords += info->NumChunks();
-                changes->PortionsToDrop.push_back(info);
+                changes->PortionsToDrop.push_back(*info);
                 dropPortions.insert(portion);
             }
 
@@ -332,36 +333,20 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
         return changes;
     }
 
-    // Add stale portions of alive paths
-    THashSet<ui64> cleanGranules;
-    std::shared_ptr<TGranuleMeta> granuleMeta;
-    for (auto it = CleanupPortions.begin(); it != CleanupPortions.end();) {
-        if (!granuleMeta || granuleMeta->GetGranuleId() != it->GetGranuleId()) {
-            auto itGranule = Granules.find(it->GetGranuleId());
-            if (itGranule == Granules.end()) {
-                it = CleanupPortions.erase(it);
-                continue;
-            }
-            granuleMeta = itGranule->second;
+    while (CleanupPortions.size() && affectedRecords <= maxRecords) {
+        auto it = CleanupPortions.begin();
+        if (it->first >= snapshot) {
+            AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanupStop")("snapshot", snapshot.DebugString())("current_snapshot", it->first.DebugString());
+            break;
         }
-        Y_VERIFY(granuleMeta);
-        auto portionInfo = granuleMeta->GetPortionPtr(it->GetPortionId());
-        if (!portionInfo) {
-            it = CleanupPortions.erase(it);
-        } else if (portionInfo->CheckForCleanup(snapshot)) {
-            affectedRecords += portionInfo->NumChunks();
-            changes->PortionsToDrop.push_back(portionInfo);
-            it = CleanupPortions.erase(it);
-            if (affectedRecords > maxRecords) {
-                changes->NeedRepeat = true;
-                break;
-            }
-        } else {
-            AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup/Check")("snapshot", snapshot.DebugString())("portions_snapshot", portionInfo->GetRemoveSnapshot().DebugString());
-            Y_VERIFY(portionInfo->CheckForCleanup());
-            ++it;
+        for (auto&& i : it->second) {
+            Y_VERIFY(i.CheckForCleanup(snapshot));
+            affectedRecords += i.NumChunks();
+            changes->PortionsToDrop.push_back(i);
         }
+        CleanupPortions.erase(it);
     }
+    changes->NeedRepeat = affectedRecords > maxRecords;
     AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size());
 
     if (changes->PortionsToDrop.empty()) {
@@ -587,12 +572,23 @@ void TColumnEngineForLogs::SetGranule(const TGranuleRecord& rec) {
     AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_granule")("granule", rec.DebugString());
     const TMark mark(rec.Mark);
 
-    // There should be only one granule with (PathId, Mark).
     AFL_VERIFY(PathGranules[rec.PathId].emplace(mark, rec.Granule).second)("event", "marker_duplication")("granule_id", rec.Granule)("old_granule_id", PathGranules[rec.PathId][mark]);
-
-    // Allocate granule info and ensure that there is no granule with same id inserted before.
     AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters())).second)("event", "granule_duplication")
-        ("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString());
+        ("rec_path_id", rec.PathId)("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString());
+}
+
+std::optional<ui64> TColumnEngineForLogs::NewGranule(const TGranuleRecord& rec) {
+    AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "upsert_granule")("granule", rec.DebugString());
+    const TMark mark(rec.Mark);
+
+    auto insertInfo = PathGranules[rec.PathId].emplace(mark, rec.Granule);
+    if (insertInfo.second) {
+        AFL_VERIFY(Granules.emplace(rec.Granule, std::make_shared<TGranuleMeta>(rec, GranulesStorage, SignalCounters.RegisterGranuleDataCounters())).second)("event", "granule_duplication")
+            ("granule_id", rec.Granule)("old_granule", Granules[rec.Granule]->DebugString());
+        return {};
+    } else {
+        return insertInfo.first->second;
+    }
 }
 
 void TColumnEngineForLogs::EraseGranule(ui64 pathId, ui64 granule, const TMark& mark) {

+ 3 - 15
ydb/core/tx/columnshard/engines/column_engine_logs.h

@@ -220,13 +220,13 @@ private:
     /// Set of empty granules.
     /// Just for providing count of empty granules.
     THashSet<ui64> EmptyGranules;
-    TSet<TPortionAddress> CleanupPortions;
+    std::map<TSnapshot, std::vector<TPortionInfo>> CleanupPortions;
     TColumnEngineStats Counters;
     ui64 LastPortion;
     ui64 LastGranule;
     TSnapshot LastSnapshot = TSnapshot::Zero();
     mutable std::optional<TMark> CachedDefaultMark;
-
+    bool Loaded = false;
 private:
     const std::shared_ptr<arrow::Schema>& MarkSchema() const noexcept {
         return VersionedIndex.GetIndexKey();
@@ -243,19 +243,6 @@ private:
         return VersionedIndex.GetLastSchema()->GetIndexInfo().IsCompositeIndexKey();
     }
 
-    void ClearIndex() {
-        Granules.clear();
-        PathGranules.clear();
-        PathStats.clear();
-        EmptyGranules.clear();
-        CleanupPortions.clear();
-        Counters.Clear();
-
-        LastPortion = 0;
-        LastGranule = 0;
-        LastSnapshot = TSnapshot::Zero();
-    }
-
     bool LoadGranules(IDbWrapper& db);
     bool LoadColumns(IDbWrapper& db, THashSet<TUnifiedBlobId>& lostBlobs);
     bool LoadCounters(IDbWrapper& db);
@@ -264,6 +251,7 @@ private:
 
     /// Insert granule or check if same granule was already inserted.
     void SetGranule(const TGranuleRecord& rec);
+    std::optional<ui64> NewGranule(const TGranuleRecord& rec);
     void UpsertPortion(const TPortionInfo& portionInfo, const TPortionInfo* exInfo = nullptr);
     bool ErasePortion(const TPortionInfo& portionInfo, bool updateStats = true);
     void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT,

+ 71 - 64
ydb/core/tx/columnshard/engines/ut_logs_engine.cpp

@@ -678,88 +678,95 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
 
     Y_UNIT_TEST(IndexTtl) {
         TTestDbWrapper db;
-        TIndexInfo tableInfo = NColumnShard::BuildTableInfo(testColumns, testKey);;
+        TIndexInfo tableInfo = NColumnShard::BuildTableInfo(testColumns, testKey);
 
         ui64 pathId = 1;
         ui32 step = 1000;
 
         // insert
         ui64 planStep = 1;
-
-        TSnapshot indexSnapshot(1, 1);
-        TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
-        engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
         THashSet<TUnifiedBlobId> lostBlobs;
-        engine.Load(db, lostBlobs);
-
-        THashMap<TBlobRange, TString> blobs;
-        ui64 numRows = 1000;
-        ui64 rowPos = 0;
-        for (ui64 txId = 1; txId <= 20; ++txId, rowPos += numRows) {
-            TString testBlob = MakeTestBlob(rowPos, rowPos + numRows);
-            auto blobRange = MakeBlobRange(++step, testBlob.size());
-            blobs[blobRange] = testBlob;
-
-            // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
-            std::vector<TInsertedData> dataToIndex;
-            dataToIndex.push_back(
-                TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {}));
+        TSnapshot indexSnapshot(1, 1);
+        {
+            TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
+            engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
+            engine.Load(db, lostBlobs);
+
+            THashMap<TBlobRange, TString> blobs;
+            ui64 numRows = 1000;
+            ui64 rowPos = 0;
+            for (ui64 txId = 1; txId <= 20; ++txId, rowPos += numRows) {
+                TString testBlob = MakeTestBlob(rowPos, rowPos + numRows);
+                auto blobRange = MakeBlobRange(++step, testBlob.size());
+                blobs[blobRange] = testBlob;
+
+                // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
+                std::vector<TInsertedData> dataToIndex;
+                dataToIndex.push_back(
+                    TInsertedData(txId, pathId, "", blobRange.BlobId, {}, indexSnapshot, {}));
+
+                bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
+                UNIT_ASSERT(ok);
+            }
 
-            bool ok = Insert(engine, db, TSnapshot(planStep, txId), std::move(dataToIndex), blobs, step);
-            UNIT_ASSERT(ok);
-        }
+            // compact
+            planStep = 2;
 
-        // compact
-        planStep = 2;
+            //        bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
+            //        UNIT_ASSERT(ok);
 
-//        bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
-//        UNIT_ASSERT(ok);
+                    // read
+            planStep = 3;
 
-        // read
-        planStep = 3;
+            const TIndexInfo& indexInfo = engine.GetVersionedIndex().GetLastSchema()->GetIndexInfo();
+            THashSet<ui32> oneColumnId = {indexInfo.GetColumnId(testColumns[0].first)};
 
-        const TIndexInfo& indexInfo = engine.GetVersionedIndex().GetLastSchema()->GetIndexInfo();
-        THashSet<ui32> oneColumnId = { indexInfo.GetColumnId(testColumns[0].first) };
-
-        { // full scan
-            ui64 txId = 1;
-            auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
-            UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
-        }
+            { // full scan
+                ui64 txId = 1;
+                auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+                UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
+            }
 
-        // Cleanup
-        Cleanup(engine, db, TSnapshot(planStep, 1), 0);
+            // Cleanup
+            Cleanup(engine, db, TSnapshot(planStep, 1), 0);
 
-        { // full scan
-            ui64 txId = 1;
-            auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
-            UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
-        }
+            { // full scan
+                ui64 txId = 1;
+                auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+                UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
+            }
 
-        // TTL
-        std::shared_ptr<arrow::DataType> ttlColType = arrow::timestamp(arrow::TimeUnit::MICRO);
-        THashMap<ui64, NOlap::TTiering> pathTtls;
-        NOlap::TTiering tiering;
-        tiering.Ttl = NOlap::TTierInfo::MakeTtl(TDuration::MicroSeconds(TInstant::Now().MicroSeconds() - 10000), "timestamp");
-        pathTtls.emplace(pathId, std::move(tiering));
-        Ttl(engine, db, pathTtls, 10);
+            // TTL
+            std::shared_ptr<arrow::DataType> ttlColType = arrow::timestamp(arrow::TimeUnit::MICRO);
+            THashMap<ui64, NOlap::TTiering> pathTtls;
+            NOlap::TTiering tiering;
+            tiering.Ttl = NOlap::TTierInfo::MakeTtl(TDuration::MicroSeconds(TInstant::Now().MicroSeconds() - 10000), "timestamp");
+            pathTtls.emplace(pathId, std::move(tiering));
+            Ttl(engine, db, pathTtls, 10);
 
-        // read + load + read
+            // read + load + read
 
-        { // full scan
-            ui64 txId = 1;
-            auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
-            UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
+            { // full scan
+                ui64 txId = 1;
+                auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+                UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
+            }
         }
-
-        // load
-        engine.Load(db, lostBlobs);
-        UNIT_ASSERT_VALUES_EQUAL(engine.GetTotalStats().EmptyGranules, 0);
-
-        { // full scan
-            ui64 txId = 1;
-            auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
-            UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
+        {
+            // load
+            TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
+            engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
+            engine.Load(db, lostBlobs);
+            UNIT_ASSERT_VALUES_EQUAL(engine.GetTotalStats().EmptyGranules, 0);
+
+            const TIndexInfo& indexInfo = engine.GetVersionedIndex().GetLastSchema()->GetIndexInfo();
+            THashSet<ui32> oneColumnId = {indexInfo.GetColumnId(testColumns[0].first)};
+
+            { // full scan
+                ui64 txId = 1;
+                auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
+                UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
+            }
         }
     }
 }

+ 2 - 2
ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp

@@ -2871,8 +2871,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
                     ++cleanupsHappened;
                     Cerr << "Cleanup old portions:";
                     for (const auto& portion : cleanup->PortionsToDrop) {
-                        Cerr << " " << portion->GetPortion();
-                        deletedPortions.insert(portion->GetPortion());
+                        Cerr << " " << portion.GetPortion();
+                        deletedPortions.insert(portion.GetPortion());
                     }
                     Cerr << Endl;
                 }