operation-inl.h 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928
  1. #pragma once
  2. #ifndef OPERATION_INL_H_
  3. #error "Direct inclusion of this file is not allowed, use operation.h"
  4. #include "operation.h"
  5. #endif
  6. #undef OPERATION_INL_H_
  7. #include "errors.h"
  8. #include <util/generic/bt_exception.h>
  9. #include <util/generic/singleton.h>
  10. #include <util/system/type_name.h>
  11. #include <util/stream/file.h>
  12. #include <util/stream/buffer.h>
  13. #include <util/string/subst.h>
  14. #include <typeindex>
  15. namespace NYT {
  16. namespace NDetail {
  17. ////////////////////////////////////////////////////////////////////////////////
  18. template<class T>
  19. void Assign(TVector<T>& array, size_t idx, const T& value) {
  20. array.resize(std::max(array.size(), idx + 1));
  21. array[idx] = value;
  22. }
  23. ////////////////////////////////////////////////////////////////////////////////
  24. template <typename TRow>
  25. TStructuredRowStreamDescription GetStructuredRowStreamDescription()
  26. {
  27. if constexpr (std::is_same_v<TRow, NYT::TNode>) {
  28. return TTNodeStructuredRowStream{};
  29. } else if constexpr (std::is_same_v<TRow, NYT::TYaMRRow>) {
  30. return TTYaMRRowStructuredRowStream{};
  31. } else if constexpr (std::is_same_v<::google::protobuf::Message, TRow>) {
  32. return TProtobufStructuredRowStream{nullptr};
  33. } else if constexpr (TIsBaseOf<::google::protobuf::Message, TRow>::Value) {
  34. return TProtobufStructuredRowStream{TRow::descriptor()};
  35. } else if constexpr (TIsProtoOneOf<TRow>::value) {
  36. return TProtobufStructuredRowStream{nullptr};
  37. } else {
  38. static_assert(TDependentFalse<TRow>, "Unknown row type");
  39. }
  40. }
  41. ////////////////////////////////////////////////////////////////////////////////
  42. } // namespace NDetail
  43. ////////////////////////////////////////////////////////////////////////////////
  44. template <typename TRow>
  45. TStructuredTablePath Structured(TRichYPath richYPath)
  46. {
  47. return TStructuredTablePath(std::move(richYPath), StructuredTableDescription<TRow>());
  48. }
  49. template <typename TRow>
  50. TTableStructure StructuredTableDescription()
  51. {
  52. if constexpr (std::is_same_v<TRow, NYT::TNode>) {
  53. return TUnspecifiedTableStructure{};
  54. } else if constexpr (std::is_same_v<TRow, NYT::TYaMRRow>) {
  55. return TUnspecifiedTableStructure{};
  56. } else if constexpr (std::is_base_of_v<::google::protobuf::Message, TRow>) {
  57. if constexpr (std::is_same_v<::google::protobuf::Message, TRow>) {
  58. static_assert(TDependentFalse<TRow>, "Cannot use ::google::protobuf::Message as table descriptor");
  59. } else {
  60. return TProtobufTableStructure{TRow::descriptor()};
  61. }
  62. } else {
  63. static_assert(TDependentFalse<TRow>, "Unknown row type");
  64. }
  65. }
  66. ////////////////////////////////////////////////////////////////////////////////
  67. template <typename TDerived>
  68. TDerived& TRawOperationIoTableSpec<TDerived>::AddInput(const TRichYPath& path)
  69. {
  70. Inputs_.push_back(path);
  71. return static_cast<TDerived&>(*this);
  72. }
  73. template <typename TDerived>
  74. TDerived& TRawOperationIoTableSpec<TDerived>::SetInput(size_t tableIndex, const TRichYPath& path)
  75. {
  76. NDetail::Assign(Inputs_, tableIndex, path);
  77. }
  78. template <typename TDerived>
  79. TDerived& TRawOperationIoTableSpec<TDerived>::AddOutput(const TRichYPath& path)
  80. {
  81. Outputs_.push_back(path);
  82. return static_cast<TDerived&>(*this);
  83. }
  84. template <typename TDerived>
  85. TDerived& TRawOperationIoTableSpec<TDerived>::SetOutput(size_t tableIndex, const TRichYPath& path)
  86. {
  87. NDetail::Assign(Outputs_, tableIndex, path);
  88. }
  89. template <typename TDerived>
  90. const TVector<TRichYPath>& TRawOperationIoTableSpec<TDerived>::GetInputs() const
  91. {
  92. return Inputs_;
  93. }
  94. template <typename TDerived>
  95. const TVector<TRichYPath>& TRawOperationIoTableSpec<TDerived>::GetOutputs() const
  96. {
  97. return Outputs_;
  98. }
  99. ////////////////////////////////////////////////////////////////////////////////
  100. template <typename TDerived>
  101. TDerived& TRawMapReduceOperationIoSpec<TDerived>::AddMapOutput(const TRichYPath& path)
  102. {
  103. MapOutputs_.push_back(path);
  104. return static_cast<TDerived&>(*this);
  105. }
  106. template <typename TDerived>
  107. TDerived& TRawMapReduceOperationIoSpec<TDerived>::SetMapOutput(size_t tableIndex, const TRichYPath& path)
  108. {
  109. NDetail::Assign(MapOutputs_, tableIndex, path);
  110. }
  111. template <typename TDerived>
  112. const TVector<TRichYPath>& TRawMapReduceOperationIoSpec<TDerived>::GetMapOutputs() const
  113. {
  114. return MapOutputs_;
  115. }
  116. ////////////////////////////////////////////////////////////////////////////////
  117. ::TIntrusivePtr<INodeReaderImpl> CreateJobNodeReader(TRawTableReaderPtr rawTableReader);
  118. ::TIntrusivePtr<IYaMRReaderImpl> CreateJobYaMRReader(TRawTableReaderPtr rawTableReader);
  119. ::TIntrusivePtr<IProtoReaderImpl> CreateJobProtoReader(TRawTableReaderPtr rawTableReader);
  120. ::TIntrusivePtr<INodeWriterImpl> CreateJobNodeWriter(THolder<IProxyOutput> rawTableWriter);
  121. ::TIntrusivePtr<IYaMRWriterImpl> CreateJobYaMRWriter(THolder<IProxyOutput> rawTableWriter);
  122. ::TIntrusivePtr<IProtoWriterImpl> CreateJobProtoWriter(THolder<IProxyOutput> rawTableWriter);
  123. ////////////////////////////////////////////////////////////////////////////////
  124. template <class T>
  125. inline ::TIntrusivePtr<typename TRowTraits<T>::IReaderImpl> CreateJobReaderImpl(TRawTableReaderPtr rawTableReader);
  126. template <>
  127. inline ::TIntrusivePtr<INodeReaderImpl> CreateJobReaderImpl<TNode>(TRawTableReaderPtr rawTableReader)
  128. {
  129. return CreateJobNodeReader(rawTableReader);
  130. }
  131. template <>
  132. inline ::TIntrusivePtr<IYaMRReaderImpl> CreateJobReaderImpl<TYaMRRow>(TRawTableReaderPtr rawTableReader)
  133. {
  134. return CreateJobYaMRReader(rawTableReader);
  135. }
  136. template <>
  137. inline ::TIntrusivePtr<IProtoReaderImpl> CreateJobReaderImpl<Message>(TRawTableReaderPtr rawTableReader)
  138. {
  139. return CreateJobProtoReader(rawTableReader);
  140. }
  141. template <class T>
  142. inline ::TIntrusivePtr<typename TRowTraits<T>::IReaderImpl> CreateJobReaderImpl(TRawTableReaderPtr rawTableReader)
  143. {
  144. if constexpr (TIsBaseOf<Message, T>::Value || NDetail::TIsProtoOneOf<T>::value) {
  145. return CreateJobProtoReader(rawTableReader);
  146. } else {
  147. static_assert(TDependentFalse<T>, "Unknown row type");
  148. }
  149. }
  150. template <class T>
  151. inline TTableReaderPtr<T> CreateJobReader(TRawTableReaderPtr rawTableReader)
  152. {
  153. return new TTableReader<T>(CreateJobReaderImpl<T>(rawTableReader));
  154. }
  155. ////////////////////////////////////////////////////////////////////////////////
  156. template <class T>
  157. TTableWriterPtr<T> CreateJobWriter(THolder<IProxyOutput> rawJobWriter);
  158. template <>
  159. inline TTableWriterPtr<TNode> CreateJobWriter<TNode>(THolder<IProxyOutput> rawJobWriter)
  160. {
  161. return new TTableWriter<TNode>(CreateJobNodeWriter(std::move(rawJobWriter)));
  162. }
  163. template <>
  164. inline TTableWriterPtr<TYaMRRow> CreateJobWriter<TYaMRRow>(THolder<IProxyOutput> rawJobWriter)
  165. {
  166. return new TTableWriter<TYaMRRow>(CreateJobYaMRWriter(std::move(rawJobWriter)));
  167. }
  168. template <>
  169. inline TTableWriterPtr<Message> CreateJobWriter<Message>(THolder<IProxyOutput> rawJobWriter)
  170. {
  171. return new TTableWriter<Message>(CreateJobProtoWriter(std::move(rawJobWriter)));
  172. }
  173. template <class T, class = void>
  174. struct TProtoWriterCreator;
  175. template <class T>
  176. struct TProtoWriterCreator<T, std::enable_if_t<TIsBaseOf<Message, T>::Value>>
  177. {
  178. static TTableWriterPtr<T> Create(::TIntrusivePtr<IProtoWriterImpl> writer)
  179. {
  180. return new TTableWriter<T>(writer);
  181. }
  182. };
  183. template <class T>
  184. inline TTableWriterPtr<T> CreateJobWriter(THolder<IProxyOutput> rawJobWriter)
  185. {
  186. if constexpr (TIsBaseOf<Message, T>::Value) {
  187. return TProtoWriterCreator<T>::Create(CreateJobProtoWriter(std::move(rawJobWriter)));
  188. } else {
  189. static_assert(TDependentFalse<T>, "Unknown row type");
  190. }
  191. }
  192. ////////////////////////////////////////////////////////////////////////////////
  193. template <class T>
  194. void TOperationInputSpecBase::AddInput(const TRichYPath& path)
  195. {
  196. Inputs_.push_back(path);
  197. StructuredInputs_.emplace_back(Structured<T>(path));
  198. }
  199. template <class T>
  200. void TOperationInputSpecBase::SetInput(size_t tableIndex, const TRichYPath& path)
  201. {
  202. NDetail::Assign(Inputs_, tableIndex, path);
  203. NDetail::Assign(StructuredInputs_, tableIndex, Structured<T>(path));
  204. }
  205. template <class T>
  206. void TOperationOutputSpecBase::AddOutput(const TRichYPath& path)
  207. {
  208. Outputs_.push_back(path);
  209. StructuredOutputs_.emplace_back(Structured<T>(path));
  210. }
  211. template <class T>
  212. void TOperationOutputSpecBase::SetOutput(size_t tableIndex, const TRichYPath& path)
  213. {
  214. NDetail::Assign(Outputs_, tableIndex, path);
  215. NDetail::Assign(StructuredOutputs_, tableIndex, Structured<T>(path));
  216. }
  217. template <class TDerived>
  218. template <class T>
  219. TDerived& TOperationIOSpec<TDerived>::AddInput(const TRichYPath& path)
  220. {
  221. static_assert(!std::is_same<T, Message>::value, "input type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  222. TOperationInputSpecBase::AddInput<T>(path);
  223. return *static_cast<TDerived*>(this);
  224. }
  225. template <class TDerived>
  226. template <class T>
  227. TDerived& TOperationIOSpec<TDerived>::SetInput(size_t tableIndex, const TRichYPath& path)
  228. {
  229. static_assert(!std::is_same<T, Message>::value, "input type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  230. TOperationInputSpecBase::SetInput<T>(tableIndex, path);
  231. return *static_cast<TDerived*>(this);
  232. }
  233. template <class TDerived>
  234. template <class T>
  235. TDerived& TOperationIOSpec<TDerived>::AddOutput(const TRichYPath& path)
  236. {
  237. static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  238. TOperationOutputSpecBase::AddOutput<T>(path);
  239. return *static_cast<TDerived*>(this);
  240. }
  241. template <class TDerived>
  242. template <class T>
  243. TDerived& TOperationIOSpec<TDerived>::SetOutput(size_t tableIndex, const TRichYPath& path)
  244. {
  245. static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  246. TOperationOutputSpecBase::SetOutput<T>(tableIndex, path);
  247. return *static_cast<TDerived*>(this);
  248. }
  249. template <class TDerived>
  250. TDerived& TOperationIOSpec<TDerived>::AddStructuredInput(TStructuredTablePath path)
  251. {
  252. TOperationInputSpecBase::AddStructuredInput(std::move(path));
  253. return *static_cast<TDerived*>(this);
  254. }
  255. template <class TDerived>
  256. TDerived& TOperationIOSpec<TDerived>::AddStructuredOutput(TStructuredTablePath path)
  257. {
  258. TOperationOutputSpecBase::AddStructuredOutput(std::move(path));
  259. return *static_cast<TDerived*>(this);
  260. }
  261. ////////////////////////////////////////////////////////////////////////////////
  262. template <class T>
  263. TVanillaTask& TVanillaTask::AddOutput(const TRichYPath& path)
  264. {
  265. static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  266. TOperationOutputSpecBase::AddOutput<T>(path);
  267. return *this;
  268. }
  269. template <class T>
  270. TVanillaTask& TVanillaTask::SetOutput(size_t tableIndex, const TRichYPath& path)
  271. {
  272. static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  273. TOperationOutputSpecBase::SetOutput<T>(tableIndex, path);
  274. return *this;
  275. }
  276. ////////////////////////////////////////////////////////////////////////////////
  277. namespace NDetail {
  278. void ResetUseClientProtobuf(const char* methodName);
  279. } // namespace NDetail
  280. template <class TDerived>
  281. TDerived& TOperationIOSpec<TDerived>::AddProtobufInput_VerySlow_Deprecated(const TRichYPath& path)
  282. {
  283. NDetail::ResetUseClientProtobuf("AddProtobufInput_VerySlow_Deprecated");
  284. Inputs_.push_back(path);
  285. StructuredInputs_.emplace_back(TStructuredTablePath(path, TProtobufTableStructure{nullptr}));
  286. return *static_cast<TDerived*>(this);
  287. }
  288. template <class TDerived>
  289. TDerived& TOperationIOSpec<TDerived>::AddProtobufOutput_VerySlow_Deprecated(const TRichYPath& path)
  290. {
  291. NDetail::ResetUseClientProtobuf("AddProtobufOutput_VerySlow_Deprecated");
  292. Outputs_.push_back(path);
  293. StructuredOutputs_.emplace_back(TStructuredTablePath(path, TProtobufTableStructure{nullptr}));
  294. return *static_cast<TDerived*>(this);
  295. }
  296. ////////////////////////////////////////////////////////////////////////////////
  297. template <typename TRow>
  298. TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::Description()
  299. {
  300. for (auto i : Indices_) {
  301. Preparer_.InputDescription<TRow>(i);
  302. }
  303. return *this;
  304. }
  305. template <typename TRow>
  306. TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::Description(bool inferSchema)
  307. {
  308. for (auto i : Indices_) {
  309. Preparer_.OutputDescription<TRow>(i, inferSchema);
  310. }
  311. return *this;
  312. }
  313. ////////////////////////////////////////////////////////////////////////////////
  314. template <typename TCont>
  315. TJobOperationPreparer::TInputGroup TJobOperationPreparer::BeginInputGroup(const TCont& indices)
  316. {
  317. for (auto i : indices) {
  318. ValidateInputTableIndex(i, TStringBuf("BeginInputGroup()"));
  319. }
  320. return TInputGroup(*this, TVector<int>(std::begin(indices), std::end(indices)));
  321. }
  322. template <typename TCont>
  323. TJobOperationPreparer::TOutputGroup TJobOperationPreparer::BeginOutputGroup(const TCont& indices)
  324. {
  325. for (auto i : indices) {
  326. ValidateOutputTableIndex(i, TStringBuf("BeginOutputGroup()"));
  327. }
  328. return TOutputGroup(*this, indices);
  329. }
  330. template <typename TRow>
  331. TJobOperationPreparer& TJobOperationPreparer::InputDescription(int tableIndex)
  332. {
  333. ValidateMissingInputDescription(tableIndex);
  334. InputTableDescriptions_[tableIndex] = StructuredTableDescription<TRow>();
  335. return *this;
  336. }
  337. template <typename TRow>
  338. TJobOperationPreparer& TJobOperationPreparer::OutputDescription(int tableIndex, bool inferSchema)
  339. {
  340. ValidateMissingOutputDescription(tableIndex);
  341. OutputTableDescriptions_[tableIndex] = StructuredTableDescription<TRow>();
  342. if (inferSchema && !OutputSchemas_[tableIndex]) {
  343. OutputSchemas_[tableIndex] = CreateTableSchema<TRow>();
  344. }
  345. return *this;
  346. }
  347. ////////////////////////////////////////////////////////////////////////////////
  348. template <class TDerived>
  349. template <class TRow>
  350. TDerived& TIntermediateTablesHintSpec<TDerived>::HintMapOutput()
  351. {
  352. IntermediateMapOutputDescription_ = StructuredTableDescription<TRow>();
  353. return *static_cast<TDerived*>(this);
  354. }
  355. template <class TDerived>
  356. template <class TRow>
  357. TDerived& TIntermediateTablesHintSpec<TDerived>::AddMapOutput(const TRichYPath& path)
  358. {
  359. MapOutputs_.push_back(path);
  360. StructuredMapOutputs_.emplace_back(Structured<TRow>(path));
  361. return *static_cast<TDerived*>(this);
  362. }
  363. template <class TDerived>
  364. template <class TRow>
  365. TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceCombinerInput()
  366. {
  367. IntermediateReduceCombinerInputDescription_ = StructuredTableDescription<TRow>();
  368. return *static_cast<TDerived*>(this);
  369. }
  370. template <class TDerived>
  371. template <class TRow>
  372. TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceCombinerOutput()
  373. {
  374. IntermediateReduceCombinerOutputDescription_ = StructuredTableDescription<TRow>();
  375. return *static_cast<TDerived*>(this);
  376. }
  377. template <class TDerived>
  378. template <class TRow>
  379. TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceInput()
  380. {
  381. IntermediateReducerInputDescription_ = StructuredTableDescription<TRow>();
  382. return *static_cast<TDerived*>(this);
  383. }
  384. template <class TDerived>
  385. const TVector<TStructuredTablePath>& TIntermediateTablesHintSpec<TDerived>::GetStructuredMapOutputs() const
  386. {
  387. return StructuredMapOutputs_;
  388. }
  389. template <class TDerived>
  390. const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateMapOutputDescription() const
  391. {
  392. return IntermediateMapOutputDescription_;
  393. }
  394. template <class TDerived>
  395. const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReduceCombinerInputDescription() const
  396. {
  397. return IntermediateReduceCombinerInputDescription_;
  398. }
  399. template <class TDerived>
  400. const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReduceCombinerOutputDescription() const
  401. {
  402. return IntermediateReduceCombinerOutputDescription_;
  403. }
  404. template <class TDerived>
  405. const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReducerInputDescription() const
  406. {
  407. return IntermediateReducerInputDescription_;
  408. }
  409. ////////////////////////////////////////////////////////////////////////////////
  410. struct TReducerContext
  411. {
  412. bool Break = false;
  413. static TReducerContext* Get() { return Singleton<TReducerContext>(); }
  414. };
  415. template <class TR, class TW>
  416. inline void IReducer<TR, TW>::Break()
  417. {
  418. TReducerContext::Get()->Break = true;
  419. }
  420. template <typename TReader, typename TWriter>
  421. void FeedJobInput(
  422. IMapper<TReader, TWriter>* mapper,
  423. typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl,
  424. TWriter* writer)
  425. {
  426. using TInputRow = typename TReader::TRowType;
  427. auto reader = MakeIntrusive<TTableReader<TInputRow>>(readerImpl);
  428. mapper->Do(reader.Get(), writer);
  429. }
  430. template <typename TReader, typename TWriter>
  431. void FeedJobInput(
  432. IReducer<TReader, TWriter>* reducer,
  433. typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl,
  434. TWriter* writer)
  435. {
  436. using TInputRow = typename TReader::TRowType;
  437. auto rangesReader = MakeIntrusive<TTableRangesReader<TInputRow>>(readerImpl);
  438. for (; rangesReader->IsValid(); rangesReader->Next()) {
  439. reducer->Do(&rangesReader->GetRange(), writer);
  440. if (TReducerContext::Get()->Break) {
  441. break;
  442. }
  443. }
  444. }
  445. template <typename TReader, typename TWriter>
  446. void FeedJobInput(
  447. IAggregatorReducer<TReader, TWriter>* reducer,
  448. typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl,
  449. TWriter* writer)
  450. {
  451. using TInputRow = typename TReader::TRowType;
  452. auto rangesReader = MakeIntrusive<TTableRangesReader<TInputRow>>(readerImpl);
  453. reducer->Do(rangesReader.Get(), writer);
  454. }
  455. template <class TRawJob>
  456. int RunRawJob(size_t outputTableCount, IInputStream& jobStateStream)
  457. {
  458. TRawJobContext context(outputTableCount);
  459. TRawJob job;
  460. job.Load(jobStateStream);
  461. job.Do(context);
  462. return 0;
  463. }
  464. template <>
  465. inline int RunRawJob<TCommandRawJob>(size_t /* outputTableCount */, IInputStream& /* jobStateStream */)
  466. {
  467. Y_ABORT();
  468. }
  469. template <class TVanillaJob>
  470. int RunVanillaJob(size_t outputTableCount, IInputStream& jobStateStream)
  471. {
  472. TVanillaJob job;
  473. job.Load(jobStateStream);
  474. if constexpr (std::is_base_of<IVanillaJob<>, TVanillaJob>::value) {
  475. Y_ABORT_UNLESS(outputTableCount == 0, "Void vanilla job expects zero 'outputTableCount'");
  476. job.Do();
  477. } else {
  478. Y_ABORT_UNLESS(outputTableCount, "Vanilla job with table writer expects nonzero 'outputTableCount'");
  479. using TOutputRow = typename TVanillaJob::TWriter::TRowType;
  480. THolder<IProxyOutput> rawJobWriter;
  481. if (auto customWriter = job.CreateCustomRawJobWriter(outputTableCount)) {
  482. rawJobWriter = std::move(customWriter);
  483. } else {
  484. rawJobWriter = CreateRawJobWriter(outputTableCount);
  485. }
  486. auto writer = CreateJobWriter<TOutputRow>(std::move(rawJobWriter));
  487. job.Start(writer.Get());
  488. job.Do(writer.Get());
  489. job.Finish(writer.Get());
  490. writer->Finish();
  491. }
  492. return 0;
  493. }
  494. template <>
  495. inline int RunVanillaJob<TCommandVanillaJob>(size_t /* outputTableCount */, IInputStream& /* jobStateStream */)
  496. {
  497. Y_ABORT();
  498. }
  499. template <class TJob>
  500. requires TIsBaseOf<IStructuredJob, TJob>::Value
  501. int RunJob(size_t outputTableCount, IInputStream& jobStateStream)
  502. {
  503. using TInputRow = typename TJob::TReader::TRowType;
  504. using TOutputRow = typename TJob::TWriter::TRowType;
  505. auto job = MakeIntrusive<TJob>();
  506. job->Load(jobStateStream);
  507. TRawTableReaderPtr rawJobReader;
  508. if (auto customReader = job->CreateCustomRawJobReader(/*fd*/ 0)) {
  509. rawJobReader = customReader;
  510. } else {
  511. rawJobReader = CreateRawJobReader(/*fd*/ 0);
  512. }
  513. auto readerImpl = CreateJobReaderImpl<TInputRow>(rawJobReader);
  514. // Many users don't expect to have jobs with empty input so we skip such jobs.
  515. if (!readerImpl->IsValid()) {
  516. return 0;
  517. }
  518. THolder<IProxyOutput> rawJobWriter;
  519. if (auto customWriter = job->CreateCustomRawJobWriter(outputTableCount)) {
  520. rawJobWriter = std::move(customWriter);
  521. } else {
  522. rawJobWriter = CreateRawJobWriter(outputTableCount);
  523. }
  524. auto writer = CreateJobWriter<TOutputRow>(std::move(rawJobWriter));
  525. job->Start(writer.Get());
  526. FeedJobInput(job.Get(), readerImpl.Get(), writer.Get());
  527. job->Finish(writer.Get());
  528. writer->Finish();
  529. return 0;
  530. }
  531. //
  532. // We leave RunMapJob/RunReduceJob/RunAggregatorReducer for backward compatibility,
  533. // some user use them already. :(
  534. template <class TMapper>
  535. int RunMapJob(size_t outputTableCount, IInputStream& jobStateStream)
  536. {
  537. return RunJob<TMapper>(outputTableCount, jobStateStream);
  538. }
  539. template <class TReducer>
  540. int RunReduceJob(size_t outputTableCount, IInputStream& jobStateStream)
  541. {
  542. return RunJob<TReducer>(outputTableCount, jobStateStream);
  543. }
  544. template <class TReducer>
  545. int RunAggregatorReducer(size_t outputTableCount, IInputStream& jobStateStream)
  546. {
  547. return RunJob<TReducer>(outputTableCount, jobStateStream);
  548. }
  549. ////////////////////////////////////////////////////////////////////////////////
  550. template <typename T, typename = void>
  551. struct TIsConstructibleFromNode
  552. : std::false_type
  553. { };
  554. template <typename T>
  555. struct TIsConstructibleFromNode<T, std::void_t<decltype(T::FromNode(std::declval<TNode&>()))>>
  556. : std::true_type
  557. { };
  558. template <class TJob>
  559. ::TIntrusivePtr<NYT::IStructuredJob> ConstructJobFromNode(const TNode& node)
  560. {
  561. if constexpr (TIsConstructibleFromNode<TJob>::value) {
  562. Y_ENSURE(node.GetType() != TNode::Undefined,
  563. "job has FromNode method but constructor arguments were not provided");
  564. return TJob::FromNode(node);
  565. } else {
  566. Y_ENSURE(node.GetType() == TNode::Undefined,
  567. "constructor arguments provided but job does not contain FromNode method");
  568. return MakeIntrusive<TJob>();
  569. }
  570. }
  571. ////////////////////////////////////////////////////////////////////////////////
  572. using TJobFunction = int (*)(size_t, IInputStream&);
  573. using TConstructJobFunction = ::TIntrusivePtr<NYT::IStructuredJob> (*)(const TNode&);
  574. class TJobFactory
  575. {
  576. public:
  577. static TJobFactory* Get()
  578. {
  579. return Singleton<TJobFactory>();
  580. }
  581. template <class TJob>
  582. void RegisterJob(const char* name)
  583. {
  584. RegisterJobImpl<TJob>(name, RunJob<TJob>);
  585. JobConstructors[name] = ConstructJobFromNode<TJob>;
  586. }
  587. template <class TRawJob>
  588. void RegisterRawJob(const char* name)
  589. {
  590. RegisterJobImpl<TRawJob>(name, RunRawJob<TRawJob>);
  591. }
  592. template <class TVanillaJob>
  593. void RegisterVanillaJob(const char* name)
  594. {
  595. RegisterJobImpl<TVanillaJob>(name, RunVanillaJob<TVanillaJob>);
  596. }
  597. TString GetJobName(const IJob* job)
  598. {
  599. const auto typeIndex = std::type_index(typeid(*job));
  600. CheckJobRegistered(typeIndex);
  601. return JobNames[typeIndex];
  602. }
  603. TJobFunction GetJobFunction(const char* name)
  604. {
  605. CheckNameRegistered(name);
  606. return JobFunctions[name];
  607. }
  608. TConstructJobFunction GetConstructingFunction(const char* name)
  609. {
  610. CheckNameRegistered(name);
  611. return JobConstructors[name];
  612. }
  613. private:
  614. TMap<std::type_index, TString> JobNames;
  615. THashMap<TString, TJobFunction> JobFunctions;
  616. THashMap<TString, TConstructJobFunction> JobConstructors;
  617. template <typename TJob, typename TRunner>
  618. void RegisterJobImpl(const char* name, TRunner runner) {
  619. const auto typeIndex = std::type_index(typeid(TJob));
  620. CheckNotRegistered(typeIndex, name);
  621. JobNames[typeIndex] = name;
  622. JobFunctions[name] = runner;
  623. }
  624. void CheckNotRegistered(const std::type_index& typeIndex, const char* name)
  625. {
  626. Y_ENSURE(!JobNames.contains(typeIndex),
  627. "type_info '" << typeIndex.name() << "'"
  628. "is already registered under name '" << JobNames[typeIndex] << "'");
  629. Y_ENSURE(!JobFunctions.contains(name),
  630. "job with name '" << name << "' is already registered");
  631. }
  632. void CheckJobRegistered(const std::type_index& typeIndex)
  633. {
  634. Y_ENSURE(JobNames.contains(typeIndex),
  635. "type_info '" << typeIndex.name() << "' is not registered, use REGISTER_* macros");
  636. }
  637. void CheckNameRegistered(const char* name)
  638. {
  639. Y_ENSURE(JobFunctions.contains(name),
  640. "job with name '" << name << "' is not registered, use REGISTER_* macros");
  641. }
  642. };
  643. ////////////////////////////////////////////////////////////////////////////////
  644. template <class TMapper>
  645. struct TMapperRegistrator
  646. {
  647. TMapperRegistrator(const char* name)
  648. {
  649. static_assert(TMapper::JobType == IJob::EType::Mapper,
  650. "REGISTER_MAPPER is not compatible with this job class");
  651. NYT::TJobFactory::Get()->RegisterJob<TMapper>(name);
  652. }
  653. };
  654. template <class TReducer>
  655. struct TReducerRegistrator
  656. {
  657. TReducerRegistrator(const char* name)
  658. {
  659. static_assert(TReducer::JobType == IJob::EType::Reducer ||
  660. TReducer::JobType == IJob::EType::ReducerAggregator,
  661. "REGISTER_REDUCER is not compatible with this job class");
  662. NYT::TJobFactory::Get()->RegisterJob<TReducer>(name);
  663. }
  664. };
  665. template <class TRawJob>
  666. struct TRawJobRegistrator
  667. {
  668. TRawJobRegistrator(const char* name)
  669. {
  670. static_assert(TRawJob::JobType == IJob::EType::RawJob,
  671. "REGISTER_RAW_JOB is not compatible with this job class");
  672. NYT::TJobFactory::Get()->RegisterRawJob<TRawJob>(name);
  673. }
  674. };
  675. template <class TVanillaJob>
  676. struct TVanillaJobRegistrator
  677. {
  678. TVanillaJobRegistrator(const char* name)
  679. {
  680. static_assert(TVanillaJob::JobType == IJob::EType::VanillaJob,
  681. "REGISTER_VANILLA_JOB is not compatible with this job class");
  682. NYT::TJobFactory::Get()->RegisterVanillaJob<TVanillaJob>(name);
  683. }
  684. };
  685. ////////////////////////////////////////////////////////////////////////////////
  686. inline TString YtRegistryTypeName(const TString& name) {
  687. TString res = name;
  688. #ifdef _win_
  689. SubstGlobal(res, "class ", "");
  690. #endif
  691. return res;
  692. }
  693. ////////////////////////////////////////////////////////////////////////////////
  694. #define REGISTER_MAPPER(...) \
  695. static const NYT::TMapperRegistrator<__VA_ARGS__> \
  696. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data());
  697. #define REGISTER_NAMED_MAPPER(name, ...) \
  698. static const NYT::TMapperRegistrator<__VA_ARGS__> \
  699. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
  700. #define REGISTER_REDUCER(...) \
  701. static const NYT::TReducerRegistrator<__VA_ARGS__> \
  702. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data());
  703. #define REGISTER_NAMED_REDUCER(name, ...) \
  704. static const NYT::TReducerRegistrator<__VA_ARGS__> \
  705. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
  706. #define REGISTER_NAMED_RAW_JOB(name, ...) \
  707. static const NYT::TRawJobRegistrator<__VA_ARGS__> \
  708. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
  709. #define REGISTER_RAW_JOB(...) \
  710. REGISTER_NAMED_RAW_JOB((NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data()), __VA_ARGS__)
  711. #define REGISTER_NAMED_VANILLA_JOB(name, ...) \
  712. static NYT::TVanillaJobRegistrator<__VA_ARGS__> \
  713. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
  714. #define REGISTER_VANILLA_JOB(...) \
  715. REGISTER_NAMED_VANILLA_JOB((NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data()), __VA_ARGS__)
  716. ////////////////////////////////////////////////////////////////////////////////
  717. template <typename TReader, typename TWriter>
  718. TStructuredRowStreamDescription IMapper<TReader, TWriter>::GetInputRowStreamDescription() const
  719. {
  720. return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>();
  721. }
  722. template <typename TReader, typename TWriter>
  723. TStructuredRowStreamDescription IMapper<TReader, TWriter>::GetOutputRowStreamDescription() const
  724. {
  725. return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
  726. }
  727. ////////////////////////////////////////////////////////////////////////////////
  728. template <typename TReader, typename TWriter>
  729. TStructuredRowStreamDescription IReducer<TReader, TWriter>::GetInputRowStreamDescription() const
  730. {
  731. return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>();
  732. }
  733. template <typename TReader, typename TWriter>
  734. TStructuredRowStreamDescription IReducer<TReader, TWriter>::GetOutputRowStreamDescription() const
  735. {
  736. return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
  737. }
  738. ////////////////////////////////////////////////////////////////////////////////
  739. template <typename TReader, typename TWriter>
  740. TStructuredRowStreamDescription IAggregatorReducer<TReader, TWriter>::GetInputRowStreamDescription() const
  741. {
  742. return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>();
  743. }
  744. template <typename TReader, typename TWriter>
  745. TStructuredRowStreamDescription IAggregatorReducer<TReader, TWriter>::GetOutputRowStreamDescription() const
  746. {
  747. return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
  748. }
  749. ////////////////////////////////////////////////////////////////////////////////
  750. template <typename TWriter>
  751. TStructuredRowStreamDescription IVanillaJob<TWriter>::GetInputRowStreamDescription() const
  752. {
  753. return TVoidStructuredRowStream();
  754. }
  755. template <typename TWriter>
  756. TStructuredRowStreamDescription IVanillaJob<TWriter>::GetOutputRowStreamDescription() const
  757. {
  758. return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
  759. }
  760. ////////////////////////////////////////////////////////////////////////////////
  761. } // namespace NYT