Browse Source

improve chunks splitter performance (#7665)

ivanmorozov333 7 months ago
parent
commit
555ebf399c
1 changed files with 20 additions and 8 deletions
  1. 20 8
      ydb/core/formats/arrow/arrow_helpers.cpp

+ 20 - 8
ydb/core/formats/arrow/arrow_helpers.cpp

@@ -934,25 +934,37 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SliceToRecordBatches(const std:
     }
     std::sort(positions.begin(), positions.end());
     positions.erase(std::unique(positions.begin(), positions.end()), positions.end());
-
+    AFL_VERIFY(positions.size() > 1)("size", positions.size())("positions", JoinSeq(",", positions));
     std::vector<std::vector<std::shared_ptr<arrow::Array>>> slicedData;
     slicedData.resize(positions.size() - 1);
-    {
-        for (auto&& i : t->columns()) {
-            for (ui32 idx = 0; idx + 1 < positions.size(); ++idx) {
-                auto slice = i->Slice(positions[idx], positions[idx + 1] - positions[idx]);
-                AFL_VERIFY(slice->num_chunks() == 1);
-                slicedData[idx].emplace_back(slice->chunks().front());
+    for (auto&& i : t->columns()) {
+        ui32 currentPosition = 0;
+        auto it = i->chunks().begin();
+        ui32 length = (*it)->length();
+        for (ui32 idx = 0; idx + 1 < positions.size(); ++idx) {
+            auto chunk = (*it)->Slice(positions[idx] - currentPosition, positions[idx + 1] - positions[idx]);
+            AFL_VERIFY_DEBUG(chunk->length() == positions[idx + 1] - positions[idx])("length", chunk->length())(
+                                              "delta", positions[idx + 1] - positions[idx]);
+            AFL_VERIFY_DEBUG(chunk->length())("delta", positions[idx + 1] - positions[idx]);
+            if (positions[idx + 1] - currentPosition == length) {
+                if (++it != i->chunks().end()) {
+                    length = (*it)->length();
+                }
+                currentPosition = positions[idx + 1];
             }
+            slicedData[idx].emplace_back(chunk);
         }
     }
     std::vector<std::shared_ptr<arrow::RecordBatch>> result;
     ui32 count = 0;
     for (auto&& i : slicedData) {
+        AFL_VERIFY_DEBUG(i.size());
+        AFL_VERIFY_DEBUG(i.front()->length());
         result.emplace_back(arrow::RecordBatch::Make(t->schema(), i.front()->length(), i));
         count += result.back()->num_rows();
     }
-    AFL_VERIFY(count == t->num_rows())("count", count)("t", t->num_rows());
+    AFL_VERIFY(count == t->num_rows())("count", count)("t", t->num_rows())("sd_size", slicedData.size())("columns", t->num_columns())(
+                            "schema", t->schema()->ToString());
     return result;
 }