123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- #include "skiff.h"
- #include <yt/cpp/mapreduce/common/retry_lib.h>
- #include <yt/cpp/mapreduce/http/retry_request.h>
- #include <yt/cpp/mapreduce/http/requests.h>
- #include <yt/cpp/mapreduce/interface/config.h>
- #include <yt/cpp/mapreduce/interface/common.h>
- #include <yt/cpp/mapreduce/interface/serialize.h>
- #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
- #include <library/cpp/yson/node/node_builder.h>
- #include <library/cpp/yson/node/node_io.h>
- #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
- #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
- #include <yt/cpp/mapreduce/skiff/skiff_schema.h>
- #include <library/cpp/yson/consumer.h>
- #include <library/cpp/yson/writer.h>
- #include <util/string/cast.h>
- #include <util/stream/str.h>
- #include <util/stream/file.h>
- #include <util/folder/path.h>
- namespace NYT {
- namespace NDetail {
- using namespace NRawClient;
- using ::ToString;
- ////////////////////////////////////////////////////////////////////////////////
- static NSkiff::TSkiffSchemaPtr ReadSkiffSchema(const TString& fileName)
- {
- if (!TFsPath(fileName).Exists()) {
- return nullptr;
- }
- TIFStream input(fileName);
- NSkiff::TSkiffSchemaPtr schema;
- Deserialize(schema, NodeFromYsonStream(&input));
- return schema;
- }
- NSkiff::TSkiffSchemaPtr GetJobInputSkiffSchema()
- {
- return ReadSkiffSchema("skiff_input");
- }
- NSkiff::EWireType ValueTypeToSkiffType(EValueType valueType)
- {
- using NSkiff::EWireType;
- switch (valueType) {
- case VT_INT64:
- case VT_INT32:
- case VT_INT16:
- case VT_INT8:
- return EWireType::Int64;
- case VT_UINT64:
- case VT_UINT32:
- case VT_UINT16:
- case VT_UINT8:
- return EWireType::Uint64;
- case VT_DOUBLE:
- case VT_FLOAT:
- return EWireType::Double;
- case VT_BOOLEAN:
- return EWireType::Boolean;
- case VT_STRING:
- case VT_UTF8:
- case VT_JSON:
- return EWireType::String32;
- case VT_ANY:
- return EWireType::Yson32;
- case VT_NULL:
- case VT_VOID:
- return EWireType::Nothing;
- case VT_DATE:
- case VT_DATETIME:
- case VT_TIMESTAMP:
- return EWireType::Uint64;
- case VT_INTERVAL:
- return EWireType::Int64;
- };
- ythrow yexception() << "Cannot convert EValueType '" << valueType << "' to NSkiff::EWireType";
- }
- NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
- const TTableSchema& schema,
- const TCreateSkiffSchemaOptions& options)
- {
- using namespace NSkiff;
- Y_ENSURE(schema.Strict(), "Cannot create Skiff schema for non-strict table schema");
- TVector<TSkiffSchemaPtr> skiffColumns;
- for (const auto& column: schema.Columns()) {
- TSkiffSchemaPtr skiffColumn;
- if (column.Deleted().Defined() && *column.Deleted()) {
- continue;
- }
- if (column.Type() == VT_ANY && *column.TypeV3() != *NTi::Optional(NTi::Yson())) {
- // We ignore all complex types until YT-12717 is done.
- return nullptr;
- }
- if (column.TypeV3()->IsDecimal() ||
- column.TypeV3()->IsOptional() && column.TypeV3()->AsOptional()->GetItemType()->IsDecimal())
- {
- // Complex logic for decimal types, ignore them for now.
- return nullptr;
- }
- if (column.Required() || NTi::IsSingular(column.TypeV3()->GetTypeName())) {
- skiffColumn = CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()));
- } else {
- skiffColumn = CreateVariant8Schema({
- CreateSimpleTypeSchema(EWireType::Nothing),
- CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()))});
- }
- if (options.RenameColumns_) {
- auto maybeName = options.RenameColumns_->find(column.Name());
- skiffColumn->SetName(maybeName == options.RenameColumns_->end() ? column.Name() : maybeName->second);
- } else {
- skiffColumn->SetName(column.Name());
- }
- skiffColumns.push_back(skiffColumn);
- }
- if (options.HasKeySwitch_) {
- skiffColumns.push_back(
- CreateSimpleTypeSchema(EWireType::Boolean)->SetName("$key_switch"));
- }
- if (options.HasRangeIndex_) {
- skiffColumns.push_back(
- CreateVariant8Schema({
- CreateSimpleTypeSchema(EWireType::Nothing),
- CreateSimpleTypeSchema(EWireType::Int64)})
- ->SetName("$range_index"));
- }
- skiffColumns.push_back(
- CreateVariant8Schema({
- CreateSimpleTypeSchema(EWireType::Nothing),
- CreateSimpleTypeSchema(EWireType::Int64)})
- ->SetName("$row_index"));
- return CreateTupleSchema(std::move(skiffColumns));
- }
- NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
- const TNode& schemaNode,
- const TCreateSkiffSchemaOptions& options)
- {
- TTableSchema schema;
- Deserialize(schema, schemaNode);
- return CreateSkiffSchema(schema, options);
- }
- void Serialize(const NSkiff::TSkiffSchemaPtr& schema, NYson::IYsonConsumer* consumer)
- {
- consumer->OnBeginMap();
- if (schema->GetName().size() > 0) {
- consumer->OnKeyedItem("name");
- consumer->OnStringScalar(schema->GetName());
- }
- consumer->OnKeyedItem("wire_type");
- consumer->OnStringScalar(ToString(schema->GetWireType()));
- if (schema->GetChildren().size() > 0) {
- consumer->OnKeyedItem("children");
- consumer->OnBeginList();
- for (const auto& child : schema->GetChildren()) {
- consumer->OnListItem();
- Serialize(child, consumer);
- }
- consumer->OnEndList();
- }
- consumer->OnEndMap();
- }
- void Deserialize(NSkiff::TSkiffSchemaPtr& schema, const TNode& node)
- {
- using namespace NSkiff;
- static auto createSchema = [](EWireType wireType, TVector<TSkiffSchemaPtr>&& children) -> TSkiffSchemaPtr {
- switch (wireType) {
- case EWireType::Tuple:
- return CreateTupleSchema(std::move(children));
- case EWireType::Variant8:
- return CreateVariant8Schema(std::move(children));
- case EWireType::Variant16:
- return CreateVariant16Schema(std::move(children));
- case EWireType::RepeatedVariant8:
- return CreateRepeatedVariant8Schema(std::move(children));
- case EWireType::RepeatedVariant16:
- return CreateRepeatedVariant16Schema(std::move(children));
- default:
- return CreateSimpleTypeSchema(wireType);
- }
- };
- const auto& map = node.AsMap();
- const auto* wireTypePtr = map.FindPtr("wire_type");
- Y_ENSURE(wireTypePtr, "'wire_type' is a required key");
- auto wireType = FromString<NSkiff::EWireType>(wireTypePtr->AsString());
- const auto* childrenPtr = map.FindPtr("children");
- Y_ENSURE(NSkiff::IsSimpleType(wireType) || childrenPtr,
- "'children' key is required for complex node '" << wireType << "'");
- TVector<TSkiffSchemaPtr> children;
- if (childrenPtr) {
- for (const auto& childNode : childrenPtr->AsList()) {
- TSkiffSchemaPtr childSchema;
- Deserialize(childSchema, childNode);
- children.push_back(std::move(childSchema));
- }
- }
- schema = createSchema(wireType, std::move(children));
- const auto* namePtr = map.FindPtr("name");
- if (namePtr) {
- schema->SetName(namePtr->AsString());
- }
- }
- TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {
- Y_ENSURE(schema->GetWireType() == NSkiff::EWireType::Variant16,
- "Bad wire type for schema; expected 'variant16', got " << schema->GetWireType());
- THashMap<
- NSkiff::TSkiffSchemaPtr,
- size_t,
- NSkiff::TSkiffSchemaPtrHasher,
- NSkiff::TSkiffSchemaPtrEqual> schemasMap;
- size_t tableIndex = 0;
- auto config = TNode("skiff");
- config.Attributes()["table_skiff_schemas"] = TNode::CreateList();
- for (const auto& schemaChild : schema->GetChildren()) {
- auto [iter, inserted] = schemasMap.emplace(schemaChild, tableIndex);
- size_t currentIndex;
- if (inserted) {
- currentIndex = tableIndex;
- ++tableIndex;
- } else {
- currentIndex = iter->second;
- }
- config.Attributes()["table_skiff_schemas"].Add("$" + ToString(currentIndex));
- }
- config.Attributes()["skiff_schema_registry"] = TNode::CreateMap();
- for (const auto& [tableSchema, index] : schemasMap) {
- TNode node;
- TNodeBuilder nodeBuilder(&node);
- Serialize(tableSchema, &nodeBuilder);
- config.Attributes()["skiff_schema_registry"][ToString(index)] = std::move(node);
- }
- return TFormat(config);
- }
- NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
- const TClientContext& context,
- const IClientRetryPolicyPtr& clientRetryPolicy,
- const TTransactionId& transactionId,
- ENodeReaderFormat nodeReaderFormat,
- const TVector<TRichYPath>& tablePaths,
- const TCreateSkiffSchemaOptions& options)
- {
- if (nodeReaderFormat == ENodeReaderFormat::Yson) {
- return nullptr;
- }
- for (const auto& path : tablePaths) {
- if (path.Columns_) {
- switch (nodeReaderFormat) {
- case ENodeReaderFormat::Skiff:
- ythrow TApiUsageError() << "Cannot use Skiff format with column selectors";
- case ENodeReaderFormat::Auto:
- return nullptr;
- default:
- Y_ABORT("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat));
- }
- }
- }
- auto nodes = NRawClient::BatchTransform(
- clientRetryPolicy->CreatePolicyForGenericRequest(),
- context,
- NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
- [&] (TRawBatchRequest& batch, const TRichYPath& path) {
- auto getOptions = TGetOptions()
- .AttributeFilter(
- TAttributeFilter()
- .AddAttribute("schema")
- .AddAttribute("dynamic")
- .AddAttribute("type")
- );
- return batch.Get(transactionId, path.Path_, getOptions);
- });
- TVector<NSkiff::TSkiffSchemaPtr> schemas;
- for (size_t tableIndex = 0; tableIndex < nodes.size(); ++tableIndex) {
- const auto& tablePath = tablePaths[tableIndex].Path_;
- const auto& attributes = nodes[tableIndex].GetAttributes();
- Y_ENSURE_EX(attributes["type"] == TNode("table"),
- TApiUsageError() << "Operation input path " << tablePath << " is not a table");
- bool dynamic = attributes["dynamic"].AsBool();
- bool strict = attributes["schema"].GetAttributes()["strict"].AsBool();
- switch (nodeReaderFormat) {
- case ENodeReaderFormat::Skiff:
- Y_ENSURE_EX(strict,
- TApiUsageError() << "Cannot use skiff format for table with non-strict schema '" << tablePath << "'");
- Y_ENSURE_EX(!dynamic,
- TApiUsageError() << "Cannot use skiff format for dynamic table '" << tablePath << "'");
- break;
- case ENodeReaderFormat::Auto:
- if (dynamic || !strict) {
- YT_LOG_DEBUG("Cannot use skiff format for table '%v' as it is dynamic or has non-strict schema",
- tablePath);
- return nullptr;
- }
- break;
- default:
- Y_ABORT("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat));
- }
- NSkiff::TSkiffSchemaPtr curSkiffSchema;
- if (tablePaths[tableIndex].RenameColumns_) {
- auto customOptions = options;
- customOptions.RenameColumns(*tablePaths[tableIndex].RenameColumns_);
- curSkiffSchema = CreateSkiffSchema(attributes["schema"], customOptions);
- } else {
- curSkiffSchema = CreateSkiffSchema(attributes["schema"], options);
- }
- if (!curSkiffSchema) {
- return nullptr;
- }
- schemas.push_back(curSkiffSchema);
- }
- return NSkiff::CreateVariant16Schema(std::move(schemas));
- }
- ////////////////////////////////////////////////////////////////////////////////
- NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
- const TVector<NSkiff::TSkiffSchemaPtr>& tableSchemas,
- const TCreateSkiffSchemaOptions& options
- ) {
- constexpr auto KEY_SWITCH_COLUMN = "$key_switch";
- constexpr auto ROW_INDEX_COLUMN = "$row_index";
- constexpr auto RANGE_INDEX_COLUMN = "$range_index";
- TVector<NSkiff::TSkiffSchemaPtr> schemas;
- schemas.reserve(tableSchemas.size());
- for (const auto& tableSchema : tableSchemas) {
- Y_ENSURE(tableSchema->GetWireType() == NSkiff::EWireType::Tuple,
- "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'");
- const auto& children = tableSchema->GetChildren();
- NSkiff::TSkiffSchemaList columns;
- columns.reserve(children.size() + 3);
- if (options.HasKeySwitch_) {
- columns.push_back(
- CreateSimpleTypeSchema(NSkiff::EWireType::Boolean)->SetName(KEY_SWITCH_COLUMN));
- }
- columns.push_back(
- NSkiff::CreateVariant8Schema({
- CreateSimpleTypeSchema(NSkiff::EWireType::Nothing),
- CreateSimpleTypeSchema(NSkiff::EWireType::Int64)})
- ->SetName(ROW_INDEX_COLUMN));
- if (options.HasRangeIndex_) {
- columns.push_back(
- NSkiff::CreateVariant8Schema({
- CreateSimpleTypeSchema(NSkiff::EWireType::Nothing),
- CreateSimpleTypeSchema(NSkiff::EWireType::Int64)})
- ->SetName(RANGE_INDEX_COLUMN));
- }
- columns.insert(columns.end(), children.begin(), children.end());
- schemas.push_back(NSkiff::CreateTupleSchema(columns));
- }
- return NSkiff::CreateVariant16Schema(schemas);
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NDetail
- } // namespace NYT
|