structured_table_formats.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. #include "structured_table_formats.h"
  2. #include "format_hints.h"
  3. #include "skiff.h"
  4. #include <yt/cpp/mapreduce/common/retry_lib.h>
  5. #include <yt/cpp/mapreduce/common/retry_request.h>
  6. #include <yt/cpp/mapreduce/interface/common.h>
  7. #include <yt/cpp/mapreduce/interface/raw_client.h>
  8. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  9. #include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
  10. #include <yt/cpp/mapreduce/http_client/raw_requests.h>
  11. #include <library/cpp/type_info/type_info.h>
  12. #include <library/cpp/yson/writer.h>
  13. namespace NYT {
  14. ////////////////////////////////////////////////////////////////////////////////
  15. TMaybe<TNode> GetCommonTableFormat(
  16. const TVector<TMaybe<TNode>>& formats)
  17. {
  18. TMaybe<TNode> result;
  19. bool start = true;
  20. for (auto& format : formats) {
  21. if (start) {
  22. result = format;
  23. start = false;
  24. continue;
  25. }
  26. if (result.Defined() != format.Defined()) {
  27. ythrow yexception() << "Different formats of input tables";
  28. }
  29. if (!result.Defined()) {
  30. continue;
  31. }
  32. auto& resultAttrs = result.Get()->GetAttributes();
  33. auto& formatAttrs = format.Get()->GetAttributes();
  34. if (resultAttrs["key_column_names"] != formatAttrs["key_column_names"]) {
  35. ythrow yexception() << "Different formats of input tables";
  36. }
  37. bool hasSubkeyColumns = resultAttrs.HasKey("subkey_column_names");
  38. if (hasSubkeyColumns != formatAttrs.HasKey("subkey_column_names")) {
  39. ythrow yexception() << "Different formats of input tables";
  40. }
  41. if (hasSubkeyColumns &&
  42. resultAttrs["subkey_column_names"] != formatAttrs["subkey_column_names"])
  43. {
  44. ythrow yexception() << "Different formats of input tables";
  45. }
  46. }
  47. return result;
  48. }
  49. TMaybe<TNode> GetTableFormat(
  50. const IClientRetryPolicyPtr& retryPolicy,
  51. const IRawClientPtr& rawClient,
  52. const TTransactionId& transactionId,
  53. const TRichYPath& path)
  54. {
  55. auto formatPath = path.Path_ + "/@_format";
  56. auto exists = NDetail::RequestWithRetry<bool>(
  57. retryPolicy->CreatePolicyForGenericRequest(),
  58. [&rawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) {
  59. return rawClient->Exists(transactionId, formatPath);
  60. });
  61. if (!exists) {
  62. return TMaybe<TNode>();
  63. }
  64. auto format = NDetail::RequestWithRetry<TMaybe<TNode>>(
  65. retryPolicy->CreatePolicyForGenericRequest(),
  66. [&rawClient, &transactionId, &formatPath] (TMutationId /*mutationId*/) {
  67. return rawClient->Get(transactionId, formatPath);
  68. });
  69. if (format.Get()->AsString() != "yamred_dsv") {
  70. return TMaybe<TNode>();
  71. }
  72. auto& formatAttrs = format.Get()->Attributes();
  73. if (!formatAttrs.HasKey("key_column_names")) {
  74. ythrow yexception() <<
  75. "Table '" << path.Path_ << "': attribute 'key_column_names' is missing";
  76. }
  77. formatAttrs["has_subkey"] = "true";
  78. formatAttrs["lenval"] = "true";
  79. return format;
  80. }
  81. TMaybe<TNode> GetTableFormats(
  82. const IClientRetryPolicyPtr& clientRetryPolicy,
  83. const IRawClientPtr& rawClient,
  84. const TTransactionId& transactionId,
  85. const TVector<TRichYPath>& inputs)
  86. {
  87. TVector<TMaybe<TNode>> formats;
  88. for (auto& table : inputs) {
  89. formats.push_back(GetTableFormat(clientRetryPolicy, rawClient, transactionId, table));
  90. }
  91. return GetCommonTableFormat(formats);
  92. }
  93. ////////////////////////////////////////////////////////////////////////////////
  94. namespace NDetail {
  95. ////////////////////////////////////////////////////////////////////////////////
  96. NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
  97. const IRawClientPtr& rawClient,
  98. const TTransactionId& transactionId,
  99. const TVector<TRichYPath>& tables,
  100. const TOperationOptions& options,
  101. ENodeReaderFormat nodeReaderFormat)
  102. {
  103. bool hasInputQuery = options.Spec_.Defined() && options.Spec_->IsMap() && options.Spec_->HasKey("input_query");
  104. if (hasInputQuery) {
  105. Y_ENSURE_EX(nodeReaderFormat != ENodeReaderFormat::Skiff,
  106. TApiUsageError() << "Cannot use Skiff format for operations with 'input_query' in spec");
  107. return nullptr;
  108. }
  109. return CreateSkiffSchemaIfNecessary(
  110. rawClient,
  111. transactionId,
  112. nodeReaderFormat,
  113. tables,
  114. TCreateSkiffSchemaOptions()
  115. .HasKeySwitch(true)
  116. .HasRangeIndex(true));
  117. }
  118. TString CreateSkiffConfig(const NSkiff::TSkiffSchemaPtr& schema)
  119. {
  120. TString result;
  121. TStringOutput stream(result);
  122. ::NYson::TYsonWriter writer(&stream);
  123. Serialize(schema, &writer);
  124. return result;
  125. }
  126. TString CreateProtoConfig(const TVector<const ::google::protobuf::Descriptor*>& descriptorList)
  127. {
  128. TString result;
  129. TStringOutput messageTypeList(result);
  130. for (const auto& descriptor : descriptorList) {
  131. messageTypeList << descriptor->full_name() << Endl;
  132. }
  133. return result;
  134. }
  135. ////////////////////////////////////////////////////////////////////////////////
  136. struct TGetTableStructureDescriptionStringImpl {
  137. template<typename T>
  138. TString operator()(const T& description) {
  139. if constexpr (std::is_same_v<T, TUnspecifiedTableStructure>) {
  140. return "Unspecified";
  141. } else if constexpr (std::is_same_v<T, TProtobufTableStructure>) {
  142. TString res;
  143. TStringStream out(res);
  144. if (description.Descriptor) {
  145. out << description.Descriptor->full_name();
  146. } else {
  147. out << "<unknown>";
  148. }
  149. out << " protobuf message";
  150. return res;
  151. } else {
  152. static_assert(TDependentFalse<T>, "Unknown type");
  153. }
  154. }
  155. };
  156. TString GetTableStructureDescriptionString(const TTableStructure& tableStructure)
  157. {
  158. return std::visit(TGetTableStructureDescriptionStringImpl(), tableStructure);
  159. }
  160. ////////////////////////////////////////////////////////////////////////////////
  161. TString JobTablePathString(const TStructuredJobTable& jobTable)
  162. {
  163. if (jobTable.RichYPath) {
  164. return jobTable.RichYPath->Path_;
  165. } else {
  166. return "<intermediate-table>";
  167. }
  168. }
  169. TStructuredJobTableList ToStructuredJobTableList(const TVector<TStructuredTablePath>& tableList)
  170. {
  171. TStructuredJobTableList result;
  172. for (const auto& table : tableList) {
  173. result.push_back(TStructuredJobTable{table.Description, table.RichYPath});
  174. }
  175. return result;
  176. }
  177. TStructuredJobTableList CanonizeStructuredTableList(const IRawClientPtr& rawClient, const TVector<TStructuredTablePath>& tableList)
  178. {
  179. TVector<TRichYPath> toCanonize;
  180. toCanonize.reserve(tableList.size());
  181. for (const auto& table : tableList) {
  182. toCanonize.emplace_back(table.RichYPath);
  183. }
  184. const auto canonized = NRawClient::CanonizeYPaths(rawClient, toCanonize);
  185. Y_ABORT_UNLESS(canonized.size() == tableList.size());
  186. TStructuredJobTableList result;
  187. result.reserve(tableList.size());
  188. for (size_t i = 0; i != canonized.size(); ++i) {
  189. result.emplace_back(TStructuredJobTable{tableList[i].Description, canonized[i]});
  190. }
  191. return result;
  192. }
  193. TVector<TRichYPath> GetPathList(
  194. const TStructuredJobTableList& tableList,
  195. const TMaybe<TVector<TTableSchema>>& jobSchemaInferenceResult,
  196. bool inferSchemaFromDescriptions)
  197. {
  198. Y_ABORT_UNLESS(!jobSchemaInferenceResult || tableList.size() == jobSchemaInferenceResult->size());
  199. auto maybeInferSchema = [&] (const TStructuredJobTable& table, ui32 tableIndex) -> TMaybe<TTableSchema> {
  200. if (jobSchemaInferenceResult && !jobSchemaInferenceResult->at(tableIndex).Empty()) {
  201. return jobSchemaInferenceResult->at(tableIndex);
  202. }
  203. if (inferSchemaFromDescriptions) {
  204. return GetTableSchema(table.Description);
  205. }
  206. return Nothing();
  207. };
  208. TVector<TRichYPath> result;
  209. result.reserve(tableList.size());
  210. for (size_t tableIndex = 0; tableIndex != tableList.size(); ++tableIndex) {
  211. const auto& table = tableList[tableIndex];
  212. Y_ABORT_UNLESS(table.RichYPath, "Cannot get path for intermediate table");
  213. auto richYPath = *table.RichYPath;
  214. if (!richYPath.Schema_) {
  215. if (auto schema = maybeInferSchema(table, tableIndex)) {
  216. richYPath.Schema(std::move(*schema));
  217. }
  218. }
  219. result.emplace_back(std::move(richYPath));
  220. }
  221. return result;
  222. }
  223. TStructuredRowStreamDescription GetJobStreamDescription(
  224. const IStructuredJob& job,
  225. EIODirection direction)
  226. {
  227. switch (direction) {
  228. case EIODirection::Input:
  229. return job.GetInputRowStreamDescription();
  230. case EIODirection::Output:
  231. return job.GetOutputRowStreamDescription();
  232. default:
  233. Y_ABORT("unreachable");
  234. }
  235. }
  236. TString GetSuffix(EIODirection direction)
  237. {
  238. switch (direction) {
  239. case EIODirection::Input:
  240. return "_input";
  241. case EIODirection::Output:
  242. return "_output";
  243. }
  244. Y_ABORT("unreachable");
  245. }
  246. TString GetAddIOMethodName(EIODirection direction)
  247. {
  248. switch (direction) {
  249. case EIODirection::Input:
  250. return "AddInput<>";
  251. case EIODirection::Output:
  252. return "AddOutput<>";
  253. }
  254. Y_ABORT("unreachable");
  255. }
  256. ////////////////////////////////////////////////////////////////////////////////
  257. struct TFormatBuilder::TFormatSwitcher
  258. {
  259. template <typename T>
  260. auto operator() (const T& /*t*/) {
  261. if constexpr (std::is_same_v<T, TTNodeStructuredRowStream>) {
  262. return &TFormatBuilder::CreateNodeFormat;
  263. } else if constexpr (std::is_same_v<T, TTYaMRRowStructuredRowStream>) {
  264. return &TFormatBuilder::CreateYamrFormat;
  265. } else if constexpr (std::is_same_v<T, TProtobufStructuredRowStream>) {
  266. return &TFormatBuilder::CreateProtobufFormat;
  267. } else if constexpr (std::is_same_v<T, TVoidStructuredRowStream>) {
  268. return &TFormatBuilder::CreateVoidFormat;
  269. } else {
  270. static_assert(TDependentFalse<T>, "unknown stream description");
  271. }
  272. }
  273. };
  274. TFormatBuilder::TFormatBuilder(
  275. IRawClientPtr rawClient,
  276. IClientRetryPolicyPtr clientRetryPolicy,
  277. TClientContext context,
  278. TTransactionId transactionId,
  279. TOperationOptions operationOptions)
  280. : RawClient_(std::move(rawClient))
  281. , ClientRetryPolicy_(std::move(clientRetryPolicy))
  282. , Context_(std::move(context))
  283. , TransactionId_(transactionId)
  284. , OperationOptions_(std::move(operationOptions))
  285. { }
  286. std::pair <TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateFormat(
  287. const IStructuredJob& job,
  288. const EIODirection& direction,
  289. const TStructuredJobTableList& structuredTableList,
  290. const TMaybe <TFormatHints>& formatHints,
  291. ENodeReaderFormat nodeReaderFormat,
  292. bool allowFormatFromTableAttribute)
  293. {
  294. auto jobStreamDescription = GetJobStreamDescription(job, direction);
  295. auto method = std::visit(TFormatSwitcher(), jobStreamDescription);
  296. return (this->*method)(
  297. job,
  298. direction,
  299. structuredTableList,
  300. formatHints,
  301. nodeReaderFormat,
  302. allowFormatFromTableAttribute);
  303. }
  304. std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateVoidFormat(
  305. const IStructuredJob& /*job*/,
  306. const EIODirection& /*direction*/,
  307. const TStructuredJobTableList& /*structuredTableList*/,
  308. const TMaybe<TFormatHints>& /*formatHints*/,
  309. ENodeReaderFormat /*nodeReaderFormat*/,
  310. bool /*allowFormatFromTableAttribute*/)
  311. {
  312. return {
  313. TFormat(),
  314. Nothing()
  315. };
  316. }
  317. std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateYamrFormat(
  318. const IStructuredJob& job,
  319. const EIODirection& direction,
  320. const TStructuredJobTableList& structuredTableList,
  321. const TMaybe<TFormatHints>& /*formatHints*/,
  322. ENodeReaderFormat /*nodeReaderFormat*/,
  323. bool allowFormatFromTableAttribute)
  324. {
  325. for (const auto& table: structuredTableList) {
  326. if (!std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) {
  327. ythrow TApiUsageError()
  328. << "cannot use " << direction << " table '" << JobTablePathString(table)
  329. << "' with job " << TJobFactory::Get()->GetJobName(&job) << "; "
  330. << "table has unsupported structure description; check " << GetAddIOMethodName(direction) << " for this table";
  331. }
  332. }
  333. TMaybe<TNode> formatFromTableAttributes;
  334. if (allowFormatFromTableAttribute && OperationOptions_.UseTableFormats_) {
  335. TVector<TRichYPath> tableList;
  336. for (const auto& table: structuredTableList) {
  337. Y_ABORT_UNLESS(table.RichYPath, "Cannot use format from table for intermediate table");
  338. tableList.push_back(*table.RichYPath);
  339. }
  340. formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, RawClient_, TransactionId_, tableList);
  341. }
  342. if (formatFromTableAttributes) {
  343. return {
  344. TFormat(*formatFromTableAttributes),
  345. Nothing()
  346. };
  347. } else {
  348. auto formatNode = TNode("yamr");
  349. formatNode.Attributes() = TNode()
  350. ("lenval", true)
  351. ("has_subkey", true)
  352. ("enable_table_index", true);
  353. return {
  354. TFormat(formatNode),
  355. Nothing()
  356. };
  357. }
  358. }
  359. std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat(
  360. const IStructuredJob& job,
  361. const EIODirection& direction,
  362. const TStructuredJobTableList& structuredTableList,
  363. const TMaybe<TFormatHints>& formatHints,
  364. ENodeReaderFormat nodeReaderFormat,
  365. bool /*allowFormatFromTableAttribute*/)
  366. {
  367. for (const auto& table: structuredTableList) {
  368. if (!std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) {
  369. ythrow TApiUsageError()
  370. << "cannot use " << direction << " table '" << JobTablePathString(table)
  371. << "' with job " << TJobFactory::Get()->GetJobName(&job) << "; "
  372. << "table has unsupported structure description; check AddInput<> / AddOutput<> for this table";
  373. }
  374. }
  375. NSkiff::TSkiffSchemaPtr skiffSchema = nullptr;
  376. if (nodeReaderFormat != ENodeReaderFormat::Yson) {
  377. TVector<TRichYPath> tableList;
  378. for (const auto& table: structuredTableList) {
  379. Y_ABORT_UNLESS(table.RichYPath, "Cannot use skiff with temporary tables");
  380. tableList.emplace_back(*table.RichYPath);
  381. }
  382. skiffSchema = TryCreateSkiffSchema(
  383. RawClient_,
  384. TransactionId_,
  385. tableList,
  386. OperationOptions_,
  387. nodeReaderFormat);
  388. }
  389. if (skiffSchema) {
  390. auto format = CreateSkiffFormat(skiffSchema);
  391. NYT::NDetail::ApplyFormatHints<TNode>(&format, formatHints);
  392. return {
  393. format,
  394. TSmallJobFile{
  395. TString("skiff") + GetSuffix(direction),
  396. CreateSkiffConfig(skiffSchema)
  397. }
  398. };
  399. } else {
  400. auto format = TFormat::YsonBinary();
  401. NYT::NDetail::ApplyFormatHints<TNode>(&format, formatHints);
  402. return {
  403. format,
  404. Nothing()
  405. };
  406. }
  407. }
  408. [[noreturn]] static void ThrowUnsupportedStructureDescription(
  409. const EIODirection& direction,
  410. const TStructuredJobTable& table,
  411. const IStructuredJob& job)
  412. {
  413. ythrow TApiUsageError()
  414. << "cannot use " << direction << " table '" << JobTablePathString(table)
  415. << "' with job " << TJobFactory::Get()->GetJobName(&job) << "; "
  416. << "table has unsupported structure description; check " << GetAddIOMethodName(direction) << " for this table";
  417. }
  418. [[noreturn]] static void ThrowTypeDeriveFail(
  419. const EIODirection& direction,
  420. const IStructuredJob& job,
  421. const TString& type)
  422. {
  423. ythrow TApiUsageError()
  424. << "Cannot derive exact " << type << " type for intermediate " << direction << " table for job "
  425. << TJobFactory::Get()->GetJobName(&job)
  426. << "; use one of TMapReduceOperationSpec::Hint* methods to specify intermediate table structure";
  427. }
  428. [[noreturn]] static void ThrowUnexpectedDifferentDescriptors(
  429. const EIODirection& direction,
  430. const TStructuredJobTable& table,
  431. const IStructuredJob& job,
  432. const TMaybe<TStringBuf> jobDescriptorName,
  433. const TMaybe<TStringBuf> descriptorName)
  434. {
  435. ythrow TApiUsageError()
  436. << "Job " << TJobFactory::Get()->GetJobName(&job) << " expects "
  437. << jobDescriptorName << " as " << direction << ", but table " << JobTablePathString(table)
  438. << " is tagged with " << descriptorName;
  439. }
  440. std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateProtobufFormat(
  441. const IStructuredJob& job,
  442. const EIODirection& direction,
  443. const TStructuredJobTableList& structuredTableList,
  444. const TMaybe<TFormatHints>& /*formatHints*/,
  445. ENodeReaderFormat /*nodeReaderFormat*/,
  446. bool /*allowFormatFromTableAttribute*/)
  447. {
  448. if (Context_.Config->UseClientProtobuf) {
  449. return {
  450. TFormat::YsonBinary(),
  451. TSmallJobFile{
  452. TString("proto") + GetSuffix(direction),
  453. CreateProtoConfig({}),
  454. },
  455. };
  456. }
  457. const ::google::protobuf::Descriptor* const jobDescriptor =
  458. std::get<TProtobufStructuredRowStream>(GetJobStreamDescription(job, direction)).Descriptor;
  459. Y_ENSURE(!structuredTableList.empty(),
  460. "empty " << direction << " tables for job " << TJobFactory::Get()->GetJobName(&job));
  461. TVector<const ::google::protobuf::Descriptor*> descriptorList;
  462. for (const auto& table : structuredTableList) {
  463. const ::google::protobuf::Descriptor* descriptor = nullptr;
  464. if (std::holds_alternative<TProtobufTableStructure>(table.Description)) {
  465. descriptor = std::get<TProtobufTableStructure>(table.Description).Descriptor;
  466. } else if (table.RichYPath) {
  467. ThrowUnsupportedStructureDescription(direction, table, job);
  468. }
  469. if (!descriptor) {
  470. // It must be intermediate table, because there is no proper way to add such table to spec
  471. // (AddInput requires to specify proper message).
  472. Y_ABORT_UNLESS(!table.RichYPath, "Descriptors for all tables except intermediate must be known");
  473. if (jobDescriptor) {
  474. descriptor = jobDescriptor;
  475. } else {
  476. ThrowTypeDeriveFail(direction, job, "protobuf");
  477. }
  478. }
  479. if (jobDescriptor && descriptor != jobDescriptor) {
  480. ThrowUnexpectedDifferentDescriptors(
  481. direction,
  482. table,
  483. job,
  484. jobDescriptor->full_name(),
  485. descriptor->full_name());
  486. }
  487. descriptorList.push_back(descriptor);
  488. }
  489. Y_ABORT_UNLESS(!descriptorList.empty(), "Messages for proto format are unknown (empty ProtoDescriptors)");
  490. return {
  491. TFormat::Protobuf(descriptorList, Context_.Config->ProtobufFormatWithDescriptors),
  492. TSmallJobFile{
  493. TString("proto") + GetSuffix(direction),
  494. CreateProtoConfig(descriptorList)
  495. },
  496. };
  497. }
  498. ////////////////////////////////////////////////////////////////////////////////
  499. struct TGetTableSchemaImpl
  500. {
  501. template <typename T>
  502. TMaybe<TTableSchema> operator() (const T& description) {
  503. if constexpr (std::is_same_v<T, TUnspecifiedTableStructure>) {
  504. return Nothing();
  505. } else if constexpr (std::is_same_v<T, TProtobufTableStructure>) {
  506. if (!description.Descriptor) {
  507. return Nothing();
  508. }
  509. return CreateTableSchema(*description.Descriptor);
  510. } else {
  511. static_assert(TDependentFalse<T>, "unknown type");
  512. }
  513. }
  514. };
  515. TMaybe<TTableSchema> GetTableSchema(const TTableStructure& tableStructure)
  516. {
  517. return std::visit(TGetTableSchemaImpl(), tableStructure);
  518. }
  519. ////////////////////////////////////////////////////////////////////////////////
  520. } // namespace NDetail
  521. } // namespace NYT