skiff.cpp 14 KB


  1. #include "skiff.h"
  2. #include <yt/cpp/mapreduce/common/retry_lib.h>
  3. #include <yt/cpp/mapreduce/http/retry_request.h>
  4. #include <yt/cpp/mapreduce/http/requests.h>
  5. #include <yt/cpp/mapreduce/interface/config.h>
  6. #include <yt/cpp/mapreduce/interface/common.h>
  7. #include <yt/cpp/mapreduce/interface/serialize.h>
  8. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  9. #include <library/cpp/yson/node/node_builder.h>
  10. #include <library/cpp/yson/node/node_io.h>
  11. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  12. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  13. #include <yt/cpp/mapreduce/skiff/skiff_schema.h>
  14. #include <library/cpp/yson/consumer.h>
  15. #include <library/cpp/yson/writer.h>
  16. #include <util/string/cast.h>
  17. #include <util/stream/str.h>
  18. #include <util/stream/file.h>
  19. #include <util/folder/path.h>
  20. namespace NYT {
  21. namespace NDetail {
  22. using namespace NRawClient;
  23. using ::ToString;
  24. ////////////////////////////////////////////////////////////////////////////////
  25. static NSkiff::TSkiffSchemaPtr ReadSkiffSchema(const TString& fileName)
  26. {
  27. if (!TFsPath(fileName).Exists()) {
  28. return nullptr;
  29. }
  30. TIFStream input(fileName);
  31. NSkiff::TSkiffSchemaPtr schema;
  32. Deserialize(schema, NodeFromYsonStream(&input));
  33. return schema;
  34. }
  35. NSkiff::TSkiffSchemaPtr GetJobInputSkiffSchema()
  36. {
  37. return ReadSkiffSchema("skiff_input");
  38. }
  39. NSkiff::EWireType ValueTypeToSkiffType(EValueType valueType)
  40. {
  41. using NSkiff::EWireType;
  42. switch (valueType) {
  43. case VT_INT64:
  44. case VT_INT32:
  45. case VT_INT16:
  46. case VT_INT8:
  47. return EWireType::Int64;
  48. case VT_UINT64:
  49. case VT_UINT32:
  50. case VT_UINT16:
  51. case VT_UINT8:
  52. return EWireType::Uint64;
  53. case VT_DOUBLE:
  54. case VT_FLOAT:
  55. return EWireType::Double;
  56. case VT_BOOLEAN:
  57. return EWireType::Boolean;
  58. case VT_STRING:
  59. case VT_UTF8:
  60. case VT_JSON:
  61. return EWireType::String32;
  62. case VT_ANY:
  63. return EWireType::Yson32;
  64. case VT_NULL:
  65. case VT_VOID:
  66. return EWireType::Nothing;
  67. case VT_DATE:
  68. case VT_DATETIME:
  69. case VT_TIMESTAMP:
  70. return EWireType::Uint64;
  71. case VT_INTERVAL:
  72. return EWireType::Int64;
  73. };
  74. ythrow yexception() << "Cannot convert EValueType '" << valueType << "' to NSkiff::EWireType";
  75. }
  76. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  77. const TTableSchema& schema,
  78. const TCreateSkiffSchemaOptions& options)
  79. {
  80. using namespace NSkiff;
  81. Y_ENSURE(schema.Strict(), "Cannot create Skiff schema for non-strict table schema");
  82. TVector<TSkiffSchemaPtr> skiffColumns;
  83. for (const auto& column: schema.Columns()) {
  84. TSkiffSchemaPtr skiffColumn;
  85. if (column.Deleted().Defined() && *column.Deleted()) {
  86. continue;
  87. }
  88. if (column.Type() == VT_ANY && *column.TypeV3() != *NTi::Optional(NTi::Yson())) {
  89. // We ignore all complex types until YT-12717 is done.
  90. return nullptr;
  91. }
  92. if (column.TypeV3()->IsDecimal() ||
  93. column.TypeV3()->IsOptional() && column.TypeV3()->AsOptional()->GetItemType()->IsDecimal())
  94. {
  95. // Complex logic for decimal types, ignore them for now.
  96. return nullptr;
  97. }
  98. if (column.Required() || NTi::IsSingular(column.TypeV3()->GetTypeName())) {
  99. skiffColumn = CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()));
  100. } else {
  101. skiffColumn = CreateVariant8Schema({
  102. CreateSimpleTypeSchema(EWireType::Nothing),
  103. CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()))});
  104. }
  105. if (options.RenameColumns_) {
  106. auto maybeName = options.RenameColumns_->find(column.Name());
  107. skiffColumn->SetName(maybeName == options.RenameColumns_->end() ? column.Name() : maybeName->second);
  108. } else {
  109. skiffColumn->SetName(column.Name());
  110. }
  111. skiffColumns.push_back(skiffColumn);
  112. }
  113. if (options.HasKeySwitch_) {
  114. skiffColumns.push_back(
  115. CreateSimpleTypeSchema(EWireType::Boolean)->SetName("$key_switch"));
  116. }
  117. if (options.HasRangeIndex_) {
  118. skiffColumns.push_back(
  119. CreateVariant8Schema({
  120. CreateSimpleTypeSchema(EWireType::Nothing),
  121. CreateSimpleTypeSchema(EWireType::Int64)})
  122. ->SetName("$range_index"));
  123. }
  124. skiffColumns.push_back(
  125. CreateVariant8Schema({
  126. CreateSimpleTypeSchema(EWireType::Nothing),
  127. CreateSimpleTypeSchema(EWireType::Int64)})
  128. ->SetName("$row_index"));
  129. return CreateTupleSchema(std::move(skiffColumns));
  130. }
  131. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  132. const TNode& schemaNode,
  133. const TCreateSkiffSchemaOptions& options)
  134. {
  135. TTableSchema schema;
  136. Deserialize(schema, schemaNode);
  137. return CreateSkiffSchema(schema, options);
  138. }
  139. void Serialize(const NSkiff::TSkiffSchemaPtr& schema, NYson::IYsonConsumer* consumer)
  140. {
  141. consumer->OnBeginMap();
  142. if (schema->GetName().size() > 0) {
  143. consumer->OnKeyedItem("name");
  144. consumer->OnStringScalar(schema->GetName());
  145. }
  146. consumer->OnKeyedItem("wire_type");
  147. consumer->OnStringScalar(ToString(schema->GetWireType()));
  148. if (schema->GetChildren().size() > 0) {
  149. consumer->OnKeyedItem("children");
  150. consumer->OnBeginList();
  151. for (const auto& child : schema->GetChildren()) {
  152. consumer->OnListItem();
  153. Serialize(child, consumer);
  154. }
  155. consumer->OnEndList();
  156. }
  157. consumer->OnEndMap();
  158. }
  159. void Deserialize(NSkiff::TSkiffSchemaPtr& schema, const TNode& node)
  160. {
  161. using namespace NSkiff;
  162. static auto createSchema = [](EWireType wireType, TVector<TSkiffSchemaPtr>&& children) -> TSkiffSchemaPtr {
  163. switch (wireType) {
  164. case EWireType::Tuple:
  165. return CreateTupleSchema(std::move(children));
  166. case EWireType::Variant8:
  167. return CreateVariant8Schema(std::move(children));
  168. case EWireType::Variant16:
  169. return CreateVariant16Schema(std::move(children));
  170. case EWireType::RepeatedVariant8:
  171. return CreateRepeatedVariant8Schema(std::move(children));
  172. case EWireType::RepeatedVariant16:
  173. return CreateRepeatedVariant16Schema(std::move(children));
  174. default:
  175. return CreateSimpleTypeSchema(wireType);
  176. }
  177. };
  178. const auto& map = node.AsMap();
  179. const auto* wireTypePtr = map.FindPtr("wire_type");
  180. Y_ENSURE(wireTypePtr, "'wire_type' is a required key");
  181. auto wireType = FromString<NSkiff::EWireType>(wireTypePtr->AsString());
  182. const auto* childrenPtr = map.FindPtr("children");
  183. Y_ENSURE(NSkiff::IsSimpleType(wireType) || childrenPtr,
  184. "'children' key is required for complex node '" << wireType << "'");
  185. TVector<TSkiffSchemaPtr> children;
  186. if (childrenPtr) {
  187. for (const auto& childNode : childrenPtr->AsList()) {
  188. TSkiffSchemaPtr childSchema;
  189. Deserialize(childSchema, childNode);
  190. children.push_back(std::move(childSchema));
  191. }
  192. }
  193. schema = createSchema(wireType, std::move(children));
  194. const auto* namePtr = map.FindPtr("name");
  195. if (namePtr) {
  196. schema->SetName(namePtr->AsString());
  197. }
  198. }
  199. TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {
  200. Y_ENSURE(schema->GetWireType() == NSkiff::EWireType::Variant16,
  201. "Bad wire type for schema; expected 'variant16', got " << schema->GetWireType());
  202. THashMap<
  203. NSkiff::TSkiffSchemaPtr,
  204. size_t,
  205. NSkiff::TSkiffSchemaPtrHasher,
  206. NSkiff::TSkiffSchemaPtrEqual> schemasMap;
  207. size_t tableIndex = 0;
  208. auto config = TNode("skiff");
  209. config.Attributes()["table_skiff_schemas"] = TNode::CreateList();
  210. for (const auto& schemaChild : schema->GetChildren()) {
  211. auto [iter, inserted] = schemasMap.emplace(schemaChild, tableIndex);
  212. size_t currentIndex;
  213. if (inserted) {
  214. currentIndex = tableIndex;
  215. ++tableIndex;
  216. } else {
  217. currentIndex = iter->second;
  218. }
  219. config.Attributes()["table_skiff_schemas"].Add("$" + ToString(currentIndex));
  220. }
  221. config.Attributes()["skiff_schema_registry"] = TNode::CreateMap();
  222. for (const auto& [tableSchema, index] : schemasMap) {
  223. TNode node;
  224. TNodeBuilder nodeBuilder(&node);
  225. Serialize(tableSchema, &nodeBuilder);
  226. config.Attributes()["skiff_schema_registry"][ToString(index)] = std::move(node);
  227. }
  228. return TFormat(config);
  229. }
  230. NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
  231. const TClientContext& context,
  232. const IClientRetryPolicyPtr& clientRetryPolicy,
  233. const TTransactionId& transactionId,
  234. ENodeReaderFormat nodeReaderFormat,
  235. const TVector<TRichYPath>& tablePaths,
  236. const TCreateSkiffSchemaOptions& options)
  237. {
  238. if (nodeReaderFormat == ENodeReaderFormat::Yson) {
  239. return nullptr;
  240. }
  241. for (const auto& path : tablePaths) {
  242. if (path.Columns_) {
  243. switch (nodeReaderFormat) {
  244. case ENodeReaderFormat::Skiff:
  245. ythrow TApiUsageError() << "Cannot use Skiff format with column selectors";
  246. case ENodeReaderFormat::Auto:
  247. return nullptr;
  248. default:
  249. Y_ABORT("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat));
  250. }
  251. }
  252. }
  253. auto nodes = NRawClient::BatchTransform(
  254. clientRetryPolicy->CreatePolicyForGenericRequest(),
  255. context,
  256. NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
  257. [&] (TRawBatchRequest& batch, const TRichYPath& path) {
  258. auto getOptions = TGetOptions()
  259. .AttributeFilter(
  260. TAttributeFilter()
  261. .AddAttribute("schema")
  262. .AddAttribute("dynamic")
  263. .AddAttribute("type")
  264. );
  265. return batch.Get(transactionId, path.Path_, getOptions);
  266. });
  267. TVector<NSkiff::TSkiffSchemaPtr> schemas;
  268. for (size_t tableIndex = 0; tableIndex < nodes.size(); ++tableIndex) {
  269. const auto& tablePath = tablePaths[tableIndex].Path_;
  270. const auto& attributes = nodes[tableIndex].GetAttributes();
  271. Y_ENSURE_EX(attributes["type"] == TNode("table"),
  272. TApiUsageError() << "Operation input path " << tablePath << " is not a table");
  273. bool dynamic = attributes["dynamic"].AsBool();
  274. bool strict = attributes["schema"].GetAttributes()["strict"].AsBool();
  275. switch (nodeReaderFormat) {
  276. case ENodeReaderFormat::Skiff:
  277. Y_ENSURE_EX(strict,
  278. TApiUsageError() << "Cannot use skiff format for table with non-strict schema '" << tablePath << "'");
  279. Y_ENSURE_EX(!dynamic,
  280. TApiUsageError() << "Cannot use skiff format for dynamic table '" << tablePath << "'");
  281. break;
  282. case ENodeReaderFormat::Auto:
  283. if (dynamic || !strict) {
  284. YT_LOG_DEBUG("Cannot use skiff format for table '%v' as it is dynamic or has non-strict schema",
  285. tablePath);
  286. return nullptr;
  287. }
  288. break;
  289. default:
  290. Y_ABORT("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat));
  291. }
  292. NSkiff::TSkiffSchemaPtr curSkiffSchema;
  293. if (tablePaths[tableIndex].RenameColumns_) {
  294. auto customOptions = options;
  295. customOptions.RenameColumns(*tablePaths[tableIndex].RenameColumns_);
  296. curSkiffSchema = CreateSkiffSchema(attributes["schema"], customOptions);
  297. } else {
  298. curSkiffSchema = CreateSkiffSchema(attributes["schema"], options);
  299. }
  300. if (!curSkiffSchema) {
  301. return nullptr;
  302. }
  303. schemas.push_back(curSkiffSchema);
  304. }
  305. return NSkiff::CreateVariant16Schema(std::move(schemas));
  306. }
  307. ////////////////////////////////////////////////////////////////////////////////
  308. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  309. const TVector<NSkiff::TSkiffSchemaPtr>& tableSchemas,
  310. const TCreateSkiffSchemaOptions& options
  311. ) {
  312. constexpr auto KEY_SWITCH_COLUMN = "$key_switch";
  313. constexpr auto ROW_INDEX_COLUMN = "$row_index";
  314. constexpr auto RANGE_INDEX_COLUMN = "$range_index";
  315. TVector<NSkiff::TSkiffSchemaPtr> schemas;
  316. schemas.reserve(tableSchemas.size());
  317. for (const auto& tableSchema : tableSchemas) {
  318. Y_ENSURE(tableSchema->GetWireType() == NSkiff::EWireType::Tuple,
  319. "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'");
  320. const auto& children = tableSchema->GetChildren();
  321. NSkiff::TSkiffSchemaList columns;
  322. columns.reserve(children.size() + 3);
  323. if (options.HasKeySwitch_) {
  324. columns.push_back(
  325. CreateSimpleTypeSchema(NSkiff::EWireType::Boolean)->SetName(KEY_SWITCH_COLUMN));
  326. }
  327. columns.push_back(
  328. NSkiff::CreateVariant8Schema({
  329. CreateSimpleTypeSchema(NSkiff::EWireType::Nothing),
  330. CreateSimpleTypeSchema(NSkiff::EWireType::Int64)})
  331. ->SetName(ROW_INDEX_COLUMN));
  332. if (options.HasRangeIndex_) {
  333. columns.push_back(
  334. NSkiff::CreateVariant8Schema({
  335. CreateSimpleTypeSchema(NSkiff::EWireType::Nothing),
  336. CreateSimpleTypeSchema(NSkiff::EWireType::Int64)})
  337. ->SetName(RANGE_INDEX_COLUMN));
  338. }
  339. columns.insert(columns.end(), children.begin(), children.end());
  340. schemas.push_back(NSkiff::CreateTupleSchema(columns));
  341. }
  342. return NSkiff::CreateVariant16Schema(schemas);
  343. }
  344. ////////////////////////////////////////////////////////////////////////////////
  345. } // namespace NDetail
  346. } // namespace NYT