operation-inl.h 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931
  1. #pragma once
  2. #ifndef OPERATION_INL_H_
  3. #error "Direct inclusion of this file is not allowed, use operation.h"
  4. #include "operation.h"
  5. #endif
  6. #undef OPERATION_INL_H_
  7. #include "errors.h"
  8. #include <util/generic/bt_exception.h>
  9. #include <util/generic/singleton.h>
  10. #include <util/system/type_name.h>
  11. #include <util/stream/file.h>
  12. #include <util/stream/buffer.h>
  13. #include <util/string/subst.h>
  14. #include <typeindex>
  15. namespace NYT {
  16. namespace NDetail {
  17. ////////////////////////////////////////////////////////////////////////////////
  18. template<class T>
  19. void Assign(TVector<T>& array, size_t idx, const T& value) {
  20. array.resize(std::max(array.size(), idx + 1));
  21. array[idx] = value;
  22. }
  23. ////////////////////////////////////////////////////////////////////////////////
  24. template <typename TRow>
  25. TStructuredRowStreamDescription GetStructuredRowStreamDescription()
  26. {
  27. if constexpr (std::is_same_v<TRow, NYT::TNode>) {
  28. return TTNodeStructuredRowStream{};
  29. } else if constexpr (std::is_same_v<TRow, NYT::TYaMRRow>) {
  30. return TTYaMRRowStructuredRowStream{};
  31. } else if constexpr (std::is_same_v<::google::protobuf::Message, TRow>) {
  32. return TProtobufStructuredRowStream{nullptr};
  33. } else if constexpr (TIsBaseOf<::google::protobuf::Message, TRow>::Value) {
  34. return TProtobufStructuredRowStream{TRow::descriptor()};
  35. } else if constexpr (TIsProtoOneOf<TRow>::value) {
  36. return TProtobufStructuredRowStream{nullptr};
  37. } else {
  38. static_assert(TDependentFalse<TRow>, "Unknown row type");
  39. }
  40. }
  41. ////////////////////////////////////////////////////////////////////////////////
  42. } // namespace NDetail
  43. ////////////////////////////////////////////////////////////////////////////////
  44. template <typename TRow>
  45. TStructuredTablePath Structured(TRichYPath richYPath)
  46. {
  47. return TStructuredTablePath(std::move(richYPath), StructuredTableDescription<TRow>());
  48. }
  49. template <typename TRow>
  50. TTableStructure StructuredTableDescription()
  51. {
  52. if constexpr (std::is_same_v<TRow, NYT::TNode>) {
  53. return TUnspecifiedTableStructure{};
  54. } else if constexpr (std::is_same_v<TRow, NYT::TYaMRRow>) {
  55. return TUnspecifiedTableStructure{};
  56. } else if constexpr (std::is_base_of_v<::google::protobuf::Message, TRow>) {
  57. if constexpr (std::is_same_v<::google::protobuf::Message, TRow>) {
  58. static_assert(TDependentFalse<TRow>, "Cannot use ::google::protobuf::Message as table descriptor");
  59. } else {
  60. return TProtobufTableStructure{TRow::descriptor()};
  61. }
  62. } else {
  63. static_assert(TDependentFalse<TRow>, "Unknown row type");
  64. }
  65. }
  66. ////////////////////////////////////////////////////////////////////////////////
  67. template <typename TDerived>
  68. TDerived& TRawOperationIoTableSpec<TDerived>::AddInput(const TRichYPath& path)
  69. {
  70. Inputs_.push_back(path);
  71. return static_cast<TDerived&>(*this);
  72. }
  73. template <typename TDerived>
  74. TDerived& TRawOperationIoTableSpec<TDerived>::SetInput(size_t tableIndex, const TRichYPath& path)
  75. {
  76. NDetail::Assign(Inputs_, tableIndex, path);
  77. return static_cast<TDerived&>(*this);
  78. }
  79. template <typename TDerived>
  80. TDerived& TRawOperationIoTableSpec<TDerived>::AddOutput(const TRichYPath& path)
  81. {
  82. Outputs_.push_back(path);
  83. return static_cast<TDerived&>(*this);
  84. }
  85. template <typename TDerived>
  86. TDerived& TRawOperationIoTableSpec<TDerived>::SetOutput(size_t tableIndex, const TRichYPath& path)
  87. {
  88. NDetail::Assign(Outputs_, tableIndex, path);
  89. return static_cast<TDerived&>(*this);
  90. }
  91. template <typename TDerived>
  92. const TVector<TRichYPath>& TRawOperationIoTableSpec<TDerived>::GetInputs() const
  93. {
  94. return Inputs_;
  95. }
  96. template <typename TDerived>
  97. const TVector<TRichYPath>& TRawOperationIoTableSpec<TDerived>::GetOutputs() const
  98. {
  99. return Outputs_;
  100. }
  101. ////////////////////////////////////////////////////////////////////////////////
  102. template <typename TDerived>
  103. TDerived& TRawMapReduceOperationIoSpec<TDerived>::AddMapOutput(const TRichYPath& path)
  104. {
  105. MapOutputs_.push_back(path);
  106. return static_cast<TDerived&>(*this);
  107. }
  108. template <typename TDerived>
  109. TDerived& TRawMapReduceOperationIoSpec<TDerived>::SetMapOutput(size_t tableIndex, const TRichYPath& path)
  110. {
  111. NDetail::Assign(MapOutputs_, tableIndex, path);
  112. return static_cast<TDerived&>(*this);
  113. }
  114. template <typename TDerived>
  115. const TVector<TRichYPath>& TRawMapReduceOperationIoSpec<TDerived>::GetMapOutputs() const
  116. {
  117. return MapOutputs_;
  118. }
  119. ////////////////////////////////////////////////////////////////////////////////
  120. ::TIntrusivePtr<INodeReaderImpl> CreateJobNodeReader(TRawTableReaderPtr rawTableReader);
  121. ::TIntrusivePtr<IYaMRReaderImpl> CreateJobYaMRReader(TRawTableReaderPtr rawTableReader);
  122. ::TIntrusivePtr<IProtoReaderImpl> CreateJobProtoReader(TRawTableReaderPtr rawTableReader);
  123. ::TIntrusivePtr<INodeWriterImpl> CreateJobNodeWriter(THolder<IProxyOutput> rawTableWriter);
  124. ::TIntrusivePtr<IYaMRWriterImpl> CreateJobYaMRWriter(THolder<IProxyOutput> rawTableWriter);
  125. ::TIntrusivePtr<IProtoWriterImpl> CreateJobProtoWriter(THolder<IProxyOutput> rawTableWriter);
  126. ////////////////////////////////////////////////////////////////////////////////
  127. template <class T>
  128. inline ::TIntrusivePtr<typename TRowTraits<T>::IReaderImpl> CreateJobReaderImpl(TRawTableReaderPtr rawTableReader);
  129. template <>
  130. inline ::TIntrusivePtr<INodeReaderImpl> CreateJobReaderImpl<TNode>(TRawTableReaderPtr rawTableReader)
  131. {
  132. return CreateJobNodeReader(rawTableReader);
  133. }
  134. template <>
  135. inline ::TIntrusivePtr<IYaMRReaderImpl> CreateJobReaderImpl<TYaMRRow>(TRawTableReaderPtr rawTableReader)
  136. {
  137. return CreateJobYaMRReader(rawTableReader);
  138. }
  139. template <>
  140. inline ::TIntrusivePtr<IProtoReaderImpl> CreateJobReaderImpl<Message>(TRawTableReaderPtr rawTableReader)
  141. {
  142. return CreateJobProtoReader(rawTableReader);
  143. }
  144. template <class T>
  145. inline ::TIntrusivePtr<typename TRowTraits<T>::IReaderImpl> CreateJobReaderImpl(TRawTableReaderPtr rawTableReader)
  146. {
  147. if constexpr (TIsBaseOf<Message, T>::Value || NDetail::TIsProtoOneOf<T>::value) {
  148. return CreateJobProtoReader(rawTableReader);
  149. } else {
  150. static_assert(TDependentFalse<T>, "Unknown row type");
  151. }
  152. }
  153. template <class T>
  154. inline TTableReaderPtr<T> CreateJobReader(TRawTableReaderPtr rawTableReader)
  155. {
  156. return new TTableReader<T>(CreateJobReaderImpl<T>(rawTableReader));
  157. }
  158. ////////////////////////////////////////////////////////////////////////////////
  159. template <class T>
  160. TTableWriterPtr<T> CreateJobWriter(THolder<IProxyOutput> rawJobWriter);
  161. template <>
  162. inline TTableWriterPtr<TNode> CreateJobWriter<TNode>(THolder<IProxyOutput> rawJobWriter)
  163. {
  164. return new TTableWriter<TNode>(CreateJobNodeWriter(std::move(rawJobWriter)));
  165. }
  166. template <>
  167. inline TTableWriterPtr<TYaMRRow> CreateJobWriter<TYaMRRow>(THolder<IProxyOutput> rawJobWriter)
  168. {
  169. return new TTableWriter<TYaMRRow>(CreateJobYaMRWriter(std::move(rawJobWriter)));
  170. }
  171. template <>
  172. inline TTableWriterPtr<Message> CreateJobWriter<Message>(THolder<IProxyOutput> rawJobWriter)
  173. {
  174. return new TTableWriter<Message>(CreateJobProtoWriter(std::move(rawJobWriter)));
  175. }
  176. template <class T, class = void>
  177. struct TProtoWriterCreator;
  178. template <class T>
  179. struct TProtoWriterCreator<T, std::enable_if_t<TIsBaseOf<Message, T>::Value>>
  180. {
  181. static TTableWriterPtr<T> Create(::TIntrusivePtr<IProtoWriterImpl> writer)
  182. {
  183. return new TTableWriter<T>(writer);
  184. }
  185. };
  186. template <class T>
  187. inline TTableWriterPtr<T> CreateJobWriter(THolder<IProxyOutput> rawJobWriter)
  188. {
  189. if constexpr (TIsBaseOf<Message, T>::Value) {
  190. return TProtoWriterCreator<T>::Create(CreateJobProtoWriter(std::move(rawJobWriter)));
  191. } else {
  192. static_assert(TDependentFalse<T>, "Unknown row type");
  193. }
  194. }
  195. ////////////////////////////////////////////////////////////////////////////////
  196. template <class T>
  197. void TOperationInputSpecBase::AddInput(const TRichYPath& path)
  198. {
  199. Inputs_.push_back(path);
  200. StructuredInputs_.emplace_back(Structured<T>(path));
  201. }
  202. template <class T>
  203. void TOperationInputSpecBase::SetInput(size_t tableIndex, const TRichYPath& path)
  204. {
  205. NDetail::Assign(Inputs_, tableIndex, path);
  206. NDetail::Assign(StructuredInputs_, tableIndex, Structured<T>(path));
  207. }
  208. template <class T>
  209. void TOperationOutputSpecBase::AddOutput(const TRichYPath& path)
  210. {
  211. Outputs_.push_back(path);
  212. StructuredOutputs_.emplace_back(Structured<T>(path));
  213. }
  214. template <class T>
  215. void TOperationOutputSpecBase::SetOutput(size_t tableIndex, const TRichYPath& path)
  216. {
  217. NDetail::Assign(Outputs_, tableIndex, path);
  218. NDetail::Assign(StructuredOutputs_, tableIndex, Structured<T>(path));
  219. }
  220. template <class TDerived>
  221. template <class T>
  222. TDerived& TOperationIOSpec<TDerived>::AddInput(const TRichYPath& path)
  223. {
  224. static_assert(!std::is_same<T, Message>::value, "input type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  225. TOperationInputSpecBase::AddInput<T>(path);
  226. return *static_cast<TDerived*>(this);
  227. }
  228. template <class TDerived>
  229. template <class T>
  230. TDerived& TOperationIOSpec<TDerived>::SetInput(size_t tableIndex, const TRichYPath& path)
  231. {
  232. static_assert(!std::is_same<T, Message>::value, "input type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  233. TOperationInputSpecBase::SetInput<T>(tableIndex, path);
  234. return *static_cast<TDerived*>(this);
  235. }
  236. template <class TDerived>
  237. template <class T>
  238. TDerived& TOperationIOSpec<TDerived>::AddOutput(const TRichYPath& path)
  239. {
  240. static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  241. TOperationOutputSpecBase::AddOutput<T>(path);
  242. return *static_cast<TDerived*>(this);
  243. }
  244. template <class TDerived>
  245. template <class T>
  246. TDerived& TOperationIOSpec<TDerived>::SetOutput(size_t tableIndex, const TRichYPath& path)
  247. {
  248. static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  249. TOperationOutputSpecBase::SetOutput<T>(tableIndex, path);
  250. return *static_cast<TDerived*>(this);
  251. }
  252. template <class TDerived>
  253. TDerived& TOperationIOSpec<TDerived>::AddStructuredInput(TStructuredTablePath path)
  254. {
  255. TOperationInputSpecBase::AddStructuredInput(std::move(path));
  256. return *static_cast<TDerived*>(this);
  257. }
  258. template <class TDerived>
  259. TDerived& TOperationIOSpec<TDerived>::AddStructuredOutput(TStructuredTablePath path)
  260. {
  261. TOperationOutputSpecBase::AddStructuredOutput(std::move(path));
  262. return *static_cast<TDerived*>(this);
  263. }
  264. ////////////////////////////////////////////////////////////////////////////////
  265. template <class T>
  266. TVanillaTask& TVanillaTask::AddOutput(const TRichYPath& path)
  267. {
  268. static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  269. TOperationOutputSpecBase::AddOutput<T>(path);
  270. return *this;
  271. }
  272. template <class T>
  273. TVanillaTask& TVanillaTask::SetOutput(size_t tableIndex, const TRichYPath& path)
  274. {
  275. static_assert(!std::is_same<T, Message>::value, "output type can't be Message, it can only be its strict subtype (see st.yandex-team.ru/YT-7609)");
  276. TOperationOutputSpecBase::SetOutput<T>(tableIndex, path);
  277. return *this;
  278. }
  279. ////////////////////////////////////////////////////////////////////////////////
  280. namespace NDetail {
  281. void ResetUseClientProtobuf(const char* methodName);
  282. } // namespace NDetail
  283. template <class TDerived>
  284. TDerived& TOperationIOSpec<TDerived>::AddProtobufInput_VerySlow_Deprecated(const TRichYPath& path)
  285. {
  286. NDetail::ResetUseClientProtobuf("AddProtobufInput_VerySlow_Deprecated");
  287. Inputs_.push_back(path);
  288. StructuredInputs_.emplace_back(TStructuredTablePath(path, TProtobufTableStructure{nullptr}));
  289. return *static_cast<TDerived*>(this);
  290. }
  291. template <class TDerived>
  292. TDerived& TOperationIOSpec<TDerived>::AddProtobufOutput_VerySlow_Deprecated(const TRichYPath& path)
  293. {
  294. NDetail::ResetUseClientProtobuf("AddProtobufOutput_VerySlow_Deprecated");
  295. Outputs_.push_back(path);
  296. StructuredOutputs_.emplace_back(TStructuredTablePath(path, TProtobufTableStructure{nullptr}));
  297. return *static_cast<TDerived*>(this);
  298. }
  299. ////////////////////////////////////////////////////////////////////////////////
  300. template <typename TRow>
  301. TJobOperationPreparer::TInputGroup& TJobOperationPreparer::TInputGroup::Description()
  302. {
  303. for (auto i : Indices_) {
  304. Preparer_.InputDescription<TRow>(i);
  305. }
  306. return *this;
  307. }
  308. template <typename TRow>
  309. TJobOperationPreparer::TOutputGroup& TJobOperationPreparer::TOutputGroup::Description(bool inferSchema)
  310. {
  311. for (auto i : Indices_) {
  312. Preparer_.OutputDescription<TRow>(i, inferSchema);
  313. }
  314. return *this;
  315. }
  316. ////////////////////////////////////////////////////////////////////////////////
  317. template <typename TCont>
  318. TJobOperationPreparer::TInputGroup TJobOperationPreparer::BeginInputGroup(const TCont& indices)
  319. {
  320. for (auto i : indices) {
  321. ValidateInputTableIndex(i, TStringBuf("BeginInputGroup()"));
  322. }
  323. return TInputGroup(*this, TVector<int>(std::begin(indices), std::end(indices)));
  324. }
  325. template <typename TCont>
  326. TJobOperationPreparer::TOutputGroup TJobOperationPreparer::BeginOutputGroup(const TCont& indices)
  327. {
  328. for (auto i : indices) {
  329. ValidateOutputTableIndex(i, TStringBuf("BeginOutputGroup()"));
  330. }
  331. return TOutputGroup(*this, indices);
  332. }
  333. template <typename TRow>
  334. TJobOperationPreparer& TJobOperationPreparer::InputDescription(int tableIndex)
  335. {
  336. ValidateMissingInputDescription(tableIndex);
  337. InputTableDescriptions_[tableIndex] = StructuredTableDescription<TRow>();
  338. return *this;
  339. }
  340. template <typename TRow>
  341. TJobOperationPreparer& TJobOperationPreparer::OutputDescription(int tableIndex, bool inferSchema)
  342. {
  343. ValidateMissingOutputDescription(tableIndex);
  344. OutputTableDescriptions_[tableIndex] = StructuredTableDescription<TRow>();
  345. if (inferSchema && !OutputSchemas_[tableIndex]) {
  346. OutputSchemas_[tableIndex] = CreateTableSchema<TRow>();
  347. }
  348. return *this;
  349. }
  350. ////////////////////////////////////////////////////////////////////////////////
  351. template <class TDerived>
  352. template <class TRow>
  353. TDerived& TIntermediateTablesHintSpec<TDerived>::HintMapOutput()
  354. {
  355. IntermediateMapOutputDescription_ = StructuredTableDescription<TRow>();
  356. return *static_cast<TDerived*>(this);
  357. }
  358. template <class TDerived>
  359. template <class TRow>
  360. TDerived& TIntermediateTablesHintSpec<TDerived>::AddMapOutput(const TRichYPath& path)
  361. {
  362. MapOutputs_.push_back(path);
  363. StructuredMapOutputs_.emplace_back(Structured<TRow>(path));
  364. return *static_cast<TDerived*>(this);
  365. }
  366. template <class TDerived>
  367. template <class TRow>
  368. TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceCombinerInput()
  369. {
  370. IntermediateReduceCombinerInputDescription_ = StructuredTableDescription<TRow>();
  371. return *static_cast<TDerived*>(this);
  372. }
  373. template <class TDerived>
  374. template <class TRow>
  375. TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceCombinerOutput()
  376. {
  377. IntermediateReduceCombinerOutputDescription_ = StructuredTableDescription<TRow>();
  378. return *static_cast<TDerived*>(this);
  379. }
  380. template <class TDerived>
  381. template <class TRow>
  382. TDerived& TIntermediateTablesHintSpec<TDerived>::HintReduceInput()
  383. {
  384. IntermediateReducerInputDescription_ = StructuredTableDescription<TRow>();
  385. return *static_cast<TDerived*>(this);
  386. }
  387. template <class TDerived>
  388. const TVector<TStructuredTablePath>& TIntermediateTablesHintSpec<TDerived>::GetStructuredMapOutputs() const
  389. {
  390. return StructuredMapOutputs_;
  391. }
  392. template <class TDerived>
  393. const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateMapOutputDescription() const
  394. {
  395. return IntermediateMapOutputDescription_;
  396. }
  397. template <class TDerived>
  398. const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReduceCombinerInputDescription() const
  399. {
  400. return IntermediateReduceCombinerInputDescription_;
  401. }
  402. template <class TDerived>
  403. const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReduceCombinerOutputDescription() const
  404. {
  405. return IntermediateReduceCombinerOutputDescription_;
  406. }
  407. template <class TDerived>
  408. const TMaybe<TTableStructure>& TIntermediateTablesHintSpec<TDerived>::GetIntermediateReducerInputDescription() const
  409. {
  410. return IntermediateReducerInputDescription_;
  411. }
  412. ////////////////////////////////////////////////////////////////////////////////
  413. struct TReducerContext
  414. {
  415. bool Break = false;
  416. static TReducerContext* Get() { return Singleton<TReducerContext>(); }
  417. };
  418. template <class TR, class TW>
  419. inline void IReducer<TR, TW>::Break()
  420. {
  421. TReducerContext::Get()->Break = true;
  422. }
  423. template <typename TReader, typename TWriter>
  424. void FeedJobInput(
  425. IMapper<TReader, TWriter>* mapper,
  426. typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl,
  427. TWriter* writer)
  428. {
  429. using TInputRow = typename TReader::TRowType;
  430. auto reader = MakeIntrusive<TTableReader<TInputRow>>(readerImpl);
  431. mapper->Do(reader.Get(), writer);
  432. }
  433. template <typename TReader, typename TWriter>
  434. void FeedJobInput(
  435. IReducer<TReader, TWriter>* reducer,
  436. typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl,
  437. TWriter* writer)
  438. {
  439. using TInputRow = typename TReader::TRowType;
  440. auto rangesReader = MakeIntrusive<TTableRangesReader<TInputRow>>(readerImpl);
  441. for (; rangesReader->IsValid(); rangesReader->Next()) {
  442. reducer->Do(&rangesReader->GetRange(), writer);
  443. if (TReducerContext::Get()->Break) {
  444. break;
  445. }
  446. }
  447. }
  448. template <typename TReader, typename TWriter>
  449. void FeedJobInput(
  450. IAggregatorReducer<TReader, TWriter>* reducer,
  451. typename TRowTraits<typename TReader::TRowType>::IReaderImpl* readerImpl,
  452. TWriter* writer)
  453. {
  454. using TInputRow = typename TReader::TRowType;
  455. auto rangesReader = MakeIntrusive<TTableRangesReader<TInputRow>>(readerImpl);
  456. reducer->Do(rangesReader.Get(), writer);
  457. }
  458. template <class TRawJob>
  459. int RunRawJob(size_t outputTableCount, IInputStream& jobStateStream)
  460. {
  461. TRawJobContext context(outputTableCount);
  462. TRawJob job;
  463. job.Load(jobStateStream);
  464. job.Do(context);
  465. return 0;
  466. }
  467. template <>
  468. inline int RunRawJob<TCommandRawJob>(size_t /* outputTableCount */, IInputStream& /* jobStateStream */)
  469. {
  470. Y_ABORT();
  471. }
  472. template <class TVanillaJob>
  473. int RunVanillaJob(size_t outputTableCount, IInputStream& jobStateStream)
  474. {
  475. TVanillaJob job;
  476. job.Load(jobStateStream);
  477. if constexpr (std::is_base_of<IVanillaJob<>, TVanillaJob>::value) {
  478. Y_ABORT_UNLESS(outputTableCount == 0, "Void vanilla job expects zero 'outputTableCount'");
  479. job.Do();
  480. } else {
  481. Y_ABORT_UNLESS(outputTableCount, "Vanilla job with table writer expects nonzero 'outputTableCount'");
  482. using TOutputRow = typename TVanillaJob::TWriter::TRowType;
  483. THolder<IProxyOutput> rawJobWriter;
  484. if (auto customWriter = job.CreateCustomRawJobWriter(outputTableCount)) {
  485. rawJobWriter = std::move(customWriter);
  486. } else {
  487. rawJobWriter = CreateRawJobWriter(outputTableCount);
  488. }
  489. auto writer = CreateJobWriter<TOutputRow>(std::move(rawJobWriter));
  490. job.Start(writer.Get());
  491. job.Do(writer.Get());
  492. job.Finish(writer.Get());
  493. writer->Finish();
  494. }
  495. return 0;
  496. }
  497. template <>
  498. inline int RunVanillaJob<TCommandVanillaJob>(size_t /* outputTableCount */, IInputStream& /* jobStateStream */)
  499. {
  500. Y_ABORT();
  501. }
  502. template <class TJob>
  503. requires TIsBaseOf<IStructuredJob, TJob>::Value
  504. int RunJob(size_t outputTableCount, IInputStream& jobStateStream)
  505. {
  506. using TInputRow = typename TJob::TReader::TRowType;
  507. using TOutputRow = typename TJob::TWriter::TRowType;
  508. auto job = MakeIntrusive<TJob>();
  509. job->Load(jobStateStream);
  510. TRawTableReaderPtr rawJobReader;
  511. if (auto customReader = job->CreateCustomRawJobReader(/*fd*/ 0)) {
  512. rawJobReader = customReader;
  513. } else {
  514. rawJobReader = CreateRawJobReader(/*fd*/ 0);
  515. }
  516. auto readerImpl = CreateJobReaderImpl<TInputRow>(rawJobReader);
  517. // Many users don't expect to have jobs with empty input so we skip such jobs.
  518. if (!readerImpl->IsValid()) {
  519. return 0;
  520. }
  521. THolder<IProxyOutput> rawJobWriter;
  522. if (auto customWriter = job->CreateCustomRawJobWriter(outputTableCount)) {
  523. rawJobWriter = std::move(customWriter);
  524. } else {
  525. rawJobWriter = CreateRawJobWriter(outputTableCount);
  526. }
  527. auto writer = CreateJobWriter<TOutputRow>(std::move(rawJobWriter));
  528. job->Start(writer.Get());
  529. FeedJobInput(job.Get(), readerImpl.Get(), writer.Get());
  530. job->Finish(writer.Get());
  531. writer->Finish();
  532. return 0;
  533. }
  534. //
  535. // We leave RunMapJob/RunReduceJob/RunAggregatorReducer for backward compatibility,
  536. // some user use them already. :(
  537. template <class TMapper>
  538. int RunMapJob(size_t outputTableCount, IInputStream& jobStateStream)
  539. {
  540. return RunJob<TMapper>(outputTableCount, jobStateStream);
  541. }
  542. template <class TReducer>
  543. int RunReduceJob(size_t outputTableCount, IInputStream& jobStateStream)
  544. {
  545. return RunJob<TReducer>(outputTableCount, jobStateStream);
  546. }
  547. template <class TReducer>
  548. int RunAggregatorReducer(size_t outputTableCount, IInputStream& jobStateStream)
  549. {
  550. return RunJob<TReducer>(outputTableCount, jobStateStream);
  551. }
  552. ////////////////////////////////////////////////////////////////////////////////
  553. template <typename T, typename = void>
  554. struct TIsConstructibleFromNode
  555. : std::false_type
  556. { };
  557. template <typename T>
  558. struct TIsConstructibleFromNode<T, std::void_t<decltype(T::FromNode(std::declval<TNode&>()))>>
  559. : std::true_type
  560. { };
  561. template <class TJob>
  562. ::TIntrusivePtr<NYT::IStructuredJob> ConstructJobFromNode(const TNode& node)
  563. {
  564. if constexpr (TIsConstructibleFromNode<TJob>::value) {
  565. Y_ENSURE(node.GetType() != TNode::Undefined,
  566. "job has FromNode method but constructor arguments were not provided");
  567. return TJob::FromNode(node);
  568. } else {
  569. Y_ENSURE(node.GetType() == TNode::Undefined,
  570. "constructor arguments provided but job does not contain FromNode method");
  571. return MakeIntrusive<TJob>();
  572. }
  573. }
  574. ////////////////////////////////////////////////////////////////////////////////
  575. using TJobFunction = int (*)(size_t, IInputStream&);
  576. using TConstructJobFunction = ::TIntrusivePtr<NYT::IStructuredJob> (*)(const TNode&);
  577. class TJobFactory
  578. {
  579. public:
  580. static TJobFactory* Get()
  581. {
  582. return Singleton<TJobFactory>();
  583. }
  584. template <class TJob>
  585. void RegisterJob(const char* name)
  586. {
  587. RegisterJobImpl<TJob>(name, RunJob<TJob>);
  588. JobConstructors[name] = ConstructJobFromNode<TJob>;
  589. }
  590. template <class TRawJob>
  591. void RegisterRawJob(const char* name)
  592. {
  593. RegisterJobImpl<TRawJob>(name, RunRawJob<TRawJob>);
  594. }
  595. template <class TVanillaJob>
  596. void RegisterVanillaJob(const char* name)
  597. {
  598. RegisterJobImpl<TVanillaJob>(name, RunVanillaJob<TVanillaJob>);
  599. }
  600. TString GetJobName(const IJob* job)
  601. {
  602. const auto typeIndex = std::type_index(typeid(*job));
  603. CheckJobRegistered(typeIndex);
  604. return JobNames[typeIndex];
  605. }
  606. TJobFunction GetJobFunction(const char* name)
  607. {
  608. CheckNameRegistered(name);
  609. return JobFunctions[name];
  610. }
  611. TConstructJobFunction GetConstructingFunction(const char* name)
  612. {
  613. CheckNameRegistered(name);
  614. return JobConstructors[name];
  615. }
  616. private:
  617. TMap<std::type_index, TString> JobNames;
  618. THashMap<TString, TJobFunction> JobFunctions;
  619. THashMap<TString, TConstructJobFunction> JobConstructors;
  620. template <typename TJob, typename TRunner>
  621. void RegisterJobImpl(const char* name, TRunner runner) {
  622. const auto typeIndex = std::type_index(typeid(TJob));
  623. CheckNotRegistered(typeIndex, name);
  624. JobNames[typeIndex] = name;
  625. JobFunctions[name] = runner;
  626. }
  627. void CheckNotRegistered(const std::type_index& typeIndex, const char* name)
  628. {
  629. Y_ENSURE(!JobNames.contains(typeIndex),
  630. "type_info '" << typeIndex.name() << "'"
  631. "is already registered under name '" << JobNames[typeIndex] << "'");
  632. Y_ENSURE(!JobFunctions.contains(name),
  633. "job with name '" << name << "' is already registered");
  634. }
  635. void CheckJobRegistered(const std::type_index& typeIndex)
  636. {
  637. Y_ENSURE(JobNames.contains(typeIndex),
  638. "type_info '" << typeIndex.name() << "' is not registered, use REGISTER_* macros");
  639. }
  640. void CheckNameRegistered(const char* name)
  641. {
  642. Y_ENSURE(JobFunctions.contains(name),
  643. "job with name '" << name << "' is not registered, use REGISTER_* macros");
  644. }
  645. };
  646. ////////////////////////////////////////////////////////////////////////////////
  647. template <class TMapper>
  648. struct TMapperRegistrator
  649. {
  650. TMapperRegistrator(const char* name)
  651. {
  652. static_assert(TMapper::JobType == IJob::EType::Mapper,
  653. "REGISTER_MAPPER is not compatible with this job class");
  654. NYT::TJobFactory::Get()->RegisterJob<TMapper>(name);
  655. }
  656. };
  657. template <class TReducer>
  658. struct TReducerRegistrator
  659. {
  660. TReducerRegistrator(const char* name)
  661. {
  662. static_assert(TReducer::JobType == IJob::EType::Reducer ||
  663. TReducer::JobType == IJob::EType::ReducerAggregator,
  664. "REGISTER_REDUCER is not compatible with this job class");
  665. NYT::TJobFactory::Get()->RegisterJob<TReducer>(name);
  666. }
  667. };
  668. template <class TRawJob>
  669. struct TRawJobRegistrator
  670. {
  671. TRawJobRegistrator(const char* name)
  672. {
  673. static_assert(TRawJob::JobType == IJob::EType::RawJob,
  674. "REGISTER_RAW_JOB is not compatible with this job class");
  675. NYT::TJobFactory::Get()->RegisterRawJob<TRawJob>(name);
  676. }
  677. };
  678. template <class TVanillaJob>
  679. struct TVanillaJobRegistrator
  680. {
  681. TVanillaJobRegistrator(const char* name)
  682. {
  683. static_assert(TVanillaJob::JobType == IJob::EType::VanillaJob,
  684. "REGISTER_VANILLA_JOB is not compatible with this job class");
  685. NYT::TJobFactory::Get()->RegisterVanillaJob<TVanillaJob>(name);
  686. }
  687. };
  688. ////////////////////////////////////////////////////////////////////////////////
  689. inline TString YtRegistryTypeName(const TString& name) {
  690. TString res = name;
  691. #ifdef _win_
  692. SubstGlobal(res, "class ", "");
  693. #endif
  694. return res;
  695. }
  696. ////////////////////////////////////////////////////////////////////////////////
  697. #define REGISTER_MAPPER(...) \
  698. static const NYT::TMapperRegistrator<__VA_ARGS__> \
  699. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data());
  700. #define REGISTER_NAMED_MAPPER(name, ...) \
  701. static const NYT::TMapperRegistrator<__VA_ARGS__> \
  702. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
  703. #define REGISTER_REDUCER(...) \
  704. static const NYT::TReducerRegistrator<__VA_ARGS__> \
  705. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data());
  706. #define REGISTER_NAMED_REDUCER(name, ...) \
  707. static const NYT::TReducerRegistrator<__VA_ARGS__> \
  708. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
  709. #define REGISTER_NAMED_RAW_JOB(name, ...) \
  710. static const NYT::TRawJobRegistrator<__VA_ARGS__> \
  711. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
  712. #define REGISTER_RAW_JOB(...) \
  713. REGISTER_NAMED_RAW_JOB((NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data()), __VA_ARGS__)
  714. #define REGISTER_NAMED_VANILLA_JOB(name, ...) \
  715. static NYT::TVanillaJobRegistrator<__VA_ARGS__> \
  716. Y_GENERATE_UNIQUE_ID(TJobRegistrator)(name);
  717. #define REGISTER_VANILLA_JOB(...) \
  718. REGISTER_NAMED_VANILLA_JOB((NYT::YtRegistryTypeName(TypeName<__VA_ARGS__>()).data()), __VA_ARGS__)
  719. ////////////////////////////////////////////////////////////////////////////////
  720. template <typename TReader, typename TWriter>
  721. TStructuredRowStreamDescription IMapper<TReader, TWriter>::GetInputRowStreamDescription() const
  722. {
  723. return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>();
  724. }
  725. template <typename TReader, typename TWriter>
  726. TStructuredRowStreamDescription IMapper<TReader, TWriter>::GetOutputRowStreamDescription() const
  727. {
  728. return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
  729. }
  730. ////////////////////////////////////////////////////////////////////////////////
  731. template <typename TReader, typename TWriter>
  732. TStructuredRowStreamDescription IReducer<TReader, TWriter>::GetInputRowStreamDescription() const
  733. {
  734. return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>();
  735. }
  736. template <typename TReader, typename TWriter>
  737. TStructuredRowStreamDescription IReducer<TReader, TWriter>::GetOutputRowStreamDescription() const
  738. {
  739. return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
  740. }
  741. ////////////////////////////////////////////////////////////////////////////////
  742. template <typename TReader, typename TWriter>
  743. TStructuredRowStreamDescription IAggregatorReducer<TReader, TWriter>::GetInputRowStreamDescription() const
  744. {
  745. return NYT::NDetail::GetStructuredRowStreamDescription<typename TReader::TRowType>();
  746. }
  747. template <typename TReader, typename TWriter>
  748. TStructuredRowStreamDescription IAggregatorReducer<TReader, TWriter>::GetOutputRowStreamDescription() const
  749. {
  750. return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
  751. }
  752. ////////////////////////////////////////////////////////////////////////////////
  753. template <typename TWriter>
  754. TStructuredRowStreamDescription IVanillaJob<TWriter>::GetInputRowStreamDescription() const
  755. {
  756. return TVoidStructuredRowStream();
  757. }
  758. template <typename TWriter>
  759. TStructuredRowStreamDescription IVanillaJob<TWriter>::GetOutputRowStreamDescription() const
  760. {
  761. return NYT::NDetail::GetStructuredRowStreamDescription<typename TWriter::TRowType>();
  762. }
  763. ////////////////////////////////////////////////////////////////////////////////
  764. } // namespace NYT