#include "skiff.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace NYT { namespace NDetail { using namespace NRawClient; using ::ToString; //////////////////////////////////////////////////////////////////////////////// static NSkiff::TSkiffSchemaPtr ReadSkiffSchema(const TString& fileName) { if (!TFsPath(fileName).Exists()) { return nullptr; } TIFStream input(fileName); NSkiff::TSkiffSchemaPtr schema; Deserialize(schema, NodeFromYsonStream(&input)); return schema; } NSkiff::TSkiffSchemaPtr GetJobInputSkiffSchema() { return ReadSkiffSchema("skiff_input"); } NSkiff::EWireType ValueTypeToSkiffType(EValueType valueType) { using NSkiff::EWireType; switch (valueType) { case VT_INT64: case VT_INT32: case VT_INT16: case VT_INT8: return EWireType::Int64; case VT_UINT64: case VT_UINT32: case VT_UINT16: case VT_UINT8: return EWireType::Uint64; case VT_DOUBLE: case VT_FLOAT: return EWireType::Double; case VT_BOOLEAN: return EWireType::Boolean; case VT_STRING: case VT_UTF8: case VT_JSON: return EWireType::String32; case VT_ANY: return EWireType::Yson32; case VT_NULL: case VT_VOID: return EWireType::Nothing; case VT_DATE: case VT_DATETIME: case VT_TIMESTAMP: return EWireType::Uint64; case VT_INTERVAL: return EWireType::Int64; case VT_DATE32: case VT_DATETIME64: case VT_TIMESTAMP64: case VT_INTERVAL64: return EWireType::Int64; }; ythrow yexception() << "Cannot convert EValueType '" << valueType << "' to NSkiff::EWireType"; } NSkiff::TSkiffSchemaPtr CreateSkiffSchema( const TTableSchema& schema, const TCreateSkiffSchemaOptions& options) { using namespace NSkiff; Y_ENSURE(schema.Strict(), "Cannot create Skiff schema for non-strict table schema"); TVector skiffColumns; for (const auto& column: schema.Columns()) { TSkiffSchemaPtr skiffColumn; if (column.Deleted().Defined() && *column.Deleted()) { continue; } if (column.Type() == VT_ANY && *column.TypeV3() != *NTi::Optional(NTi::Yson())) { // We ignore all complex types until YT-12717 is done. return nullptr; } if (column.TypeV3()->IsDecimal() || column.TypeV3()->IsOptional() && column.TypeV3()->AsOptional()->GetItemType()->IsDecimal()) { // Complex logic for decimal types, ignore them for now. return nullptr; } if (column.Required() || NTi::IsSingular(column.TypeV3()->GetTypeName())) { skiffColumn = CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type())); } else { skiffColumn = CreateVariant8Schema({ CreateSimpleTypeSchema(EWireType::Nothing), CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()))}); } if (options.RenameColumns_) { auto maybeName = options.RenameColumns_->find(column.Name()); skiffColumn->SetName(maybeName == options.RenameColumns_->end() ? column.Name() : maybeName->second); } else { skiffColumn->SetName(column.Name()); } skiffColumns.push_back(skiffColumn); } if (options.HasKeySwitch_) { skiffColumns.push_back( CreateSimpleTypeSchema(EWireType::Boolean)->SetName("$key_switch")); } if (options.HasRangeIndex_) { skiffColumns.push_back( CreateVariant8Schema({ CreateSimpleTypeSchema(EWireType::Nothing), CreateSimpleTypeSchema(EWireType::Int64)}) ->SetName("$range_index")); } skiffColumns.push_back( CreateVariant8Schema({ CreateSimpleTypeSchema(EWireType::Nothing), CreateSimpleTypeSchema(EWireType::Int64)}) ->SetName("$row_index")); return CreateTupleSchema(std::move(skiffColumns)); } NSkiff::TSkiffSchemaPtr CreateSkiffSchema( const TNode& schemaNode, const TCreateSkiffSchemaOptions& options) { TTableSchema schema; Deserialize(schema, schemaNode); return CreateSkiffSchema(schema, options); } void Serialize(const NSkiff::TSkiffSchemaPtr& schema, NYson::IYsonConsumer* consumer) { consumer->OnBeginMap(); if (schema->GetName().size() > 0) { consumer->OnKeyedItem("name"); consumer->OnStringScalar(schema->GetName()); } consumer->OnKeyedItem("wire_type"); consumer->OnStringScalar(ToString(schema->GetWireType())); if (schema->GetChildren().size() > 0) { consumer->OnKeyedItem("children"); consumer->OnBeginList(); for (const auto& child : schema->GetChildren()) { consumer->OnListItem(); Serialize(child, consumer); } consumer->OnEndList(); } consumer->OnEndMap(); } void Deserialize(NSkiff::TSkiffSchemaPtr& schema, const TNode& node) { using namespace NSkiff; static auto createSchema = [](EWireType wireType, TVector&& children) -> TSkiffSchemaPtr { switch (wireType) { case EWireType::Tuple: return CreateTupleSchema(std::move(children)); case EWireType::Variant8: return CreateVariant8Schema(std::move(children)); case EWireType::Variant16: return CreateVariant16Schema(std::move(children)); case EWireType::RepeatedVariant8: return CreateRepeatedVariant8Schema(std::move(children)); case EWireType::RepeatedVariant16: return CreateRepeatedVariant16Schema(std::move(children)); default: return CreateSimpleTypeSchema(wireType); } }; const auto& map = node.AsMap(); const auto* wireTypePtr = map.FindPtr("wire_type"); Y_ENSURE(wireTypePtr, "'wire_type' is a required key"); auto wireType = FromString(wireTypePtr->AsString()); const auto* childrenPtr = map.FindPtr("children"); Y_ENSURE(NSkiff::IsSimpleType(wireType) || childrenPtr, "'children' key is required for complex node '" << wireType << "'"); TVector children; if (childrenPtr) { for (const auto& childNode : childrenPtr->AsList()) { TSkiffSchemaPtr childSchema; Deserialize(childSchema, childNode); children.push_back(std::move(childSchema)); } } schema = createSchema(wireType, std::move(children)); const auto* namePtr = map.FindPtr("name"); if (namePtr) { schema->SetName(namePtr->AsString()); } } TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) { Y_ENSURE(schema->GetWireType() == NSkiff::EWireType::Variant16, "Bad wire type for schema; expected 'variant16', got " << schema->GetWireType()); THashMap< NSkiff::TSkiffSchemaPtr, size_t, NSkiff::TSkiffSchemaPtrHasher, NSkiff::TSkiffSchemaPtrEqual> schemasMap; size_t tableIndex = 0; auto config = TNode("skiff"); config.Attributes()["table_skiff_schemas"] = TNode::CreateList(); for (const auto& schemaChild : schema->GetChildren()) { auto [iter, inserted] = schemasMap.emplace(schemaChild, tableIndex); size_t currentIndex; if (inserted) { currentIndex = tableIndex; ++tableIndex; } else { currentIndex = iter->second; } config.Attributes()["table_skiff_schemas"].Add("$" + ToString(currentIndex)); } config.Attributes()["skiff_schema_registry"] = TNode::CreateMap(); for (const auto& [tableSchema, index] : schemasMap) { TNode node; TNodeBuilder nodeBuilder(&node); Serialize(tableSchema, &nodeBuilder); config.Attributes()["skiff_schema_registry"][ToString(index)] = std::move(node); } return TFormat(config); } NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( const TClientContext& context, const IClientRetryPolicyPtr& clientRetryPolicy, const TTransactionId& transactionId, ENodeReaderFormat nodeReaderFormat, const TVector& tablePaths, const TCreateSkiffSchemaOptions& options) { if (nodeReaderFormat == ENodeReaderFormat::Yson) { return nullptr; } for (const auto& path : tablePaths) { if (path.Columns_) { switch (nodeReaderFormat) { case ENodeReaderFormat::Skiff: ythrow TApiUsageError() << "Cannot use Skiff format with column selectors"; case ENodeReaderFormat::Auto: return nullptr; default: Y_ABORT("Unexpected node reader format: %d", static_cast(nodeReaderFormat)); } } } auto nodes = NRawClient::BatchTransform( clientRetryPolicy->CreatePolicyForGenericRequest(), context, NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths), [&] (TRawBatchRequest& batch, const TRichYPath& path) { auto getOptions = TGetOptions() .AttributeFilter( TAttributeFilter() .AddAttribute("schema") .AddAttribute("dynamic") .AddAttribute("type") ); return batch.Get(transactionId, path.Path_, getOptions); }); TVector schemas; for (size_t tableIndex = 0; tableIndex < nodes.size(); ++tableIndex) { const auto& tablePath = tablePaths[tableIndex].Path_; const auto& attributes = nodes[tableIndex].GetAttributes(); Y_ENSURE_EX(attributes["type"] == TNode("table"), TApiUsageError() << "Operation input path " << tablePath << " is not a table"); bool dynamic = attributes["dynamic"].AsBool(); bool strict = attributes["schema"].GetAttributes()["strict"].AsBool(); switch (nodeReaderFormat) { case ENodeReaderFormat::Skiff: Y_ENSURE_EX(strict, TApiUsageError() << "Cannot use skiff format for table with non-strict schema '" << tablePath << "'"); Y_ENSURE_EX(!dynamic, TApiUsageError() << "Cannot use skiff format for dynamic table '" << tablePath << "'"); break; case ENodeReaderFormat::Auto: if (dynamic || !strict) { YT_LOG_DEBUG("Cannot use skiff format for table '%v' as it is dynamic or has non-strict schema", tablePath); return nullptr; } break; default: Y_ABORT("Unexpected node reader format: %d", static_cast(nodeReaderFormat)); } NSkiff::TSkiffSchemaPtr curSkiffSchema; if (tablePaths[tableIndex].RenameColumns_) { auto customOptions = options; customOptions.RenameColumns(*tablePaths[tableIndex].RenameColumns_); curSkiffSchema = CreateSkiffSchema(attributes["schema"], customOptions); } else { curSkiffSchema = CreateSkiffSchema(attributes["schema"], options); } if (!curSkiffSchema) { return nullptr; } schemas.push_back(curSkiffSchema); } return NSkiff::CreateVariant16Schema(std::move(schemas)); } //////////////////////////////////////////////////////////////////////////////// NSkiff::TSkiffSchemaPtr CreateSkiffSchema( const TVector& tableSchemas, const TCreateSkiffSchemaOptions& options ) { constexpr auto KEY_SWITCH_COLUMN = "$key_switch"; constexpr auto ROW_INDEX_COLUMN = "$row_index"; constexpr auto RANGE_INDEX_COLUMN = "$range_index"; TVector schemas; schemas.reserve(tableSchemas.size()); for (const auto& tableSchema : tableSchemas) { Y_ENSURE(tableSchema->GetWireType() == NSkiff::EWireType::Tuple, "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'"); const auto& children = tableSchema->GetChildren(); NSkiff::TSkiffSchemaList columns; columns.reserve(children.size() + 3); if (options.HasKeySwitch_) { columns.push_back( CreateSimpleTypeSchema(NSkiff::EWireType::Boolean)->SetName(KEY_SWITCH_COLUMN)); } columns.push_back( NSkiff::CreateVariant8Schema({ CreateSimpleTypeSchema(NSkiff::EWireType::Nothing), CreateSimpleTypeSchema(NSkiff::EWireType::Int64)}) ->SetName(ROW_INDEX_COLUMN)); if (options.HasRangeIndex_) { columns.push_back( NSkiff::CreateVariant8Schema({ CreateSimpleTypeSchema(NSkiff::EWireType::Nothing), CreateSimpleTypeSchema(NSkiff::EWireType::Int64)}) ->SetName(RANGE_INDEX_COLUMN)); } columns.insert(columns.end(), children.begin(), children.end()); schemas.push_back(NSkiff::CreateTupleSchema(columns)); } return NSkiff::CreateVariant16Schema(schemas); } //////////////////////////////////////////////////////////////////////////////// } // namespace NDetail } // namespace NYT