Browse Source

KIKIMR-19093: validations and debug info

ivanmorozov 1 year ago
parent
commit
3a393c8b2b

+ 56 - 1
ydb/core/formats/arrow/arrow_helpers.cpp

@@ -474,7 +474,7 @@ std::vector<std::unique_ptr<arrow::ArrayBuilder>> MakeBuilders(const std::shared
         if (sizeByColumn.size()) {
             auto it = sizeByColumn.find(field->name());
             if (it != sizeByColumn.end()) {
-                Y_VERIFY(NArrow::ReserveData(*builder, it->second));
+                AFL_VERIFY(NArrow::ReserveData(*builder, it->second))("size", it->second)("field", field->name());
             }
         }
 
@@ -767,6 +767,61 @@ std::shared_ptr<arrow::RecordBatch> BuildSingleRecordBatch(const std::shared_ptr
     return arrow::RecordBatch::Make(schema, 1, arrays);
 }
 
+NJson::TJsonValue DebugJson(std::shared_ptr<arrow::RecordBatch> array, const ui32 position) {
+    NJson::TJsonValue result = NJson::JSON_ARRAY;
+    for (auto&& i : array->columns()) {
+        result.AppendValue(DebugJson(i, position));
+    }
+    return result;
+}
+
+TString DebugString(std::shared_ptr<arrow::Array> array, const ui32 position) {
+    if (!array) {
+        return "_NO_DATA";
+    }
+    Y_VERIFY(position < array->length());
+    TStringBuilder result;
+    SwitchType(array->type_id(), [&](const auto& type) {
+        using TWrap = std::decay_t<decltype(type)>;
+        using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
+
+        auto& column = static_cast<const TArray&>(*array);
+        if constexpr (arrow::has_string_view<typename TWrap::T>()) {
+            auto value = column.GetString(position);
+            result << TString(value.data(), value.size());
+        }
+        if constexpr (arrow::has_c_type<typename TWrap::T>()) {
+            result << column.Value(position);
+        }
+        return true;
+    });
+    return result;
+}
+
+NJson::TJsonValue DebugJson(std::shared_ptr<arrow::Array> array, const ui32 position) {
+    if (!array) {
+        return NJson::JSON_NULL;
+    }
+    Y_VERIFY(position < array->length());
+    NJson::TJsonValue result = NJson::JSON_MAP;
+    SwitchType(array->type_id(), [&](const auto& type) {
+        using TWrap = std::decay_t<decltype(type)>;
+        using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
+
+        auto& column = static_cast<const TArray&>(*array);
+        result.InsertValue("type", typeid(TArray).name());
+        if constexpr (arrow::has_string_view<typename TWrap::T>()) {
+            auto value = column.GetString(position);
+            result.InsertValue("value", TString(value.data(), value.size()));
+        }
+        if constexpr (arrow::has_c_type<typename TWrap::T>()) {
+            result.InsertValue("value", column.Value(position));
+        }
+        return true;
+    });
+    return result;
+}
+
 NJson::TJsonValue DebugJson(std::shared_ptr<arrow::Array> array, const ui32 head, const ui32 tail) {
     if (!array) {
         return NJson::JSON_NULL;

+ 6 - 2
ydb/core/formats/arrow/arrow_helpers.h

@@ -125,7 +125,11 @@ inline bool HasNulls(const std::shared_ptr<arrow::Array>& column) {
 bool ArrayScalarsEqual(const std::shared_ptr<arrow::Array>& lhs, const std::shared_ptr<arrow::Array>& rhs);
 std::shared_ptr<arrow::Array> BoolVecToArray(const std::vector<bool>& vec);
 
-NJson::TJsonValue DebugJson(std::shared_ptr<arrow::Array> array, const ui32 head = 5, const ui32 tail = 5);
-NJson::TJsonValue DebugJson(std::shared_ptr<arrow::RecordBatch> batch, const ui32 head = 5, const ui32 tail = 5);
+NJson::TJsonValue DebugJson(std::shared_ptr<arrow::Array> array, const ui32 head, const ui32 tail);
+NJson::TJsonValue DebugJson(std::shared_ptr<arrow::RecordBatch> batch, const ui32 head, const ui32 tail);
+
+NJson::TJsonValue DebugJson(std::shared_ptr<arrow::Array> array, const ui32 position);
+TString DebugString(std::shared_ptr<arrow::Array> array, const ui32 position);
+NJson::TJsonValue DebugJson(std::shared_ptr<arrow::RecordBatch> array, const ui32 position);
 
 }

+ 73 - 1
ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp

@@ -1,4 +1,5 @@
 #include "read_filter_merger.h"
+#include <library/cpp/actors/core/log.h>
 
 namespace NKikimr::NOlap::NIndexedReader {
 
@@ -26,11 +27,82 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const {
     for (ui32 i = 0; i < Columns.size(); ++i) {
         auto& jsonColumn = result["columns"].AppendValue(NJson::JSON_MAP);
         jsonColumn["name"] = Fields[i]->name();
-        jsonColumn["info"] = NArrow::DebugJson(Columns[i]);
+        jsonColumn["value"] = NArrow::DebugString(Columns[i], Position);
     }
     return result;
 }
 
+std::optional<ui64> TSortableBatchPosition::FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool greater, const bool include) {
+    if (!batch || !batch->num_rows()) {
+        return {};
+    }
+
+    const auto checkEqualBorder = [batch, greater, include](const i64 position) ->std::optional<i64> {
+        if (include) {
+            return position;
+        } else if (greater) {
+            if (batch->num_rows() > position + 1) {
+                return position + 1;
+            } else {
+                return {};
+            }
+        } else {
+            if (position) {
+                return position - 1;
+            } else {
+                return {};
+            }
+        }
+    };
+
+    i64 posStart = 0;
+    i64 posFinish = batch->num_rows() - 1;
+    TSortableBatchPosition position = forFound.BuildSame(batch, posStart);
+    {
+        position.InitPosition(posStart);
+        auto cmp = position.Compare(forFound);
+        if (cmp == std::partial_ordering::greater) {
+            if (greater) {
+                return posStart;
+            } else {
+                return {};
+            }
+        } else if (cmp == std::partial_ordering::equivalent) {
+            return checkEqualBorder(posStart);
+        }
+    }
+    {
+        position.InitPosition(posFinish);
+        auto cmp = position.Compare(forFound);
+        if (cmp == std::partial_ordering::less) {
+            if (greater) {
+                return {};
+            } else {
+                return posFinish;
+            }
+        } else if (cmp == std::partial_ordering::equivalent) {
+            return checkEqualBorder(posFinish);
+        }
+    }
+    while (posFinish > posStart + 1) {
+        Y_VERIFY(position.InitPosition(0.5 * (posStart + posFinish)));
+        const auto comparision = position.Compare(forFound);
+        if (comparision == std::partial_ordering::less) {
+            posStart = position.Position;
+        } else if (comparision == std::partial_ordering::greater) {
+            posFinish = position.Position;
+        } else {
+            return checkEqualBorder(position.Position);
+        }
+    }
+    Y_VERIFY(posFinish != posStart);
+    if (greater) {
+        return posFinish;
+    } else {
+        return posStart;
+    }
+}
+
 void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) {
     Y_VERIFY(point);
     Y_VERIFY(point->IsSameSchema(SortSchema));

+ 23 - 0
ydb/core/tx/columnshard/engines/reader/read_filter_merger.h

@@ -17,6 +17,8 @@ protected:
     std::vector<std::shared_ptr<arrow::Array>> Columns;
     std::vector<std::shared_ptr<arrow::Field>> Fields;
     std::shared_ptr<arrow::RecordBatch> Batch;
+    static std::optional<ui64> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater, const bool include);
+
 public:
     TSortableBatchPosition() = default;
 
@@ -24,6 +26,27 @@ public:
 
     bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const;
 
+    TSortableBatchPosition BuildSame(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const {
+        std::vector<std::string> fieldNames;
+        for (auto&& i : Fields) {
+            fieldNames.emplace_back(i->name());
+        }
+        return TSortableBatchPosition(batch, position, fieldNames, ReverseSort);
+    }
+
+    static std::shared_ptr<arrow::RecordBatch> SelectInterval(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& from, const TSortableBatchPosition& to, const bool includeFrom, const bool includeTo) {
+        if (!batch) {
+            return nullptr;
+        }
+        Y_VERIFY(from.Compare(to) != std::partial_ordering::greater);
+        const std::optional<ui32> idxFrom = FindPosition(batch, from, true, includeFrom);
+        const std::optional<ui32> idxTo = FindPosition(batch, to, false, includeTo);
+        if (!idxFrom || !idxTo || *idxTo < *idxFrom) {
+            return nullptr;
+        }
+        return batch->Slice(*idxFrom, *idxTo - *idxFrom + 1);
+    }
+
     TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& columns, const bool reverseSort)
         : Position(position)
         , RecordsCount(batch->num_rows())