Просмотр исходного кода

Utf8 possibility for JsonDocument type providing

ivanmorozov 2 лет назад
Родитель
Сommit
269126dcce

+ 23 - 12
ydb/core/formats/arrow_helpers.cpp

@@ -15,6 +15,7 @@
 #include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
 #include <contrib/libs/apache/arrow/cpp/src/arrow/type_traits.h>
 #include <library/cpp/containers/stack_vector/stack_vec.h>
+#include <library/cpp/actors/core/log.h>
 #include <memory>
 
 #define Y_VERIFY_OK(status) Y_VERIFY(status.ok(), "%s", status.ToString().c_str())
@@ -1079,14 +1080,16 @@ static bool ConvertData(TCell& cell, const NScheme::TTypeInfo& colType, TMemoryP
     return true;
 }
 
-static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::Array>& column,
+static bool ConvertColumn(std::shared_ptr<arrow::Array>& column, std::shared_ptr<arrow::Field>& field,
                                                    NScheme::TTypeInfo colType) {
     if (colType.GetTypeId() == NScheme::NTypeIds::Decimal) {
-        return {};
+        return false;
     }
 
-    if (column->type()->id() != arrow::Type::BINARY) {
-        return {};
+    if (colType.GetTypeId() == NScheme::NTypeIds::JsonDocument && (column->type()->id() == arrow::Type::BINARY || column->type()->id() == arrow::Type::STRING)) {
+
+    } else if (column->type()->id() != arrow::Type::BINARY) {
+        return false;
     }
 
     auto& binaryArray = static_cast<arrow::BinaryArray&>(*column);
@@ -1100,7 +1103,7 @@ static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::
                 auto value = binaryArray.Value(i);
                 const auto dyNumber = NDyNumber::ParseDyNumberString(TStringBuf(value.data(), value.size()));
                 if (!dyNumber.Defined() || !builder.Append((*dyNumber).data(), (*dyNumber).size()).ok()) {
-                    return {};
+                    return false;
                 }
             }
         }
@@ -1109,7 +1112,8 @@ static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::
                 auto value = binaryArray.Value(i);
                 const auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(value.data(), value.size()));
                 if (!binaryJson.Defined() || !builder.Append(binaryJson->Data(), binaryJson->Size()).ok()) {
-                    return {};
+                    ALS_ERROR(0) << "NOT PARSED JSON: " << TStringBuf(value.data(), value.size());
+                    return false;
                 }
             }
         }
@@ -1119,26 +1123,33 @@ static std::shared_ptr<arrow::Array> ConvertColumn(const std::shared_ptr<arrow::
 
     std::shared_ptr<arrow::BinaryArray> result;
     if (!builder.Finish(&result).ok()) {
-        return {};
+        return false;
+    }
+
+    column = result;
+    if (colType.GetTypeId() == NScheme::NTypeIds::JsonDocument && field->type()->id() == arrow::Type::STRING) {
+        field = std::make_shared<arrow::Field>(field->name(), std::make_shared<arrow::BinaryType>());
     }
-    return result;
+
+    return true;
 }
 
 std::shared_ptr<arrow::RecordBatch> ConvertColumns(const std::shared_ptr<arrow::RecordBatch>& batch,
                                                    const THashMap<TString, NScheme::TTypeInfo>& columnsToConvert)
 {
     std::vector<std::shared_ptr<arrow::Array>> columns = batch->columns();
+    std::vector<std::shared_ptr<arrow::Field>> fields = batch->schema()->fields();
+    Y_VERIFY(columns.size() == fields.size());
     for (i32 i = 0; i < batch->num_columns(); ++i) {
         auto& colName = batch->column_name(i);
         auto it = columnsToConvert.find(TString(colName.data(), colName.size()));
         if (it != columnsToConvert.end()) {
-            columns[i] = ConvertColumn(columns[i], it->second);
-            if (!columns[i]) {
+            if (!ConvertColumn(columns[i], fields[i], it->second)) {
                 return {};
             }
         }
     }
-    return arrow::RecordBatch::Make(batch->schema(), batch->num_rows(), columns);
+    return arrow::RecordBatch::Make(std::make_shared<arrow::Schema>(fields), batch->num_rows(), columns);
 }
 
 static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr<arrow::Array>& column,
@@ -1154,7 +1165,7 @@ static std::shared_ptr<arrow::Array> InplaceConvertColumn(const std::shared_ptr<
             Y_VERIFY(arrow::bit_width(column->type()->id()) == 32);
             return std::make_shared<arrow::Date32Array>(column->data());
         }
-        default: 
+        default:
             return {};
     }
 }

+ 10 - 0
ydb/core/formats/arrow_helpers.h

@@ -200,6 +200,16 @@ public:
         return false;
     }
 
+    static bool NeedConversion(const NScheme::TTypeInfo& typeInRequest, const NScheme::TTypeInfo& expectedType) {
+        switch (expectedType.GetTypeId()) {
+            case NScheme::NTypeIds::JsonDocument:
+                return typeInRequest.GetTypeId() == NScheme::NTypeIds::Utf8;
+            default:
+                break;
+        }
+        return false;
+    }
+
     TArrowToYdbConverter(const TVector<std::pair<TString, NScheme::TTypeInfo>>& ydbSchema, IRowWriter& rowWriter)
         : YdbSchema(ydbSchema)
         , RowWriter(rowWriter)

+ 6 - 3
ydb/core/tx/tx_proxy/upload_rows_common_impl.h

@@ -280,14 +280,17 @@ private:
     static bool SameDstType(NScheme::TTypeInfo type1, NScheme::TTypeInfo type2, bool allowConvert) {
         bool res = (type1 == type2);
         if (!res && allowConvert) {
-            res = NArrow::GetArrowType(type1)->id() == NArrow::GetArrowType(type2)->id();
+            res = (NArrow::GetArrowType(type1)->id() == NArrow::GetArrowType(type2)->id());
         }
         return res;
     }
 
     static bool SameOrConvertableDstType(NScheme::TTypeInfo type1, NScheme::TTypeInfo type2, bool allowConvert) {
-        bool ok = SameDstType(type1, type2, allowConvert);
-        return ok || NArrow::TArrowToYdbConverter::NeedInplaceConversion(type1, type2);
+        bool ok = SameDstType(type1, type2, allowConvert) || NArrow::TArrowToYdbConverter::NeedInplaceConversion(type1, type2);
+        if (!ok && allowConvert) {
+            ok = NArrow::TArrowToYdbConverter::NeedConversion(type1, type2);
+        }
+        return ok;
     }
 
     bool BuildSchema(const NActors::TActorContext& ctx, TString& errorMessage, bool makeYqbSchema) {