Browse Source

Correct cleanup policy. Split writeIds list (#1205)

* Correct clenup policy. Split writeIds list
nsofya 1 year ago
parent
commit
7844a7d461

+ 1 - 9
ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp

@@ -61,15 +61,7 @@ void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet<TWriteId>& writeIds
 }
 
 THashSet<TWriteId> TInsertTable::OldWritesToAbort(const TInstant& now) const {
-    // TODO: This protection does not save us from real flooder activity.
-    // This cleanup is for seldom aborts caused by rare reasons. So there's a temporary simple O(N) here
-    // keeping in mind we need a smarter cleanup logic here not a better algo.
-    if (LastCleanup > now - CleanDelay) {
-        return {};
-    }
-    LastCleanup = now;
-
-    return Summary.GetDeprecatedInsertions(now - WaitCommitDelay);
+    return Summary.GetExpiredInsertions(now - WaitCommitDelay, CleanupPackageSize);
 }
 
 THashSet<TWriteId> TInsertTable::DropPath(IDbWrapper& dbTable, ui64 pathId) {

+ 1 - 4
ydb/core/tx/columnshard/engines/insert_table/insert_table.h

@@ -64,7 +64,7 @@ private:
     bool Loaded = false;
 public:
     static constexpr const TDuration WaitCommitDelay = TDuration::Minutes(10);
-    static constexpr const TDuration CleanDelay = TDuration::Minutes(10);
+    static constexpr ui64 CleanupPackageSize = 10000;
 
     bool Insert(IDbWrapper& dbTable, TInsertedData&& data);
     TInsertionSummary::TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId,
@@ -76,9 +76,6 @@ public:
     void EraseAborted(IDbWrapper& dbTable, const TInsertedData& key, const std::shared_ptr<IBlobsDeclareRemovingAction>& blobsAction);
     std::vector<TCommittedBlob> Read(ui64 pathId, const TSnapshot& snapshot, const std::shared_ptr<arrow::Schema>& pkSchema) const;
     bool Load(IDbWrapper& dbTable, const TInstant loadTime);
-private:
-
-    mutable TInstant LastCleanup;
 };
 
 }

+ 2 - 3
ydb/core/tx/columnshard/engines/insert_table/meta.h

@@ -23,9 +23,8 @@ public:
     TInsertedDataMeta(const NKikimrTxColumnShard::TLogicalMetadata& proto)
         : OriginalProto(proto)
     {
-        if (proto.HasDirtyWriteTimeSeconds()) {
-            DirtyWriteTime = TInstant::Seconds(proto.GetDirtyWriteTimeSeconds());
-        }
+        AFL_VERIFY(proto.HasDirtyWriteTimeSeconds())("data", proto.DebugString());
+        DirtyWriteTime = TInstant::Seconds(proto.GetDirtyWriteTimeSeconds());
         NumRows = proto.GetNumRows();
         RawBytes = proto.GetRawBytes();
     }

+ 11 - 2
ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp

@@ -111,13 +111,22 @@ THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetInsertedByPathId(const
     return result;
 }
 
-THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetDeprecatedInsertions(const TInstant timeBorder) const {
+THashSet<NKikimr::NOlap::TWriteId> TInsertionSummary::GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const {
+    if (timeBorder < MinInsertedTs) {
+        return {};
+    }
+
     THashSet<TWriteId> toAbort;
+    TInstant newMin = TInstant::Max();
     for (auto& [writeId, data] : Inserted) {
-        if (data.GetMeta().GetDirtyWriteTime() && data.GetMeta().GetDirtyWriteTime() < timeBorder) {
+        const TInstant dataInsertTs = data.GetMeta().GetDirtyWriteTime();
+        if (dataInsertTs < timeBorder && toAbort.size() < limit) {
             toAbort.insert(writeId);
+        } else {
+            newMin = Min(newMin, dataInsertTs);
         }
     }
+    MinInsertedTs = (toAbort.size() == Inserted.size()) ? TInstant::Zero() : newMin;
     return toAbort;
 }
 

+ 2 - 1
ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h

@@ -21,6 +21,7 @@ private:
 
     THashMap<TWriteId, TInsertedData> Inserted;
     THashMap<TWriteId, TInsertedData> Aborted;
+    mutable TInstant MinInsertedTs = TInstant::Zero();
 
     std::map<TPathInfoIndexPriority, std::set<const TPathInfo*>> Priorities;
     THashMap<ui64, TPathInfo> PathInfo;
@@ -35,7 +36,7 @@ private:
 public:
     THashSet<TWriteId> GetInsertedByPathId(const ui64 pathId) const;
 
-    THashSet<TWriteId> GetDeprecatedInsertions(const TInstant timeBorder) const;
+    THashSet<TWriteId> GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const;
 
     const THashMap<TWriteId, TInsertedData>& GetInserted() const {
         return Inserted;