Browse Source

YT-13309: Add merge schemas
5471ff5bcd9f7c7a9325085249492d293a8df0bd

nadya02 7 months ago
parent
commit
607f5b8873

+ 1 - 4
yt/yt/client/complex_types/check_type_compatibility.cpp

@@ -352,10 +352,7 @@ TCompatibilityPair CheckDecimalTypeCompatibility(
     }
 }
 
-// Returns pair:
-//   1. Inner element that is neither optional nor tagged.
-//   2. How many times this element is wrapped into Optional type.
-static std::pair<TComplexTypeFieldDescriptor, int> UnwrapOptionalAndTagged(const TComplexTypeFieldDescriptor& descriptor)
+std::pair<TComplexTypeFieldDescriptor, int> UnwrapOptionalAndTagged(const TComplexTypeFieldDescriptor& descriptor)
 {
     int nesting = 0;
     auto current = descriptor;

+ 8 - 2
yt/yt/client/complex_types/check_type_compatibility.h

@@ -8,12 +8,18 @@ namespace NYT::NComplexTypes {
 
 ////////////////////////////////////////////////////////////////////////////////
 
+// Returns pair:
+//   1. Inner element that is neither optional nor tagged.
+//   2. How many times this element is wrapped into Optional type.
+std::pair<NTableClient::TComplexTypeFieldDescriptor, int> UnwrapOptionalAndTagged(
+    const NTableClient::TComplexTypeFieldDescriptor& descriptor);
+
 // Returned value is pair with elements
 //   1. Compatibility of types.
 //   2. If types are NOT FullyCompatible, error contains description of incompatibility.
 std::pair<NTableClient::ESchemaCompatibility, TError> CheckTypeCompatibility(
-    const NYT::NTableClient::TLogicalTypePtr& oldType,
-    const NYT::NTableClient::TLogicalTypePtr& newType);
+    const NTableClient::TLogicalTypePtr& oldType,
+    const NTableClient::TLogicalTypePtr& newType);
 
 ////////////////////////////////////////////////////////////////////////////////
 

+ 333 - 0
yt/yt/client/complex_types/merge_complex_types.cpp

@@ -0,0 +1,333 @@
+#include "merge_complex_types.h"
+
+#include "check_type_compatibility.h"
+
+#include <yt/yt/client/table_client/logical_type.h>
+
+namespace NYT::NComplexTypes {
+
+using namespace NYT::NTableClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+std::vector<TLogicalTypePtr> MergeTupleTypes(
+    const TComplexTypeFieldDescriptor& firstDescriptor,
+    const TComplexTypeFieldDescriptor& secondDescriptor)
+{
+    YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::Tuple
+        || firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::VariantTuple);
+
+    YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == secondDescriptor.GetType()->GetMetatype());
+
+    auto allowNewElements = firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::VariantTuple;
+
+    auto firstSize = std::ssize(firstDescriptor.GetType()->GetElements());
+    auto secondSize = std::ssize(secondDescriptor.GetType()->GetElements());
+
+    if (firstSize > secondSize) {
+        return MergeTupleTypes(
+            secondDescriptor,
+            firstDescriptor);
+    }
+
+    if (!allowNewElements && firstSize != secondSize) {
+        THROW_ERROR_EXCEPTION(
+            "Tuple type of fields %Qv and %Qv of different size cannot be merged",
+            firstDescriptor.GetDescription(),
+            secondDescriptor.GetDescription());
+    }
+
+    std::vector<TLogicalTypePtr> resultElements;
+    resultElements.reserve(secondSize);
+
+    int elementIndex = 0;
+    for (; elementIndex < firstSize; ++elementIndex) {
+        auto mergedType = MergeTypes(
+            firstDescriptor.Element(elementIndex).GetType(),
+            secondDescriptor.Element(elementIndex).GetType());
+
+        resultElements.push_back(std::move(mergedType));
+    }
+    for (; elementIndex < secondSize; ++elementIndex) {
+        resultElements.push_back(secondDescriptor.Element(elementIndex).GetType());
+    }
+    return resultElements;
+}
+
+std::vector<TStructField> MergeStructTypes(
+    const TComplexTypeFieldDescriptor& firstDescriptor,
+    const TComplexTypeFieldDescriptor& secondDescriptor)
+{
+    YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::Struct
+        || firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::VariantStruct);
+
+    YT_VERIFY(firstDescriptor.GetType()->GetMetatype() == secondDescriptor.GetType()->GetMetatype());
+
+    auto firstFields = firstDescriptor.GetType()->GetFields();
+    auto secondFields = secondDescriptor.GetType()->GetFields();
+
+    auto firstSize = std::ssize(firstFields);
+    auto secondSize = std::ssize(secondFields);
+
+    if (firstSize > secondSize) {
+        return MergeStructTypes(secondDescriptor, firstDescriptor);
+    }
+
+    auto makeNullability = firstDescriptor.GetType()->GetMetatype() == ELogicalMetatype::Struct;
+
+    std::vector<TStructField> resultFields;
+    resultFields.reserve(secondSize);
+
+    ssize_t fieldIndex = 0;
+    for (; fieldIndex < firstSize; ++fieldIndex) {
+        const auto& firstName = firstFields[fieldIndex].Name;
+        const auto& secondName = secondFields[fieldIndex].Name;
+        if (firstName != secondName) {
+            THROW_ERROR_EXCEPTION(
+                "Struct member name mismatch in %Qv",
+                firstDescriptor.GetDescription())
+                << TErrorAttribute("first_name", firstName)
+                << TErrorAttribute("second_name", secondName);
+        }
+        const auto& firstFieldDescriptor = firstDescriptor.Field(fieldIndex);
+        const auto& secondFieldDescriptor = secondDescriptor.Field(fieldIndex);
+        try {
+            auto mergedField = MergeTypes(
+                firstFieldDescriptor.GetType(),
+                secondFieldDescriptor.GetType());
+            resultFields.push_back(TStructField{
+                .Name = firstName,
+                .Type = std::move(mergedField),
+            });
+        } catch (const std::exception& ex) {
+            THROW_ERROR_EXCEPTION(
+                "Struct member type mismatch in %Qv",
+                firstDescriptor.GetDescription())
+                << ex;
+        }
+    }
+
+    for (; fieldIndex < secondSize; ++fieldIndex) {
+        const auto& secondFieldDescriptor = secondDescriptor.Field(fieldIndex);
+        if (!secondFieldDescriptor.GetType()->IsNullable() && makeNullability) {
+            resultFields.push_back(TStructField{
+                .Name = secondFields[fieldIndex].Name,
+                .Type = New<TOptionalLogicalType>(secondFieldDescriptor.GetType()),
+            });
+        } else {
+            resultFields.push_back(secondFields[fieldIndex]);
+        }
+    }
+
+    return resultFields;
+}
+
+TLogicalTypePtr MergeDictTypes(
+    const TComplexTypeFieldDescriptor& firstDescriptor,
+    const TComplexTypeFieldDescriptor& secondDescriptor)
+{
+    auto mergedKey = MergeTypes(
+        firstDescriptor.DictKey().GetType(),
+        secondDescriptor.DictKey().GetType());
+
+    auto mergedValue = MergeTypes(
+        firstDescriptor.DictValue().GetType(),
+        secondDescriptor.DictValue().GetType());
+
+    return New<TDictLogicalType>(std::move(mergedKey), std::move(mergedValue));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TLogicalTypePtr UnwrapOptionalType(const TLogicalTypePtr& type)
+{
+    if (type->GetMetatype() == ELogicalMetatype::Optional) {
+        auto descriptor = TComplexTypeFieldDescriptor(type);
+        return descriptor.OptionalElement().GetType();
+    }
+    return type;
+}
+
+TLogicalTypePtr UnwrapTaggedType(const TLogicalTypePtr& type)
+{
+    if (type->GetMetatype() == ELogicalMetatype::Tagged) {
+        auto descriptor = TComplexTypeFieldDescriptor(type);
+        return descriptor.TaggedElement().GetType();
+    }
+    return type;
+}
+
+TString GetTag(const TLogicalTypePtr& type)
+{
+    return type->AsTaggedTypeRef().GetTag();
+}
+
+TString ExtractTagFromOneOfTypes(
+    const TLogicalTypePtr& firstType,
+    const TLogicalTypePtr& secondType)
+{
+    if (firstType->GetMetatype() == ELogicalMetatype::Tagged) {
+        return GetTag(firstType);
+    } else if (secondType->GetMetatype() == ELogicalMetatype::Tagged) {
+        return GetTag(secondType);
+    }
+    YT_ABORT();
+}
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
+TLogicalTypePtr MergeTypes(
+    const TLogicalTypePtr& firstType,
+    const TLogicalTypePtr& secondType)
+{
+    auto firstDescriptor = TComplexTypeFieldDescriptor(firstType);
+    auto secondDescriptor = TComplexTypeFieldDescriptor(secondType);
+
+    const auto firstMetatype = firstDescriptor.GetType()->GetMetatype();
+    const auto secondMetatype = secondDescriptor.GetType()->GetMetatype();
+
+    // It needs to handle tag before optional.
+    if (firstMetatype == ELogicalMetatype::Tagged
+        || secondMetatype == ELogicalMetatype::Tagged)
+    {
+        if (firstMetatype == ELogicalMetatype::Tagged
+            && secondMetatype == ELogicalMetatype::Tagged
+            && GetTag(firstType) != GetTag(secondType))
+        {
+                THROW_ERROR_EXCEPTION(
+                    "The type tags do not match: first tag %Qv and second tag %Qv in %Qv",
+                    GetTag(firstType),
+                    GetTag(secondType),
+                    firstDescriptor.GetDescription());
+        }
+        auto mergedType = MergeTypes(
+            UnwrapTaggedType(firstType),
+            UnwrapTaggedType(secondType));
+
+        return New<TTaggedLogicalType>(
+            ExtractTagFromOneOfTypes(firstType, secondType),
+            std::move(mergedType));
+    }
+
+    if (firstMetatype == ELogicalMetatype::Optional
+        || secondMetatype == ELogicalMetatype::Optional)
+    {
+        int firstLayerCount = UnwrapOptionalAndTagged(firstDescriptor).second;
+        int secondLayerCount = UnwrapOptionalAndTagged(secondDescriptor).second;
+
+        if (firstLayerCount != secondLayerCount && (firstLayerCount > 1 || secondLayerCount > 1)) {
+            THROW_ERROR_EXCEPTION(
+                "Type of fields %Qv and %Qv cannot be merged",
+                firstDescriptor.GetDescription(),
+                secondDescriptor.GetDescription())
+                << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType()))
+                << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType()));
+        }
+
+        auto mergedType = MergeTypes(
+            UnwrapOptionalType(firstType),
+            UnwrapOptionalType(secondType));
+
+        return New<TOptionalLogicalType>(std::move(mergedType));
+    }
+
+    if (firstMetatype != secondMetatype) {
+        THROW_ERROR_EXCEPTION(
+            "Type of %Qv field cannot be merged: metatypes are incompatible",
+            firstDescriptor.GetDescription())
+            << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType()))
+            << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType()));
+    }
+
+    switch (firstMetatype) {
+        case ELogicalMetatype::Simple:
+        {
+            if (CheckTypeCompatibility(firstType, secondType).first == ESchemaCompatibility::FullyCompatible) {
+                return secondType;
+            }
+            if (CheckTypeCompatibility(secondType, firstType).first == ESchemaCompatibility::FullyCompatible) {
+                return firstType;
+            }
+            THROW_ERROR_EXCEPTION(
+                "Type of fields %Qv and %Qv cannot be merged",
+                firstDescriptor.GetDescription(),
+                secondDescriptor.GetDescription())
+                << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType()))
+                << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType()));
+
+        }
+        case ELogicalMetatype::List:
+        {
+            auto mergedType = MergeTypes(
+                firstType->AsListTypeRef().GetElement(),
+                secondType->AsListTypeRef().GetElement());
+
+            return New<TListLogicalType>(mergedType);
+        }
+        case ELogicalMetatype::VariantStruct:
+        {
+            auto mergedFields = MergeStructTypes(
+                firstDescriptor,
+                secondDescriptor);
+
+            return New<TVariantStructLogicalType>(mergedFields);
+        }
+
+        case ELogicalMetatype::Struct:
+        {
+            auto mergedFields = MergeStructTypes(
+                firstDescriptor,
+                secondDescriptor);
+
+            return New<TStructLogicalType>(mergedFields);
+        }
+
+        case ELogicalMetatype::Tuple:
+        {
+            auto mergedElements = MergeTupleTypes(
+                firstDescriptor,
+                secondDescriptor);
+
+            return New<TTupleLogicalType>(mergedElements);
+        }
+
+        case ELogicalMetatype::VariantTuple:
+        {
+            auto mergedElements = MergeTupleTypes(
+                firstDescriptor,
+                secondDescriptor);
+
+            return New<TVariantTupleLogicalType>(mergedElements);
+        }
+
+        case ELogicalMetatype::Dict:
+            return MergeDictTypes(firstDescriptor, secondDescriptor);
+
+        case ELogicalMetatype::Decimal:
+        {
+            if (*firstDescriptor.GetType() == *secondDescriptor.GetType()) {
+                return firstType;
+            } else {
+                THROW_ERROR_EXCEPTION(
+                    "Type of fields %Qv and %Qv cannot be merged",
+                    firstDescriptor.GetDescription(),
+                    secondDescriptor.GetDescription())
+                    << TErrorAttribute("first_type", ToString(*firstDescriptor.GetType()))
+                    << TErrorAttribute("second_type", ToString(*secondDescriptor.GetType()));
+            }
+        }
+
+        case ELogicalMetatype::Optional:
+        case ELogicalMetatype::Tagged:
+            // Optional and Tagged cases were checked earlier in this function.
+            YT_ABORT();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NComplexTypes

