Просмотр исходного кода

KIKIMR-19092: simple compaction and optimizer for intervals correction

ivanmorozov 1 год назад
Родитель
Сommit
eac8ca1f55

+ 2 - 0
ydb/core/tx/columnshard/columnshard_impl.h

@@ -31,6 +31,7 @@ class TCompactColumnEngineChanges;
 class TInGranuleCompactColumnEngineChanges;
 class TSplitCompactColumnEngineChanges;
 class TInsertColumnEngineChanges;
+class TGeneralCompactColumnEngineChanges;
 }
 
 namespace NKikimr::NColumnShard {
@@ -114,6 +115,7 @@ class TColumnShard
     friend class NOlap::TSplitCompactColumnEngineChanges;
     friend class NOlap::TInsertColumnEngineChanges;
     friend class NOlap::TColumnEngineChanges;
+    friend class NOlap::TGeneralCompactColumnEngineChanges;
 
     friend class TTxController;
 

+ 1 - 0
ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt

@@ -30,4 +30,5 @@ target_sources(columnshard-engines-changes PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
 )

+ 1 - 0
ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt

@@ -31,4 +31,5 @@ target_sources(columnshard-engines-changes PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
 )

+ 1 - 0
ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt

@@ -31,4 +31,5 @@ target_sources(columnshard-engines-changes PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
 )

+ 1 - 0
ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt

@@ -30,4 +30,5 @@ target_sources(columnshard-engines-changes PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/cleanup.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/mark_granules.cpp
   ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
 )

+ 51 - 0
ydb/core/tx/columnshard/engines/changes/general_compaction.cpp

@@ -0,0 +1,51 @@
+#include "general_compaction.h"
+#include <ydb/core/tx/columnshard/columnshard_impl.h>
+#include <ydb/core/tx/columnshard/engines/storage/granule.h>
+
+namespace NKikimr::NOlap {
+
+TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept {
+    const ui64 pathId = GranuleMeta->GetPathId();
+    std::vector<TPortionInfoWithBlobs> portions = TPortionInfoWithBlobs::RestorePortions(SwitchedPortions, Blobs);
+    std::optional<TSnapshot> maxSnapshot;
+    for (auto&& i : SwitchedPortions) {
+        if (!maxSnapshot || *maxSnapshot < i.GetMinSnapshot()) {
+            maxSnapshot = i.GetMinSnapshot();
+        }
+    }
+    Y_VERIFY(maxSnapshot);
+
+    std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
+    auto resultSchema = context.SchemaVersions.GetLastSchema();
+    for (auto&& i : portions) {
+        auto dataSchema = context.SchemaVersions.GetSchema(i.GetPortionInfo().GetMinSnapshot());
+        batches.emplace_back(i.GetBatch(*dataSchema, *resultSchema));
+        Y_VERIFY(NArrow::IsSorted(batches.back(), resultSchema->GetIndexInfo().GetReplaceKey()));
+    }
+
+    auto merged = NArrow::MergeSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription(), Max<size_t>());
+    Y_VERIFY(merged.size() == 1);
+    auto batchResult = merged.front();
+    AppendedPortions = MakeAppendedPortions(pathId, batchResult, GranuleMeta->GetGranuleId(), *maxSnapshot, GranuleMeta.get(), context);
+
+    return TConclusionStatus::Success();
+}
+
+void TGeneralCompactColumnEngineChanges::DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) {
+    TBase::DoWriteIndexComplete(self, context);
+    self.IncCounter(context.FinishedSuccessfully ? NColumnShard::COUNTER_SPLIT_COMPACTION_SUCCESS : NColumnShard::COUNTER_SPLIT_COMPACTION_FAIL);
+    self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BLOBS_WRITTEN, context.BlobsWritten);
+    self.IncCounter(NColumnShard::COUNTER_SPLIT_COMPACTION_BYTES_WRITTEN, context.BytesWritten);
+}
+
+void TGeneralCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
+    TBase::DoStart(self);
+    auto& g = *GranuleMeta;
+    self.CSCounters.OnSplitCompactionInfo(g.GetAdditiveSummary().GetOther().GetPortionsSize(), g.GetAdditiveSummary().GetOther().GetPortionsCount());
+}
+
+NColumnShard::ECumulativeCounters TGeneralCompactColumnEngineChanges::GetCounterIndex(const bool isSuccess) const {
+    return isSuccess ? NColumnShard::COUNTER_COMPACTION_SUCCESS : NColumnShard::COUNTER_COMPACTION_FAIL;
+}
+
+}

+ 28 - 0
ydb/core/tx/columnshard/engines/changes/general_compaction.h

@@ -0,0 +1,28 @@
+#pragma once
+#include "compaction.h"
+
+namespace NKikimr::NOlap {
+
+class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
+private:
+    using TBase = TCompactColumnEngineChanges;
+    virtual void DoWriteIndexComplete(NColumnShard::TColumnShard& self, TWriteIndexCompleteContext& context) override;
+protected:
+    virtual TConclusionStatus DoConstructBlobs(TConstructionContext& context) noexcept override;
+    virtual TPortionMeta::EProduced GetResultProducedClass() const override {
+        return TPortionMeta::EProduced::SPLIT_COMPACTED;
+    }
+    virtual void DoStart(NColumnShard::TColumnShard& self) override;
+    virtual NColumnShard::ECumulativeCounters GetCounterIndex(const bool isSuccess) const override;
+public:
+    virtual bool IsSplit() const override {
+        return false;
+    }
+    using TBase::TBase;
+
+    virtual TString TypeString() const override {
+        return "GENERAL_COMPACTION";
+    }
+};
+
+}

+ 1 - 0
ydb/core/tx/columnshard/engines/changes/ya.make

@@ -9,6 +9,7 @@ SRCS(
     cleanup.cpp
     mark_granules.cpp
     with_appended.cpp
+    general_compaction.cpp
 )
 
 PEERDIR(

+ 2 - 0
ydb/core/tx/columnshard/engines/storage/CMakeLists.darwin-x86_64.txt

@@ -6,6 +6,7 @@
 # original buildsystem will not be accepted.
 
 
+add_subdirectory(optimizer)
 get_built_tool_path(
   TOOL_enum_parser_bin
   TOOL_enum_parser_dependency
@@ -19,6 +20,7 @@ target_link_libraries(columnshard-engines-storage PUBLIC
   yutil
   libs-apache-arrow
   ydb-core-protos
+  engines-storage-optimizer
   core-formats-arrow
   tools-enum_parser-enum_serialization_runtime
 )

+ 2 - 0
ydb/core/tx/columnshard/engines/storage/CMakeLists.linux-aarch64.txt

@@ -6,6 +6,7 @@
 # original buildsystem will not be accepted.
 
 
+add_subdirectory(optimizer)
 get_built_tool_path(
   TOOL_enum_parser_bin
   TOOL_enum_parser_dependency
@@ -20,6 +21,7 @@ target_link_libraries(columnshard-engines-storage PUBLIC
   yutil
   libs-apache-arrow
   ydb-core-protos
+  engines-storage-optimizer
   core-formats-arrow
   tools-enum_parser-enum_serialization_runtime
 )

Некоторые файлы не были показаны из-за большого количества измененных файлов