Browse Source

KIKIMR-15075: hotfix for unlimited eviction read in ColumnShard

ref:3e5d7dd77957bdc76c5d4108fcd1788cbf51ace0
Artem Zuikov 2 years ago
parent
commit
83725a03a8

+ 7 - 7
ROADMAP.md

@@ -1,14 +1,14 @@
 # YDB Roadmap
 
 ## Query Processor
-1. [ ] Support for **Snapshot Readonly** transactions mode
+1. [ ] Support for **Snapshot Read-only** transactions mode
 1. [ ] **Better resource management** for KQP Resource Manager (share information about nodes resources, avoid OOMs)
 1. [ ] Switch to **New Engine** for OLTP queries
 1. ✅ Support **`not null` for table columns**
 1. [ ] **Aggregates and predicates push down to column-oriented tables**
 1. [ ] **Optimize data formats** for data transition between query phases
-1. [ ] **Index Rename/Rebuild**
-1. [ ] **KQP Session Actor** as a replacement for KQP Worker Actor (optimize to reduce CPU usage)
+1. [ ] **Index Rebuild**
+1. [ ] Optimize **KQP Session Actor** (optimize to reduce CPU usage)
 1. [ ] **PostgreSQL compatibility**
     * [ ] Support PostgreSQL datatypes **serialization/deserialization** in YDB Public API
     * [ ] PostgreSQL compatible **query execution** (TPC-C, TPC-H queries should work)
@@ -17,13 +17,13 @@
 1. [ ] Support **constraints in query optimizer**
 1. [ ] **Query Processor 3.0** (a set of tasks to be more like traditional database in case of query execution functionality)
     * [ ] Support for **Streaming Lookup Join** via MVCC snapshots (avoid distributed transactions, scalability is better)
-    * [ ] **Universal API call for DML, DDL with unlimited results size** (aka StreamExecuteQuery, which allows to execute each query)
+    * [ ] **Universal API call for DML, DDL, ScanQuery** (aka StreaminExecuteQuery, which allows to execute each query)
     * [ ] Support for **secondary indexes in ScanQuery**
-    * [ ] **Transaction can see its own updates** (updates made during transaction execution are not buffered in RAM anymore, but rather are written to disk and available to read by this transaction)
-1. [ ] **Computation graphs caching (compute/datashard programs)** (optimize CPU usage)
+    * [ ] **Transaction can see its updates** (updates made during transaction execution are not buffered in RAM anymore, but rather are written to disk and available to read by this transaction)
+1. [ ] **Query graphs caching at DataShards** (optimize CPU usage)
 1. [ ] **RPC Deadline & Cancellation propagation** (smooth timeout management)
 1. [ ] **DDL for column-oriented tables**
-1. [ ] **Select indexes automatically**
+2. [ ] **Select indexes automatically**
 
 ## Database Core (Tablets, etc)
 1. [ ] Get **YDB topics** (aka pers queue, streams) ready for production

+ 8 - 2
ydb/core/tx/columnshard/columnshard__write_index.cpp

@@ -44,6 +44,9 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
     auto changes = Ev->Get()->IndexChanges;
     Y_VERIFY(changes);
 
+    LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString()
+        << ") changes: " << *changes << " at tablet " << Self->TabletID());
+
     bool ok = false;
     if (Ev->Get()->PutStatus == NKikimrProto::OK) {
         NOlap::TSnapshot snapshot = changes->ApplySnapshot;
@@ -55,8 +58,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
         NOlap::TDbWrapper dbWrap(txc.DB, &dsGroupSelector);
         ok = Self->PrimaryIndex->ApplyChanges(dbWrap, changes, snapshot); // update changes + apply
         if (ok) {
-            LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString()
-                << ") apply changes: " << *changes << " at tablet " << Self->TabletID());
+            LOG_S_DEBUG("TTxWriteIndex (" << changes->TypeString() << ") apply at tablet " << Self->TabletID());
 
             TBlobManagerDb blobManagerDb(txc.DB);
             for (const auto& cmtd : changes->DataToIndex) {
@@ -213,6 +215,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
     }
 
     if (BlobsToExport.size()) {
+        size_t numBlobs = BlobsToExport.size();
         for (auto& [blobId, tierName] : BlobsToExport) {
             ExportTierBlobs[tierName].emplace(blobId, TString{});
         }
@@ -221,6 +224,9 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
         ExportNo = Self->LastExportNo + 1;
         Self->LastExportNo += ExportTierBlobs.size();
 
+        LOG_S_DEBUG("TTxWriteIndex init export " << ExportNo << " of " << numBlobs << " blobs in "
+            << ExportTierBlobs.size() << " tiers at tablet " << Self->TabletID());
+
         NIceDb::TNiceDb db(txc.DB);
         Schema::SaveSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
     }

+ 3 - 0
ydb/core/tx/columnshard/columnshard_impl.cpp

@@ -738,6 +738,9 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
         LOG_S_NOTICE("Cannot prepare TTL at tablet " << TabletID());
         return {};
     }
+    if (indexChanges->NeedRepeat) {
+        Ttl.Repeat();
+    }
 
     bool needWrites = !indexChanges->PortionsToEvict.empty();
 

+ 9 - 2
ydb/core/tx/columnshard/columnshard_ttl.h