+ 15 - 0
yt/yt/client/complex_types/merge_complex_types.h

@@ -0,0 +1,15 @@
+#pragma once
+
+#include <yt/yt/client/table_client/public.h>
+
+namespace NYT::NComplexTypes {
+
+////////////////////////////////////////////////////////////////////////////////
+
+NTableClient::TLogicalTypePtr MergeTypes(
+    const NTableClient::TLogicalTypePtr& firstType,
+    const NTableClient::TLogicalTypePtr& secondType);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NComplexTypes

+ 132 - 0
yt/yt/client/table_client/merge_table_schemas.cpp

@@ -0,0 +1,132 @@
+#include "merge_table_schemas.h"
+
+#include "check_schema_compatibility.h"
+#include "comparator.h"
+#include "logical_type.h"
+#include "schema.h"
+
+#include <yt/yt/client/complex_types/check_type_compatibility.h>
+#include <yt/yt/client/complex_types/merge_complex_types.h>
+
+#include <yt/yt/core/ytree/convert.h>
+
+namespace NYT::NTableClient {
+
+using namespace NComplexTypes;
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+TColumnSchema MakeOptionalSchema(const TColumnSchema& columnSchema)
+{
+    if (columnSchema.LogicalType()->GetMetatype() == ELogicalMetatype::Optional) {
+        return columnSchema;
+    }
+    auto optionalType = New<TOptionalLogicalType>(columnSchema.LogicalType());
+    auto resultSchema = TColumnSchema(
+        columnSchema.Name(),
+        optionalType,
+        columnSchema.SortOrder());
+    resultSchema.SetStableName(columnSchema.StableName());
+    return resultSchema;
+}
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
+TTableSchemaPtr MergeTableSchemas(
+    const TTableSchemaPtr& firstSchema,
+    const TTableSchemaPtr& secondSchema)
+{
+    std::vector<TColumnSchema> resultColumns;
+    resultColumns.reserve(std::max(secondSchema->Columns().size(), firstSchema->Columns().size()));
+
+    for (const auto& secondSchemaColumn : secondSchema->Columns()) {
+        const auto* firstSchemaColumn = firstSchema->FindColumn(secondSchemaColumn.Name());
+
+        if (firstSchemaColumn) {
+            if (firstSchemaColumn->StableName() != secondSchemaColumn.StableName()) {
+                THROW_ERROR_EXCEPTION("Mismatching stable names in column %Qv: %Qv and %Qv",
+                    firstSchemaColumn->Name(),
+                    firstSchemaColumn->StableName(),
+                    secondSchemaColumn.StableName());
+            }
+            if (firstSchemaColumn->SortOrder() != secondSchemaColumn.SortOrder()) {
+                THROW_ERROR_EXCEPTION("Mismatching sort orders in column %Qv: %Qv and %Qv",
+                    firstSchemaColumn->Name());
+            }
+
+            try {
+                auto mergedType = MergeTypes(
+                    firstSchemaColumn->LogicalType(),
+                    secondSchemaColumn.LogicalType());
+
+                auto resultSchema = TColumnSchema(
+                    firstSchemaColumn->Name(),
+                    mergedType,
+                    firstSchemaColumn->SortOrder());
+
+                resultSchema.SetStableName(firstSchemaColumn->StableName());
+                resultColumns.push_back(std::move(resultSchema));
+
+            } catch(const std::exception& ex) {
+                THROW_ERROR_EXCEPTION(
+                    "Column %v first schema type is incompatible with second schema type",
+                    firstSchemaColumn->GetDiagnosticNameString())
+                    << ex;
+            }
+
+        } else if (!firstSchema->GetStrict()) {
+            THROW_ERROR_EXCEPTION("Column %v is present in second schema and is missing in non-strict first schema",
+                secondSchemaColumn.GetDiagnosticNameString());
+        } else {
+            resultColumns.push_back(MakeOptionalSchema(secondSchemaColumn));
+        }
+    }
+
+    for (const auto& firstSchemaColumn : firstSchema->Columns()) {
+        if (!secondSchema->FindColumn(firstSchemaColumn.Name())) {
+            if (!secondSchema->GetStrict()) {
+                THROW_ERROR_EXCEPTION("Column %v is present in first schema and is missing in non-strict second schema",
+                    firstSchemaColumn.GetDiagnosticNameString());
+            }
+            resultColumns.push_back(MakeOptionalSchema(firstSchemaColumn));
+        }
+    }
+
+    auto getDeletedColumnsStableNames = [] (const std::vector<TDeletedColumn>& deletedColumns) {
+        THashSet<TColumnStableName> stableNames;
+        for (const auto& column: deletedColumns) {
+            stableNames.insert(column.StableName());
+        }
+        return stableNames;
+    };
+
+    auto firstDeletedStableNames = getDeletedColumnsStableNames(firstSchema->DeletedColumns());
+    auto secondDeletedStableNames = getDeletedColumnsStableNames(secondSchema->DeletedColumns());
+
+    if (firstDeletedStableNames == secondDeletedStableNames) {
+        // If the deleted columns completely match, then the table can be teleported.
+        return {
+            New<TTableSchema>(
+                resultColumns,
+                /*strict*/ firstSchema->GetStrict() && secondSchema->GetStrict(),
+                firstSchema->GetUniqueKeys() && secondSchema->GetUniqueKeys(),
+                ETableSchemaModification::None,
+                firstSchema->DeletedColumns())
+        };
+    } else {
+        return {
+            New<TTableSchema>(
+                resultColumns,
+                /*strict*/ firstSchema->GetStrict() && secondSchema->GetStrict(),
+                firstSchema->GetUniqueKeys() && secondSchema->GetUniqueKeys())
+        };
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient

+ 16 - 0
yt/yt/client/table_client/merge_table_schemas.h

@@ -0,0 +1,16 @@
+#pragma once
+
+#include "public.h"
+
+namespace NYT::NTableClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Create schema that match both two schemas.
+TTableSchemaPtr MergeTableSchemas(
+    const TTableSchemaPtr& firstSchema,
+    const TTableSchemaPtr& secondSchema);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient

+ 4 - 2
yt/yt/client/ya.make

@@ -124,6 +124,7 @@ SRCS(
     table_client/schemaless_dynamic_table_writer.cpp
     table_client/serialize.cpp
     table_client/logical_type.cpp
+    table_client/merge_table_schemas.cpp
     table_client/name_table.cpp
     table_client/wire_protocol.cpp
     table_client/columnar_statistics.cpp
@@ -177,9 +178,10 @@ SRCS(
     complex_types/check_yson_token.cpp
     complex_types/check_type_compatibility.cpp
     complex_types/infinite_entity.cpp
-    complex_types/yson_format_conversion.cpp
-    complex_types/uuid_text.cpp
+    complex_types/merge_complex_types.cpp
     complex_types/time_text.cpp
+    complex_types/uuid_text.cpp
+    complex_types/yson_format_conversion.cpp
 
     zookeeper/packet.cpp
     zookeeper/protocol.cpp