skiff.cpp 14 KB


  1. #include "skiff.h"
  2. #include <yt/cpp/mapreduce/common/retry_lib.h>
  3. #include <yt/cpp/mapreduce/http/retry_request.h>
  4. #include <yt/cpp/mapreduce/http/requests.h>
  5. #include <yt/cpp/mapreduce/interface/config.h>
  6. #include <yt/cpp/mapreduce/interface/common.h>
  7. #include <yt/cpp/mapreduce/interface/serialize.h>
  8. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  9. #include <library/cpp/yson/node/node_builder.h>
  10. #include <library/cpp/yson/node/node_io.h>
  11. #include <yt/cpp/mapreduce/http_client/raw_requests.h>
  12. #include <yt/cpp/mapreduce/skiff/skiff_schema.h>
  13. #include <library/cpp/yson/consumer.h>
  14. #include <library/cpp/yson/writer.h>
  15. #include <util/string/cast.h>
  16. #include <util/stream/str.h>
  17. #include <util/stream/file.h>
  18. #include <util/folder/path.h>
  19. namespace NYT {
  20. namespace NDetail {
  21. using namespace NRawClient;
  22. using ::ToString;
  23. ////////////////////////////////////////////////////////////////////////////////
  24. static NSkiff::TSkiffSchemaPtr ReadSkiffSchema(const TString& fileName)
  25. {
  26. if (!TFsPath(fileName).Exists()) {
  27. return nullptr;
  28. }
  29. TIFStream input(fileName);
  30. NSkiff::TSkiffSchemaPtr schema;
  31. Deserialize(schema, NodeFromYsonStream(&input));
  32. return schema;
  33. }
  34. NSkiff::TSkiffSchemaPtr GetJobInputSkiffSchema()
  35. {
  36. return ReadSkiffSchema("skiff_input");
  37. }
  38. NSkiff::EWireType ValueTypeToSkiffType(EValueType valueType)
  39. {
  40. using NSkiff::EWireType;
  41. switch (valueType) {
  42. case VT_INT64:
  43. case VT_INT32:
  44. case VT_INT16:
  45. case VT_INT8:
  46. return EWireType::Int64;
  47. case VT_UINT64:
  48. case VT_UINT32:
  49. case VT_UINT16:
  50. case VT_UINT8:
  51. return EWireType::Uint64;
  52. case VT_DOUBLE:
  53. case VT_FLOAT:
  54. return EWireType::Double;
  55. case VT_BOOLEAN:
  56. return EWireType::Boolean;
  57. case VT_STRING:
  58. case VT_UTF8:
  59. case VT_JSON:
  60. case VT_UUID:
  61. return EWireType::String32;
  62. case VT_ANY:
  63. return EWireType::Yson32;
  64. case VT_NULL:
  65. case VT_VOID:
  66. return EWireType::Nothing;
  67. case VT_DATE:
  68. case VT_DATETIME:
  69. case VT_TIMESTAMP:
  70. return EWireType::Uint64;
  71. case VT_INTERVAL:
  72. return EWireType::Int64;
  73. case VT_DATE32:
  74. case VT_DATETIME64:
  75. case VT_TIMESTAMP64:
  76. case VT_INTERVAL64:
  77. return EWireType::Int64;
  78. };
  79. ythrow yexception() << "Cannot convert EValueType '" << valueType << "' to NSkiff::EWireType";
  80. }
  81. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  82. const TTableSchema& schema,
  83. const TCreateSkiffSchemaOptions& options)
  84. {
  85. using namespace NSkiff;
  86. Y_ENSURE(schema.Strict(), "Cannot create Skiff schema for non-strict table schema");
  87. TVector<TSkiffSchemaPtr> skiffColumns;
  88. for (const auto& column: schema.Columns()) {
  89. TSkiffSchemaPtr skiffColumn;
  90. if (column.Deleted().Defined() && *column.Deleted()) {
  91. continue;
  92. }
  93. if (column.Type() == VT_ANY && *column.TypeV3() != *NTi::Optional(NTi::Yson())) {
  94. // We ignore all complex types until YT-12717 is done.
  95. return nullptr;
  96. }
  97. if (column.TypeV3()->IsDecimal() ||
  98. column.TypeV3()->IsOptional() && column.TypeV3()->AsOptional()->GetItemType()->IsDecimal())
  99. {
  100. // Complex logic for decimal types, ignore them for now.
  101. return nullptr;
  102. }
  103. if (column.Required() || NTi::IsSingular(column.TypeV3()->GetTypeName())) {
  104. skiffColumn = CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()));
  105. } else {
  106. skiffColumn = CreateVariant8Schema({
  107. CreateSimpleTypeSchema(EWireType::Nothing),
  108. CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()))});
  109. }
  110. if (options.RenameColumns_) {
  111. auto maybeName = options.RenameColumns_->find(column.Name());
  112. skiffColumn->SetName(maybeName == options.RenameColumns_->end() ? column.Name() : maybeName->second);
  113. } else {
  114. skiffColumn->SetName(column.Name());
  115. }
  116. skiffColumns.push_back(skiffColumn);
  117. }
  118. if (options.HasKeySwitch_) {
  119. skiffColumns.push_back(
  120. CreateSimpleTypeSchema(EWireType::Boolean)->SetName("$key_switch"));
  121. }
  122. if (options.HasRangeIndex_) {
  123. skiffColumns.push_back(
  124. CreateVariant8Schema({
  125. CreateSimpleTypeSchema(EWireType::Nothing),
  126. CreateSimpleTypeSchema(EWireType::Int64)})
  127. ->SetName("$range_index"));
  128. }
  129. skiffColumns.push_back(
  130. CreateVariant8Schema({
  131. CreateSimpleTypeSchema(EWireType::Nothing),
  132. CreateSimpleTypeSchema(EWireType::Int64)})
  133. ->SetName("$row_index"));
  134. return CreateTupleSchema(std::move(skiffColumns));
  135. }
  136. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  137. const TNode& schemaNode,
  138. const TCreateSkiffSchemaOptions& options)
  139. {
  140. TTableSchema schema;
  141. Deserialize(schema, schemaNode);
  142. return CreateSkiffSchema(schema, options);
  143. }
  144. void Serialize(const NSkiff::TSkiffSchemaPtr& schema, NYson::IYsonConsumer* consumer)
  145. {
  146. consumer->OnBeginMap();
  147. if (schema->GetName().size() > 0) {
  148. consumer->OnKeyedItem("name");
  149. consumer->OnStringScalar(schema->GetName());
  150. }
  151. consumer->OnKeyedItem("wire_type");
  152. consumer->OnStringScalar(ToString(schema->GetWireType()));
  153. if (schema->GetChildren().size() > 0) {
  154. consumer->OnKeyedItem("children");
  155. consumer->OnBeginList();
  156. for (const auto& child : schema->GetChildren()) {
  157. consumer->OnListItem();
  158. Serialize(child, consumer);
  159. }
  160. consumer->OnEndList();
  161. }
  162. consumer->OnEndMap();
  163. }
  164. void Deserialize(NSkiff::TSkiffSchemaPtr& schema, const TNode& node)
  165. {
  166. using namespace NSkiff;
  167. static auto createSchema = [](EWireType wireType, TVector<TSkiffSchemaPtr>&& children) -> TSkiffSchemaPtr {
  168. switch (wireType) {
  169. case EWireType::Tuple:
  170. return CreateTupleSchema(std::move(children));
  171. case EWireType::Variant8:
  172. return CreateVariant8Schema(std::move(children));
  173. case EWireType::Variant16:
  174. return CreateVariant16Schema(std::move(children));
  175. case EWireType::RepeatedVariant8:
  176. return CreateRepeatedVariant8Schema(std::move(children));
  177. case EWireType::RepeatedVariant16:
  178. return CreateRepeatedVariant16Schema(std::move(children));
  179. default:
  180. return CreateSimpleTypeSchema(wireType);
  181. }
  182. };
  183. const auto& map = node.AsMap();
  184. const auto* wireTypePtr = map.FindPtr("wire_type");
  185. Y_ENSURE(wireTypePtr, "'wire_type' is a required key");
  186. auto wireType = FromString<NSkiff::EWireType>(wireTypePtr->AsString());
  187. const auto* childrenPtr = map.FindPtr("children");
  188. Y_ENSURE(NSkiff::IsSimpleType(wireType) || childrenPtr,
  189. "'children' key is required for complex node '" << wireType << "'");
  190. TVector<TSkiffSchemaPtr> children;
  191. if (childrenPtr) {
  192. for (const auto& childNode : childrenPtr->AsList()) {
  193. TSkiffSchemaPtr childSchema;
  194. Deserialize(childSchema, childNode);
  195. children.push_back(std::move(childSchema));
  196. }
  197. }
  198. schema = createSchema(wireType, std::move(children));
  199. const auto* namePtr = map.FindPtr("name");
  200. if (namePtr) {
  201. schema->SetName(namePtr->AsString());
  202. }
  203. }
  204. TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {
  205. Y_ENSURE(schema->GetWireType() == NSkiff::EWireType::Variant16,
  206. "Bad wire type for schema; expected 'variant16', got " << schema->GetWireType());
  207. THashMap<
  208. NSkiff::TSkiffSchemaPtr,
  209. size_t,
  210. NSkiff::TSkiffSchemaPtrHasher,
  211. NSkiff::TSkiffSchemaPtrEqual> schemasMap;
  212. size_t tableIndex = 0;
  213. auto config = TNode("skiff");
  214. config.Attributes()["table_skiff_schemas"] = TNode::CreateList();
  215. for (const auto& schemaChild : schema->GetChildren()) {
  216. auto [iter, inserted] = schemasMap.emplace(schemaChild, tableIndex);
  217. size_t currentIndex;
  218. if (inserted) {
  219. currentIndex = tableIndex;
  220. ++tableIndex;
  221. } else {
  222. currentIndex = iter->second;
  223. }
  224. config.Attributes()["table_skiff_schemas"].Add("$" + ToString(currentIndex));
  225. }
  226. config.Attributes()["skiff_schema_registry"] = TNode::CreateMap();
  227. for (const auto& [tableSchema, index] : schemasMap) {
  228. TNode node;
  229. TNodeBuilder nodeBuilder(&node);
  230. Serialize(tableSchema, &nodeBuilder);
  231. config.Attributes()["skiff_schema_registry"][ToString(index)] = std::move(node);
  232. }
  233. return TFormat(config);
  234. }
  235. NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
  236. const IRawClientPtr& rawClient,
  237. const TTransactionId& transactionId,
  238. ENodeReaderFormat nodeReaderFormat,
  239. const TVector<TRichYPath>& tablePaths,
  240. const TCreateSkiffSchemaOptions& options)
  241. {
  242. if (nodeReaderFormat == ENodeReaderFormat::Yson) {
  243. return nullptr;
  244. }
  245. for (const auto& path : tablePaths) {
  246. if (path.Columns_) {
  247. switch (nodeReaderFormat) {
  248. case ENodeReaderFormat::Skiff:
  249. ythrow TApiUsageError() << "Cannot use Skiff format with column selectors";
  250. case ENodeReaderFormat::Auto:
  251. return nullptr;
  252. default:
  253. Y_ABORT("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat));
  254. }
  255. }
  256. }
  257. auto nodes = NRawClient::BatchTransform(
  258. rawClient,
  259. NRawClient::CanonizeYPaths(rawClient, tablePaths),
  260. [&] (IRawBatchRequestPtr batch, const TRichYPath& path) {
  261. auto getOptions = TGetOptions()
  262. .AttributeFilter(
  263. TAttributeFilter()
  264. .AddAttribute("schema")
  265. .AddAttribute("dynamic")
  266. .AddAttribute("type")
  267. );
  268. return batch->Get(transactionId, path.Path_, getOptions);
  269. });
  270. TVector<NSkiff::TSkiffSchemaPtr> schemas;
  271. for (size_t tableIndex = 0; tableIndex < nodes.size(); ++tableIndex) {
  272. const auto& tablePath = tablePaths[tableIndex].Path_;
  273. const auto& attributes = nodes[tableIndex].GetAttributes();
  274. Y_ENSURE_EX(attributes["type"] == TNode("table"),
  275. TApiUsageError() << "Operation input path " << tablePath << " is not a table");
  276. bool dynamic = attributes["dynamic"].AsBool();
  277. bool strict = attributes["schema"].GetAttributes()["strict"].AsBool();
  278. switch (nodeReaderFormat) {
  279. case ENodeReaderFormat::Skiff:
  280. Y_ENSURE_EX(strict,
  281. TApiUsageError() << "Cannot use skiff format for table with non-strict schema '" << tablePath << "'");
  282. Y_ENSURE_EX(!dynamic,
  283. TApiUsageError() << "Cannot use skiff format for dynamic table '" << tablePath << "'");
  284. break;
  285. case ENodeReaderFormat::Auto:
  286. if (dynamic || !strict) {
  287. YT_LOG_DEBUG("Cannot use skiff format for table '%v' as it is dynamic or has non-strict schema",
  288. tablePath);
  289. return nullptr;
  290. }
  291. break;
  292. default:
  293. Y_ABORT("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat));
  294. }
  295. NSkiff::TSkiffSchemaPtr curSkiffSchema;
  296. if (tablePaths[tableIndex].RenameColumns_) {
  297. auto customOptions = options;
  298. customOptions.RenameColumns(*tablePaths[tableIndex].RenameColumns_);
  299. curSkiffSchema = CreateSkiffSchema(attributes["schema"], customOptions);
  300. } else {
  301. curSkiffSchema = CreateSkiffSchema(attributes["schema"], options);
  302. }
  303. if (!curSkiffSchema) {
  304. return nullptr;
  305. }
  306. schemas.push_back(curSkiffSchema);
  307. }
  308. return NSkiff::CreateVariant16Schema(std::move(schemas));
  309. }
  310. ////////////////////////////////////////////////////////////////////////////////
  311. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  312. const TVector<NSkiff::TSkiffSchemaPtr>& tableSchemas,
  313. const TCreateSkiffSchemaOptions& options
  314. ) {
  315. constexpr auto KEY_SWITCH_COLUMN = "$key_switch";
  316. constexpr auto ROW_INDEX_COLUMN = "$row_index";
  317. constexpr auto RANGE_INDEX_COLUMN = "$range_index";
  318. TVector<NSkiff::TSkiffSchemaPtr> schemas;
  319. schemas.reserve(tableSchemas.size());
  320. for (const auto& tableSchema : tableSchemas) {
  321. Y_ENSURE(tableSchema->GetWireType() == NSkiff::EWireType::Tuple,
  322. "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'");
  323. const auto& children = tableSchema->GetChildren();
  324. NSkiff::TSkiffSchemaList columns;
  325. columns.reserve(children.size() + 3);
  326. if (options.HasKeySwitch_) {
  327. columns.push_back(
  328. CreateSimpleTypeSchema(NSkiff::EWireType::Boolean)->SetName(KEY_SWITCH_COLUMN));
  329. }
  330. columns.push_back(
  331. NSkiff::CreateVariant8Schema({
  332. CreateSimpleTypeSchema(NSkiff::EWireType::Nothing),
  333. CreateSimpleTypeSchema(NSkiff::EWireType::Int64)})
  334. ->SetName(ROW_INDEX_COLUMN));
  335. if (options.HasRangeIndex_) {
  336. columns.push_back(
  337. NSkiff::CreateVariant8Schema({
  338. CreateSimpleTypeSchema(NSkiff::EWireType::Nothing),
  339. CreateSimpleTypeSchema(NSkiff::EWireType::Int64)})
  340. ->SetName(RANGE_INDEX_COLUMN));
  341. }
  342. columns.insert(columns.end(), children.begin(), children.end());
  343. schemas.push_back(NSkiff::CreateTupleSchema(columns));
  344. }
  345. return NSkiff::CreateVariant16Schema(schemas);
  346. }
  347. ////////////////////////////////////////////////////////////////////////////////
  348. } // namespace NDetail
  349. } // namespace NYT