schema_serialization_helpers.cpp 12 KB


  1. #include "comparator.h"
  2. #include <yt/yt/core/ytree/fluent.h>
  3. #include <yt/yt/client/table_client/schema_serialization_helpers.h>
  4. namespace NYT::NTableClient {
  5. void Deserialize(TMaybeDeletedColumnSchema& schema, NYson::TYsonPullParserCursor* cursor)
  6. {
  7. TSerializableColumnSchema wrapper;
  8. wrapper.DeserializeFromCursor(cursor);
  9. schema = wrapper;
  10. }
  11. void Deserialize(TMaybeDeletedColumnSchema& schema, NYTree::INodePtr node)
  12. {
  13. TSerializableColumnSchema wrapper;
  14. Deserialize(static_cast<NYTree::TYsonStructLite&>(wrapper), node);
  15. schema = static_cast<TMaybeDeletedColumnSchema>(wrapper);
  16. }
  17. TDeletedColumn TMaybeDeletedColumnSchema::GetDeletedColumnSchema() const
  18. {
  19. return TDeletedColumn(StableName());
  20. }
  21. void TSerializableColumnSchema::Register(TRegistrar registrar)
  22. {
  23. registrar.BaseClassParameter("name", &TThis::Name_)
  24. .Default();
  25. registrar.Parameter("stable_name", &TThis::SerializedStableName_)
  26. .Default();
  27. registrar.Parameter("type", &TThis::LogicalTypeV1_)
  28. .Default(std::nullopt);
  29. registrar.Parameter("required", &TThis::RequiredV1_)
  30. .Default(std::nullopt);
  31. registrar.Parameter("type_v3", &TThis::LogicalTypeV3_)
  32. .Default();
  33. registrar.BaseClassParameter("lock", &TThis::Lock_)
  34. .Default();
  35. registrar.BaseClassParameter("expression", &TThis::Expression_)
  36. .Default();
  37. registrar.BaseClassParameter("aggregate", &TThis::Aggregate_)
  38. .Default();
  39. registrar.BaseClassParameter("sort_order", &TThis::SortOrder_)
  40. .Default();
  41. registrar.BaseClassParameter("group", &TThis::Group_)
  42. .Default();
  43. registrar.BaseClassParameter("max_inline_hunk_size", &TThis::MaxInlineHunkSize_)
  44. .Default();
  45. registrar.BaseClassParameter("deleted", &TThis::Deleted_).Default(std::nullopt);
  46. registrar.Postprocessor([] (TSerializableColumnSchema* schema) {
  47. schema->RunPostprocessor();
  48. });
  49. }
  50. void TSerializableColumnSchema::DeserializeFromCursor(NYson::TYsonPullParserCursor* cursor)
  51. {
  52. cursor->ParseMap([&] (NYson::TYsonPullParserCursor* cursor) {
  53. EnsureYsonToken("column schema attribute key", *cursor, NYson::EYsonItemType::StringValue);
  54. auto key = (*cursor)->UncheckedAsString();
  55. if (key == TStringBuf("name")) {
  56. cursor->Next();
  57. SetName(ExtractTo<TString>(cursor));
  58. } else if (key == TStringBuf("required")) {
  59. cursor->Next();
  60. RequiredV1_ = ExtractTo<bool>(cursor);
  61. } else if (key == TStringBuf("type")) {
  62. cursor->Next();
  63. LogicalTypeV1_ = ExtractTo<ESimpleLogicalValueType>(cursor);
  64. } else if (key == TStringBuf("type_v3")) {
  65. cursor->Next();
  66. LogicalTypeV3_ = TTypeV3LogicalTypeWrapper();
  67. Deserialize(*LogicalTypeV3_, cursor);
  68. } else if (key == TStringBuf("lock")) {
  69. cursor->Next();
  70. SetLock(ExtractTo<std::optional<TString>>(cursor));
  71. } else if (key == TStringBuf("expression")) {
  72. cursor->Next();
  73. SetExpression(ExtractTo<std::optional<TString>>(cursor));
  74. } else if (key == TStringBuf("aggregate")) {
  75. cursor->Next();
  76. SetAggregate(ExtractTo<std::optional<TString>>(cursor));
  77. } else if (key == TStringBuf("sort_order")) {
  78. cursor->Next();
  79. SetSortOrder(ExtractTo<std::optional<ESortOrder>>(cursor));
  80. } else if (key == TStringBuf("group")) {
  81. cursor->Next();
  82. SetGroup(ExtractTo<std::optional<TString>>(cursor));
  83. } else if (key == TStringBuf("max_inline_hunk_size")) {
  84. cursor->Next();
  85. SetMaxInlineHunkSize(ExtractTo<std::optional<i64>>(cursor));
  86. } else if (key == TStringBuf("stable_name")) {
  87. cursor->Next();
  88. SerializedStableName_ = ExtractTo<TColumnStableName>(cursor);
  89. } else if (key == TStringBuf("deleted")) {
  90. cursor->Next();
  91. Deleted_ = ExtractTo<bool>(cursor);
  92. } else {
  93. cursor->Next();
  94. cursor->SkipComplexValue();
  95. }
  96. });
  97. RunPostprocessor();
  98. }
  99. void TSerializableColumnSchema::SetColumnSchema(const TColumnSchema& columnSchema)
  100. {
  101. static_cast<TColumnSchema&>(*this) = columnSchema;
  102. if (IsRenamed()) {
  103. SerializedStableName_ = StableName();
  104. }
  105. LogicalTypeV1_ = columnSchema.CastToV1Type();
  106. RequiredV1_ = columnSchema.Required();
  107. LogicalTypeV3_ = TTypeV3LogicalTypeWrapper{columnSchema.LogicalType()};
  108. }
  109. void TSerializableColumnSchema::SetDeletedColumnSchema(
  110. const TDeletedColumn& deletedColumnSchema)
  111. {
  112. Deleted_ = true;
  113. StableName_ = deletedColumnSchema.StableName();
  114. }
  115. void TSerializableColumnSchema::RunPostprocessor()
  116. {
  117. if (Deleted() && *Deleted()) {
  118. if (!SerializedStableName_) {
  119. THROW_ERROR_EXCEPTION("Stable name should be set for a deleted column");
  120. }
  121. SetStableName(*SerializedStableName_);
  122. return;
  123. }
  124. // Name
  125. if (Name().empty()) {
  126. THROW_ERROR_EXCEPTION("Column name cannot be empty");
  127. }
  128. if (SerializedStableName_) {
  129. ValidateColumnName(SerializedStableName_->Underlying());
  130. SetStableName(*SerializedStableName_);
  131. } else {
  132. SetStableName(TColumnStableName(Name()));
  133. }
  134. try {
  135. int setTypeVersion = 0;
  136. if (LogicalTypeV3_) {
  137. SetLogicalType(LogicalTypeV3_->LogicalType);
  138. setTypeVersion = 3;
  139. }
  140. if (LogicalTypeV1_) {
  141. if (setTypeVersion == 0) {
  142. SetLogicalType(MakeLogicalType(*LogicalTypeV1_, RequiredV1_.value_or(false)));
  143. setTypeVersion = 1;
  144. } else {
  145. if (*LogicalTypeV1_ != CastToV1Type()) {
  146. THROW_ERROR_EXCEPTION(
  147. "\"type_v%v\" does not match \"type\"; \"type_v%v\": %Qv \"type\": %Qlv expected \"type\": %Qlv",
  148. setTypeVersion,
  149. setTypeVersion,
  150. *LogicalType(),
  151. *LogicalTypeV1_,
  152. CastToV1Type());
  153. }
  154. }
  155. }
  156. if (RequiredV1_ && setTypeVersion > 1 && *RequiredV1_ != Required()) {
  157. THROW_ERROR_EXCEPTION(
  158. "\"type_v%v\" does not match \"required\"; \"type_v%v\": %Qv \"required\": %Qlv",
  159. setTypeVersion,
  160. setTypeVersion,
  161. *LogicalType(),
  162. *RequiredV1_);
  163. }
  164. if (setTypeVersion == 0) {
  165. THROW_ERROR_EXCEPTION("Column type is not specified");
  166. }
  167. if (*DetagLogicalType(LogicalType()) == *SimpleLogicalType(ESimpleLogicalValueType::Any)) {
  168. THROW_ERROR_EXCEPTION("Column of type %Qlv cannot be \"required\"",
  169. ESimpleLogicalValueType::Any);
  170. }
  171. // Lock
  172. if (Lock() && Lock()->empty()) {
  173. THROW_ERROR_EXCEPTION("Lock name cannot be empty");
  174. }
  175. // Group
  176. if (Group() && Group()->empty()) {
  177. THROW_ERROR_EXCEPTION("Group name cannot be empty");
  178. }
  179. } catch (const std::exception& ex) {
  180. THROW_ERROR_EXCEPTION("Error validating column %Qv in table schema",
  181. GetDiagnosticNameString())
  182. << ex;
  183. }
  184. }
  185. void Serialize(const TColumnSchema& schema, NYson::IYsonConsumer* consumer)
  186. {
  187. TSerializableColumnSchema wrapper;
  188. wrapper.SetColumnSchema(schema);
  189. Serialize(static_cast<const NYTree::TYsonStructLite&>(wrapper), consumer);
  190. }
  191. void Serialize(const TDeletedColumn& schema, NYson::IYsonConsumer* consumer)
  192. {
  193. consumer->OnBeginMap();
  194. consumer->OnKeyedItem("stable_name");
  195. consumer->OnStringScalar(schema.StableName().Underlying());
  196. consumer->OnKeyedItem("deleted");
  197. consumer->OnBooleanScalar(true);
  198. consumer->OnEndMap();
  199. }
  200. void Serialize(const TTableSchema& schema, NYson::IYsonConsumer* consumer)
  201. {
  202. auto position = NYTree::BuildYsonFluently(consumer)
  203. .BeginAttributes()
  204. .Item("strict").Value(schema.GetStrict())
  205. .Item("unique_keys").Value(schema.GetUniqueKeys())
  206. .DoIf(schema.HasNontrivialSchemaModification(), [&] (NYTree::TFluentMap fluent) {
  207. fluent.Item("schema_modification").Value(schema.GetSchemaModification());
  208. })
  209. .EndAttributes()
  210. .BeginList();
  211. for (const auto& column : schema.Columns()) {
  212. Serialize(column, position.Item().GetConsumer());
  213. }
  214. for (const auto& deletedColumn : schema.DeletedColumns()) {
  215. Serialize(deletedColumn, position.Item().GetConsumer());
  216. }
  217. position.EndList();
  218. }
  219. void Serialize(const TTableSchemaPtr& schema, NYson::IYsonConsumer* consumer)
  220. {
  221. Serialize(*schema, consumer);
  222. }
  223. void Deserialize(TTableSchema& schema, NYTree::INodePtr node)
  224. {
  225. auto childNodes = node->AsList()->GetChildren();
  226. std::vector<TColumnSchema> columns;
  227. std::vector<TDeletedColumn> deletedColumns;
  228. for (auto childNode : childNodes) {
  229. TSerializableColumnSchema wrapper;
  230. Deserialize(static_cast<NYTree::TYsonStructLite&>(wrapper), childNode);
  231. if (wrapper.Deleted() && *wrapper.Deleted()) {
  232. deletedColumns.push_back(TDeletedColumn(wrapper.StableName()));
  233. } else {
  234. columns.push_back(wrapper);
  235. }
  236. }
  237. schema = TTableSchema(
  238. columns,
  239. node->Attributes().Get<bool>("strict", true),
  240. node->Attributes().Get<bool>("unique_keys", false),
  241. node->Attributes().Get<ETableSchemaModification>(
  242. "schema_modification",
  243. ETableSchemaModification::None),
  244. deletedColumns);
  245. }
  246. void Deserialize(TTableSchema& schema, NYson::TYsonPullParserCursor* cursor)
  247. {
  248. bool strict = true;
  249. bool uniqueKeys = false;
  250. ETableSchemaModification modification = ETableSchemaModification::None;
  251. if ((*cursor)->GetType() == NYson::EYsonItemType::BeginAttributes) {
  252. cursor->ParseAttributes([&] (NYson::TYsonPullParserCursor* cursor) {
  253. EnsureYsonToken(TStringBuf("table schema attribute key"), *cursor, NYson::EYsonItemType::StringValue);
  254. auto key = (*cursor)->UncheckedAsString();
  255. if (key == TStringBuf("strict")) {
  256. cursor->Next();
  257. strict = ExtractTo<bool>(cursor);
  258. } else if (key == TStringBuf("unique_keys")) {
  259. cursor->Next();
  260. uniqueKeys = ExtractTo<bool>(cursor);
  261. } else if (key == TStringBuf("schema_modification")) {
  262. cursor->Next();
  263. modification = ExtractTo<ETableSchemaModification>(cursor);
  264. } else {
  265. cursor->Next();
  266. cursor->SkipComplexValue();
  267. }
  268. });
  269. }
  270. EnsureYsonToken(TStringBuf("table schema"), *cursor, NYson::EYsonItemType::BeginList);
  271. auto maybeDeletedColumns = ExtractTo<std::vector<TMaybeDeletedColumnSchema>>(cursor);
  272. std::vector<TColumnSchema> columns;
  273. std::vector<TDeletedColumn> deletedColumns;
  274. for (const auto& maybeDeletedColumn : maybeDeletedColumns) {
  275. if (maybeDeletedColumn.Deleted() && *maybeDeletedColumn.Deleted()) {
  276. deletedColumns.push_back(maybeDeletedColumn.GetDeletedColumnSchema());
  277. } else {
  278. columns.push_back(static_cast<TColumnSchema>(maybeDeletedColumn));
  279. }
  280. }
  281. schema = TTableSchema(columns, strict, uniqueKeys, modification, deletedColumns);
  282. }
  283. void Deserialize(TTableSchemaPtr& schema, NYTree::INodePtr node)
  284. {
  285. TTableSchema actualSchema;
  286. Deserialize(actualSchema, node);
  287. schema = New<TTableSchema>(std::move(actualSchema));
  288. }
  289. void Deserialize(TTableSchemaPtr& schema, NYson::TYsonPullParserCursor* cursor)
  290. {
  291. TTableSchema actualSchema;
  292. Deserialize(actualSchema, cursor);
  293. schema = New<TTableSchema>(std::move(actualSchema));
  294. }
  295. ////////////////////////////////////////////////////////////////////////////////
  296. } // namespace NYT::NTableClient