skiff.cpp 14 KB


  1. #include "skiff.h"
  2. #include <yt/cpp/mapreduce/common/retry_lib.h>
  3. #include <yt/cpp/mapreduce/http/retry_request.h>
  4. #include <yt/cpp/mapreduce/http/requests.h>
  5. #include <yt/cpp/mapreduce/interface/config.h>
  6. #include <yt/cpp/mapreduce/interface/common.h>
  7. #include <yt/cpp/mapreduce/interface/serialize.h>
  8. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  9. #include <library/cpp/yson/node/node_builder.h>
  10. #include <library/cpp/yson/node/node_io.h>
  11. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  12. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  13. #include <yt/cpp/mapreduce/skiff/skiff_schema.h>
  14. #include <library/cpp/yson/consumer.h>
  15. #include <library/cpp/yson/writer.h>
  16. #include <util/string/cast.h>
  17. #include <util/stream/str.h>
  18. #include <util/stream/file.h>
  19. #include <util/folder/path.h>
  20. namespace NYT {
  21. namespace NDetail {
  22. using namespace NRawClient;
  23. using ::ToString;
  24. ////////////////////////////////////////////////////////////////////////////////
  25. static NSkiff::TSkiffSchemaPtr ReadSkiffSchema(const TString& fileName)
  26. {
  27. if (!TFsPath(fileName).Exists()) {
  28. return nullptr;
  29. }
  30. TIFStream input(fileName);
  31. NSkiff::TSkiffSchemaPtr schema;
  32. Deserialize(schema, NodeFromYsonStream(&input));
  33. return schema;
  34. }
  35. NSkiff::TSkiffSchemaPtr GetJobInputSkiffSchema()
  36. {
  37. return ReadSkiffSchema("skiff_input");
  38. }
  39. NSkiff::EWireType ValueTypeToSkiffType(EValueType valueType)
  40. {
  41. using NSkiff::EWireType;
  42. switch (valueType) {
  43. case VT_INT64:
  44. case VT_INT32:
  45. case VT_INT16:
  46. case VT_INT8:
  47. return EWireType::Int64;
  48. case VT_UINT64:
  49. case VT_UINT32:
  50. case VT_UINT16:
  51. case VT_UINT8:
  52. return EWireType::Uint64;
  53. case VT_DOUBLE:
  54. case VT_FLOAT:
  55. return EWireType::Double;
  56. case VT_BOOLEAN:
  57. return EWireType::Boolean;
  58. case VT_STRING:
  59. case VT_UTF8:
  60. case VT_JSON:
  61. return EWireType::String32;
  62. case VT_ANY:
  63. return EWireType::Yson32;
  64. case VT_NULL:
  65. case VT_VOID:
  66. return EWireType::Nothing;
  67. case VT_DATE:
  68. case VT_DATETIME:
  69. case VT_TIMESTAMP:
  70. return EWireType::Uint64;
  71. case VT_INTERVAL:
  72. return EWireType::Int64;
  73. case VT_DATE32:
  74. case VT_DATETIME64:
  75. case VT_TIMESTAMP64:
  76. case VT_INTERVAL64:
  77. return EWireType::Int64;
  78. };
  79. ythrow yexception() << "Cannot convert EValueType '" << valueType << "' to NSkiff::EWireType";
  80. }
  81. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  82. const TTableSchema& schema,
  83. const TCreateSkiffSchemaOptions& options)
  84. {
  85. using namespace NSkiff;
  86. Y_ENSURE(schema.Strict(), "Cannot create Skiff schema for non-strict table schema");
  87. TVector<TSkiffSchemaPtr> skiffColumns;
  88. for (const auto& column: schema.Columns()) {
  89. TSkiffSchemaPtr skiffColumn;
  90. if (column.Deleted().Defined() && *column.Deleted()) {
  91. continue;
  92. }
  93. if (column.Type() == VT_ANY && *column.TypeV3() != *NTi::Optional(NTi::Yson())) {
  94. // We ignore all complex types until YT-12717 is done.
  95. return nullptr;
  96. }
  97. if (column.TypeV3()->IsDecimal() ||
  98. column.TypeV3()->IsOptional() && column.TypeV3()->AsOptional()->GetItemType()->IsDecimal())
  99. {
  100. // Complex logic for decimal types, ignore them for now.
  101. return nullptr;
  102. }
  103. if (column.Required() || NTi::IsSingular(column.TypeV3()->GetTypeName())) {
  104. skiffColumn = CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()));
  105. } else {
  106. skiffColumn = CreateVariant8Schema({
  107. CreateSimpleTypeSchema(EWireType::Nothing),
  108. CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()))});
  109. }
  110. if (options.RenameColumns_) {
  111. auto maybeName = options.RenameColumns_->find(column.Name());
  112. skiffColumn->SetName(maybeName == options.RenameColumns_->end() ? column.Name() : maybeName->second);
  113. } else {
  114. skiffColumn->SetName(column.Name());
  115. }
  116. skiffColumns.push_back(skiffColumn);
  117. }
  118. if (options.HasKeySwitch_) {
  119. skiffColumns.push_back(
  120. CreateSimpleTypeSchema(EWireType::Boolean)->SetName("$key_switch"));
  121. }
  122. if (options.HasRangeIndex_) {
  123. skiffColumns.push_back(
  124. CreateVariant8Schema({
  125. CreateSimpleTypeSchema(EWireType::Nothing),
  126. CreateSimpleTypeSchema(EWireType::Int64)})
  127. ->SetName("$range_index"));
  128. }
  129. skiffColumns.push_back(
  130. CreateVariant8Schema({
  131. CreateSimpleTypeSchema(EWireType::Nothing),
  132. CreateSimpleTypeSchema(EWireType::Int64)})
  133. ->SetName("$row_index"));
  134. return CreateTupleSchema(std::move(skiffColumns));
  135. }
  136. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  137. const TNode& schemaNode,
  138. const TCreateSkiffSchemaOptions& options)
  139. {
  140. TTableSchema schema;
  141. Deserialize(schema, schemaNode);
  142. return CreateSkiffSchema(schema, options);
  143. }
  144. void Serialize(const NSkiff::TSkiffSchemaPtr& schema, NYson::IYsonConsumer* consumer)
  145. {
  146. consumer->OnBeginMap();
  147. if (schema->GetName().size() > 0) {
  148. consumer->OnKeyedItem("name");
  149. consumer->OnStringScalar(schema->GetName());
  150. }
  151. consumer->OnKeyedItem("wire_type");
  152. consumer->OnStringScalar(ToString(schema->GetWireType()));
  153. if (schema->GetChildren().size() > 0) {
  154. consumer->OnKeyedItem("children");
  155. consumer->OnBeginList();
  156. for (const auto& child : schema->GetChildren()) {
  157. consumer->OnListItem();
  158. Serialize(child, consumer);
  159. }
  160. consumer->OnEndList();
  161. }
  162. consumer->OnEndMap();
  163. }
  164. void Deserialize(NSkiff::TSkiffSchemaPtr& schema, const TNode& node)
  165. {
  166. using namespace NSkiff;
  167. static auto createSchema = [](EWireType wireType, TVector<TSkiffSchemaPtr>&& children) -> TSkiffSchemaPtr {
  168. switch (wireType) {
  169. case EWireType::Tuple:
  170. return CreateTupleSchema(std::move(children));
  171. case EWireType::Variant8:
  172. return CreateVariant8Schema(std::move(children));
  173. case EWireType::Variant16:
  174. return CreateVariant16Schema(std::move(children));
  175. case EWireType::RepeatedVariant8:
  176. return CreateRepeatedVariant8Schema(std::move(children));
  177. case EWireType::RepeatedVariant16:
  178. return CreateRepeatedVariant16Schema(std::move(children));
  179. default:
  180. return CreateSimpleTypeSchema(wireType);
  181. }
  182. };
  183. const auto& map = node.AsMap();
  184. const auto* wireTypePtr = map.FindPtr("wire_type");
  185. Y_ENSURE(wireTypePtr, "'wire_type' is a required key");
  186. auto wireType = FromString<NSkiff::EWireType>(wireTypePtr->AsString());
  187. const auto* childrenPtr = map.FindPtr("children");
  188. Y_ENSURE(NSkiff::IsSimpleType(wireType) || childrenPtr,
  189. "'children' key is required for complex node '" << wireType << "'");
  190. TVector<TSkiffSchemaPtr> children;
  191. if (childrenPtr) {
  192. for (const auto& childNode : childrenPtr->AsList()) {
  193. TSkiffSchemaPtr childSchema;
  194. Deserialize(childSchema, childNode);
  195. children.push_back(std::move(childSchema));
  196. }
  197. }
  198. schema = createSchema(wireType, std::move(children));
  199. const auto* namePtr = map.FindPtr("name");
  200. if (namePtr) {
  201. schema->SetName(namePtr->AsString());
  202. }
  203. }
  204. TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {
  205. Y_ENSURE(schema->GetWireType() == NSkiff::EWireType::Variant16,
  206. "Bad wire type for schema; expected 'variant16', got " << schema->GetWireType());
  207. THashMap<
  208. NSkiff::TSkiffSchemaPtr,
  209. size_t,
  210. NSkiff::TSkiffSchemaPtrHasher,
  211. NSkiff::TSkiffSchemaPtrEqual> schemasMap;
  212. size_t tableIndex = 0;
  213. auto config = TNode("skiff");
  214. config.Attributes()["table_skiff_schemas"] = TNode::CreateList();
  215. for (const auto& schemaChild : schema->GetChildren()) {
  216. auto [iter, inserted] = schemasMap.emplace(schemaChild, tableIndex);
  217. size_t currentIndex;
  218. if (inserted) {
  219. currentIndex = tableIndex;
  220. ++tableIndex;
  221. } else {
  222. currentIndex = iter->second;
  223. }
  224. config.Attributes()["table_skiff_schemas"].Add("$" + ToString(currentIndex));
  225. }
  226. config.Attributes()["skiff_schema_registry"] = TNode::CreateMap();
  227. for (const auto& [tableSchema, index] : schemasMap) {
  228. TNode node;
  229. TNodeBuilder nodeBuilder(&node);
  230. Serialize(tableSchema, &nodeBuilder);
  231. config.Attributes()["skiff_schema_registry"][ToString(index)] = std::move(node);
  232. }
  233. return TFormat(config);
  234. }
  235. NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
  236. const TClientContext& context,
  237. const IClientRetryPolicyPtr& clientRetryPolicy,
  238. const TTransactionId& transactionId,
  239. ENodeReaderFormat nodeReaderFormat,
  240. const TVector<TRichYPath>& tablePaths,
  241. const TCreateSkiffSchemaOptions& options)
  242. {
  243. if (nodeReaderFormat == ENodeReaderFormat::Yson) {
  244. return nullptr;
  245. }
  246. for (const auto& path : tablePaths) {
  247. if (path.Columns_) {
  248. switch (nodeReaderFormat) {
  249. case ENodeReaderFormat::Skiff:
  250. ythrow TApiUsageError() << "Cannot use Skiff format with column selectors";
  251. case ENodeReaderFormat::Auto:
  252. return nullptr;
  253. default:
  254. Y_ABORT("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat));
  255. }
  256. }
  257. }
  258. auto nodes = NRawClient::BatchTransform(
  259. clientRetryPolicy->CreatePolicyForGenericRequest(),
  260. context,
  261. NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
  262. [&] (TRawBatchRequest& batch, const TRichYPath& path) {
  263. auto getOptions = TGetOptions()
  264. .AttributeFilter(
  265. TAttributeFilter()
  266. .AddAttribute("schema")
  267. .AddAttribute("dynamic")
  268. .AddAttribute("type")
  269. );
  270. return batch.Get(transactionId, path.Path_, getOptions);
  271. });
  272. TVector<NSkiff::TSkiffSchemaPtr> schemas;
  273. for (size_t tableIndex = 0; tableIndex < nodes.size(); ++tableIndex) {
  274. const auto& tablePath = tablePaths[tableIndex].Path_;
  275. const auto& attributes = nodes[tableIndex].GetAttributes();
  276. Y_ENSURE_EX(attributes["type"] == TNode("table"),
  277. TApiUsageError() << "Operation input path " << tablePath << " is not a table");
  278. bool dynamic = attributes["dynamic"].AsBool();
  279. bool strict = attributes["schema"].GetAttributes()["strict"].AsBool();
  280. switch (nodeReaderFormat) {
  281. case ENodeReaderFormat::Skiff:
  282. Y_ENSURE_EX(strict,
  283. TApiUsageError() << "Cannot use skiff format for table with non-strict schema '" << tablePath << "'");
  284. Y_ENSURE_EX(!dynamic,
  285. TApiUsageError() << "Cannot use skiff format for dynamic table '" << tablePath << "'");
  286. break;
  287. case ENodeReaderFormat::Auto:
  288. if (dynamic || !strict) {
  289. YT_LOG_DEBUG("Cannot use skiff format for table '%v' as it is dynamic or has non-strict schema",
  290. tablePath);
  291. return nullptr;
  292. }
  293. break;
  294. default:
  295. Y_ABORT("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat));
  296. }
  297. NSkiff::TSkiffSchemaPtr curSkiffSchema;
  298. if (tablePaths[tableIndex].RenameColumns_) {
  299. auto customOptions = options;
  300. customOptions.RenameColumns(*tablePaths[tableIndex].RenameColumns_);
  301. curSkiffSchema = CreateSkiffSchema(attributes["schema"], customOptions);
  302. } else {
  303. curSkiffSchema = CreateSkiffSchema(attributes["schema"], options);
  304. }
  305. if (!curSkiffSchema) {
  306. return nullptr;
  307. }
  308. schemas.push_back(curSkiffSchema);
  309. }
  310. return NSkiff::CreateVariant16Schema(std::move(schemas));
  311. }
  312. ////////////////////////////////////////////////////////////////////////////////
  313. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  314. const TVector<NSkiff::TSkiffSchemaPtr>& tableSchemas,
  315. const TCreateSkiffSchemaOptions& options
  316. ) {
  317. constexpr auto KEY_SWITCH_COLUMN = "$key_switch";
  318. constexpr auto ROW_INDEX_COLUMN = "$row_index";
  319. constexpr auto RANGE_INDEX_COLUMN = "$range_index";
  320. TVector<NSkiff::TSkiffSchemaPtr> schemas;
  321. schemas.reserve(tableSchemas.size());
  322. for (const auto& tableSchema : tableSchemas) {
  323. Y_ENSURE(tableSchema->GetWireType() == NSkiff::EWireType::Tuple,
  324. "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'");
  325. const auto& children = tableSchema->GetChildren();
  326. NSkiff::TSkiffSchemaList columns;
  327. columns.reserve(children.size() + 3);
  328. if (options.HasKeySwitch_) {
  329. columns.push_back(
  330. CreateSimpleTypeSchema(NSkiff::EWireType::Boolean)->SetName(KEY_SWITCH_COLUMN));
  331. }
  332. columns.push_back(
  333. NSkiff::CreateVariant8Schema({
  334. CreateSimpleTypeSchema(NSkiff::EWireType::Nothing),
  335. CreateSimpleTypeSchema(NSkiff::EWireType::Int64)})
  336. ->SetName(ROW_INDEX_COLUMN));
  337. if (options.HasRangeIndex_) {
  338. columns.push_back(
  339. NSkiff::CreateVariant8Schema({
  340. CreateSimpleTypeSchema(NSkiff::EWireType::Nothing),
  341. CreateSimpleTypeSchema(NSkiff::EWireType::Int64)})
  342. ->SetName(RANGE_INDEX_COLUMN));
  343. }
  344. columns.insert(columns.end(), children.begin(), children.end());
  345. schemas.push_back(NSkiff::CreateTupleSchema(columns));
  346. }
  347. return NSkiff::CreateVariant16Schema(schemas);
  348. }
  349. ////////////////////////////////////////////////////////////////////////////////
  350. } // namespace NDetail
  351. } // namespace NYT