@@ -6,6 +6,7 @@ namespace NKikimr::NColumnShard {
 class TTtl {
 public:
     static constexpr const ui64 DEFAULT_TTL_TIMEOUT_SEC = 60 * 60;
+    static constexpr const ui64 DEFAULT_REPEAT_TTL_TIMEOUT_SEC = 10;
 
     struct TEviction {
         TString TierName;
@@ -80,7 +81,7 @@ public:
     }
 
     THashMap<ui64, NOlap::TTiersInfo> MakeIndexTtlMap(TInstant now, bool force = false) {
-        if ((now < LastRegularTtl + RegularTtlTimeout) && !force) {
+        if ((now < LastRegularTtl + TtlTimeout) && !force) {
             return {};
         }
 
@@ -93,12 +94,18 @@ public:
         return out;
     }
 
+    void Repeat() {
+        LastRegularTtl -= TtlTimeout;
+        LastRegularTtl += RepeatTtlTimeout;
+    }
+
     const THashSet<TString>& TtlColumns() const { return Columns; }
 
 private:
     THashMap<ui64, TDescription> PathTtls; // pathId -> ttl
     THashSet<TString> Columns;
-    TDuration RegularTtlTimeout{TDuration::Seconds(DEFAULT_TTL_TIMEOUT_SEC)};
+    TDuration TtlTimeout{TDuration::Seconds(DEFAULT_TTL_TIMEOUT_SEC)};
+    TDuration RepeatTtlTimeout{TDuration::Seconds(DEFAULT_REPEAT_TTL_TIMEOUT_SEC)};
     TInstant LastRegularTtl;
 
     NOlap::TTiersInfo Convert(const TDescription& descr, TInstant timePoint) const {

+ 5 - 1
ydb/core/tx/columnshard/engines/column_engine.h

@@ -16,6 +16,8 @@ struct TPredicate;
 struct TCompactionLimits {
     static constexpr const ui32 MIN_GOOD_BLOB_SIZE = 256 * 1024; // some BlobStorage constant
     static constexpr const ui32 MAX_BLOB_SIZE = 8 * 1024 * 1024; // some BlobStorage constant
+    static constexpr const ui64 DEFAULT_EVICTION_BYTES = 64 * 1024 * 1024;
+    static constexpr const ui64 MAX_BLOBS_TO_DELETE = 10000;
 
     ui32 GoodBlobSize{MIN_GOOD_BLOB_SIZE};
     ui32 GranuleBlobSplitSize{MAX_BLOB_SIZE};
@@ -131,6 +133,7 @@ public:
     TVector<TColumnRecord> EvictedRecords;
     TVector<std::pair<TPortionInfo, ui64>> PortionsToMove; // {portion, new granule}
     THashMap<TBlobRange, TString> Blobs;
+    bool NeedRepeat{false};
 
     bool IsInsert() const { return Type == INSERT; }
     bool IsCompaction() const { return Type == COMPACTION; }
@@ -363,7 +366,8 @@ public:
                                                                   const TSnapshot& outdatedSnapshot) = 0;
     virtual std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot,
                                                                THashSet<ui64>& pathsToDrop) = 0;
-    virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) = 0;
+    virtual std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls,
+                                                           ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) = 0;
     virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) = 0;
     virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
     //virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO

+ 16 - 3
ydb/core/tx/columnshard/engines/column_engine_logs.cpp

@@ -771,13 +771,18 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
     return changes;
 }
 
-std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) {
+std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls,
+                                                                     ui64 maxEvictBytes) {
     if (pathTtls.empty()) {
         return {};
     }
 
     TSnapshot fakeSnapshot = {1, 1}; // TODO: better snapshot
     auto changes = std::make_shared<TChanges>(TColumnEngineChanges::TTL, fakeSnapshot);
+    ui64 evicttionSize = 0;
+    bool allowEviction = true;
+    ui64 dropBlobs = 0;
+    bool allowDrop = true;
 
     for (auto& [pathId, ttl] : pathTtls) {
         if (!PathGranules.count(pathId)) {
@@ -800,19 +805,24 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
                     continue;
                 }
 
+                allowEviction = (evicttionSize <= maxEvictBytes);
+                allowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE);
+
                 if (auto max = info.MaxValue(ttlColumnId)) {
                     bool keep = false;
                     for (auto& border : ttl.TierBorders) {
                         if (NArrow::ScalarLess(*border.ToTimestamp(), *max)) {
                             keep = true;
-                            if (info.TierName != border.TierName) {
+                            if (allowEviction && info.TierName != border.TierName) {
+                                evicttionSize += info.BlobsSizes().first;
                                 changes->PortionsToEvict.emplace_back(info, border.TierName);
                             }
                             break;
                         }
                     }
-                    if (!keep) {
+                    if (!keep && allowDrop) {
                         Y_VERIFY(!NArrow::ScalarLess(*ttl.TierBorders.back().ToTimestamp(), *max));
+                        dropBlobs += info.NumRecords();
                         changes->PortionsToDrop.push_back(info);
                     }
                 }
@@ -825,6 +835,9 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartTtl(const THash
         return {};
     }
 
+    if (!allowEviction || !allowDrop) {
+        changes->NeedRepeat = true;
+    }
     return changes;
 }
 

+ 2 - 1
ydb/core/tx/columnshard/engines/column_engine_logs.h

@@ -141,7 +141,8 @@ public:
                                                           const TSnapshot& outdatedSnapshot) override;
     std::shared_ptr<TColumnEngineChanges> StartCleanup(const TSnapshot& snapshot,
                                                        THashSet<ui64>& pathsToDrop) override;
-    std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls) override;
+    std::shared_ptr<TColumnEngineChanges> StartTtl(const THashMap<ui64, TTiersInfo>& pathTtls,
+                                                   ui64 maxEvictBytes = TCompactionLimits::DEFAULT_EVICTION_BYTES) override;
     bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
                       const TSnapshot& snapshot) override;
     void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override;