skiff.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. #include "skiff.h"
  2. #include <yt/cpp/mapreduce/common/retry_lib.h>
  3. #include <yt/cpp/mapreduce/http/retry_request.h>
  4. #include <yt/cpp/mapreduce/http/requests.h>
  5. #include <yt/cpp/mapreduce/interface/config.h>
  6. #include <yt/cpp/mapreduce/interface/common.h>
  7. #include <yt/cpp/mapreduce/interface/serialize.h>
  8. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  9. #include <library/cpp/yson/node/node_builder.h>
  10. #include <library/cpp/yson/node/node_io.h>
  11. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  12. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  13. #include <yt/cpp/mapreduce/skiff/skiff_schema.h>
  14. #include <library/cpp/yson/consumer.h>
  15. #include <library/cpp/yson/writer.h>
  16. #include <util/string/cast.h>
  17. #include <util/stream/str.h>
  18. #include <util/stream/file.h>
  19. #include <util/folder/path.h>
  20. namespace NYT {
  21. namespace NDetail {
  22. using namespace NRawClient;
  23. using ::ToString;
  24. ////////////////////////////////////////////////////////////////////////////////
  25. static NSkiff::TSkiffSchemaPtr ReadSkiffSchema(const TString& fileName)
  26. {
  27. if (!TFsPath(fileName).Exists()) {
  28. return nullptr;
  29. }
  30. TIFStream input(fileName);
  31. NSkiff::TSkiffSchemaPtr schema;
  32. Deserialize(schema, NodeFromYsonStream(&input));
  33. return schema;
  34. }
  35. NSkiff::TSkiffSchemaPtr GetJobInputSkiffSchema()
  36. {
  37. return ReadSkiffSchema("skiff_input");
  38. }
  39. NSkiff::EWireType ValueTypeToSkiffType(EValueType valueType)
  40. {
  41. using NSkiff::EWireType;
  42. switch (valueType) {
  43. case VT_INT64:
  44. case VT_INT32:
  45. case VT_INT16:
  46. case VT_INT8:
  47. return EWireType::Int64;
  48. case VT_UINT64:
  49. case VT_UINT32:
  50. case VT_UINT16:
  51. case VT_UINT8:
  52. return EWireType::Uint64;
  53. case VT_DOUBLE:
  54. case VT_FLOAT:
  55. return EWireType::Double;
  56. case VT_BOOLEAN:
  57. return EWireType::Boolean;
  58. case VT_STRING:
  59. case VT_UTF8:
  60. case VT_JSON:
  61. case VT_UUID:
  62. return EWireType::String32;
  63. case VT_ANY:
  64. return EWireType::Yson32;
  65. case VT_NULL:
  66. case VT_VOID:
  67. return EWireType::Nothing;
  68. case VT_DATE:
  69. case VT_DATETIME:
  70. case VT_TIMESTAMP:
  71. return EWireType::Uint64;
  72. case VT_INTERVAL:
  73. return EWireType::Int64;
  74. case VT_DATE32:
  75. case VT_DATETIME64:
  76. case VT_TIMESTAMP64:
  77. case VT_INTERVAL64:
  78. return EWireType::Int64;
  79. };
  80. ythrow yexception() << "Cannot convert EValueType '" << valueType << "' to NSkiff::EWireType";
  81. }
  82. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  83. const TTableSchema& schema,
  84. const TCreateSkiffSchemaOptions& options)
  85. {
  86. using namespace NSkiff;
  87. Y_ENSURE(schema.Strict(), "Cannot create Skiff schema for non-strict table schema");
  88. TVector<TSkiffSchemaPtr> skiffColumns;
  89. for (const auto& column: schema.Columns()) {
  90. TSkiffSchemaPtr skiffColumn;
  91. if (column.Deleted().Defined() && *column.Deleted()) {
  92. continue;
  93. }
  94. if (column.Type() == VT_ANY && *column.TypeV3() != *NTi::Optional(NTi::Yson())) {
  95. // We ignore all complex types until YT-12717 is done.
  96. return nullptr;
  97. }
  98. if (column.TypeV3()->IsDecimal() ||
  99. column.TypeV3()->IsOptional() && column.TypeV3()->AsOptional()->GetItemType()->IsDecimal())
  100. {
  101. // Complex logic for decimal types, ignore them for now.
  102. return nullptr;
  103. }
  104. if (column.Required() || NTi::IsSingular(column.TypeV3()->GetTypeName())) {
  105. skiffColumn = CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()));
  106. } else {
  107. skiffColumn = CreateVariant8Schema({
  108. CreateSimpleTypeSchema(EWireType::Nothing),
  109. CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()))});
  110. }
  111. if (options.RenameColumns_) {
  112. auto maybeName = options.RenameColumns_->find(column.Name());
  113. skiffColumn->SetName(maybeName == options.RenameColumns_->end() ? column.Name() : maybeName->second);
  114. } else {
  115. skiffColumn->SetName(column.Name());
  116. }
  117. skiffColumns.push_back(skiffColumn);
  118. }
  119. if (options.HasKeySwitch_) {
  120. skiffColumns.push_back(
  121. CreateSimpleTypeSchema(EWireType::Boolean)->SetName("$key_switch"));
  122. }
  123. if (options.HasRangeIndex_) {
  124. skiffColumns.push_back(
  125. CreateVariant8Schema({
  126. CreateSimpleTypeSchema(EWireType::Nothing),
  127. CreateSimpleTypeSchema(EWireType::Int64)})
  128. ->SetName("$range_index"));
  129. }
  130. skiffColumns.push_back(
  131. CreateVariant8Schema({
  132. CreateSimpleTypeSchema(EWireType::Nothing),
  133. CreateSimpleTypeSchema(EWireType::Int64)})
  134. ->SetName("$row_index"));
  135. return CreateTupleSchema(std::move(skiffColumns));
  136. }
  137. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  138. const TNode& schemaNode,
  139. const TCreateSkiffSchemaOptions& options)
  140. {
  141. TTableSchema schema;
  142. Deserialize(schema, schemaNode);
  143. return CreateSkiffSchema(schema, options);
  144. }
  145. void Serialize(const NSkiff::TSkiffSchemaPtr& schema, NYson::IYsonConsumer* consumer)
  146. {
  147. consumer->OnBeginMap();
  148. if (schema->GetName().size() > 0) {
  149. consumer->OnKeyedItem("name");
  150. consumer->OnStringScalar(schema->GetName());
  151. }
  152. consumer->OnKeyedItem("wire_type");
  153. consumer->OnStringScalar(ToString(schema->GetWireType()));
  154. if (schema->GetChildren().size() > 0) {
  155. consumer->OnKeyedItem("children");
  156. consumer->OnBeginList();
  157. for (const auto& child : schema->GetChildren()) {
  158. consumer->OnListItem();
  159. Serialize(child, consumer);
  160. }
  161. consumer->OnEndList();
  162. }
  163. consumer->OnEndMap();
  164. }
  165. void Deserialize(NSkiff::TSkiffSchemaPtr& schema, const TNode& node)
  166. {
  167. using namespace NSkiff;
  168. static auto createSchema = [](EWireType wireType, TVector<TSkiffSchemaPtr>&& children) -> TSkiffSchemaPtr {
  169. switch (wireType) {
  170. case EWireType::Tuple:
  171. return CreateTupleSchema(std::move(children));
  172. case EWireType::Variant8:
  173. return CreateVariant8Schema(std::move(children));
  174. case EWireType::Variant16:
  175. return CreateVariant16Schema(std::move(children));
  176. case EWireType::RepeatedVariant8:
  177. return CreateRepeatedVariant8Schema(std::move(children));
  178. case EWireType::RepeatedVariant16:
  179. return CreateRepeatedVariant16Schema(std::move(children));
  180. default:
  181. return CreateSimpleTypeSchema(wireType);
  182. }
  183. };
  184. const auto& map = node.AsMap();
  185. const auto* wireTypePtr = map.FindPtr("wire_type");
  186. Y_ENSURE(wireTypePtr, "'wire_type' is a required key");
  187. auto wireType = FromString<NSkiff::EWireType>(wireTypePtr->AsString());
  188. const auto* childrenPtr = map.FindPtr("children");
  189. Y_ENSURE(NSkiff::IsSimpleType(wireType) || childrenPtr,
  190. "'children' key is required for complex node '" << wireType << "'");
  191. TVector<TSkiffSchemaPtr> children;
  192. if (childrenPtr) {
  193. for (const auto& childNode : childrenPtr->AsList()) {
  194. TSkiffSchemaPtr childSchema;
  195. Deserialize(childSchema, childNode);
  196. children.push_back(std::move(childSchema));
  197. }
  198. }
  199. schema = createSchema(wireType, std::move(children));
  200. const auto* namePtr = map.FindPtr("name");
  201. if (namePtr) {
  202. schema->SetName(namePtr->AsString());
  203. }
  204. }
  205. TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {
  206. Y_ENSURE(schema->GetWireType() == NSkiff::EWireType::Variant16,
  207. "Bad wire type for schema; expected 'variant16', got " << schema->GetWireType());
  208. THashMap<
  209. NSkiff::TSkiffSchemaPtr,
  210. size_t,
  211. NSkiff::TSkiffSchemaPtrHasher,
  212. NSkiff::TSkiffSchemaPtrEqual> schemasMap;
  213. size_t tableIndex = 0;
  214. auto config = TNode("skiff");
  215. config.Attributes()["table_skiff_schemas"] = TNode::CreateList();
  216. for (const auto& schemaChild : schema->GetChildren()) {
  217. auto [iter, inserted] = schemasMap.emplace(schemaChild, tableIndex);
  218. size_t currentIndex;
  219. if (inserted) {
  220. currentIndex = tableIndex;
  221. ++tableIndex;
  222. } else {
  223. currentIndex = iter->second;
  224. }
  225. config.Attributes()["table_skiff_schemas"].Add("$" + ToString(currentIndex));
  226. }
  227. config.Attributes()["skiff_schema_registry"] = TNode::CreateMap();
  228. for (const auto& [tableSchema, index] : schemasMap) {
  229. TNode node;
  230. TNodeBuilder nodeBuilder(&node);
  231. Serialize(tableSchema, &nodeBuilder);
  232. config.Attributes()["skiff_schema_registry"][ToString(index)] = std::move(node);
  233. }
  234. return TFormat(config);
  235. }
  236. NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
  237. const TClientContext& context,
  238. const IClientRetryPolicyPtr& clientRetryPolicy,
  239. const TTransactionId& transactionId,
  240. ENodeReaderFormat nodeReaderFormat,
  241. const TVector<TRichYPath>& tablePaths,
  242. const TCreateSkiffSchemaOptions& options)
  243. {
  244. if (nodeReaderFormat == ENodeReaderFormat::Yson) {
  245. return nullptr;
  246. }
  247. for (const auto& path : tablePaths) {
  248. if (path.Columns_) {
  249. switch (nodeReaderFormat) {
  250. case ENodeReaderFormat::Skiff:
  251. ythrow TApiUsageError() << "Cannot use Skiff format with column selectors";
  252. case ENodeReaderFormat::Auto:
  253. return nullptr;
  254. default:
  255. Y_ABORT("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat));
  256. }
  257. }
  258. }
  259. auto nodes = NRawClient::BatchTransform(
  260. clientRetryPolicy->CreatePolicyForGenericRequest(),
  261. context,
  262. NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
  263. [&] (TRawBatchRequest& batch, const TRichYPath& path) {
  264. auto getOptions = TGetOptions()
  265. .AttributeFilter(
  266. TAttributeFilter()
  267. .AddAttribute("schema")
  268. .AddAttribute("dynamic")
  269. .AddAttribute("type")
  270. );
  271. return batch.Get(transactionId, path.Path_, getOptions);
  272. });
  273. TVector<NSkiff::TSkiffSchemaPtr> schemas;
  274. for (size_t tableIndex = 0; tableIndex < nodes.size(); ++tableIndex) {
  275. const auto& tablePath = tablePaths[tableIndex].Path_;
  276. const auto& attributes = nodes[tableIndex].GetAttributes();
  277. Y_ENSURE_EX(attributes["type"] == TNode("table"),
  278. TApiUsageError() << "Operation input path " << tablePath << " is not a table");
  279. bool dynamic = attributes["dynamic"].AsBool();
  280. bool strict = attributes["schema"].GetAttributes()["strict"].AsBool();
  281. switch (nodeReaderFormat) {
  282. case ENodeReaderFormat::Skiff:
  283. Y_ENSURE_EX(strict,
  284. TApiUsageError() << "Cannot use skiff format for table with non-strict schema '" << tablePath << "'");
  285. Y_ENSURE_EX(!dynamic,
  286. TApiUsageError() << "Cannot use skiff format for dynamic table '" << tablePath << "'");
  287. break;
  288. case ENodeReaderFormat::Auto:
  289. if (dynamic || !strict) {
  290. YT_LOG_DEBUG("Cannot use skiff format for table '%v' as it is dynamic or has non-strict schema",
  291. tablePath);
  292. return nullptr;
  293. }
  294. break;
  295. default:
  296. Y_ABORT("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat));
  297. }
  298. NSkiff::TSkiffSchemaPtr curSkiffSchema;
  299. if (tablePaths[tableIndex].RenameColumns_) {
  300. auto customOptions = options;
  301. customOptions.RenameColumns(*tablePaths[tableIndex].RenameColumns_);
  302. curSkiffSchema = CreateSkiffSchema(attributes["schema"], customOptions);
  303. } else {
  304. curSkiffSchema = CreateSkiffSchema(attributes["schema"], options);
  305. }
  306. if (!curSkiffSchema) {
  307. return nullptr;
  308. }
  309. schemas.push_back(curSkiffSchema);
  310. }
  311. return NSkiff::CreateVariant16Schema(std::move(schemas));
  312. }
  313. ////////////////////////////////////////////////////////////////////////////////
  314. NSkiff::TSkiffSchemaPtr CreateSkiffSchema(
  315. const TVector<NSkiff::TSkiffSchemaPtr>& tableSchemas,
  316. const TCreateSkiffSchemaOptions& options
  317. ) {
  318. constexpr auto KEY_SWITCH_COLUMN = "$key_switch";
  319. constexpr auto ROW_INDEX_COLUMN = "$row_index";
  320. constexpr auto RANGE_INDEX_COLUMN = "$range_index";
  321. TVector<NSkiff::TSkiffSchemaPtr> schemas;
  322. schemas.reserve(tableSchemas.size());
  323. for (const auto& tableSchema : tableSchemas) {
  324. Y_ENSURE(tableSchema->GetWireType() == NSkiff::EWireType::Tuple,
  325. "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'");
  326. const auto& children = tableSchema->GetChildren();
  327. NSkiff::TSkiffSchemaList columns;
  328. columns.reserve(children.size() + 3);
  329. if (options.HasKeySwitch_) {
  330. columns.push_back(
  331. CreateSimpleTypeSchema(NSkiff::EWireType::Boolean)->SetName(KEY_SWITCH_COLUMN));
  332. }
  333. columns.push_back(
  334. NSkiff::CreateVariant8Schema({
  335. CreateSimpleTypeSchema(NSkiff::EWireType::Nothing),
  336. CreateSimpleTypeSchema(NSkiff::EWireType::Int64)})
  337. ->SetName(ROW_INDEX_COLUMN));
  338. if (options.HasRangeIndex_) {
  339. columns.push_back(
  340. NSkiff::CreateVariant8Schema({
  341. CreateSimpleTypeSchema(NSkiff::EWireType::Nothing),
  342. CreateSimpleTypeSchema(NSkiff::EWireType::Int64)})
  343. ->SetName(RANGE_INDEX_COLUMN));
  344. }
  345. columns.insert(columns.end(), children.begin(), children.end());
  346. schemas.push_back(NSkiff::CreateTupleSchema(columns));
  347. }
  348. return NSkiff::CreateVariant16Schema(schemas);
  349. }
  350. ////////////////////////////////////////////////////////////////////////////////
  351. } // namespace NDetail
  352. } // namespace NYT