Browse Source

KIKIMR-19802: split throught another way withno useless convertation

ivanmorozov 1 year ago
parent
commit
d808487703

+ 5 - 0
.mapping.json

@@ -5353,6 +5353,11 @@
   "ydb/core/tx/columnshard/engines/changes/compaction/CMakeLists.linux-x86_64.txt":"",
   "ydb/core/tx/columnshard/engines/changes/compaction/CMakeLists.txt":"",
   "ydb/core/tx/columnshard/engines/changes/compaction/CMakeLists.windows-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-aarch64.txt":"",
+  "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.linux-x86_64.txt":"",
+  "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.txt":"",
+  "ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.windows-x86_64.txt":"",
   "ydb/core/tx/columnshard/engines/insert_table/CMakeLists.darwin-x86_64.txt":"",
   "ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-aarch64.txt":"",
   "ydb/core/tx/columnshard/engines/insert_table/CMakeLists.linux-x86_64.txt":"",

+ 9 - 5
ydb/core/formats/arrow/arrow_helpers.cpp

@@ -171,7 +171,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumnsValidate(const std::shared_ptr
 
     auto srcSchema = srcBatch->schema();
     for (auto& name : columnNames) {
-        int pos = srcSchema->GetFieldIndex(name);
+        const int pos = srcSchema->GetFieldIndex(name);
         AFL_VERIFY(pos >= 0)("field_name", name);
         fields.push_back(srcSchema->field(pos));
         columns.push_back(srcBatch->column(pos));
@@ -206,7 +206,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
             auto srcField = srcBatch->schema()->GetFieldByName(field->name());
             Y_ABORT_UNLESS(srcField);
             if (!field->Equals(srcField)) {
-                AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name())
+                AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
                                 ("column_type", field->ToString(true))("incoming_type", srcField->ToString(true));
                 return nullptr;
             }
@@ -214,7 +214,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
 
         Y_ABORT_UNLESS(columns.back());
         if (!columns.back()->type()->Equals(field->type())) {
-            AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_parse_incoming_batch")("reason", "invalid_column_type")("column", field->name())
+            AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")("column", field->name())
                                 ("column_type", field->type()->ToString())("incoming_type", columns.back()->type()->ToString());
             return nullptr;
         }
@@ -696,7 +696,11 @@ int ScalarCompare(const std::shared_ptr<arrow::Scalar>& x, const std::shared_ptr
 std::shared_ptr<arrow::RecordBatch> SortBatch(const std::shared_ptr<arrow::RecordBatch>& batch,
                                               const std::shared_ptr<arrow::Schema>& sortingKey, const bool andUnique) {
     auto sortPermutation = MakeSortPermutation(batch, sortingKey, andUnique);
-    return Reorder(batch, sortPermutation, andUnique);
+    if (sortPermutation) {
+        return Reorder(batch, sortPermutation, andUnique);
+    } else {
+        return batch;
+    }
 }
 
 std::shared_ptr<arrow::Array> BoolVecToArray(const std::vector<bool>& vec) {
@@ -862,7 +866,7 @@ NJson::TJsonValue DebugJson(std::shared_ptr<arrow::Array> array, const ui32 head
         resultFull.InsertValue("tail", tail);
         auto& result = resultFull.InsertValue("data", NJson::JSON_ARRAY);
         for (int i = 0; i < column.length(); ++i) {
-            if (i >= (int)head && i + (int)tail <= column.length()) {
+            if (i >= (int)head && i + (int)tail < column.length()) {
                 continue;
             }
             if constexpr (arrow::has_string_view<typename TWrap::T>()) {

+ 10 - 0
ydb/core/formats/arrow/permutations.cpp

@@ -71,6 +71,8 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
     TStatusValidator::Validate(builder.Reserve(points.size()));
 
     TRawReplaceKey* predKey = nullptr;
+    int predPosition = -1;
+    bool isTrivial = true;
     for (auto& point : points) {
         if (andUnique) {
             if (predKey) {
@@ -83,10 +85,18 @@ std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<ar
                 }
             }
         }
+        if (point.GetPosition() != predPosition + 1) {
+            isTrivial = false;
+        }
+        predPosition = point.GetPosition();
         TStatusValidator::Validate(builder.Append(point.GetPosition()));
         predKey = &point;
     }
 
+    if (isTrivial && builder.length() == (i64)points.size()) {
+        return nullptr;
+    }
+
     std::shared_ptr<arrow::UInt64Array> out;
     TStatusValidator::Validate(builder.Finish(&out));
     return out;

+ 6 - 3
ydb/core/formats/arrow/reader/read_filter_merger.cpp

@@ -15,7 +15,7 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const {
     return result;
 }
 
-std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool greater) {
+std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool greater, const std::optional<ui32> includedStartPosition) {
     if (!batch || !batch->num_rows()) {
         return {};
     }
@@ -25,6 +25,9 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
     if (forFound.IsReverseSort()) {
         std::swap(posStart, posFinish);
     }
+    if (includedStartPosition) {
+        posStart = *includedStartPosition;
+    }
     TSortableBatchPosition position = forFound.BuildSame(batch, posStart);
     {
         position.InitPosition(posStart);
@@ -64,7 +67,7 @@ std::optional<TSortableBatchPosition::TFoundPosition> TSortableBatchPosition::Fi
 }
 
 TSortableBatchPosition::TFoundPosition TSortableBatchPosition::SkipToLower(const TSortableBatchPosition& forFound) {
-    auto pos = FindPosition(Batch, forFound, true);
+    auto pos = FindPosition(Batch, forFound, true, Position);
     AFL_VERIFY(pos)("batch", NArrow::DebugJson(Batch, 1, 1))("found", forFound.DebugJson());
     if (ReverseSort) {
         AFL_VERIFY(pos->GetPosition() <= Position)("pos", Position)("pos_skip", pos->GetPosition())("reverse", true);
@@ -75,7 +78,7 @@ TSortableBatchPosition::TFoundPosition TSortableBatchPosition::SkipToLower(const
     return *pos;
 }
 
-TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns) {
+TSortableScanData::TSortableScanData(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns) {
     for (auto&& i : columns) {
         auto c = batch->GetColumnByName(i);
         AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns));

+ 78 - 6
ydb/core/formats/arrow/reader/read_filter_merger.h

@@ -3,8 +3,8 @@
 #include <ydb/core/formats/arrow/arrow_filter.h>
 #include <ydb/core/formats/arrow/arrow_helpers.h>
 #include <ydb/core/formats/arrow/switch/switch_type.h>
-#include <ydb/core/formats/arrow/replace_key.h>
 #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <library/cpp/actors/core/log.h>
 #include <util/generic/hash.h>
 #include <util/string/join.h>
 #include <set>
@@ -19,7 +19,7 @@ private:
     YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields);
 public:
     TSortableScanData() = default;
-    TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns);
+    TSortableScanData(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columns);
 
     bool IsSameSchema(const std::shared_ptr<arrow::Schema>& schema) const {
         if (Fields.size() != (size_t)schema->num_fields()) {
@@ -103,13 +103,85 @@ public:
         }
     };
 
-    static std::optional<TFoundPosition> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater);
-    TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound);
+    template <class TContainer>
+    class TAssociatedContainerIterator {
+    private:
+        typename TContainer::const_iterator Current;
+        typename TContainer::const_iterator End;
+    public:
+        TAssociatedContainerIterator(const TContainer& container)
+            : Current(container.begin())
+            , End(container.end())
+        {
+        }
+
+        bool IsValid() const {
+            return Current != End;
+        }
+
+        void Next() {
+            ++Current;
+        }
+
+        const auto& CurrentPosition() const {
+            return Current->first;
+        }
+    };
+
+    //  (-inf, it1), [it1, it2), [it2, it3), ..., [itLast, +inf)
+    template <class TBordersIterator>
+    static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBorders(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, TBordersIterator& it) {
+        std::vector<std::shared_ptr<arrow::RecordBatch>> result;
+        if (!batch || batch->num_rows() == 0) {
+            while (it.IsValid()) {
+                result.emplace_back(nullptr);
+            }
+            result.emplace_back(nullptr);
+            return result;
+        }
+        TSortableBatchPosition pos(batch, 0, columnNames, {}, false);
+        bool batchFinished = false;
+        i64 recordsCountSplitted = 0;
+        for (; it.IsValid() && !batchFinished; it.Next()) {
+            const ui32 startPos = pos.GetPosition();
+            auto posFound = pos.SkipToLower(it.CurrentPosition());
+            if (posFound.IsGreater() || posFound.IsEqual()) {
+                if (posFound.GetPosition() == startPos) {
+                    result.emplace_back(nullptr);
+                } else {
+                    result.emplace_back(batch->Slice(startPos, posFound.GetPosition() - startPos));
+                    recordsCountSplitted += result.back()->num_rows();
+                }
+            } else {
+                result.emplace_back(batch->Slice(startPos, posFound.GetPosition() - startPos + 1));
+                recordsCountSplitted += result.back()->num_rows();
+                batchFinished = true;
+            }
+        }
+        if (batchFinished) {
+            for (; it.IsValid(); it.Next()) {
+                result.emplace_back(nullptr);
+            }
+            result.emplace_back(nullptr);
+        } else {
+            AFL_VERIFY(!it.IsValid());
+            result.emplace_back(batch->Slice(pos.GetPosition()));
+            AFL_VERIFY(result.back()->num_rows());
+            recordsCountSplitted += result.back()->num_rows();
+        }
+        AFL_VERIFY(batch->num_rows() == recordsCountSplitted);
+        return result;
+    }
 
-    NArrow::TReplaceKey BuildReplaceKey() const {
-        return NArrow::TReplaceKey::FromBatch(Batch, Position);
+    template <class TContainer>
+    static std::vector<std::shared_ptr<arrow::RecordBatch>> SplitByBordersInAssociativeContainer(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<std::string>& columnNames, const TContainer& container) {
+        TAssociatedContainerIterator<TContainer> it(container);
+        return SplitByBorders(batch, columnNames, it);
     }
 
+    static std::optional<TFoundPosition> FindPosition(const std::shared_ptr<arrow::RecordBatch>& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional<ui32> includedStartPosition);
+    TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound);
+
     const TSortableScanData& GetData() const {
         return *Data;
     }

+ 2 - 0
ydb/core/tx/columnshard/engines/changes/CMakeLists.darwin-x86_64.txt

@@ -8,6 +8,7 @@
 
 add_subdirectory(abstract)
 add_subdirectory(compaction)
+add_subdirectory(counters)
 
 add_library(columnshard-engines-changes)
 target_link_libraries(columnshard-engines-changes PUBLIC
@@ -18,6 +19,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
   columnshard-engines-insert_table
   engines-changes-abstract
   engines-changes-compaction
+  engines-changes-counters
   tx-columnshard-splitter
   ydb-core-tablet_flat
   core-tx-tiering

+ 2 - 0
ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-aarch64.txt

@@ -8,6 +8,7 @@
 
 add_subdirectory(abstract)
 add_subdirectory(compaction)
+add_subdirectory(counters)
 
 add_library(columnshard-engines-changes)
 target_link_libraries(columnshard-engines-changes PUBLIC
@@ -19,6 +20,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
   columnshard-engines-insert_table
   engines-changes-abstract
   engines-changes-compaction
+  engines-changes-counters
   tx-columnshard-splitter
   ydb-core-tablet_flat
   core-tx-tiering

+ 2 - 0
ydb/core/tx/columnshard/engines/changes/CMakeLists.linux-x86_64.txt

@@ -8,6 +8,7 @@
 
 add_subdirectory(abstract)
 add_subdirectory(compaction)
+add_subdirectory(counters)
 
 add_library(columnshard-engines-changes)
 target_link_libraries(columnshard-engines-changes PUBLIC
@@ -19,6 +20,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
   columnshard-engines-insert_table
   engines-changes-abstract
   engines-changes-compaction
+  engines-changes-counters
   tx-columnshard-splitter
   ydb-core-tablet_flat
   core-tx-tiering

+ 2 - 0
ydb/core/tx/columnshard/engines/changes/CMakeLists.windows-x86_64.txt

@@ -8,6 +8,7 @@
 
 add_subdirectory(abstract)
 add_subdirectory(compaction)
+add_subdirectory(counters)
 
 add_library(columnshard-engines-changes)
 target_link_libraries(columnshard-engines-changes PUBLIC
@@ -18,6 +19,7 @@ target_link_libraries(columnshard-engines-changes PUBLIC
   columnshard-engines-insert_table
   engines-changes-abstract
   engines-changes-compaction
+  engines-changes-counters
   tx-columnshard-splitter
   ydb-core-tablet_flat
   core-tx-tiering

+ 20 - 0
ydb/core/tx/columnshard/engines/changes/counters/CMakeLists.darwin-x86_64.txt

@@ -0,0 +1,20 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(engines-changes-counters)
+target_link_libraries(engines-changes-counters PUBLIC
+  contrib-libs-cxxsupp
+  yutil
+  ydb-core-protos
+  cpp-actors-core
+  ydb-core-tablet_flat
+)
+target_sources(engines-changes-counters PRIVATE
+  ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/changes/counters/general.cpp
+)

Some files were not shown because too many files changed in this diff