Browse Source

KIKIMR-19211: special wave-optimizer (level in future)

ivanmorozov 1 year ago
parent
commit
4b6ad89d8b

+ 15 - 0
.mapping.json

@@ -5366,6 +5366,21 @@
   "ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.linux-x86_64.txt":"",
   "ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.txt":"",
   "ydb/core/tx/columnshard/engines/storage/optimizer/CMakeLists.windows-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-aarch64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.linux-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/abstract/CMakeLists.windows-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-aarch64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.linux-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/intervals/CMakeLists.windows-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-aarch64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.linux-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.txt":"",
+  "ydb/core/tx/columnshard/engines/storage/optimizer/levels/CMakeLists.windows-x86_64.txt":"",
   "ydb/core/tx/columnshard/engines/ut/CMakeLists.darwin-x86_64.txt":"",
   "ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-aarch64.txt":"",
   "ydb/core/tx/columnshard/engines/ut/CMakeLists.linux-x86_64.txt":"",

+ 7 - 7
ydb/core/formats/arrow/replace_key.h

@@ -53,7 +53,7 @@ public:
         : Columns(columns)
         , Position(position)
     {
-        Y_VERIFY_DEBUG(Size() > 0 && Position < (ui64)Column(0).length());
+        Y_VERIFY(Size() > 0 && Position < (ui64)Column(0).length());
     }
 
     template<typename T = TArrayVecPtr> requires IsOwning
