operation.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. #include "operation.h"
  2. #include <yt/cpp/mapreduce/interface/helpers.h>
  3. #include <util/generic/iterator_range.h>
  4. namespace NYT {
  5. ////////////////////////////////////////////////////////////////////////////////
  6. namespace NDetail {
  7. i64 OutputTableCount = -1;
  8. } // namespace NDetail
  9. ////////////////////////////////////////////////////////////////////////////////
  10. TTaskName::TTaskName(TString taskName)
  11. : TaskName_(std::move(taskName))
  12. { }
  13. TTaskName::TTaskName(const char* taskName)
  14. : TaskName_(taskName)
  15. { }
  16. TTaskName::TTaskName(ETaskName taskName)
  17. : TaskName_(ToString(taskName))
  18. { }
  19. const TString& TTaskName::Get() const
  20. {
  21. return TaskName_;
  22. }
  23. ////////////////////////////////////////////////////////////////////////////////
  24. TCommandRawJob::TCommandRawJob(TStringBuf command)
  25. : Command_(command)
  26. { }
  27. const TString& TCommandRawJob::GetCommand() const
  28. {
  29. return Command_;
  30. }
  31. void TCommandRawJob::Do(const TRawJobContext& /* jobContext */)
  32. {
  33. Y_ABORT("TCommandRawJob::Do must not be called");
  34. }
  35. REGISTER_NAMED_RAW_JOB("NYT::TCommandRawJob", TCommandRawJob)
  36. ////////////////////////////////////////////////////////////////////////////////
  37. TCommandVanillaJob::TCommandVanillaJob(TStringBuf command)
  38. : Command_(command)
  39. { }
  40. const TString& TCommandVanillaJob::GetCommand() const
  41. {
  42. return Command_;
  43. }
  44. void TCommandVanillaJob::Do()
  45. {
  46. Y_ABORT("TCommandVanillaJob::Do must not be called");
  47. }
  48. REGISTER_NAMED_VANILLA_JOB("NYT::TCommandVanillaJob", TCommandVanillaJob);
  49. ////////////////////////////////////////////////////////////////////////////////
  50. bool operator==(const TUnspecifiedTableStructure&, const TUnspecifiedTableStructure&)
  51. {
  52. return true;
  53. }
  54. bool operator==(const TProtobufTableStructure& lhs, const TProtobufTableStructure& rhs)
  55. {
  56. return lhs.Descriptor == rhs.Descriptor;
  57. }
  58. ////////////////////////////////////////////////////////////////////////////////
  59. const TVector<TStructuredTablePath>& TOperationInputSpecBase::GetStructuredInputs() const
  60. {
  61. return StructuredInputs_;
  62. }
  63. const TVector<TStructuredTablePath>& TOperationOutputSpecBase::GetStructuredOutputs() const
  64. {
  65. return StructuredOutputs_;
  66. }
  67. void TOperationInputSpecBase::AddStructuredInput(TStructuredTablePath path)
  68. {
  69. Inputs_.push_back(path.RichYPath);
  70. StructuredInputs_.push_back(std::move(path));
  71. }
  72. void TOperationOutputSpecBase::AddStructuredOutput(TStructuredTablePath path)
  73. {
  74. Outputs_.push_back(path.RichYPath);
  75. StructuredOutputs_.push_back(std::move(path));
  76. }
  77. ////////////////////////////////////////////////////////////////////////////////
  78. TVanillaTask& TVanillaTask::AddStructuredOutput(TStructuredTablePath path)
  79. {
  80. TOperationOutputSpecBase::AddStructuredOutput(std::move(path));
  81. return *this;
  82. }
  83. ////////////////////////////////////////////////////////////////////////////////
  84. TStructuredRowStreamDescription IVanillaJob<void>::GetInputRowStreamDescription() const
  85. {
  86. return TVoidStructuredRowStream();
  87. }
  88. TStructuredRowStreamDescription IVanillaJob<void>::GetOutputRowStreamDescription() const
  89. {
  90. return TVoidStructuredRowStream();
  91. }
  92. ////////////////////////////////////////////////////////////////////////////////
  93. TRawJobContext::TRawJobContext(size_t outputTableCount)
  94. : InputFile_(Duplicate(0))
  95. {
  96. for (size_t i = 0; i != outputTableCount; ++i) {
  97. OutputFileList_.emplace_back(Duplicate(3 * i + GetJobFirstOutputTableFD()));
  98. }
  99. }
  100. const TFile& TRawJobContext::GetInputFile() const
  101. {
  102. return InputFile_;
  103. }
  104. const TVector<TFile>& TRawJobContext::GetOutputFileList() const
  105. {
  106. return OutputFileList_;
  107. }
  108. ////////////////////////////////////////////////////////////////////////////////
  109. TUserJobSpec& TUserJobSpec::AddLocalFile(
  110. const TLocalFilePath& path,
  111. const TAddLocalFileOptions& options)
  112. {
  113. LocalFiles_.emplace_back(path, options);
  114. return *this;
  115. }
  116. TUserJobSpec& TUserJobSpec::JobBinaryLocalPath(TString path, TMaybe<TString> md5)
  117. {
  118. JobBinary_ = TJobBinaryLocalPath{path, md5};
  119. return *this;
  120. }
  121. TUserJobSpec& TUserJobSpec::JobBinaryCypressPath(TString path, TMaybe<TTransactionId> transactionId)
  122. {
  123. JobBinary_ = TJobBinaryCypressPath{path, transactionId};
  124. return *this;
  125. }
  126. const TJobBinaryConfig& TUserJobSpec::GetJobBinary() const
  127. {
  128. return JobBinary_;
  129. }
  130. TVector<std::tuple<TLocalFilePath, TAddLocalFileOptions>> TUserJobSpec::GetLocalFiles() const
  131. {
  132. return LocalFiles_;
  133. }
  134. ////////////////////////////////////////////////////////////////////////////////
  135. TJobOperationPreparer::TInputGroup::TInputGroup(TJobOperationPreparer& preparer, TVector<int> indices)
  136. : Preparer_(preparer)
  137. , Indices_(std::move(indices))
  138. { }
  139. TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::ColumnRenaming(const THashMap<TString, TString>& renaming)
  140. {
  141. for (auto i : Indices_) {
  142. Preparer_.InputColumnRenaming(i, renaming);
  143. }
  144. return *this;
  145. }
  146. TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::ColumnFilter(const TVector<TString>& columns)
  147. {
  148. for (auto i : Indices_) {
  149. Preparer_.InputColumnFilter(i, columns);
  150. }
  151. return *this;
  152. }
  153. TJobOperationPreparer& TJobOperationPreparer::TInputGroup::EndInputGroup()
  154. {
  155. return Preparer_;
  156. }
  157. TJobOperationPreparer::TOutputGroup::TOutputGroup(TJobOperationPreparer& preparer, TVector<int> indices)
  158. : Preparer_(preparer)
  159. , Indices_(std::move(indices))
  160. { }
  161. TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::Schema(const TTableSchema &schema)
  162. {
  163. for (auto i : Indices_) {
  164. Preparer_.OutputSchema(i, schema);
  165. }
  166. return *this;
  167. }
  168. TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::NoSchema()
  169. {
  170. for (auto i : Indices_) {
  171. Preparer_.NoOutputSchema(i);
  172. }
  173. return *this;
  174. }
  175. TJobOperationPreparer& TJobOperationPreparer::TOutputGroup::EndOutputGroup()
  176. {
  177. return Preparer_;
  178. }
  179. ////////////////////////////////////////////////////////////////////////////////
  180. TJobOperationPreparer::TJobOperationPreparer(const IOperationPreparationContext& context)
  181. : Context_(context)
  182. , OutputSchemas_(context.GetOutputCount())
  183. , InputColumnRenamings_(context.GetInputCount())
  184. , InputColumnFilters_(context.GetInputCount())
  185. , InputTableDescriptions_(context.GetInputCount())
  186. , OutputTableDescriptions_(context.GetOutputCount())
  187. { }
  188. TJobOperationPreparer::TInputGroup TJobOperationPreparer::BeginInputGroup(int begin, int end)
  189. {
  190. Y_ENSURE_EX(begin <= end, TApiUsageError()
  191. << "BeginInputGroup(): begin must not exceed end, got " << begin << ", " << end);
  192. TVector<int> indices;
  193. for (int i = begin; i < end; ++i) {
  194. ValidateInputTableIndex(i, TStringBuf("BeginInputGroup()"));
  195. indices.push_back(i);
  196. }
  197. return TInputGroup(*this, std::move(indices));
  198. }
  199. TJobOperationPreparer::TOutputGroup TJobOperationPreparer::BeginOutputGroup(int begin, int end)
  200. {
  201. Y_ENSURE_EX(begin <= end, TApiUsageError()
  202. << "BeginOutputGroup(): begin must not exceed end, got " << begin << ", " << end);
  203. TVector<int> indices;
  204. for (int i = begin; i < end; ++i) {
  205. ValidateOutputTableIndex(i, TStringBuf("BeginOutputGroup()"));
  206. indices.push_back(i);
  207. }
  208. return TOutputGroup(*this, std::move(indices));
  209. }
  210. TJobOperationPreparer& TJobOperationPreparer::NodeOutput(int tableIndex)
  211. {
  212. ValidateMissingOutputDescription(tableIndex);
  213. OutputTableDescriptions_[tableIndex] = StructuredTableDescription<TNode>();
  214. return *this;
  215. }
  216. TJobOperationPreparer& TJobOperationPreparer::OutputSchema(int tableIndex, TTableSchema schema)
  217. {
  218. ValidateMissingOutputSchema(tableIndex);
  219. OutputSchemas_[tableIndex] = std::move(schema);
  220. return *this;
  221. }
  222. TJobOperationPreparer& TJobOperationPreparer::NoOutputSchema(int tableIndex)
  223. {
  224. ValidateMissingOutputSchema(tableIndex);
  225. OutputSchemas_[tableIndex] = EmptyNonstrictSchema();
  226. return *this;
  227. }
  228. TJobOperationPreparer& TJobOperationPreparer::InputColumnRenaming(
  229. int tableIndex,
  230. const THashMap<TString,TString>& renaming)
  231. {
  232. ValidateInputTableIndex(tableIndex, TStringBuf("InputColumnRenaming()"));
  233. InputColumnRenamings_[tableIndex] = renaming;
  234. return *this;
  235. }
  236. TJobOperationPreparer& TJobOperationPreparer::InputColumnFilter(int tableIndex, const TVector<TString>& columns)
  237. {
  238. ValidateInputTableIndex(tableIndex, TStringBuf("InputColumnFilter()"));
  239. InputColumnFilters_[tableIndex] = columns;
  240. return *this;
  241. }
  242. TJobOperationPreparer& TJobOperationPreparer::FormatHints(TUserJobFormatHints newFormatHints)
  243. {
  244. FormatHints_ = newFormatHints;
  245. return *this;
  246. }
  247. void TJobOperationPreparer::Finish()
  248. {
  249. FinallyValidate();
  250. }
  251. TVector<TTableSchema> TJobOperationPreparer::GetOutputSchemas()
  252. {
  253. TVector<TTableSchema> result;
  254. result.reserve(OutputSchemas_.size());
  255. for (auto& schema : OutputSchemas_) {
  256. Y_ABORT_UNLESS(schema.Defined());
  257. result.push_back(std::move(*schema));
  258. schema.Clear();
  259. }
  260. return result;
  261. }
  262. void TJobOperationPreparer::FinallyValidate() const
  263. {
  264. TVector<int> illegallyMissingSchemaIndices;
  265. for (int i = 0; i < static_cast<int>(OutputSchemas_.size()); ++i) {
  266. if (!OutputSchemas_[i]) {
  267. illegallyMissingSchemaIndices.push_back(i);
  268. }
  269. }
  270. if (illegallyMissingSchemaIndices.empty()) {
  271. return;
  272. }
  273. TApiUsageError error;
  274. error << "Output table schemas are missing: ";
  275. for (auto i : illegallyMissingSchemaIndices) {
  276. error << "no. " << i << " (" << Context_.GetOutputPath(i).GetOrElse("<unknown path>") << "); ";
  277. }
  278. ythrow std::move(error);
  279. }
  280. ////////////////////////////////////////////////////////////////////////////////
  281. void TJobOperationPreparer::ValidateInputTableIndex(int tableIndex, TStringBuf message) const
  282. {
  283. Y_ENSURE_EX(
  284. 0 <= tableIndex && tableIndex < static_cast<int>(Context_.GetInputCount()),
  285. TApiUsageError() <<
  286. message << ": input table index " << tableIndex << " us out of range [0;" <<
  287. OutputSchemas_.size() << ")");
  288. }
  289. void TJobOperationPreparer::ValidateOutputTableIndex(int tableIndex, TStringBuf message) const
  290. {
  291. Y_ENSURE_EX(
  292. 0 <= tableIndex && tableIndex < static_cast<int>(Context_.GetOutputCount()),
  293. TApiUsageError() <<
  294. message << ": output table index " << tableIndex << " us out of range [0;" <<
  295. OutputSchemas_.size() << ")");
  296. }
  297. void TJobOperationPreparer::ValidateMissingOutputSchema(int tableIndex) const
  298. {
  299. ValidateOutputTableIndex(tableIndex, "ValidateMissingOutputSchema()");
  300. Y_ENSURE_EX(!OutputSchemas_[tableIndex],
  301. TApiUsageError() <<
  302. "Output table schema no. " << tableIndex << " " <<
  303. "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " <<
  304. "is already set");
  305. }
  306. void TJobOperationPreparer::ValidateMissingInputDescription(int tableIndex) const
  307. {
  308. ValidateInputTableIndex(tableIndex, "ValidateMissingInputDescription()");
  309. Y_ENSURE_EX(!InputTableDescriptions_[tableIndex],
  310. TApiUsageError() <<
  311. "Description for input no. " << tableIndex << " " <<
  312. "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " <<
  313. "is already set");
  314. }
  315. void TJobOperationPreparer::ValidateMissingOutputDescription(int tableIndex) const
  316. {
  317. ValidateOutputTableIndex(tableIndex, "ValidateMissingOutputDescription()");
  318. Y_ENSURE_EX(!OutputTableDescriptions_[tableIndex],
  319. TApiUsageError() <<
  320. "Description for output no. " << tableIndex << " " <<
  321. "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " <<
  322. "is already set");
  323. }
  324. TTableSchema TJobOperationPreparer::EmptyNonstrictSchema() {
  325. return TTableSchema().Strict(false);
  326. }
  327. ////////////////////////////////////////////////////////////////////////////////
  328. const TVector<THashMap<TString, TString>>& TJobOperationPreparer::GetInputColumnRenamings() const
  329. {
  330. return InputColumnRenamings_;
  331. }
  332. const TVector<TMaybe<TVector<TString>>>& TJobOperationPreparer::GetInputColumnFilters() const
  333. {
  334. return InputColumnFilters_;
  335. }
  336. const TVector<TMaybe<TTableStructure>>& TJobOperationPreparer::GetInputDescriptions() const
  337. {
  338. return InputTableDescriptions_;
  339. }
  340. const TVector<TMaybe<TTableStructure>>& TJobOperationPreparer::GetOutputDescriptions() const
  341. {
  342. return OutputTableDescriptions_;
  343. }
  344. const TUserJobFormatHints& TJobOperationPreparer::GetFormatHints() const
  345. {
  346. return FormatHints_;
  347. }
  348. TJobOperationPreparer& TJobOperationPreparer::InputFormatHints(TFormatHints hints)
  349. {
  350. FormatHints_.InputFormatHints(hints);
  351. return *this;
  352. }
  353. TJobOperationPreparer& TJobOperationPreparer::OutputFormatHints(TFormatHints hints)
  354. {
  355. FormatHints_.OutputFormatHints(hints);
  356. return *this;
  357. }
  358. ////////////////////////////////////////////////////////////////////////////////
  359. void IJob::PrepareOperation(const IOperationPreparationContext& context, TJobOperationPreparer& resultBuilder) const
  360. {
  361. for (int i = 0; i < context.GetOutputCount(); ++i) {
  362. resultBuilder.NoOutputSchema(i);
  363. }
  364. }
  365. ////////////////////////////////////////////////////////////////////////////////
  366. IOperationPtr IOperationClient::Map(
  367. const TMapOperationSpec& spec,
  368. ::TIntrusivePtr<IMapperBase> mapper,
  369. const TOperationOptions& options)
  370. {
  371. Y_ABORT_UNLESS(mapper.Get());
  372. return DoMap(
  373. spec,
  374. std::move(mapper),
  375. options);
  376. }
  377. IOperationPtr IOperationClient::Map(
  378. ::TIntrusivePtr<IMapperBase> mapper,
  379. const TOneOrMany<TStructuredTablePath>& input,
  380. const TOneOrMany<TStructuredTablePath>& output,
  381. const TMapOperationSpec& spec,
  382. const TOperationOptions& options)
  383. {
  384. Y_ENSURE_EX(spec.Inputs_.empty(),
  385. TApiUsageError() << "TMapOperationSpec::Inputs MUST be empty");
  386. Y_ENSURE_EX(spec.Outputs_.empty(),
  387. TApiUsageError() << "TMapOperationSpec::Outputs MUST be empty");
  388. auto mapSpec = spec;
  389. for (const auto& inputPath : input.Parts_) {
  390. mapSpec.AddStructuredInput(inputPath);
  391. }
  392. for (const auto& outputPath : output.Parts_) {
  393. mapSpec.AddStructuredOutput(outputPath);
  394. }
  395. return Map(mapSpec, std::move(mapper), options);
  396. }
  397. IOperationPtr IOperationClient::Reduce(
  398. const TReduceOperationSpec& spec,
  399. ::TIntrusivePtr<IReducerBase> reducer,
  400. const TOperationOptions& options)
  401. {
  402. Y_ABORT_UNLESS(reducer.Get());
  403. return DoReduce(
  404. spec,
  405. std::move(reducer),
  406. options);
  407. }
  408. IOperationPtr IOperationClient::Reduce(
  409. ::TIntrusivePtr<IReducerBase> reducer,
  410. const TOneOrMany<TStructuredTablePath>& input,
  411. const TOneOrMany<TStructuredTablePath>& output,
  412. const TSortColumns& reduceBy,
  413. const TReduceOperationSpec& spec,
  414. const TOperationOptions& options)
  415. {
  416. Y_ENSURE_EX(spec.Inputs_.empty(),
  417. TApiUsageError() << "TReduceOperationSpec::Inputs MUST be empty");
  418. Y_ENSURE_EX(spec.Outputs_.empty(),
  419. TApiUsageError() << "TReduceOperationSpec::Outputs MUST be empty");
  420. Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(),
  421. TApiUsageError() << "TReduceOperationSpec::ReduceBy MUST be empty");
  422. auto reduceSpec = spec;
  423. for (const auto& inputPath : input.Parts_) {
  424. reduceSpec.AddStructuredInput(inputPath);
  425. }
  426. for (const auto& outputPath : output.Parts_) {
  427. reduceSpec.AddStructuredOutput(outputPath);
  428. }
  429. reduceSpec.ReduceBy(reduceBy);
  430. return Reduce(reduceSpec, std::move(reducer), options);
  431. }
  432. IOperationPtr IOperationClient::JoinReduce(
  433. const TJoinReduceOperationSpec& spec,
  434. ::TIntrusivePtr<IReducerBase> reducer,
  435. const TOperationOptions& options)
  436. {
  437. Y_ABORT_UNLESS(reducer.Get());
  438. return DoJoinReduce(
  439. spec,
  440. std::move(reducer),
  441. options);
  442. }
  443. IOperationPtr IOperationClient::MapReduce(
  444. const TMapReduceOperationSpec& spec,
  445. ::TIntrusivePtr<IMapperBase> mapper,
  446. ::TIntrusivePtr<IReducerBase> reducer,
  447. const TOperationOptions& options)
  448. {
  449. Y_ABORT_UNLESS(reducer.Get());
  450. return DoMapReduce(
  451. spec,
  452. std::move(mapper),
  453. nullptr,
  454. std::move(reducer),
  455. options);
  456. }
  457. IOperationPtr IOperationClient::MapReduce(
  458. const TMapReduceOperationSpec& spec,
  459. ::TIntrusivePtr<IMapperBase> mapper,
  460. ::TIntrusivePtr<IReducerBase> reduceCombiner,
  461. ::TIntrusivePtr<IReducerBase> reducer,
  462. const TOperationOptions& options)
  463. {
  464. Y_ABORT_UNLESS(reducer.Get());
  465. return DoMapReduce(
  466. spec,
  467. std::move(mapper),
  468. std::move(reduceCombiner),
  469. std::move(reducer),
  470. options);
  471. }
  472. IOperationPtr IOperationClient::MapReduce(
  473. ::TIntrusivePtr<IMapperBase> mapper,
  474. ::TIntrusivePtr<IReducerBase> reducer,
  475. const TOneOrMany<TStructuredTablePath>& input,
  476. const TOneOrMany<TStructuredTablePath>& output,
  477. const TSortColumns& reduceBy,
  478. TMapReduceOperationSpec spec,
  479. const TOperationOptions& options)
  480. {
  481. Y_ENSURE_EX(spec.Inputs_.empty(),
  482. TApiUsageError() << "TMapReduceOperationSpec::Inputs MUST be empty");
  483. Y_ENSURE_EX(spec.Outputs_.empty(),
  484. TApiUsageError() << "TMapReduceOperationSpec::Outputs MUST be empty");
  485. Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(),
  486. TApiUsageError() << "TMapReduceOperationSpec::ReduceBy MUST be empty");
  487. for (const auto& inputPath : input.Parts_) {
  488. spec.AddStructuredInput(inputPath);
  489. }
  490. for (const auto& outputPath : output.Parts_) {
  491. spec.AddStructuredOutput(outputPath);
  492. }
  493. spec.ReduceBy(reduceBy);
  494. return MapReduce(spec, std::move(mapper), std::move(reducer), options);
  495. }
  496. IOperationPtr IOperationClient::MapReduce(
  497. ::TIntrusivePtr<IMapperBase> mapper,
  498. ::TIntrusivePtr<IReducerBase> reduceCombiner,
  499. ::TIntrusivePtr<IReducerBase> reducer,
  500. const TOneOrMany<TStructuredTablePath>& input,
  501. const TOneOrMany<TStructuredTablePath>& output,
  502. const TSortColumns& reduceBy,
  503. TMapReduceOperationSpec spec,
  504. const TOperationOptions& options)
  505. {
  506. Y_ENSURE_EX(spec.Inputs_.empty(),
  507. TApiUsageError() << "TMapReduceOperationSpec::Inputs MUST be empty");
  508. Y_ENSURE_EX(spec.Outputs_.empty(),
  509. TApiUsageError() << "TMapReduceOperationSpec::Outputs MUST be empty");
  510. Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(),
  511. TApiUsageError() << "TMapReduceOperationSpec::ReduceBy MUST be empty");
  512. for (const auto& inputPath : input.Parts_) {
  513. spec.AddStructuredInput(inputPath);
  514. }
  515. for (const auto& outputPath : output.Parts_) {
  516. spec.AddStructuredOutput(outputPath);
  517. }
  518. spec.ReduceBy(reduceBy);
  519. return MapReduce(spec, std::move(mapper), std::move(reduceCombiner), std::move(reducer), options);
  520. }
  521. IOperationPtr IOperationClient::Sort(
  522. const TOneOrMany<TRichYPath>& input,
  523. const TRichYPath& output,
  524. const TSortColumns& sortBy,
  525. const TSortOperationSpec& spec,
  526. const TOperationOptions& options)
  527. {
  528. Y_ENSURE_EX(spec.Inputs_.empty(),
  529. TApiUsageError() << "TSortOperationSpec::Inputs MUST be empty");
  530. Y_ENSURE_EX(spec.Output_.Path_.empty(),
  531. TApiUsageError() << "TSortOperationSpec::Output MUST be empty");
  532. Y_ENSURE_EX(spec.SortBy_.Parts_.empty(),
  533. TApiUsageError() << "TSortOperationSpec::SortBy MUST be empty");
  534. auto sortSpec = spec;
  535. for (const auto& inputPath : input.Parts_) {
  536. sortSpec.AddInput(inputPath);
  537. }
  538. sortSpec.Output(output);
  539. sortSpec.SortBy(sortBy);
  540. return Sort(sortSpec, options);
  541. }
  542. ////////////////////////////////////////////////////////////////////////////////
  543. TRawTableReaderPtr IStructuredJob::CreateCustomRawJobReader(int) const
  544. {
  545. return nullptr;
  546. }
  547. THolder<IProxyOutput> IStructuredJob::CreateCustomRawJobWriter(size_t) const
  548. {
  549. return nullptr;
  550. }
  551. ////////////////////////////////////////////////////////////////////////////////
  552. } // namespace NYT