operation.cpp 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. #include "operation.h"
  2. #include <util/generic/iterator_range.h>
  3. namespace NYT {
  4. ////////////////////////////////////////////////////////////////////////////////
  5. namespace NDetail {
  6. i64 OutputTableCount = -1;
  7. } // namespace NDetail
  8. ////////////////////////////////////////////////////////////////////////////////
  9. TTaskName::TTaskName(TString taskName)
  10. : TaskName_(std::move(taskName))
  11. { }
  12. TTaskName::TTaskName(const char* taskName)
  13. : TaskName_(taskName)
  14. { }
  15. TTaskName::TTaskName(ETaskName taskName)
  16. : TaskName_(ToString(taskName))
  17. { }
  18. const TString& TTaskName::Get() const
  19. {
  20. return TaskName_;
  21. }
  22. ////////////////////////////////////////////////////////////////////////////////
  23. TCommandRawJob::TCommandRawJob(TStringBuf command)
  24. : Command_(command)
  25. { }
  26. const TString& TCommandRawJob::GetCommand() const
  27. {
  28. return Command_;
  29. }
  30. void TCommandRawJob::Do(const TRawJobContext& /* jobContext */)
  31. {
  32. Y_ABORT("TCommandRawJob::Do must not be called");
  33. }
  34. REGISTER_NAMED_RAW_JOB("NYT::TCommandRawJob", TCommandRawJob)
  35. ////////////////////////////////////////////////////////////////////////////////
  36. TCommandVanillaJob::TCommandVanillaJob(TStringBuf command)
  37. : Command_(command)
  38. { }
  39. const TString& TCommandVanillaJob::GetCommand() const
  40. {
  41. return Command_;
  42. }
  43. void TCommandVanillaJob::Do()
  44. {
  45. Y_ABORT("TCommandVanillaJob::Do must not be called");
  46. }
  47. REGISTER_NAMED_VANILLA_JOB("NYT::TCommandVanillaJob", TCommandVanillaJob);
  48. ////////////////////////////////////////////////////////////////////////////////
  49. bool operator==(const TUnspecifiedTableStructure&, const TUnspecifiedTableStructure&)
  50. {
  51. return true;
  52. }
  53. bool operator==(const TProtobufTableStructure& lhs, const TProtobufTableStructure& rhs)
  54. {
  55. return lhs.Descriptor == rhs.Descriptor;
  56. }
  57. ////////////////////////////////////////////////////////////////////////////////
  58. const TVector<TStructuredTablePath>& TOperationInputSpecBase::GetStructuredInputs() const
  59. {
  60. return StructuredInputs_;
  61. }
  62. const TVector<TStructuredTablePath>& TOperationOutputSpecBase::GetStructuredOutputs() const
  63. {
  64. return StructuredOutputs_;
  65. }
  66. void TOperationInputSpecBase::AddStructuredInput(TStructuredTablePath path)
  67. {
  68. Inputs_.push_back(path.RichYPath);
  69. StructuredInputs_.push_back(std::move(path));
  70. }
  71. void TOperationOutputSpecBase::AddStructuredOutput(TStructuredTablePath path)
  72. {
  73. Outputs_.push_back(path.RichYPath);
  74. StructuredOutputs_.push_back(std::move(path));
  75. }
  76. ////////////////////////////////////////////////////////////////////////////////
  77. TVanillaTask& TVanillaTask::AddStructuredOutput(TStructuredTablePath path)
  78. {
  79. TOperationOutputSpecBase::AddStructuredOutput(std::move(path));
  80. return *this;
  81. }
  82. ////////////////////////////////////////////////////////////////////////////////
  83. TStructuredRowStreamDescription IVanillaJob<void>::GetInputRowStreamDescription() const
  84. {
  85. return TVoidStructuredRowStream();
  86. }
  87. TStructuredRowStreamDescription IVanillaJob<void>::GetOutputRowStreamDescription() const
  88. {
  89. return TVoidStructuredRowStream();
  90. }
  91. ////////////////////////////////////////////////////////////////////////////////
  92. TRawJobContext::TRawJobContext(size_t outputTableCount)
  93. : InputFile_(Duplicate(0))
  94. {
  95. for (size_t i = 0; i != outputTableCount; ++i) {
  96. OutputFileList_.emplace_back(Duplicate(3 * i + 1));
  97. }
  98. }
  99. const TFile& TRawJobContext::GetInputFile() const
  100. {
  101. return InputFile_;
  102. }
  103. const TVector<TFile>& TRawJobContext::GetOutputFileList() const
  104. {
  105. return OutputFileList_;
  106. }
  107. ////////////////////////////////////////////////////////////////////////////////
  108. TUserJobSpec& TUserJobSpec::AddLocalFile(
  109. const TLocalFilePath& path,
  110. const TAddLocalFileOptions& options)
  111. {
  112. LocalFiles_.emplace_back(path, options);
  113. return *this;
  114. }
  115. TUserJobSpec& TUserJobSpec::JobBinaryLocalPath(TString path, TMaybe<TString> md5)
  116. {
  117. JobBinary_ = TJobBinaryLocalPath{path, md5};
  118. return *this;
  119. }
  120. TUserJobSpec& TUserJobSpec::JobBinaryCypressPath(TString path, TMaybe<TTransactionId> transactionId)
  121. {
  122. JobBinary_ = TJobBinaryCypressPath{path, transactionId};
  123. return *this;
  124. }
  125. const TJobBinaryConfig& TUserJobSpec::GetJobBinary() const
  126. {
  127. return JobBinary_;
  128. }
  129. TVector<std::tuple<TLocalFilePath, TAddLocalFileOptions>> TUserJobSpec::GetLocalFiles() const
  130. {
  131. return LocalFiles_;
  132. }
  133. ////////////////////////////////////////////////////////////////////////////////
  134. TJobOperationPreparer::TInputGroup::TInputGroup(TJobOperationPreparer& preparer, TVector<int> indices)
  135. : Preparer_(preparer)
  136. , Indices_(std::move(indices))
  137. { }
  138. TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::ColumnRenaming(const THashMap<TString, TString>& renaming)
  139. {
  140. for (auto i : Indices_) {
  141. Preparer_.InputColumnRenaming(i, renaming);
  142. }
  143. return *this;
  144. }
  145. TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::ColumnFilter(const TVector<TString>& columns)
  146. {
  147. for (auto i : Indices_) {
  148. Preparer_.InputColumnFilter(i, columns);
  149. }
  150. return *this;
  151. }
  152. TJobOperationPreparer& TJobOperationPreparer::TInputGroup::EndInputGroup()
  153. {
  154. return Preparer_;
  155. }
  156. TJobOperationPreparer::TOutputGroup::TOutputGroup(TJobOperationPreparer& preparer, TVector<int> indices)
  157. : Preparer_(preparer)
  158. , Indices_(std::move(indices))
  159. { }
  160. TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::Schema(const TTableSchema &schema)
  161. {
  162. for (auto i : Indices_) {
  163. Preparer_.OutputSchema(i, schema);
  164. }
  165. return *this;
  166. }
  167. TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::NoSchema()
  168. {
  169. for (auto i : Indices_) {
  170. Preparer_.NoOutputSchema(i);
  171. }
  172. return *this;
  173. }
  174. TJobOperationPreparer& TJobOperationPreparer::TOutputGroup::EndOutputGroup()
  175. {
  176. return Preparer_;
  177. }
  178. ////////////////////////////////////////////////////////////////////////////////
  179. TJobOperationPreparer::TJobOperationPreparer(const IOperationPreparationContext& context)
  180. : Context_(context)
  181. , OutputSchemas_(context.GetOutputCount())
  182. , InputColumnRenamings_(context.GetInputCount())
  183. , InputColumnFilters_(context.GetInputCount())
  184. , InputTableDescriptions_(context.GetInputCount())
  185. , OutputTableDescriptions_(context.GetOutputCount())
  186. { }
  187. TJobOperationPreparer::TInputGroup TJobOperationPreparer::BeginInputGroup(int begin, int end)
  188. {
  189. Y_ENSURE_EX(begin <= end, TApiUsageError()
  190. << "BeginInputGroup(): begin must not exceed end, got " << begin << ", " << end);
  191. TVector<int> indices;
  192. for (int i = begin; i < end; ++i) {
  193. ValidateInputTableIndex(i, TStringBuf("BeginInputGroup()"));
  194. indices.push_back(i);
  195. }
  196. return TInputGroup(*this, std::move(indices));
  197. }
  198. TJobOperationPreparer::TOutputGroup TJobOperationPreparer::BeginOutputGroup(int begin, int end)
  199. {
  200. Y_ENSURE_EX(begin <= end, TApiUsageError()
  201. << "BeginOutputGroup(): begin must not exceed end, got " << begin << ", " << end);
  202. TVector<int> indices;
  203. for (int i = begin; i < end; ++i) {
  204. ValidateOutputTableIndex(i, TStringBuf("BeginOutputGroup()"));
  205. indices.push_back(i);
  206. }
  207. return TOutputGroup(*this, std::move(indices));
  208. }
  209. TJobOperationPreparer& TJobOperationPreparer::NodeOutput(int tableIndex)
  210. {
  211. ValidateMissingOutputDescription(tableIndex);
  212. OutputTableDescriptions_[tableIndex] = StructuredTableDescription<TNode>();
  213. return *this;
  214. }
  215. TJobOperationPreparer& TJobOperationPreparer::OutputSchema(int tableIndex, TTableSchema schema)
  216. {
  217. ValidateMissingOutputSchema(tableIndex);
  218. OutputSchemas_[tableIndex] = std::move(schema);
  219. return *this;
  220. }
  221. TJobOperationPreparer& TJobOperationPreparer::NoOutputSchema(int tableIndex)
  222. {
  223. ValidateMissingOutputSchema(tableIndex);
  224. OutputSchemas_[tableIndex] = EmptyNonstrictSchema();
  225. return *this;
  226. }
  227. TJobOperationPreparer& TJobOperationPreparer::InputColumnRenaming(
  228. int tableIndex,
  229. const THashMap<TString,TString>& renaming)
  230. {
  231. ValidateInputTableIndex(tableIndex, TStringBuf("InputColumnRenaming()"));
  232. InputColumnRenamings_[tableIndex] = renaming;
  233. return *this;
  234. }
  235. TJobOperationPreparer& TJobOperationPreparer::InputColumnFilter(int tableIndex, const TVector<TString>& columns)
  236. {
  237. ValidateInputTableIndex(tableIndex, TStringBuf("InputColumnFilter()"));
  238. InputColumnFilters_[tableIndex] = columns;
  239. return *this;
  240. }
  241. TJobOperationPreparer& TJobOperationPreparer::FormatHints(TUserJobFormatHints newFormatHints)
  242. {
  243. FormatHints_ = newFormatHints;
  244. return *this;
  245. }
  246. void TJobOperationPreparer::Finish()
  247. {
  248. FinallyValidate();
  249. }
  250. TVector<TTableSchema> TJobOperationPreparer::GetOutputSchemas()
  251. {
  252. TVector<TTableSchema> result;
  253. result.reserve(OutputSchemas_.size());
  254. for (auto& schema : OutputSchemas_) {
  255. Y_ABORT_UNLESS(schema.Defined());
  256. result.push_back(std::move(*schema));
  257. schema.Clear();
  258. }
  259. return result;
  260. }
  261. void TJobOperationPreparer::FinallyValidate() const
  262. {
  263. TVector<int> illegallyMissingSchemaIndices;
  264. for (int i = 0; i < static_cast<int>(OutputSchemas_.size()); ++i) {
  265. if (!OutputSchemas_[i]) {
  266. illegallyMissingSchemaIndices.push_back(i);
  267. }
  268. }
  269. if (illegallyMissingSchemaIndices.empty()) {
  270. return;
  271. }
  272. TApiUsageError error;
  273. error << "Output table schemas are missing: ";
  274. for (auto i : illegallyMissingSchemaIndices) {
  275. error << "no. " << i;
  276. if (auto path = Context_.GetInputPath(i)) {
  277. error << "(" << *path << ")";
  278. }
  279. error << "; ";
  280. }
  281. ythrow std::move(error);
  282. }
  283. ////////////////////////////////////////////////////////////////////////////////
  284. void TJobOperationPreparer::ValidateInputTableIndex(int tableIndex, TStringBuf message) const
  285. {
  286. Y_ENSURE_EX(
  287. 0 <= tableIndex && tableIndex < static_cast<int>(Context_.GetInputCount()),
  288. TApiUsageError() <<
  289. message << ": input table index " << tableIndex << " us out of range [0;" <<
  290. OutputSchemas_.size() << ")");
  291. }
  292. void TJobOperationPreparer::ValidateOutputTableIndex(int tableIndex, TStringBuf message) const
  293. {
  294. Y_ENSURE_EX(
  295. 0 <= tableIndex && tableIndex < static_cast<int>(Context_.GetOutputCount()),
  296. TApiUsageError() <<
  297. message << ": output table index " << tableIndex << " us out of range [0;" <<
  298. OutputSchemas_.size() << ")");
  299. }
  300. void TJobOperationPreparer::ValidateMissingOutputSchema(int tableIndex) const
  301. {
  302. ValidateOutputTableIndex(tableIndex, "ValidateMissingOutputSchema()");
  303. Y_ENSURE_EX(!OutputSchemas_[tableIndex],
  304. TApiUsageError() <<
  305. "Output table schema no. " << tableIndex << " " <<
  306. "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " <<
  307. "is already set");
  308. }
  309. void TJobOperationPreparer::ValidateMissingInputDescription(int tableIndex) const
  310. {
  311. ValidateInputTableIndex(tableIndex, "ValidateMissingInputDescription()");
  312. Y_ENSURE_EX(!InputTableDescriptions_[tableIndex],
  313. TApiUsageError() <<
  314. "Description for input no. " << tableIndex << " " <<
  315. "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " <<
  316. "is already set");
  317. }
  318. void TJobOperationPreparer::ValidateMissingOutputDescription(int tableIndex) const
  319. {
  320. ValidateOutputTableIndex(tableIndex, "ValidateMissingOutputDescription()");
  321. Y_ENSURE_EX(!OutputTableDescriptions_[tableIndex],
  322. TApiUsageError() <<
  323. "Description for output no. " << tableIndex << " " <<
  324. "(" << Context_.GetOutputPath(tableIndex).GetOrElse("<unknown path>") << ") " <<
  325. "is already set");
  326. }
  327. TTableSchema TJobOperationPreparer::EmptyNonstrictSchema() {
  328. return TTableSchema().Strict(false);
  329. }
  330. ////////////////////////////////////////////////////////////////////////////////
  331. const TVector<THashMap<TString, TString>>& TJobOperationPreparer::GetInputColumnRenamings() const
  332. {
  333. return InputColumnRenamings_;
  334. }
  335. const TVector<TMaybe<TVector<TString>>>& TJobOperationPreparer::GetInputColumnFilters() const
  336. {
  337. return InputColumnFilters_;
  338. }
  339. const TVector<TMaybe<TTableStructure>>& TJobOperationPreparer::GetInputDescriptions() const
  340. {
  341. return InputTableDescriptions_;
  342. }
  343. const TVector<TMaybe<TTableStructure>>& TJobOperationPreparer::GetOutputDescriptions() const
  344. {
  345. return OutputTableDescriptions_;
  346. }
  347. const TUserJobFormatHints& TJobOperationPreparer::GetFormatHints() const
  348. {
  349. return FormatHints_;
  350. }
  351. TJobOperationPreparer& TJobOperationPreparer::InputFormatHints(TFormatHints hints)
  352. {
  353. FormatHints_.InputFormatHints(hints);
  354. return *this;
  355. }
  356. TJobOperationPreparer& TJobOperationPreparer::OutputFormatHints(TFormatHints hints)
  357. {
  358. FormatHints_.OutputFormatHints(hints);
  359. return *this;
  360. }
  361. ////////////////////////////////////////////////////////////////////////////////
  362. void IJob::PrepareOperation(const IOperationPreparationContext& context, TJobOperationPreparer& resultBuilder) const
  363. {
  364. for (int i = 0; i < context.GetOutputCount(); ++i) {
  365. resultBuilder.NoOutputSchema(i);
  366. }
  367. }
  368. ////////////////////////////////////////////////////////////////////////////////
  369. IOperationPtr IOperationClient::Map(
  370. const TMapOperationSpec& spec,
  371. ::TIntrusivePtr<IMapperBase> mapper,
  372. const TOperationOptions& options)
  373. {
  374. Y_ABORT_UNLESS(mapper.Get());
  375. return DoMap(
  376. spec,
  377. std::move(mapper),
  378. options);
  379. }
  380. IOperationPtr IOperationClient::Map(
  381. ::TIntrusivePtr<IMapperBase> mapper,
  382. const TOneOrMany<TStructuredTablePath>& input,
  383. const TOneOrMany<TStructuredTablePath>& output,
  384. const TMapOperationSpec& spec,
  385. const TOperationOptions& options)
  386. {
  387. Y_ENSURE_EX(spec.Inputs_.empty(),
  388. TApiUsageError() << "TMapOperationSpec::Inputs MUST be empty");
  389. Y_ENSURE_EX(spec.Outputs_.empty(),
  390. TApiUsageError() << "TMapOperationSpec::Outputs MUST be empty");
  391. auto mapSpec = spec;
  392. for (const auto& inputPath : input.Parts_) {
  393. mapSpec.AddStructuredInput(inputPath);
  394. }
  395. for (const auto& outputPath : output.Parts_) {
  396. mapSpec.AddStructuredOutput(outputPath);
  397. }
  398. return Map(mapSpec, std::move(mapper), options);
  399. }
  400. IOperationPtr IOperationClient::Reduce(
  401. const TReduceOperationSpec& spec,
  402. ::TIntrusivePtr<IReducerBase> reducer,
  403. const TOperationOptions& options)
  404. {
  405. Y_ABORT_UNLESS(reducer.Get());
  406. return DoReduce(
  407. spec,
  408. std::move(reducer),
  409. options);
  410. }
  411. IOperationPtr IOperationClient::Reduce(
  412. ::TIntrusivePtr<IReducerBase> reducer,
  413. const TOneOrMany<TStructuredTablePath>& input,
  414. const TOneOrMany<TStructuredTablePath>& output,
  415. const TSortColumns& reduceBy,
  416. const TReduceOperationSpec& spec,
  417. const TOperationOptions& options)
  418. {
  419. Y_ENSURE_EX(spec.Inputs_.empty(),
  420. TApiUsageError() << "TReduceOperationSpec::Inputs MUST be empty");
  421. Y_ENSURE_EX(spec.Outputs_.empty(),
  422. TApiUsageError() << "TReduceOperationSpec::Outputs MUST be empty");
  423. Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(),
  424. TApiUsageError() << "TReduceOperationSpec::ReduceBy MUST be empty");
  425. auto reduceSpec = spec;
  426. for (const auto& inputPath : input.Parts_) {
  427. reduceSpec.AddStructuredInput(inputPath);
  428. }
  429. for (const auto& outputPath : output.Parts_) {
  430. reduceSpec.AddStructuredOutput(outputPath);
  431. }
  432. reduceSpec.ReduceBy(reduceBy);
  433. return Reduce(reduceSpec, std::move(reducer), options);
  434. }
  435. IOperationPtr IOperationClient::JoinReduce(
  436. const TJoinReduceOperationSpec& spec,
  437. ::TIntrusivePtr<IReducerBase> reducer,
  438. const TOperationOptions& options)
  439. {
  440. Y_ABORT_UNLESS(reducer.Get());
  441. return DoJoinReduce(
  442. spec,
  443. std::move(reducer),
  444. options);
  445. }
  446. IOperationPtr IOperationClient::MapReduce(
  447. const TMapReduceOperationSpec& spec,
  448. ::TIntrusivePtr<IMapperBase> mapper,
  449. ::TIntrusivePtr<IReducerBase> reducer,
  450. const TOperationOptions& options)
  451. {
  452. Y_ABORT_UNLESS(reducer.Get());
  453. return DoMapReduce(
  454. spec,
  455. std::move(mapper),
  456. nullptr,
  457. std::move(reducer),
  458. options);
  459. }
  460. IOperationPtr IOperationClient::MapReduce(
  461. const TMapReduceOperationSpec& spec,
  462. ::TIntrusivePtr<IMapperBase> mapper,
  463. ::TIntrusivePtr<IReducerBase> reduceCombiner,
  464. ::TIntrusivePtr<IReducerBase> reducer,
  465. const TOperationOptions& options)
  466. {
  467. Y_ABORT_UNLESS(reducer.Get());
  468. return DoMapReduce(
  469. spec,
  470. std::move(mapper),
  471. std::move(reduceCombiner),
  472. std::move(reducer),
  473. options);
  474. }
  475. IOperationPtr IOperationClient::MapReduce(
  476. ::TIntrusivePtr<IMapperBase> mapper,
  477. ::TIntrusivePtr<IReducerBase> reducer,
  478. const TOneOrMany<TStructuredTablePath>& input,
  479. const TOneOrMany<TStructuredTablePath>& output,
  480. const TSortColumns& reduceBy,
  481. TMapReduceOperationSpec spec,
  482. const TOperationOptions& options)
  483. {
  484. Y_ENSURE_EX(spec.Inputs_.empty(),
  485. TApiUsageError() << "TMapReduceOperationSpec::Inputs MUST be empty");
  486. Y_ENSURE_EX(spec.Outputs_.empty(),
  487. TApiUsageError() << "TMapReduceOperationSpec::Outputs MUST be empty");
  488. Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(),
  489. TApiUsageError() << "TMapReduceOperationSpec::ReduceBy MUST be empty");
  490. for (const auto& inputPath : input.Parts_) {
  491. spec.AddStructuredInput(inputPath);
  492. }
  493. for (const auto& outputPath : output.Parts_) {
  494. spec.AddStructuredOutput(outputPath);
  495. }
  496. spec.ReduceBy(reduceBy);
  497. return MapReduce(spec, std::move(mapper), std::move(reducer), options);
  498. }
  499. IOperationPtr IOperationClient::MapReduce(
  500. ::TIntrusivePtr<IMapperBase> mapper,
  501. ::TIntrusivePtr<IReducerBase> reduceCombiner,
  502. ::TIntrusivePtr<IReducerBase> reducer,
  503. const TOneOrMany<TStructuredTablePath>& input,
  504. const TOneOrMany<TStructuredTablePath>& output,
  505. const TSortColumns& reduceBy,
  506. TMapReduceOperationSpec spec,
  507. const TOperationOptions& options)
  508. {
  509. Y_ENSURE_EX(spec.Inputs_.empty(),
  510. TApiUsageError() << "TMapReduceOperationSpec::Inputs MUST be empty");
  511. Y_ENSURE_EX(spec.Outputs_.empty(),
  512. TApiUsageError() << "TMapReduceOperationSpec::Outputs MUST be empty");
  513. Y_ENSURE_EX(spec.ReduceBy_.Parts_.empty(),
  514. TApiUsageError() << "TMapReduceOperationSpec::ReduceBy MUST be empty");
  515. for (const auto& inputPath : input.Parts_) {
  516. spec.AddStructuredInput(inputPath);
  517. }
  518. for (const auto& outputPath : output.Parts_) {
  519. spec.AddStructuredOutput(outputPath);
  520. }
  521. spec.ReduceBy(reduceBy);
  522. return MapReduce(spec, std::move(mapper), std::move(reduceCombiner), std::move(reducer), options);
  523. }
  524. IOperationPtr IOperationClient::Sort(
  525. const TOneOrMany<TRichYPath>& input,
  526. const TRichYPath& output,
  527. const TSortColumns& sortBy,
  528. const TSortOperationSpec& spec,
  529. const TOperationOptions& options)
  530. {
  531. Y_ENSURE_EX(spec.Inputs_.empty(),
  532. TApiUsageError() << "TSortOperationSpec::Inputs MUST be empty");
  533. Y_ENSURE_EX(spec.Output_.Path_.empty(),
  534. TApiUsageError() << "TSortOperationSpec::Output MUST be empty");
  535. Y_ENSURE_EX(spec.SortBy_.Parts_.empty(),
  536. TApiUsageError() << "TSortOperationSpec::SortBy MUST be empty");
  537. auto sortSpec = spec;
  538. for (const auto& inputPath : input.Parts_) {
  539. sortSpec.AddInput(inputPath);
  540. }
  541. sortSpec.Output(output);
  542. sortSpec.SortBy(sortBy);
  543. return Sort(sortSpec, options);
  544. }
  545. ////////////////////////////////////////////////////////////////////////////////
  546. TRawTableReaderPtr IStructuredJob::CreateCustomRawJobReader(int) const
  547. {
  548. return nullptr;
  549. }
  550. THolder<IProxyOutput> IStructuredJob::CreateCustomRawJobWriter(size_t) const
  551. {
  552. return nullptr;
  553. }
  554. ////////////////////////////////////////////////////////////////////////////////
  555. } // namespace NYT