@@ -61,7 +61,7 @@ public:
         : Columns(std::make_shared<TArrayVec>(std::move(columns)))
         , Position(position)
     {
-        Y_VERIFY_DEBUG(Size() > 0 && Position < (ui64)Column(0).length());
+        Y_VERIFY(Size() > 0 && Position < (ui64)Column(0).length());
     }
 
     size_t Hash() const {
@@ -70,7 +70,7 @@ public:
 
     template<typename T>
     bool operator == (const TReplaceKeyTemplate<T>& key) const {
-        Y_VERIFY_DEBUG(Size() == key.Size());
+        Y_VERIFY(Size() == key.Size());
 
         for (int i = 0; i < Size(); ++i) {
             auto cmp = CompareColumnValue(i, key, i);
@@ -83,7 +83,7 @@ public:
 
     template<typename T>
     std::partial_ordering operator <=> (const TReplaceKeyTemplate<T>& key) const {
-        Y_VERIFY_DEBUG(Size() == key.Size());
+        Y_VERIFY(Size() == key.Size());
 
         for (int i = 0; i < Size(); ++i) {
             auto cmp = CompareColumnValue(i, key, i);
@@ -96,7 +96,7 @@ public:
 
     template<typename T>
     std::partial_ordering CompareNotNull(const TReplaceKeyTemplate<T>& key) const {
-        Y_VERIFY_DEBUG(Size() == key.Size());
+        Y_VERIFY(Size() == key.Size());
 
         for (int i = 0; i < Size(); ++i) {
             auto cmp = CompareColumnValueNotNull(i, key, i);
@@ -109,8 +109,8 @@ public:
 
     template<typename T>
     std::partial_ordering ComparePartNotNull(const TReplaceKeyTemplate<T>& key, int size) const {
-        Y_VERIFY_DEBUG(size <= key.Size());
-        Y_VERIFY_DEBUG(size <= Size());
+        Y_VERIFY(size <= key.Size());
+        Y_VERIFY(size <= Size());
 
         for (int i = 0; i < size; ++i) {
             auto cmp = CompareColumnValueNotNull(i, key, i);

+ 26 - 18
ydb/core/tx/columnshard/blob_manager.cpp

@@ -19,6 +19,13 @@ TLogoBlobID ParseLogoBlobId(TString blobId) {
 }
 
 struct TBlobBatch::TBatchInfo : TNonCopyable {
+private:
+    std::vector<TUnifiedBlobId> BlobIds;
+public:
+    const std::vector<TUnifiedBlobId>& GetBlobIds() const {
+        return BlobIds;
+    }
+
     TIntrusivePtr<TTabletStorageInfo> TabletInfo;
     TAllocatedGenStepConstPtr GenStepRef;
     const TBlobsManagerCounters Counters;
@@ -26,7 +33,6 @@ struct TBlobBatch::TBatchInfo : TNonCopyable {
     const ui32 Step;
     const ui32 Channel;
 
-    std::vector<ui32> BlobSizes;
     std::vector<bool> InFlight;
     i32 InFlightCount;
     ui64 TotalSizeBytes;
@@ -42,18 +48,15 @@ struct TBlobBatch::TBatchInfo : TNonCopyable {
         , TotalSizeBytes(0) {
     }
 
-    TUnifiedBlobId NextBlobId(ui32 blobSize) {
-        BlobSizes.push_back(blobSize);
+    TUnifiedBlobId NextBlobId(const ui32 blobSize) {
         InFlight.push_back(true);
         ++InFlightCount;
         TotalSizeBytes += blobSize;
-        return MakeBlobId(BlobSizes.size() - 1);
-    }
 
-    TUnifiedBlobId MakeBlobId(ui32 i) const {
-        Y_VERIFY(i < BlobSizes.size());
         const ui32 dsGroup = TabletInfo->GroupFor(Channel, Gen);
-        return TUnifiedBlobId(dsGroup, TLogoBlobID(TabletInfo->TabletID, Gen, Step, Channel, BlobSizes[i], i));
+        TUnifiedBlobId nextBlobId(dsGroup, TLogoBlobID(TabletInfo->TabletID, Gen, Step, Channel, blobSize, BlobIds.size()));
+        BlobIds.emplace_back(std::move(nextBlobId));
+        return BlobIds.back();
     }
 };
 
@@ -90,6 +93,7 @@ void TBlobBatch::OnBlobWriteResult(const TLogoBlobID& blobId, const NKikimrProto
     BatchInfo->Counters.OnPutResult(blobId.BlobSize());
     Y_VERIFY(status == NKikimrProto::OK, "The caller must handle unsuccessful status");
     Y_VERIFY(BatchInfo);
+    Y_VERIFY(blobId.Cookie() < BatchInfo->InFlight.size());
     Y_VERIFY(BatchInfo->InFlight[blobId.Cookie()], "Blob %s is already acked!", blobId.ToString().c_str());
 
     BatchInfo->InFlight[blobId.Cookie()] = false;
@@ -104,7 +108,7 @@ bool TBlobBatch::AllBlobWritesCompleted() const {
 
 ui64 TBlobBatch::GetBlobCount() const {
     if (BatchInfo) {
-        return BatchInfo->BlobSizes.size();
+        return BatchInfo->GetBlobIds().size();
     }
     return 0;
 }
@@ -217,7 +221,7 @@ TGenStep TBlobManager::FindNewGCBarrier() {
 
 std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTask(const TString& storageId, const std::shared_ptr<TBlobManager>& manager) {
     if (BlobsToKeep.empty() && BlobsToDelete.empty() && LastCollectedGenStep == TGenStep{CurrentGen, CurrentStep}) {
-        ACFL_DEBUG("event", "TBlobManager::NeedStorageGC skip");
+        ACFL_DEBUG("event", "TBlobManager::BuildGCTask skip")("current_gen", CurrentGen)("current_step", CurrentStep);
         return nullptr;
     }
 
@@ -231,7 +235,7 @@ std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::Bui
 
     NOlap::NBlobOperations::NBlobStorage::TGCTask::TGCListsByGroup perGroupGCListsInFlight;
 
-    // Clear all possibly not keeped trash in channel's groups: create an event for each group
+    // Clear all possibly not kept trash in channel's groups: create an event for each group
     if (FirstGC) {
         FirstGC = false;
 
@@ -256,6 +260,7 @@ std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::Bui
             }
             ui32 blobGroup = TabletInfo->GroupFor(keepBlobIt->Channel(), keepBlobIt->Generation());
             perGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt);
+            AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc", *keepBlobIt);
         }
         BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt);
         BlobsManagerCounters.OnBlobsKeep(BlobsToKeep);
@@ -267,10 +272,12 @@ std::shared_ptr<NOlap::NBlobOperations::NBlobStorage::TGCTask> TBlobManager::Bui
             if (genStep > newCollectGenStep) {
                 break;
             }
+            AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", *blobIt);
             ui32 blobGroup = TabletInfo->GroupFor(blobIt->Channel(), blobIt->Generation());
             NOlap::NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[blobGroup];
             bool skipDontKeep = false;
             if (gl.KeepList.erase(*blobIt)) {
+                AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc_remove", *blobIt);
                 // Skipped blobs still need to be deleted from BlobsToKeep table
                 keepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *blobIt));
 
@@ -317,21 +324,21 @@ void TBlobManager::DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) {
 
     LOG_S_DEBUG("BlobManager at tablet " << TabletInfo->TabletID
         << " Save Batch GenStep: " << blobBatch.BatchInfo->Gen << ":" << blobBatch.BatchInfo->Step
-        << " Blob count: " << blobBatch.BatchInfo->BlobSizes.size());
+        << " Blob count: " << blobBatch.BatchInfo->GetBlobIds().size());
 
     // Add this batch to KeepQueue
     TGenStep edgeGenStep = EdgeGenStep();
-    for (ui32 i = 0; i < blobBatch.BatchInfo->BlobSizes.size(); ++i) {
-        const TUnifiedBlobId blobId = blobBatch.BatchInfo->MakeBlobId(i);
+    for (auto&& blobId: blobBatch.BatchInfo->GetBlobIds()) {
         Y_VERIFY_DEBUG(blobId.IsDsBlob(), "Not a DS blob id: %s", blobId.ToStringNew().c_str());
 
-        auto logoblobId = blobId.GetLogoBlobId();
-        TGenStep genStep{logoblobId.Generation(), logoblobId.Step()};
+        auto logoBlobId = blobId.GetLogoBlobId();
+        TGenStep genStep{logoBlobId.Generation(), logoBlobId.Step()};
 
         AFL_VERIFY(genStep > edgeGenStep)("gen_step", genStep)("edge_gen_step", edgeGenStep)("blob_id", blobId.ToStringNew());
+        AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep", logoBlobId.ToString());
 
-        BlobsManagerCounters.OnKeepMarker(logoblobId.BlobSize());
-        BlobsToKeep.insert(std::move(logoblobId));
+        BlobsManagerCounters.OnKeepMarker(logoBlobId.BlobSize());
+        BlobsToKeep.insert(std::move(logoBlobId));
         db.AddBlobToKeep(blobId);
     }
     BlobsManagerCounters.OnBlobsKeep(BlobsToKeep);
@@ -344,6 +351,7 @@ void TBlobManager::DeleteBlob(const TUnifiedBlobId& blobId, IBlobManagerDb& db)
 
     // Persist deletion intent
     db.AddBlobToDelete(blobId);
+    AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete", blobId);
 
     // Check if the deletion needs to be delayed until the blob is no longer
     // used by in-flight requests

+ 4 - 2
ydb/core/tx/columnshard/blobs_action/counters/write.cpp

@@ -11,11 +11,13 @@ TWriteCounters::TWriteCounters(const TConsumerCounters& owner)
 
     RepliesCount = TBase::GetDeriviative("Replies/Count");
     ReplyBytes = TBase::GetDeriviative("Replies/Bytes");
-    ReplyDuration = TBase::GetHistogram("Replies/Duration", NMonitoring::ExponentialHistogram(15, 2, 1000));
+    ReplyDurationBySize = TBase::GetHistogram("Replies/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1));
+    ReplyDurationByCount = TBase::GetHistogram("Replies/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 1));
 
     FailsCount = TBase::GetDeriviative("Fails/Count");
     FailBytes = TBase::GetDeriviative("Fails/Bytes");
-    FailDuration = TBase::GetHistogram("Fails/Duration", NMonitoring::ExponentialHistogram(15, 2, 1000));
+    FailDurationBySize = TBase::GetHistogram("Fails/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 2));
+    FailDurationByCount = TBase::GetHistogram("Fails/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 2));
 }
 
 }

+ 8 - 4
ydb/core/tx/columnshard/blobs_action/counters/write.h

@@ -14,11 +14,13 @@ private:
 
     NMonitoring::TDynamicCounters::TCounterPtr RepliesCount;
     NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes;
-    NMonitoring::THistogramPtr ReplyDuration;
+    NMonitoring::THistogramPtr ReplyDurationByCount;
+    NMonitoring::THistogramPtr ReplyDurationBySize;
 
     NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
     NMonitoring::TDynamicCounters::TCounterPtr FailBytes;
-    NMonitoring::THistogramPtr FailDuration;
+    NMonitoring::THistogramPtr FailDurationByCount;
+    NMonitoring::THistogramPtr FailDurationBySize;
 public:
     TWriteCounters(const TConsumerCounters& owner);
 
@@ -30,13 +32,15 @@ public:
     void OnReply(const ui64 bytes, const TDuration d) const {
         RepliesCount->Add(1);
         ReplyBytes->Add(bytes);
-        ReplyDuration->Collect(d.MilliSeconds());
+        ReplyDurationByCount->Collect((i64)d.MilliSeconds());
+        ReplyDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes);
     }
 
     void OnFail(const ui64 bytes, const TDuration d) const {
         FailsCount->Add(1);
         FailBytes->Add(bytes);
-        FailDuration->Collect(d.MilliSeconds());
+        FailDurationByCount->Collect((i64)d.MilliSeconds());
+        FailDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes);
     }
 };
 

+ 2 - 2
ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp

@@ -5,12 +5,12 @@
 namespace NKikimr::NColumnShard {
 
 bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) {
-    TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID()));
+    auto changes = Ev->Get()->IndexChanges;
+    TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("external_task_id", changes->GetTaskIdentifier());
     Y_VERIFY(Self->InsertTable);
     Y_VERIFY(Self->TablesManager.HasPrimaryIndex());
     txc.DB.NoMoreReadsForTx();
 
-    auto changes = Ev->Get()->IndexChanges;
     ACFL_DEBUG("event", "TTxWriteIndex::Execute")("change_type", changes->TypeString())("details", *changes);
     if (Ev->Get()->GetPutStatus() == NKikimrProto::OK) {
         NOlap::TSnapshot snapshot(Self->LastPlannedStep, Self->LastPlannedTxId);

+ 3 - 3
ydb/core/tx/columnshard/engines/changes/compaction.cpp

@@ -75,7 +75,7 @@ void TCompactColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, T
     NeedGranuleStatusProvide = false;
 }
 
-TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext)
+TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext)
     : TBase(limits.GetSplitSettings(), saverContext, StaticTypeName())
     , Limits(limits)
     , GranuleMeta(granule)
@@ -83,10 +83,10 @@ TCompactColumnEngineChanges::TCompactColumnEngineChanges(const TCompactionLimits
     Y_VERIFY(GranuleMeta);
 
     SwitchedPortions.reserve(portions.size());
-    for (const auto& [_, portionInfo] : portions) {
+    for (const auto& portionInfo : portions) {
         Y_VERIFY(portionInfo->IsActive());
         SwitchedPortions.emplace_back(*portionInfo);
-        PortionsToRemove.emplace_back(*portionInfo);
+        AFL_VERIFY(PortionsToRemove.emplace(portionInfo->GetAddress(), *portionInfo).second);
         Y_VERIFY(portionInfo->GetGranule() == GranuleMeta->GetGranuleId());
     }
     Y_VERIFY(SwitchedPortions.size());

+ 1 - 1
ydb/core/tx/columnshard/engines/changes/compaction.h

@@ -32,7 +32,7 @@ public:
 
     virtual THashSet<TPortionAddress> GetTouchedPortions() const override;
 
-    TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::map<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext);
+    TCompactColumnEngineChanges(const TCompactionLimits& limits, std::shared_ptr<TGranuleMeta> granule, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const TSaverContext& saverContext);
     ~TCompactColumnEngineChanges();
 
     static TString StaticTypeName() {

+ 83 - 66
ydb/core/tx/columnshard/engines/changes/general_compaction.cpp

@@ -37,7 +37,7 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
         pkFieldNamesSet.emplace(i);
     }
 
-    std::shared_ptr<arrow::RecordBatch> batchResult;
+    std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults;
     {
         arrow::FieldVector indexFields;
         indexFields.emplace_back(portionIdField);
@@ -62,51 +62,53 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
             Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, resultSchema->GetIndexInfo().GetReplaceKey()));
             mergeStream.AddPoolSource({}, batch, nullptr);
         }
-
-        NIndexedReader::TRecordBatchBuilder indexesBuilder(indexFields);
-        mergeStream.DrainAll(indexesBuilder);
-        batchResult = indexesBuilder.Finalize();
+        batchResults = mergeStream.DrainAllParts(CheckPoints, indexFields, true);
     }
-    
-    auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
-    auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName);
-    auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
-    auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
-    Y_VERIFY(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx);
-    Y_VERIFY(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
-    Y_VERIFY(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
-    Y_VERIFY(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
-    Y_VERIFY(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
-    const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
-    const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);
-
-    const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / 10000 + 1) + 1;
+    Y_VERIFY(batchResults.size());
 
     TSerializationStats stats;
     for (auto&& i : SwitchedPortions) {
         stats.Merge(i.GetSerializationStat(*resultSchema));
     }
 
-    std::map<std::string, std::vector<TColumnPortionResult>> columnChunks;
-
+    std::vector<std::map<std::string, std::vector<TColumnPortionResult>>> chunkGroups;
+    chunkGroups.resize(batchResults.size());
     for (auto&& f : resultSchema->GetSchema()->fields()) {
         const ui32 columnId = resultSchema->GetColumnId(f->name());
         auto columnInfo = stats.GetColumnInfo(columnId);
         Y_VERIFY(columnInfo);
-        TColumnMergeContext context(resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, f, *columnInfo, SaverContext);
-        TMergedColumn mColumn(context);
-        {
-//            auto c = batchResult->GetColumnByName(f->name());
-//            AFL_VERIFY(!c);
+
+        std::vector<TPortionColumnCursor> cursors;
+        auto loader = resultSchema->GetColumnLoader(f->name());
+        for (auto&& p : portions) {
+            std::vector<const TColumnRecord*> records;
+            std::vector<IPortionColumnChunk::TPtr> chunks;
+            p.ExtractColumnChunks(columnId, records, chunks);
+            cursors.emplace_back(TPortionColumnCursor(chunks, records, loader));
+        }
+
+        ui32 batchesRecordsCount = 0;
+        ui32 columnRecordsCount = 0;
+        std::map<std::string, std::vector<TColumnPortionResult>> columnChunks;
+        ui32 batchIdx = 0;
+        for (auto&& batchResult : batchResults) {
+            const ui32 portionRecordsCountLimit = batchResult->num_rows() / (batchResult->num_rows() / 10000 + 1) + 1;
+            TColumnMergeContext context(resultSchema, portionRecordsCountLimit, 50 * 1024 * 1024, f, *columnInfo, SaverContext);
+            TMergedColumn mColumn(context);
+
+            auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
+            auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName);
+            auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
+            auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
+            Y_VERIFY(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx);
+            Y_VERIFY(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
+            Y_VERIFY(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
+            Y_VERIFY(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
+            Y_VERIFY(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
+            const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
+            const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);
+
             AFL_VERIFY(batchResult->num_rows() == pIdxArray.length());
-            std::vector<TPortionColumnCursor> cursors;
-            auto loader = resultSchema->GetColumnLoader(f->name());
-            for (auto&& p : portions) {
-                std::vector<const TColumnRecord*> records;
-                std::vector<IPortionColumnChunk::TPtr> chunks;
-                p.ExtractColumnChunks(columnId, records, chunks);
-                cursors.emplace_back(TPortionColumnCursor(chunks, records, loader));
-            }
             std::optional<ui16> predPortionIdx;
             for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) {
                 const ui16 portionIdx = pIdxArray.Value(idx);
@@ -121,46 +123,54 @@ TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstruc
                 }
                 predPortionIdx = portionIdx;
             }
+            chunkGroups[batchIdx][f->name()] = mColumn.BuildResult();
+            batchesRecordsCount += batchResult->num_rows();
+            columnRecordsCount += mColumn.GetRecordsCount();
+            ++batchIdx;
         }
-        AFL_VERIFY(mColumn.GetRecordsCount() == batchResult->num_rows())("f_name", f->name())("mCount", mColumn.GetRecordsCount())("bCount", batchResult->num_rows());
-        columnChunks[f->name()] = mColumn.BuildResult();
-    }
-
-    Y_VERIFY(columnChunks.size());
+        AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("f_name", f->name())("mCount", columnRecordsCount)("bCount", batchesRecordsCount);
 
-    for (auto&& i : columnChunks) {
-        if (i.second.size() != columnChunks.begin()->second.size()) {
-            for (ui32 p = 0; p < std::min<ui32>(columnChunks.begin()->second.size(), i.second.size()); ++p) {
-                AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())("p", i.second[p].DebugString());
+    }
+    ui32 batchIdx = 0;
+    for (auto&& columnChunks : chunkGroups) {
+        auto batchResult = batchResults[batchIdx];
+        ++batchIdx;
+        Y_VERIFY(columnChunks.size());
+
+        for (auto&& i : columnChunks) {
+            if (i.second.size() != columnChunks.begin()->second.size()) {
+                for (ui32 p = 0; p < std::min<ui32>(columnChunks.begin()->second.size(), i.second.size()); ++p) {
+                    AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("p_first", columnChunks.begin()->second[p].DebugString())("p", i.second[p].DebugString());
+                }
             }
+            AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())("current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
         }
-        AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())("current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
-    }
 
-    std::vector<TGeneralSerializedSlice> batchSlices;
-    std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultSchema, SaverContext, std::move(stats)));
+        std::vector<TGeneralSerializedSlice> batchSlices;
+        std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultSchema, SaverContext, std::move(stats)));
 
-    for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) {
-        std::map<ui32, std::vector<IPortionColumnChunk::TPtr>> portionColumns;
-        for (auto&& p : columnChunks) {
-            portionColumns.emplace(resultSchema->GetColumnId(p.first), p.second[i].GetChunks());
+        for (ui32 i = 0; i < columnChunks.begin()->second.size(); ++i) {
+            std::map<ui32, std::vector<IPortionColumnChunk::TPtr>> portionColumns;
+            for (auto&& p : columnChunks) {
+                portionColumns.emplace(resultSchema->GetColumnId(p.first), p.second[i].GetChunks());
+            }
+            batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters, GetSplitSettings());
         }
-        batchSlices.emplace_back(portionColumns, schemaDetails, context.Counters.SplitterCounters, GetSplitSettings());
-    }
 
-    TSimilarSlicer slicer(4 * 1024 * 1024);
-    auto packs = slicer.Split(batchSlices);
-
-    ui32 recordIdx = 0;
-    for (auto&& i : packs) {
-        TGeneralSerializedSlice slice(std::move(i));
-        auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
-        std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs();
-        AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetGranuleId(), *maxSnapshot, SaverContext.GetStorageOperator()));
-        NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
-        NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
-        AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName());
-        recordIdx += slice.GetRecordsCount();
+        TSimilarSlicer slicer(4 * 1024 * 1024);
+        auto packs = slicer.Split(batchSlices);
+
+        ui32 recordIdx = 0;
+        for (auto&& i : packs) {
+            TGeneralSerializedSlice slice(std::move(i));
+            auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
+            std::vector<std::vector<IPortionColumnChunk::TPtr>> chunksByBlobs = slice.GroupChunksByBlobs();
+            AppendedPortions.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, GranuleMeta->GetGranuleId(), *maxSnapshot, SaverContext.GetStorageOperator()));
+            NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
+            NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
+            AppendedPortions.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName());
+            recordIdx += slice.GetRecordsCount();
+        }
     }
     if (IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
         TStringBuilder sbSwitched;
@@ -198,4 +208,11 @@ NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounter
     return isSuccess ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL;
 }
 
+void TGeneralCompactColumnEngineChanges::AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position) {
+    if (CheckPoints.size()) {
+        AFL_VERIFY(CheckPoints.back().Compare(position) == std::partial_ordering::less);
+    }
+    CheckPoints.emplace_back(position);
+}
+
 }

+ 4 - 0
ydb/core/tx/columnshard/engines/changes/general_compaction.h

@@ -1,5 +1,6 @@
 #pragma once
 #include "compaction.h"
+#include <ydb/core/formats/arrow/reader/read_filter_merger.h>
 
 namespace NKikimr::NOlap::NCompaction {
 
@@ -7,6 +8,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
 private:
     using TBase = TCompactColumnEngineChanges;
     virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
+    std::vector<NIndexedReader::TSortableBatchPosition> CheckPoints;
 protected:
     virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override;
     virtual TPortionMeta::EProduced GetResultProducedClass() const override {
@@ -17,6 +19,8 @@ protected:
 public:
     using TBase::TBase;
 
+    void AddCheckPoint(const NIndexedReader::TSortableBatchPosition& position);
+
     virtual TString TypeString() const override {
         return StaticTypeName();
     }

Some files were not shown because too many files changed in this diff