operation.cpp 103 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046
  1. #include "operation.h"
  2. #include "abortable_registry.h"
  3. #include "client.h"
  4. #include "operation_helpers.h"
  5. #include "operation_tracker.h"
  6. #include "prepare_operation.h"
  7. #include "skiff.h"
  8. #include "structured_table_formats.h"
  9. #include "yt_poller.h"
  10. #include <yt/cpp/mapreduce/common/helpers.h>
  11. #include <yt/cpp/mapreduce/common/retry_lib.h>
  12. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  13. #include <yt/cpp/mapreduce/interface/config.h>
  14. #include <yt/cpp/mapreduce/interface/errors.h>
  15. #include <yt/cpp/mapreduce/interface/fluent.h>
  16. #include <yt/cpp/mapreduce/interface/format.h>
  17. #include <yt/cpp/mapreduce/interface/job_statistics.h>
  18. #include <yt/cpp/mapreduce/interface/protobuf_format.h>
  19. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  20. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  21. #include <yt/cpp/mapreduce/http/requests.h>
  22. #include <yt/cpp/mapreduce/http/retry_request.h>
  23. #include <yt/cpp/mapreduce/io/job_reader.h>
  24. #include <yt/cpp/mapreduce/io/job_writer.h>
  25. #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
  26. #include <yt/cpp/mapreduce/io/yamr_table_writer.h>
  27. #include <yt/cpp/mapreduce/io/node_table_reader.h>
  28. #include <yt/cpp/mapreduce/io/node_table_writer.h>
  29. #include <yt/cpp/mapreduce/io/proto_table_reader.h>
  30. #include <yt/cpp/mapreduce/io/proto_table_writer.h>
  31. #include <yt/cpp/mapreduce/io/proto_helpers.h>
  32. #include <yt/cpp/mapreduce/io/skiff_table_reader.h>
  33. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  34. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  35. #include <library/cpp/yson/node/serialize.h>
  36. #include <util/generic/hash_set.h>
  37. #include <util/string/builder.h>
  38. #include <util/string/cast.h>
  39. #include <util/system/thread.h>
  40. #include <util/system/env.h>
  41. #include <util/system/fs.h>
  42. namespace NYT {
  43. namespace NDetail {
  44. using namespace NRawClient;
  45. using ::ToString;
  46. ////////////////////////////////////////////////////////////////////////////////
  47. static const ui64 DefaultExrtaTmpfsSize = 1024LL * 1024LL;
  48. ////////////////////////////////////////////////////////////////////////////////
  49. namespace {
  50. ////////////////////////////////////////////////////////////////////////////////
  51. struct TMapReduceOperationIo
  52. {
  53. TVector<TRichYPath> Inputs;
  54. TVector<TRichYPath> MapOutputs;
  55. TVector<TRichYPath> Outputs;
  56. TMaybe<TFormat> MapperInputFormat;
  57. TMaybe<TFormat> MapperOutputFormat;
  58. TMaybe<TFormat> ReduceCombinerInputFormat;
  59. TMaybe<TFormat> ReduceCombinerOutputFormat;
  60. TFormat ReducerInputFormat = TFormat::YsonBinary();
  61. TFormat ReducerOutputFormat = TFormat::YsonBinary();
  62. TVector<TSmallJobFile> MapperJobFiles;
  63. TVector<TSmallJobFile> ReduceCombinerJobFiles;
  64. TVector<TSmallJobFile> ReducerJobFiles;
  65. };
  66. template <typename T>
  67. void VerifyHasElements(const TVector<T>& paths, TStringBuf name)
  68. {
  69. if (paths.empty()) {
  70. ythrow TApiUsageError() << "no " << name << " table is specified";
  71. }
  72. }
  73. ////////////////////////////////////////////////////////////////////////////////
  74. TVector<TSmallJobFile> CreateFormatConfig(
  75. TMaybe<TSmallJobFile> inputConfig,
  76. const TMaybe<TSmallJobFile>& outputConfig)
  77. {
  78. TVector<TSmallJobFile> result;
  79. if (inputConfig) {
  80. result.push_back(std::move(*inputConfig));
  81. }
  82. if (outputConfig) {
  83. result.push_back(std::move(*outputConfig));
  84. }
  85. return result;
  86. }
  87. template <typename T>
  88. ENodeReaderFormat NodeReaderFormatFromHintAndGlobalConfig(const TUserJobFormatHintsBase<T>& formatHints)
  89. {
  90. auto result = TConfig::Get()->NodeReaderFormat;
  91. if (formatHints.InputFormatHints_ && formatHints.InputFormatHints_->SkipNullValuesForTNode_) {
  92. Y_ENSURE_EX(
  93. result != ENodeReaderFormat::Skiff,
  94. TApiUsageError() << "skiff format doesn't support SkipNullValuesForTNode format hint");
  95. result = ENodeReaderFormat::Yson;
  96. }
  97. return result;
  98. }
  99. template <class TSpec>
  100. const TVector<TStructuredTablePath>& GetStructuredInputs(const TSpec& spec)
  101. {
  102. if constexpr (std::is_same_v<TSpec, TVanillaTask>) {
  103. static const TVector<TStructuredTablePath> empty;
  104. return empty;
  105. } else {
  106. return spec.GetStructuredInputs();
  107. }
  108. }
  109. template <class TSpec>
  110. const TVector<TStructuredTablePath>& GetStructuredOutputs(const TSpec& spec)
  111. {
  112. return spec.GetStructuredOutputs();
  113. }
  114. template <class TSpec>
  115. const TMaybe<TFormatHints>& GetInputFormatHints(const TSpec& spec)
  116. {
  117. if constexpr (std::is_same_v<TSpec, TVanillaTask>) {
  118. static const TMaybe<TFormatHints> empty = Nothing();
  119. return empty;
  120. } else {
  121. return spec.InputFormatHints_;
  122. }
  123. }
  124. template <class TSpec>
  125. const TMaybe<TFormatHints>& GetOutputFormatHints(const TSpec& spec)
  126. {
  127. return spec.OutputFormatHints_;
  128. }
  129. template <class TSpec>
  130. ENodeReaderFormat GetNodeReaderFormat(const TSpec& spec, bool allowSkiff)
  131. {
  132. if constexpr (std::is_same<TSpec, TVanillaTask>::value) {
  133. return ENodeReaderFormat::Yson;
  134. } else {
  135. return allowSkiff
  136. ? NodeReaderFormatFromHintAndGlobalConfig(spec)
  137. : ENodeReaderFormat::Yson;
  138. }
  139. }
  140. static void SortColumnsToNames(const TSortColumns& sortColumns, THashSet<TString>* result)
  141. {
  142. auto names = sortColumns.GetNames();
  143. result->insert(names.begin(), names.end());
  144. }
  145. static THashSet<TString> SortColumnsToNames(const TSortColumns& sortColumns)
  146. {
  147. THashSet<TString> columnNames;
  148. SortColumnsToNames(sortColumns, &columnNames);
  149. return columnNames;
  150. }
  151. THashSet<TString> GetColumnsUsedInOperation(const TJoinReduceOperationSpec& spec)
  152. {
  153. return SortColumnsToNames(spec.JoinBy_);
  154. }
  155. THashSet<TString> GetColumnsUsedInOperation(const TReduceOperationSpec& spec) {
  156. auto result = SortColumnsToNames(spec.SortBy_);
  157. SortColumnsToNames(spec.ReduceBy_, &result);
  158. if (spec.JoinBy_) {
  159. SortColumnsToNames(*spec.JoinBy_, &result);
  160. }
  161. return result;
  162. }
  163. THashSet<TString> GetColumnsUsedInOperation(const TMapReduceOperationSpec& spec)
  164. {
  165. auto result = SortColumnsToNames(spec.SortBy_);
  166. SortColumnsToNames(spec.ReduceBy_, &result);
  167. return result;
  168. }
  169. THashSet<TString> GetColumnsUsedInOperation(const TMapOperationSpec&)
  170. {
  171. return THashSet<TString>();
  172. }
  173. THashSet<TString> GetColumnsUsedInOperation(const TVanillaTask&)
  174. {
  175. return THashSet<TString>();
  176. }
  177. TStructuredJobTableList ApplyProtobufColumnFilters(
  178. const TStructuredJobTableList& tableList,
  179. const TOperationPreparer& preparer,
  180. const THashSet<TString>& columnsUsedInOperations,
  181. const TOperationOptions& options)
  182. {
  183. bool hasInputQuery = options.Spec_.Defined() && options.Spec_->IsMap() && options.Spec_->HasKey("input_query");
  184. if (hasInputQuery) {
  185. return tableList;
  186. }
  187. auto isDynamic = BatchTransform(
  188. CreateDefaultRequestRetryPolicy(preparer.GetContext().Config),
  189. preparer.GetContext(),
  190. tableList,
  191. [&] (TRawBatchRequest& batch, const auto& table) {
  192. return batch.Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
  193. });
  194. auto newTableList = tableList;
  195. for (size_t tableIndex = 0; tableIndex < tableList.size(); ++tableIndex) {
  196. if (isDynamic[tableIndex].AsBool()) {
  197. continue;
  198. }
  199. auto& table = newTableList[tableIndex];
  200. Y_ABORT_UNLESS(table.RichYPath);
  201. if (table.RichYPath->Columns_) {
  202. continue;
  203. }
  204. if (!std::holds_alternative<TProtobufTableStructure>(table.Description)) {
  205. continue;
  206. }
  207. const auto& descriptor = std::get<TProtobufTableStructure>(table.Description).Descriptor;
  208. if (!descriptor) {
  209. continue;
  210. }
  211. auto fromDescriptor = NDetail::InferColumnFilter(*descriptor);
  212. if (!fromDescriptor) {
  213. continue;
  214. }
  215. THashSet<TString> columns(fromDescriptor->begin(), fromDescriptor->end());
  216. columns.insert(columnsUsedInOperations.begin(), columnsUsedInOperations.end());
  217. table.RichYPath->Columns(TVector<TString>(columns.begin(), columns.end()));
  218. }
  219. return newTableList;
  220. }
  221. template <class TSpec>
  222. TSimpleOperationIo CreateSimpleOperationIo(
  223. const IStructuredJob& structuredJob,
  224. const TOperationPreparer& preparer,
  225. const TSpec& spec,
  226. const TOperationOptions& options,
  227. bool allowSkiff)
  228. {
  229. if (!std::holds_alternative<TVoidStructuredRowStream>(structuredJob.GetInputRowStreamDescription())) {
  230. VerifyHasElements(GetStructuredInputs(spec), "input");
  231. }
  232. TUserJobFormatHints hints;
  233. hints.InputFormatHints_ = GetInputFormatHints(spec);
  234. hints.OutputFormatHints_ = GetOutputFormatHints(spec);
  235. ENodeReaderFormat nodeReaderFormat = GetNodeReaderFormat(spec, allowSkiff);
  236. return CreateSimpleOperationIoHelper(
  237. structuredJob,
  238. preparer,
  239. options,
  240. CanonizeStructuredTableList(preparer.GetContext(), GetStructuredInputs(spec)),
  241. CanonizeStructuredTableList(preparer.GetContext(), GetStructuredOutputs(spec)),
  242. hints,
  243. nodeReaderFormat,
  244. GetColumnsUsedInOperation(spec));
  245. }
  246. template <class T>
  247. TSimpleOperationIo CreateSimpleOperationIo(
  248. const IJob& job,
  249. const TOperationPreparer& preparer,
  250. const TSimpleRawOperationIoSpec<T>& spec)
  251. {
  252. auto getFormatOrDefault = [&] (const TMaybe<TFormat>& maybeFormat, const char* formatName) {
  253. if (maybeFormat) {
  254. return *maybeFormat;
  255. } else if (spec.Format_) {
  256. return *spec.Format_;
  257. } else {
  258. ythrow TApiUsageError() << "Neither " << formatName << "format nor default format is specified for raw operation";
  259. }
  260. };
  261. auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs());
  262. auto outputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs());
  263. VerifyHasElements(inputs, "input");
  264. VerifyHasElements(outputs, "output");
  265. TUserJobFormatHints hints;
  266. auto outputSchemas = PrepareOperation(
  267. job,
  268. TOperationPreparationContext(
  269. inputs,
  270. outputs,
  271. preparer.GetContext(),
  272. preparer.GetClientRetryPolicy(),
  273. preparer.GetTransactionId()),
  274. &inputs,
  275. &outputs,
  276. hints);
  277. Y_ABORT_UNLESS(outputs.size() == outputSchemas.size());
  278. for (int i = 0; i < static_cast<int>(outputs.size()); ++i) {
  279. if (!outputs[i].Schema_ && !outputSchemas[i].Columns().empty()) {
  280. outputs[i].Schema_ = outputSchemas[i];
  281. }
  282. }
  283. return TSimpleOperationIo {
  284. inputs,
  285. outputs,
  286. getFormatOrDefault(spec.InputFormat_, "input"),
  287. getFormatOrDefault(spec.OutputFormat_, "output"),
  288. TVector<TSmallJobFile>{},
  289. };
  290. }
  291. ////////////////////////////////////////////////////////////////////////////////
  292. TString GetJobStderrWithRetriesAndIgnoreErrors(
  293. const IRequestRetryPolicyPtr& retryPolicy,
  294. const TClientContext& context,
  295. const TOperationId& operationId,
  296. const TJobId& jobId,
  297. const size_t stderrTailSize,
  298. const TGetJobStderrOptions& options = TGetJobStderrOptions())
  299. {
  300. TString jobStderr;
  301. try {
  302. jobStderr = GetJobStderrWithRetries(
  303. retryPolicy,
  304. context,
  305. operationId,
  306. jobId,
  307. options);
  308. } catch (const TErrorResponse& e) {
  309. YT_LOG_ERROR("Cannot get job stderr (OperationId: %v, JobId: %v, Error: %v)",
  310. operationId,
  311. jobId,
  312. e.what());
  313. }
  314. if (jobStderr.size() > stderrTailSize) {
  315. jobStderr = jobStderr.substr(jobStderr.size() - stderrTailSize, stderrTailSize);
  316. }
  317. return jobStderr;
  318. }
  319. TVector<TFailedJobInfo> GetFailedJobInfo(
  320. const IClientRetryPolicyPtr& clientRetryPolicy,
  321. const TClientContext& context,
  322. const TOperationId& operationId,
  323. const TGetFailedJobInfoOptions& options)
  324. {
  325. const auto listJobsResult = ListJobs(
  326. clientRetryPolicy->CreatePolicyForGenericRequest(),
  327. context,
  328. operationId,
  329. TListJobsOptions()
  330. .State(EJobState::Failed)
  331. .Limit(options.MaxJobCount_));
  332. const auto stderrTailSize = options.StderrTailSize_;
  333. TVector<TFailedJobInfo> result;
  334. for (const auto& job : listJobsResult.Jobs) {
  335. auto& info = result.emplace_back();
  336. Y_ENSURE(job.Id);
  337. info.JobId = *job.Id;
  338. info.Error = job.Error.GetOrElse(TYtError(TString("unknown error")));
  339. if (job.StderrSize.GetOrElse(0) != 0) {
  340. // There are cases when due to bad luck we cannot read stderr even if
  341. // list_jobs reports that stderr_size > 0.
  342. //
  343. // Such errors don't have special error code
  344. // so we ignore all errors and try our luck on other jobs.
  345. info.Stderr = GetJobStderrWithRetriesAndIgnoreErrors(
  346. clientRetryPolicy->CreatePolicyForGenericRequest(),
  347. context,
  348. operationId,
  349. *job.Id,
  350. stderrTailSize);
  351. }
  352. }
  353. return result;
  354. }
  355. struct TGetJobsStderrOptions
  356. {
  357. using TSelf = TGetJobsStderrOptions;
  358. // How many jobs to download. Which jobs will be chosen is undefined.
  359. FLUENT_FIELD_DEFAULT(ui64, MaxJobCount, 10);
  360. // How much of stderr should be downloaded.
  361. FLUENT_FIELD_DEFAULT(ui64, StderrTailSize, 64 * 1024);
  362. };
  363. static TVector<TString> GetJobsStderr(
  364. const IClientRetryPolicyPtr& clientRetryPolicy,
  365. const TClientContext& context,
  366. const TOperationId& operationId,
  367. const TGetJobsStderrOptions& options = TGetJobsStderrOptions())
  368. {
  369. const auto listJobsResult = ListJobs(
  370. clientRetryPolicy->CreatePolicyForGenericRequest(),
  371. context,
  372. operationId,
  373. TListJobsOptions().Limit(options.MaxJobCount_).WithStderr(true));
  374. const auto stderrTailSize = options.StderrTailSize_;
  375. TVector<TString> result;
  376. for (const auto& job : listJobsResult.Jobs) {
  377. result.push_back(
  378. // There are cases when due to bad luck we cannot read stderr even if
  379. // list_jobs reports that stderr_size > 0.
  380. //
  381. // Such errors don't have special error code
  382. // so we ignore all errors and try our luck on other jobs.
  383. GetJobStderrWithRetriesAndIgnoreErrors(
  384. clientRetryPolicy->CreatePolicyForGenericRequest(),
  385. context,
  386. operationId,
  387. *job.Id,
  388. stderrTailSize)
  389. );
  390. }
  391. return result;
  392. }
  393. int CountIntermediateTables(const TStructuredJobTableList& tables)
  394. {
  395. int result = 0;
  396. for (const auto& table : tables) {
  397. if (table.RichYPath) {
  398. break;
  399. }
  400. ++result;
  401. }
  402. return result;
  403. }
  404. ////////////////////////////////////////////////////////////////////////////////
  405. } // namespace
  406. ////////////////////////////////////////////////////////////////////////////////
  407. TSimpleOperationIo CreateSimpleOperationIoHelper(
  408. const IStructuredJob& structuredJob,
  409. const TOperationPreparer& preparer,
  410. const TOperationOptions& options,
  411. TStructuredJobTableList structuredInputs,
  412. TStructuredJobTableList structuredOutputs,
  413. TUserJobFormatHints hints,
  414. ENodeReaderFormat nodeReaderFormat,
  415. const THashSet<TString>& columnsUsedInOperations)
  416. {
  417. auto intermediateInputTableCount = CountIntermediateTables(structuredInputs);
  418. auto intermediateOutputTableCount = CountIntermediateTables(structuredOutputs);
  419. auto jobSchemaInferenceResult = PrepareOperation(
  420. structuredJob,
  421. TOperationPreparationContext(
  422. structuredInputs,
  423. structuredOutputs,
  424. preparer.GetContext(),
  425. preparer.GetClientRetryPolicy(),
  426. preparer.GetTransactionId()),
  427. &structuredInputs,
  428. &structuredOutputs,
  429. hints);
  430. TVector<TSmallJobFile> formatConfigList;
  431. TFormatBuilder formatBuilder(preparer.GetClientRetryPolicy(), preparer.GetContext(), preparer.GetTransactionId(), options);
  432. auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat(
  433. structuredJob,
  434. EIODirection::Input,
  435. structuredInputs,
  436. hints.InputFormatHints_,
  437. nodeReaderFormat,
  438. /* allowFormatFromTableAttribute = */ true);
  439. auto [outputFormat, outputFormatConfig] = formatBuilder.CreateFormat(
  440. structuredJob,
  441. EIODirection::Output,
  442. structuredOutputs,
  443. hints.OutputFormatHints_,
  444. ENodeReaderFormat::Yson,
  445. /* allowFormatFromTableAttribute = */ false);
  446. const bool inferOutputSchema = options.InferOutputSchema_.GetOrElse(preparer.GetContext().Config->InferTableSchema);
  447. auto outputPaths = GetPathList(
  448. TStructuredJobTableList(structuredOutputs.begin() + intermediateOutputTableCount, structuredOutputs.end()),
  449. TVector<TTableSchema>(jobSchemaInferenceResult.begin() + intermediateOutputTableCount, jobSchemaInferenceResult.end()),
  450. inferOutputSchema);
  451. auto inputPaths = GetPathList(
  452. ApplyProtobufColumnFilters(
  453. TStructuredJobTableList(structuredInputs.begin() + intermediateInputTableCount, structuredInputs.end()),
  454. preparer,
  455. columnsUsedInOperations,
  456. options),
  457. /*schemaInferenceResult*/ Nothing(),
  458. /*inferSchema*/ false);
  459. return TSimpleOperationIo {
  460. inputPaths,
  461. outputPaths,
  462. inputFormat,
  463. outputFormat,
  464. CreateFormatConfig(inputFormatConfig, outputFormatConfig)
  465. };
  466. }
  467. EOperationBriefState CheckOperation(
  468. const IClientRetryPolicyPtr& clientRetryPolicy,
  469. const TClientContext& context,
  470. const TOperationId& operationId)
  471. {
  472. auto attributes = GetOperation(
  473. clientRetryPolicy->CreatePolicyForGenericRequest(),
  474. context,
  475. operationId,
  476. TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
  477. .Add(EOperationAttribute::State)
  478. .Add(EOperationAttribute::Result)));
  479. Y_ABORT_UNLESS(attributes.BriefState,
  480. "get_operation for operation %s has not returned \"state\" field",
  481. GetGuidAsString(operationId).data());
  482. if (*attributes.BriefState == EOperationBriefState::Completed) {
  483. return EOperationBriefState::Completed;
  484. } else if (*attributes.BriefState == EOperationBriefState::Aborted || *attributes.BriefState == EOperationBriefState::Failed) {
  485. YT_LOG_ERROR("Operation %v %v (%v)",
  486. operationId,
  487. ToString(*attributes.BriefState),
  488. ToString(TOperationExecutionTimeTracker::Get()->Finish(operationId)));
  489. auto failedJobInfoList = GetFailedJobInfo(
  490. clientRetryPolicy,
  491. context,
  492. operationId,
  493. TGetFailedJobInfoOptions());
  494. Y_ABORT_UNLESS(attributes.Result && attributes.Result->Error);
  495. ythrow TOperationFailedError(
  496. *attributes.BriefState == EOperationBriefState::Aborted
  497. ? TOperationFailedError::Aborted
  498. : TOperationFailedError::Failed,
  499. operationId,
  500. *attributes.Result->Error,
  501. failedJobInfoList);
  502. }
  503. return EOperationBriefState::InProgress;
  504. }
  505. void WaitForOperation(
  506. const IClientRetryPolicyPtr& clientRetryPolicy,
  507. const TClientContext& context,
  508. const TOperationId& operationId)
  509. {
  510. const TDuration checkOperationStateInterval =
  511. UseLocalModeOptimization(context, clientRetryPolicy)
  512. ? Min(TDuration::MilliSeconds(100), context.Config->OperationTrackerPollPeriod)
  513. : context.Config->OperationTrackerPollPeriod;
  514. while (true) {
  515. auto status = CheckOperation(clientRetryPolicy, context, operationId);
  516. if (status == EOperationBriefState::Completed) {
  517. YT_LOG_INFO("Operation %v completed (%v)",
  518. operationId,
  519. TOperationExecutionTimeTracker::Get()->Finish(operationId));
  520. break;
  521. }
  522. TWaitProxy::Get()->Sleep(checkOperationStateInterval);
  523. }
  524. }
  525. ////////////////////////////////////////////////////////////////////////////////
  526. namespace {
  527. TNode BuildAutoMergeSpec(const TAutoMergeSpec& options)
  528. {
  529. TNode result;
  530. if (options.Mode_) {
  531. result["mode"] = ToString(*options.Mode_);
  532. }
  533. if (options.MaxIntermediateChunkCount_) {
  534. result["max_intermediate_chunk_count"] = *options.MaxIntermediateChunkCount_;
  535. }
  536. if (options.ChunkCountPerMergeJob_) {
  537. result["chunk_count_per_merge_job"] = *options.ChunkCountPerMergeJob_;
  538. }
  539. if (options.ChunkSizeThreshold_) {
  540. result["chunk_size_threshold"] = *options.ChunkSizeThreshold_;
  541. }
  542. return result;
  543. }
  544. TNode BuildJobProfilerSpec(const TJobProfilerSpec& profilerSpec)
  545. {
  546. TNode result;
  547. if (profilerSpec.ProfilingBinary_) {
  548. result["binary"] = ToString(*profilerSpec.ProfilingBinary_);
  549. }
  550. if (profilerSpec.ProfilerType_) {
  551. result["type"] = ToString(*profilerSpec.ProfilerType_);
  552. }
  553. if (profilerSpec.ProfilingProbability_) {
  554. result["profiling_probability"] = *profilerSpec.ProfilingProbability_;
  555. }
  556. if (profilerSpec.SamplingFrequency_) {
  557. result["sampling_frequency"] = *profilerSpec.SamplingFrequency_;
  558. }
  559. return result;
  560. }
  561. // Returns undefined node if resources doesn't contain any meaningful field
  562. TNode BuildSchedulerResourcesSpec(const TSchedulerResources& resources)
  563. {
  564. TNode result;
  565. if (resources.UserSlots().Defined()) {
  566. result["user_slots"] = *resources.UserSlots();
  567. }
  568. if (resources.Cpu().Defined()) {
  569. result["cpu"] = *resources.Cpu();
  570. }
  571. if (resources.Memory().Defined()) {
  572. result["memory"] = *resources.Memory();
  573. }
  574. return result;
  575. }
  576. void BuildUserJobFluently(
  577. const TJobPreparer& preparer,
  578. const TMaybe<TFormat>& inputFormat,
  579. const TMaybe<TFormat>& outputFormat,
  580. TFluentMap fluent)
  581. {
  582. const auto& userJobSpec = preparer.GetSpec();
  583. TMaybe<i64> memoryLimit = userJobSpec.MemoryLimit_;
  584. TMaybe<double> cpuLimit = userJobSpec.CpuLimit_;
  585. TMaybe<ui16> portCount = userJobSpec.PortCount_;
  586. // Use 1MB extra tmpfs size by default, it helps to detect job sandbox as tmp directory
  587. // for standard python libraries. See YTADMINREQ-14505 for more details.
  588. auto tmpfsSize = preparer.GetSpec().ExtraTmpfsSize_.GetOrElse(DefaultExrtaTmpfsSize);
  589. if (preparer.ShouldMountSandbox()) {
  590. tmpfsSize += preparer.GetTotalFileSize();
  591. if (tmpfsSize == 0) {
  592. // This can be a case for example when it is local mode and we don't upload binary.
  593. // NOTE: YT doesn't like zero tmpfs size.
  594. tmpfsSize = RoundUpFileSize(1);
  595. }
  596. memoryLimit = memoryLimit.GetOrElse(512ll << 20) + tmpfsSize;
  597. }
  598. fluent
  599. .Item("file_paths").List(preparer.GetFiles())
  600. .DoIf(!preparer.GetLayers().empty(), [&] (TFluentMap fluentMap) {
  601. fluentMap.Item("layer_paths").List(preparer.GetLayers());
  602. })
  603. .Item("command").Value(preparer.GetCommand())
  604. .Item("class_name").Value(preparer.GetClassName())
  605. .DoIf(!userJobSpec.Environment_.empty(), [&] (TFluentMap fluentMap) {
  606. TNode environment;
  607. for (const auto& item : userJobSpec.Environment_) {
  608. environment[item.first] = item.second;
  609. }
  610. fluentMap.Item("environment").Value(environment);
  611. })
  612. .DoIf(userJobSpec.DiskSpaceLimit_.Defined(), [&] (TFluentMap fluentMap) {
  613. fluentMap.Item("disk_space_limit").Value(*userJobSpec.DiskSpaceLimit_);
  614. })
  615. .DoIf(inputFormat.Defined(), [&] (TFluentMap fluentMap) {
  616. fluentMap.Item("input_format").Value(inputFormat->Config);
  617. })
  618. .DoIf(outputFormat.Defined(), [&] (TFluentMap fluentMap) {
  619. fluentMap.Item("output_format").Value(outputFormat->Config);
  620. })
  621. .DoIf(memoryLimit.Defined(), [&] (TFluentMap fluentMap) {
  622. fluentMap.Item("memory_limit").Value(*memoryLimit);
  623. })
  624. .DoIf(userJobSpec.MemoryReserveFactor_.Defined(), [&] (TFluentMap fluentMap) {
  625. fluentMap.Item("memory_reserve_factor").Value(*userJobSpec.MemoryReserveFactor_);
  626. })
  627. .DoIf(cpuLimit.Defined(), [&] (TFluentMap fluentMap) {
  628. fluentMap.Item("cpu_limit").Value(*cpuLimit);
  629. })
  630. .DoIf(portCount.Defined(), [&] (TFluentMap fluentMap) {
  631. fluentMap.Item("port_count").Value(*portCount);
  632. })
  633. .DoIf(userJobSpec.JobTimeLimit_.Defined(), [&] (TFluentMap fluentMap) {
  634. fluentMap.Item("job_time_limit").Value(userJobSpec.JobTimeLimit_->MilliSeconds());
  635. })
  636. .DoIf(userJobSpec.DiskRequest_.Defined(), [&] (TFluentMap fluentMap) {
  637. const auto& diskRequest = *userJobSpec.DiskRequest_;
  638. TNode diskRequestNode = TNode::CreateMap();
  639. if (diskRequest.DiskSpace_.Defined()) {
  640. diskRequestNode["disk_space"] = *diskRequest.DiskSpace_;
  641. }
  642. if (diskRequest.InodeCount_.Defined()) {
  643. diskRequestNode["inode_count"] = *diskRequest.InodeCount_;
  644. }
  645. if (diskRequest.Account_.Defined()) {
  646. diskRequestNode["account"] = *diskRequest.Account_;
  647. }
  648. if (diskRequest.MediumName_.Defined()) {
  649. diskRequestNode["medium_name"] = *diskRequest.MediumName_;
  650. }
  651. fluentMap.Item("disk_request").Value(diskRequestNode);
  652. })
  653. .DoIf(userJobSpec.NetworkProject_.Defined(), [&] (TFluentMap fluentMap) {
  654. fluentMap.Item("network_project").Value(*userJobSpec.NetworkProject_);
  655. })
  656. .DoIf(preparer.ShouldMountSandbox(), [&] (TFluentMap fluentMap) {
  657. fluentMap.Item("tmpfs_path").Value(".");
  658. fluentMap.Item("tmpfs_size").Value(tmpfsSize);
  659. fluentMap.Item("copy_files").Value(true);
  660. })
  661. .Item("profilers")
  662. .BeginList()
  663. .DoFor(userJobSpec.JobProfilers_, [&] (TFluentList list, const auto& jobProfiler) {
  664. list.Item().Value(BuildJobProfilerSpec(jobProfiler));
  665. })
  666. .EndList()
  667. .Item("redirect_stdout_to_stderr").Value(preparer.ShouldRedirectStdoutToStderr());
  668. }
  669. // Might return undefined value.
  670. TNode GetNirvanaBlockUrlFromContext()
  671. {
  672. auto filePath = TString("/slot/sandbox/j/job_context.json");
  673. auto nvYtOperationId = GetEnv("NV_YT_OPERATION_ID");
  674. if (nvYtOperationId.empty()) {
  675. return {};
  676. }
  677. if (!NFs::Exists(filePath)) {
  678. return {};
  679. }
  680. NJson::TJsonValue json;
  681. try {
  682. auto inf = TIFStream(filePath);
  683. json = NJson::ReadJsonTree(&inf, /*throwOnError*/ true);
  684. } catch (const std::exception& ex) {
  685. YT_LOG_ERROR("Failed to load nirvana job context: %v", ex.what());
  686. return {};
  687. }
  688. const auto* url = json.GetValueByPath("meta.blockURL");
  689. if (!url || !url->IsString()) {
  690. return {};
  691. }
  692. TNode result = url->GetString();
  693. result.Attributes()["_type_tag"] = "url";
  694. return result;
  695. }
  696. template <typename T>
  697. void BuildCommonOperationPart(
  698. const TConfigPtr& config,
  699. const TOperationSpecBase<T>& baseSpec,
  700. const TOperationOptions& options,
  701. TNode* specNode)
  702. {
  703. const TProcessState* properties = TProcessState::Get();
  704. auto& startedBySpec = (*specNode)["started_by"];
  705. startedBySpec["hostname"] = properties->FqdnHostName,
  706. startedBySpec["pid"] = properties->Pid;
  707. startedBySpec["user"] = properties->UserName;
  708. startedBySpec["wrapper_version"] = properties->ClientVersion;
  709. startedBySpec["binary"] = properties->BinaryPath;
  710. startedBySpec["binary_name"] = properties->BinaryName;
  711. auto nirvanaBlockUrl = GetNirvanaBlockUrlFromContext();
  712. if (!nirvanaBlockUrl.IsUndefined()) {
  713. startedBySpec["nirvana_block_url"] = nirvanaBlockUrl;
  714. }
  715. TString pool;
  716. if (baseSpec.Pool_) {
  717. pool = *baseSpec.Pool_;
  718. } else {
  719. pool = config->Pool;
  720. }
  721. if (!pool.empty()) {
  722. (*specNode)["pool"] = pool;
  723. }
  724. if (baseSpec.Weight_.Defined()) {
  725. (*specNode)["weight"] = *baseSpec.Weight_;
  726. }
  727. if (baseSpec.TimeLimit_.Defined()) {
  728. (*specNode)["time_limit"] = baseSpec.TimeLimit_->MilliSeconds();
  729. }
  730. if (baseSpec.PoolTrees().Defined()) {
  731. TNode poolTreesSpec = TNode::CreateList();
  732. for (const auto& tree : *baseSpec.PoolTrees()) {
  733. poolTreesSpec.Add(tree);
  734. }
  735. (*specNode)["pool_trees"] = std::move(poolTreesSpec);
  736. }
  737. if (baseSpec.ResourceLimits().Defined()) {
  738. auto resourceLimitsSpec = BuildSchedulerResourcesSpec(*baseSpec.ResourceLimits());
  739. if (!resourceLimitsSpec.IsUndefined()) {
  740. (*specNode)["resource_limits"] = std::move(resourceLimitsSpec);
  741. }
  742. }
  743. if (options.SecureVault_.Defined()) {
  744. Y_ENSURE(options.SecureVault_->IsMap(),
  745. "SecureVault must be a map node, got " << options.SecureVault_->GetType());
  746. (*specNode)["secure_vault"] = *options.SecureVault_;
  747. }
  748. if (baseSpec.Title_.Defined()) {
  749. (*specNode)["title"] = *baseSpec.Title_;
  750. }
  751. }
  752. template <typename TSpec>
  753. void BuildCommonUserOperationPart(const TSpec& baseSpec, TNode* spec)
  754. {
  755. if (baseSpec.MaxFailedJobCount_.Defined()) {
  756. (*spec)["max_failed_job_count"] = *baseSpec.MaxFailedJobCount_;
  757. }
  758. if (baseSpec.FailOnJobRestart_.Defined()) {
  759. (*spec)["fail_on_job_restart"] = *baseSpec.FailOnJobRestart_;
  760. }
  761. if (baseSpec.StderrTablePath_.Defined()) {
  762. (*spec)["stderr_table_path"] = *baseSpec.StderrTablePath_;
  763. }
  764. if (baseSpec.CoreTablePath_.Defined()) {
  765. (*spec)["core_table_path"] = *baseSpec.CoreTablePath_;
  766. }
  767. if (baseSpec.WaitingJobTimeout_.Defined()) {
  768. (*spec)["waiting_job_timeout"] = baseSpec.WaitingJobTimeout_->MilliSeconds();
  769. }
  770. }
  771. template <typename TSpec>
  772. void BuildJobCountOperationPart(const TSpec& spec, TNode* nodeSpec)
  773. {
  774. if (spec.JobCount_.Defined()) {
  775. (*nodeSpec)["job_count"] = *spec.JobCount_;
  776. }
  777. if (spec.DataSizePerJob_.Defined()) {
  778. (*nodeSpec)["data_size_per_job"] = *spec.DataSizePerJob_;
  779. }
  780. }
  781. template <typename TSpec>
  782. void BuildPartitionCountOperationPart(const TSpec& spec, TNode* nodeSpec)
  783. {
  784. if (spec.PartitionCount_.Defined()) {
  785. (*nodeSpec)["partition_count"] = *spec.PartitionCount_;
  786. }
  787. if (spec.PartitionDataSize_.Defined()) {
  788. (*nodeSpec)["partition_data_size"] = *spec.PartitionDataSize_;
  789. }
  790. }
  791. template <typename TSpec>
  792. void BuildDataSizePerSortJobPart(const TSpec& spec, TNode* nodeSpec)
  793. {
  794. if (spec.DataSizePerSortJob_.Defined()) {
  795. (*nodeSpec)["data_size_per_sort_job"] = *spec.DataSizePerSortJob_;
  796. }
  797. }
  798. template <typename TSpec>
  799. void BuildPartitionJobCountOperationPart(const TSpec& spec, TNode* nodeSpec)
  800. {
  801. if (spec.PartitionJobCount_.Defined()) {
  802. (*nodeSpec)["partition_job_count"] = *spec.PartitionJobCount_;
  803. }
  804. if (spec.DataSizePerPartitionJob_.Defined()) {
  805. (*nodeSpec)["data_size_per_partition_job"] = *spec.DataSizePerPartitionJob_;
  806. }
  807. }
  808. template <typename TSpec>
  809. void BuildMapJobCountOperationPart(const TSpec& spec, TNode* nodeSpec)
  810. {
  811. if (spec.MapJobCount_.Defined()) {
  812. (*nodeSpec)["map_job_count"] = *spec.MapJobCount_;
  813. }
  814. if (spec.DataSizePerMapJob_.Defined()) {
  815. (*nodeSpec)["data_size_per_map_job"] = *spec.DataSizePerMapJob_;
  816. }
  817. }
  818. template <typename TSpec>
  819. void BuildIntermediateDataPart(const TSpec& spec, TNode* nodeSpec)
  820. {
  821. if (spec.IntermediateDataAccount_.Defined()) {
  822. (*nodeSpec)["intermediate_data_account"] = *spec.IntermediateDataAccount_;
  823. }
  824. if (spec.IntermediateDataReplicationFactor_.Defined()) {
  825. (*nodeSpec)["intermediate_data_replication_factor"] = *spec.IntermediateDataReplicationFactor_;
  826. }
  827. }
  828. ////////////////////////////////////////////////////////////////////////////////
  829. TNode MergeSpec(TNode dst, TNode spec, const TOperationOptions& options)
  830. {
  831. MergeNodes(dst["spec"], spec);
  832. if (options.Spec_) {
  833. MergeNodes(dst["spec"], *options.Spec_);
  834. }
  835. return dst;
  836. }
  837. template <typename TSpec>
  838. void CreateDebugOutputTables(const TSpec& spec, const TOperationPreparer& preparer)
  839. {
  840. if (spec.StderrTablePath_.Defined()) {
  841. NYT::NDetail::Create(
  842. preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  843. preparer.GetContext(),
  844. TTransactionId(),
  845. *spec.StderrTablePath_,
  846. NT_TABLE,
  847. TCreateOptions()
  848. .IgnoreExisting(true)
  849. .Recursive(true));
  850. }
  851. if (spec.CoreTablePath_.Defined()) {
  852. NYT::NDetail::Create(
  853. preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  854. preparer.GetContext(),
  855. TTransactionId(),
  856. *spec.CoreTablePath_,
  857. NT_TABLE,
  858. TCreateOptions()
  859. .IgnoreExisting(true)
  860. .Recursive(true));
  861. }
  862. }
  863. void CreateOutputTable(
  864. const TOperationPreparer& preparer,
  865. const TRichYPath& path)
  866. {
  867. Y_ENSURE(path.Path_, "Output table is not set");
  868. if (!path.Create_.Defined()) {
  869. // If `create` attribute is defined
  870. Create(
  871. preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  872. preparer.GetContext(), preparer.GetTransactionId(), path.Path_, NT_TABLE,
  873. TCreateOptions()
  874. .IgnoreExisting(true)
  875. .Recursive(true));
  876. }
  877. }
  878. void CreateOutputTables(
  879. const TOperationPreparer& preparer,
  880. const TVector<TRichYPath>& paths)
  881. {
  882. for (auto& path : paths) {
  883. CreateOutputTable(preparer, path);
  884. }
  885. }
  886. void CheckInputTablesExist(
  887. const TOperationPreparer& preparer,
  888. const TVector<TRichYPath>& paths)
  889. {
  890. Y_ENSURE(!paths.empty(), "Input tables are not set");
  891. for (auto& path : paths) {
  892. auto curTransactionId = path.TransactionId_.GetOrElse(preparer.GetTransactionId());
  893. Y_ENSURE_EX(
  894. Exists(
  895. preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  896. preparer.GetContext(),
  897. curTransactionId,
  898. path.Path_),
  899. TApiUsageError() << "Input table '" << path.Path_ << "' doesn't exist");
  900. }
  901. }
  902. void LogJob(const TOperationId& opId, const IJob* job, const char* type)
  903. {
  904. if (job) {
  905. YT_LOG_INFO("Operation %v; %v = %v",
  906. opId,
  907. type,
  908. TJobFactory::Get()->GetJobName(job));
  909. }
  910. }
  911. void LogYPaths(const TOperationId& opId, const TVector<TRichYPath>& paths, const char* type)
  912. {
  913. for (size_t i = 0; i < paths.size(); ++i) {
  914. YT_LOG_INFO("Operation %v; %v[%v] = %v",
  915. opId,
  916. type,
  917. i,
  918. paths[i].Path_);
  919. }
  920. }
  921. void LogYPath(const TOperationId& opId, const TRichYPath& path, const char* type)
  922. {
  923. YT_LOG_INFO("Operation %v; %v = %v",
  924. opId,
  925. type,
  926. path.Path_);
  927. }
  928. TString AddModeToTitleIfDebug(const TString& title) {
  929. #ifndef NDEBUG
  930. return title + " (debug build)";
  931. #else
  932. return title;
  933. #endif
  934. }
  935. } // namespace
  936. ////////////////////////////////////////////////////////////////////////////////
  937. template <typename T>
  938. void DoExecuteMap(
  939. const TOperationPtr& operation,
  940. const TOperationPreparerPtr& preparer,
  941. const TSimpleOperationIo& operationIo,
  942. TMapOperationSpecBase<T> spec,
  943. const IJobPtr& mapper,
  944. const TOperationOptions& options)
  945. {
  946. if (options.CreateDebugOutputTables_) {
  947. CreateDebugOutputTables(spec, *preparer);
  948. }
  949. if (options.CreateOutputTables_) {
  950. CheckInputTablesExist(*preparer, operationIo.Inputs);
  951. CreateOutputTables(*preparer, operationIo.Outputs);
  952. }
  953. TJobPreparer map(
  954. *preparer,
  955. spec.MapperSpec_,
  956. *mapper,
  957. operationIo.Outputs.size(),
  958. operationIo.JobFiles,
  959. options);
  960. spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(map.GetClassName()));
  961. TNode specNode = BuildYsonNodeFluently()
  962. .BeginMap().Item("spec").BeginMap()
  963. .Item("mapper").DoMap([&] (TFluentMap fluent) {
  964. BuildUserJobFluently(
  965. map,
  966. operationIo.InputFormat,
  967. operationIo.OutputFormat,
  968. fluent);
  969. })
  970. .DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) {
  971. auto autoMergeSpec = BuildAutoMergeSpec(*spec.AutoMerge_);
  972. if (!autoMergeSpec.IsUndefined()) {
  973. fluent.Item("auto_merge").Value(std::move(autoMergeSpec));
  974. }
  975. })
  976. .Item("input_table_paths").List(operationIo.Inputs)
  977. .Item("output_table_paths").List(operationIo.Outputs)
  978. .DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) {
  979. fluent.Item("ordered").Value(spec.Ordered_.GetRef());
  980. })
  981. .EndMap().EndMap();
  982. BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
  983. specNode["spec"]["job_io"]["control_attributes"]["enable_row_index"] = TNode(true);
  984. specNode["spec"]["job_io"]["control_attributes"]["enable_range_index"] = TNode(true);
  985. if (!preparer->GetContext().Config->TableWriter.Empty()) {
  986. specNode["spec"]["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter;
  987. }
  988. BuildCommonUserOperationPart(spec, &specNode["spec"]);
  989. BuildJobCountOperationPart(spec, &specNode["spec"]);
  990. auto startOperation = [
  991. operation=operation.Get(),
  992. spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
  993. preparer,
  994. operationIo,
  995. mapper
  996. ] () {
  997. auto operationId = preparer->StartOperation(operation, "map", spec);
  998. LogJob(operationId, mapper.Get(), "mapper");
  999. LogYPaths(operationId, operationIo.Inputs, "input");
  1000. LogYPaths(operationId, operationIo.Outputs, "output");
  1001. return operationId;
  1002. };
  1003. operation->SetDelayedStartFunction(std::move(startOperation));
  1004. }
  1005. void ExecuteMap(
  1006. const TOperationPtr& operation,
  1007. const TOperationPreparerPtr& preparer,
  1008. const TMapOperationSpec& spec,
  1009. const ::TIntrusivePtr<IStructuredJob>& mapper,
  1010. const TOperationOptions& options)
  1011. {
  1012. YT_LOG_DEBUG("Starting map operation (PreparationId: %v)",
  1013. preparer->GetPreparationId());
  1014. auto operationIo = CreateSimpleOperationIo(*mapper, *preparer, spec, options, /* allowSkiff = */ true);
  1015. DoExecuteMap(
  1016. operation,
  1017. preparer,
  1018. operationIo,
  1019. spec,
  1020. mapper,
  1021. options);
  1022. }
  1023. void ExecuteRawMap(
  1024. const TOperationPtr& operation,
  1025. const TOperationPreparerPtr& preparer,
  1026. const TRawMapOperationSpec& spec,
  1027. const ::TIntrusivePtr<IRawJob>& mapper,
  1028. const TOperationOptions& options)
  1029. {
  1030. YT_LOG_DEBUG("Starting raw map operation (PreparationId: %v)",
  1031. preparer->GetPreparationId());
  1032. auto operationIo = CreateSimpleOperationIo(*mapper, *preparer, spec);
  1033. DoExecuteMap(
  1034. operation,
  1035. preparer,
  1036. operationIo,
  1037. spec,
  1038. mapper,
  1039. options);
  1040. }
  1041. ////////////////////////////////////////////////////////////////////////////////
  1042. template <typename T>
  1043. void DoExecuteReduce(
  1044. const TOperationPtr& operation,
  1045. const TOperationPreparerPtr& preparer,
  1046. const TSimpleOperationIo& operationIo,
  1047. TReduceOperationSpecBase<T> spec,
  1048. const IJobPtr& reducer,
  1049. const TOperationOptions& options)
  1050. {
  1051. if (options.CreateDebugOutputTables_) {
  1052. CreateDebugOutputTables(spec, *preparer);
  1053. }
  1054. if (options.CreateOutputTables_) {
  1055. CheckInputTablesExist(*preparer, operationIo.Inputs);
  1056. CreateOutputTables(*preparer, operationIo.Outputs);
  1057. }
  1058. TJobPreparer reduce(
  1059. *preparer,
  1060. spec.ReducerSpec_,
  1061. *reducer,
  1062. operationIo.Outputs.size(),
  1063. operationIo.JobFiles,
  1064. options);
  1065. spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName()));
  1066. TNode specNode = BuildYsonNodeFluently()
  1067. .BeginMap().Item("spec").BeginMap()
  1068. .Item("reducer").DoMap([&] (TFluentMap fluent) {
  1069. BuildUserJobFluently(
  1070. reduce,
  1071. operationIo.InputFormat,
  1072. operationIo.OutputFormat,
  1073. fluent);
  1074. })
  1075. .DoIf(!spec.SortBy_.Parts_.empty(), [&] (TFluentMap fluent) {
  1076. fluent.Item("sort_by").Value(spec.SortBy_);
  1077. })
  1078. .Item("reduce_by").Value(spec.ReduceBy_)
  1079. .DoIf(spec.JoinBy_.Defined(), [&] (TFluentMap fluent) {
  1080. fluent.Item("join_by").Value(spec.JoinBy_.GetRef());
  1081. })
  1082. .DoIf(spec.EnableKeyGuarantee_.Defined(), [&] (TFluentMap fluent) {
  1083. fluent.Item("enable_key_guarantee").Value(spec.EnableKeyGuarantee_.GetRef());
  1084. })
  1085. .Item("input_table_paths").List(operationIo.Inputs)
  1086. .Item("output_table_paths").List(operationIo.Outputs)
  1087. .Item("job_io").BeginMap()
  1088. .Item("control_attributes").BeginMap()
  1089. .Item("enable_key_switch").Value(true)
  1090. .Item("enable_row_index").Value(true)
  1091. .Item("enable_range_index").Value(true)
  1092. .EndMap()
  1093. .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&] (TFluentMap fluent) {
  1094. fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
  1095. })
  1096. .EndMap()
  1097. .DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) {
  1098. fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_));
  1099. })
  1100. .EndMap().EndMap();
  1101. BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
  1102. BuildCommonUserOperationPart(spec, &specNode["spec"]);
  1103. BuildJobCountOperationPart(spec, &specNode["spec"]);
  1104. auto startOperation = [
  1105. operation=operation.Get(),
  1106. spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
  1107. preparer,
  1108. operationIo,
  1109. reducer
  1110. ] () {
  1111. auto operationId = preparer->StartOperation(operation, "reduce", spec);
  1112. LogJob(operationId, reducer.Get(), "reducer");
  1113. LogYPaths(operationId, operationIo.Inputs, "input");
  1114. LogYPaths(operationId, operationIo.Outputs, "output");
  1115. return operationId;
  1116. };
  1117. operation->SetDelayedStartFunction(std::move(startOperation));
  1118. }
  1119. void ExecuteReduce(
  1120. const TOperationPtr& operation,
  1121. const TOperationPreparerPtr& preparer,
  1122. const TReduceOperationSpec& spec,
  1123. const ::TIntrusivePtr<IStructuredJob>& reducer,
  1124. const TOperationOptions& options)
  1125. {
  1126. YT_LOG_DEBUG("Starting reduce operation (PreparationId: %v)",
  1127. preparer->GetPreparationId());
  1128. auto operationIo = CreateSimpleOperationIo(*reducer, *preparer, spec, options, /* allowSkiff = */ false);
  1129. DoExecuteReduce(
  1130. operation,
  1131. preparer,
  1132. operationIo,
  1133. spec,
  1134. reducer,
  1135. options);
  1136. }
  1137. void ExecuteRawReduce(
  1138. const TOperationPtr& operation,
  1139. const TOperationPreparerPtr& preparer,
  1140. const TRawReduceOperationSpec& spec,
  1141. const ::TIntrusivePtr<IRawJob>& reducer,
  1142. const TOperationOptions& options)
  1143. {
  1144. YT_LOG_DEBUG("Starting raw reduce operation (PreparationId: %v)",
  1145. preparer->GetPreparationId());
  1146. auto operationIo = CreateSimpleOperationIo(*reducer, *preparer, spec);
  1147. DoExecuteReduce(
  1148. operation,
  1149. preparer,
  1150. operationIo,
  1151. spec,
  1152. reducer,
  1153. options);
  1154. }
  1155. ////////////////////////////////////////////////////////////////////////////////
  1156. template <typename T>
  1157. void DoExecuteJoinReduce(
  1158. const TOperationPtr& operation,
  1159. const TOperationPreparerPtr& preparer,
  1160. const TSimpleOperationIo& operationIo,
  1161. TJoinReduceOperationSpecBase<T> spec,
  1162. const IJobPtr& reducer,
  1163. const TOperationOptions& options)
  1164. {
  1165. if (options.CreateDebugOutputTables_) {
  1166. CreateDebugOutputTables(spec, *preparer);
  1167. }
  1168. if (options.CreateOutputTables_) {
  1169. CheckInputTablesExist(*preparer, operationIo.Inputs);
  1170. CreateOutputTables(*preparer, operationIo.Outputs);
  1171. }
  1172. TJobPreparer reduce(
  1173. *preparer,
  1174. spec.ReducerSpec_,
  1175. *reducer,
  1176. operationIo.Outputs.size(),
  1177. operationIo.JobFiles,
  1178. options);
  1179. spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName()));
  1180. TNode specNode = BuildYsonNodeFluently()
  1181. .BeginMap().Item("spec").BeginMap()
  1182. .Item("reducer").DoMap([&] (TFluentMap fluent) {
  1183. BuildUserJobFluently(
  1184. reduce,
  1185. operationIo.InputFormat,
  1186. operationIo.OutputFormat,
  1187. fluent);
  1188. })
  1189. .Item("join_by").Value(spec.JoinBy_)
  1190. .Item("input_table_paths").List(operationIo.Inputs)
  1191. .Item("output_table_paths").List(operationIo.Outputs)
  1192. .Item("job_io").BeginMap()
  1193. .Item("control_attributes").BeginMap()
  1194. .Item("enable_key_switch").Value(true)
  1195. .Item("enable_row_index").Value(true)
  1196. .Item("enable_range_index").Value(true)
  1197. .EndMap()
  1198. .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&] (TFluentMap fluent) {
  1199. fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
  1200. })
  1201. .EndMap()
  1202. .EndMap().EndMap();
  1203. BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
  1204. BuildCommonUserOperationPart(spec, &specNode["spec"]);
  1205. BuildJobCountOperationPart(spec, &specNode["spec"]);
  1206. auto startOperation = [
  1207. operation=operation.Get(),
  1208. spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
  1209. preparer,
  1210. reducer,
  1211. operationIo
  1212. ] () {
  1213. auto operationId = preparer->StartOperation(operation, "join_reduce", spec);
  1214. LogJob(operationId, reducer.Get(), "reducer");
  1215. LogYPaths(operationId, operationIo.Inputs, "input");
  1216. LogYPaths(operationId, operationIo.Outputs, "output");
  1217. return operationId;
  1218. };
  1219. operation->SetDelayedStartFunction(std::move(startOperation));
  1220. }
  1221. void ExecuteJoinReduce(
  1222. const TOperationPtr& operation,
  1223. const TOperationPreparerPtr& preparer,
  1224. const TJoinReduceOperationSpec& spec,
  1225. const ::TIntrusivePtr<IStructuredJob>& reducer,
  1226. const TOperationOptions& options)
  1227. {
  1228. YT_LOG_DEBUG("Starting join reduce operation (PreparationId: %v)",
  1229. preparer->GetPreparationId());
  1230. auto operationIo = CreateSimpleOperationIo(*reducer, *preparer, spec, options, /* allowSkiff = */ false);
  1231. return DoExecuteJoinReduce(
  1232. operation,
  1233. preparer,
  1234. operationIo,
  1235. spec,
  1236. reducer,
  1237. options);
  1238. }
  1239. void ExecuteRawJoinReduce(
  1240. const TOperationPtr& operation,
  1241. const TOperationPreparerPtr& preparer,
  1242. const TRawJoinReduceOperationSpec& spec,
  1243. const ::TIntrusivePtr<IRawJob>& reducer,
  1244. const TOperationOptions& options)
  1245. {
  1246. YT_LOG_DEBUG("Starting raw join reduce operation (PreparationId: %v)",
  1247. preparer->GetPreparationId());
  1248. auto operationIo = CreateSimpleOperationIo(*reducer, *preparer, spec);
  1249. return DoExecuteJoinReduce(
  1250. operation,
  1251. preparer,
  1252. operationIo,
  1253. spec,
  1254. reducer,
  1255. options);
  1256. }
  1257. ////////////////////////////////////////////////////////////////////////////////
  1258. template <typename T>
  1259. void DoExecuteMapReduce(
  1260. const TOperationPtr& operation,
  1261. const TOperationPreparerPtr& preparer,
  1262. const TMapReduceOperationIo& operationIo,
  1263. TMapReduceOperationSpecBase<T> spec,
  1264. const IJobPtr& mapper,
  1265. const IJobPtr& reduceCombiner,
  1266. const IJobPtr& reducer,
  1267. const TOperationOptions& options)
  1268. {
  1269. TVector<TRichYPath> allOutputs;
  1270. allOutputs.insert(allOutputs.end(), operationIo.MapOutputs.begin(), operationIo.MapOutputs.end());
  1271. allOutputs.insert(allOutputs.end(), operationIo.Outputs.begin(), operationIo.Outputs.end());
  1272. if (options.CreateDebugOutputTables_) {
  1273. CreateDebugOutputTables(spec, *preparer);
  1274. }
  1275. if (options.CreateOutputTables_) {
  1276. CheckInputTablesExist(*preparer, operationIo.Inputs);
  1277. CreateOutputTables(*preparer, allOutputs);
  1278. }
  1279. TSortColumns sortBy = spec.SortBy_;
  1280. TSortColumns reduceBy = spec.ReduceBy_;
  1281. if (sortBy.Parts_.empty()) {
  1282. sortBy = reduceBy;
  1283. }
  1284. const bool hasMapper = mapper != nullptr;
  1285. const bool hasCombiner = reduceCombiner != nullptr;
  1286. TVector<TRichYPath> files;
  1287. TJobPreparer reduce(
  1288. *preparer,
  1289. spec.ReducerSpec_,
  1290. *reducer,
  1291. operationIo.Outputs.size(),
  1292. operationIo.ReducerJobFiles,
  1293. options);
  1294. TString title;
  1295. TNode specNode = BuildYsonNodeFluently()
  1296. .BeginMap().Item("spec").BeginMap()
  1297. .DoIf(hasMapper, [&] (TFluentMap fluent) {
  1298. TJobPreparer map(
  1299. *preparer,
  1300. spec.MapperSpec_,
  1301. *mapper,
  1302. 1 + operationIo.MapOutputs.size(),
  1303. operationIo.MapperJobFiles,
  1304. options);
  1305. fluent.Item("mapper").DoMap([&] (TFluentMap fluent) {
  1306. BuildUserJobFluently(
  1307. std::cref(map),
  1308. *operationIo.MapperInputFormat,
  1309. *operationIo.MapperOutputFormat,
  1310. fluent);
  1311. });
  1312. title = "mapper:" + map.GetClassName() + " ";
  1313. })
  1314. .DoIf(hasCombiner, [&] (TFluentMap fluent) {
  1315. TJobPreparer combine(
  1316. *preparer,
  1317. spec.ReduceCombinerSpec_,
  1318. *reduceCombiner,
  1319. size_t(1),
  1320. operationIo.ReduceCombinerJobFiles,
  1321. options);
  1322. fluent.Item("reduce_combiner").DoMap([&] (TFluentMap fluent) {
  1323. BuildUserJobFluently(
  1324. combine,
  1325. *operationIo.ReduceCombinerInputFormat,
  1326. *operationIo.ReduceCombinerOutputFormat,
  1327. fluent);
  1328. });
  1329. title += "combiner:" + combine.GetClassName() + " ";
  1330. })
  1331. .Item("reducer").DoMap([&] (TFluentMap fluent) {
  1332. BuildUserJobFluently(
  1333. reduce,
  1334. operationIo.ReducerInputFormat,
  1335. operationIo.ReducerOutputFormat,
  1336. fluent);
  1337. })
  1338. .Item("sort_by").Value(sortBy)
  1339. .Item("reduce_by").Value(reduceBy)
  1340. .Item("input_table_paths").List(operationIo.Inputs)
  1341. .Item("output_table_paths").List(allOutputs)
  1342. .Item("mapper_output_table_count").Value(operationIo.MapOutputs.size())
  1343. .DoIf(spec.ForceReduceCombiners_.Defined(), [&] (TFluentMap fluent) {
  1344. fluent.Item("force_reduce_combiners").Value(*spec.ForceReduceCombiners_);
  1345. })
  1346. .Item("map_job_io").BeginMap()
  1347. .Item("control_attributes").BeginMap()
  1348. .Item("enable_row_index").Value(true)
  1349. .Item("enable_range_index").Value(true)
  1350. .EndMap()
  1351. .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&] (TFluentMap fluent) {
  1352. fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
  1353. })
  1354. .EndMap()
  1355. .Item("sort_job_io").BeginMap()
  1356. .Item("control_attributes").BeginMap()
  1357. .Item("enable_key_switch").Value(true)
  1358. .EndMap()
  1359. .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&] (TFluentMap fluent) {
  1360. fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
  1361. })
  1362. .EndMap()
  1363. .Item("reduce_job_io").BeginMap()
  1364. .Item("control_attributes").BeginMap()
  1365. .Item("enable_key_switch").Value(true)
  1366. .EndMap()
  1367. .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&] (TFluentMap fluent) {
  1368. fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
  1369. })
  1370. .EndMap()
  1371. .Do([&] (TFluentMap) {
  1372. spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName()));
  1373. })
  1374. .EndMap().EndMap();
  1375. if (spec.Ordered_) {
  1376. specNode["spec"]["ordered"] = *spec.Ordered_;
  1377. }
  1378. BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
  1379. BuildCommonUserOperationPart(spec, &specNode["spec"]);
  1380. BuildMapJobCountOperationPart(spec, &specNode["spec"]);
  1381. BuildPartitionCountOperationPart(spec, &specNode["spec"]);
  1382. BuildIntermediateDataPart(spec, &specNode["spec"]);
  1383. BuildDataSizePerSortJobPart(spec, &specNode["spec"]);
  1384. auto startOperation = [
  1385. operation=operation.Get(),
  1386. spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
  1387. preparer,
  1388. mapper,
  1389. reduceCombiner,
  1390. reducer,
  1391. inputs=operationIo.Inputs,
  1392. allOutputs
  1393. ] () {
  1394. auto operationId = preparer->StartOperation(operation, "map_reduce", spec);
  1395. LogJob(operationId, mapper.Get(), "mapper");
  1396. LogJob(operationId, reduceCombiner.Get(), "reduce_combiner");
  1397. LogJob(operationId, reducer.Get(), "reducer");
  1398. LogYPaths(operationId, inputs, "input");
  1399. LogYPaths(operationId, allOutputs, "output");
  1400. return operationId;
  1401. };
  1402. operation->SetDelayedStartFunction(std::move(startOperation));
  1403. }
  1404. void ExecuteMapReduce(
  1405. const TOperationPtr& operation,
  1406. const TOperationPreparerPtr& preparer,
  1407. const TMapReduceOperationSpec& spec_,
  1408. const ::TIntrusivePtr<IStructuredJob>& mapper,
  1409. const ::TIntrusivePtr<IStructuredJob>& reduceCombiner,
  1410. const ::TIntrusivePtr<IStructuredJob>& reducer,
  1411. const TOperationOptions& options)
  1412. {
  1413. YT_LOG_DEBUG("Starting map-reduce operation (PreparationId: %v)",
  1414. preparer->GetPreparationId());
  1415. TMapReduceOperationSpec spec = spec_;
  1416. TMapReduceOperationIo operationIo;
  1417. auto structuredInputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredInputs());
  1418. auto structuredMapOutputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredMapOutputs());
  1419. auto structuredOutputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredOutputs());
  1420. const bool inferOutputSchema = options.InferOutputSchema_.GetOrElse(preparer->GetContext().Config->InferTableSchema);
  1421. TVector<TTableSchema> currentInferenceResult;
  1422. auto fixSpec = [&] (const TFormat& format) {
  1423. if (format.IsYamredDsv()) {
  1424. spec.SortBy_.Parts_.clear();
  1425. spec.ReduceBy_.Parts_.clear();
  1426. const TYamredDsvAttributes attributes = format.GetYamredDsvAttributes();
  1427. for (auto& column : attributes.KeyColumnNames) {
  1428. spec.SortBy_.Parts_.push_back(column);
  1429. spec.ReduceBy_.Parts_.push_back(column);
  1430. }
  1431. for (const auto& column : attributes.SubkeyColumnNames) {
  1432. spec.SortBy_.Parts_.push_back(column);
  1433. }
  1434. }
  1435. };
  1436. VerifyHasElements(structuredInputs, "inputs");
  1437. TFormatBuilder formatBuilder(
  1438. preparer->GetClientRetryPolicy(),
  1439. preparer->GetContext(),
  1440. preparer->GetTransactionId(),
  1441. options);
  1442. if (mapper) {
  1443. auto mapperOutputDescription =
  1444. spec.GetIntermediateMapOutputDescription()
  1445. .GetOrElse(TUnspecifiedTableStructure());
  1446. TStructuredJobTableList mapperOutput = {
  1447. TStructuredJobTable::Intermediate(mapperOutputDescription),
  1448. };
  1449. for (const auto& table : structuredMapOutputs) {
  1450. mapperOutput.push_back(TStructuredJobTable{table.Description, table.RichYPath});
  1451. }
  1452. auto hints = spec.MapperFormatHints_;
  1453. auto mapperInferenceResult = PrepareOperation<TStructuredJobTableList>(
  1454. *mapper,
  1455. TOperationPreparationContext(
  1456. structuredInputs,
  1457. mapperOutput,
  1458. preparer->GetContext(),
  1459. preparer->GetClientRetryPolicy(),
  1460. preparer->GetTransactionId()),
  1461. &structuredInputs,
  1462. /* outputs */ nullptr,
  1463. hints);
  1464. auto nodeReaderFormat = NodeReaderFormatFromHintAndGlobalConfig(spec.MapperFormatHints_);
  1465. auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat(
  1466. *mapper,
  1467. EIODirection::Input,
  1468. structuredInputs,
  1469. hints.InputFormatHints_,
  1470. nodeReaderFormat,
  1471. /* allowFormatFromTableAttribute */ true);
  1472. auto [outputFormat, outputFormatConfig] = formatBuilder.CreateFormat(
  1473. *mapper,
  1474. EIODirection::Output,
  1475. mapperOutput,
  1476. hints.OutputFormatHints_,
  1477. ENodeReaderFormat::Yson,
  1478. /* allowFormatFromTableAttribute */ false);
  1479. operationIo.MapperJobFiles = CreateFormatConfig(inputFormatConfig, outputFormatConfig);
  1480. operationIo.MapperInputFormat = inputFormat;
  1481. operationIo.MapperOutputFormat = outputFormat;
  1482. Y_ABORT_UNLESS(mapperInferenceResult.size() >= 1);
  1483. currentInferenceResult = TVector<TTableSchema>{mapperInferenceResult[0]};
  1484. // The first output as it corresponds to the intermediate data.
  1485. TVector<TTableSchema> additionalOutputsInferenceResult(mapperInferenceResult.begin() + 1, mapperInferenceResult.end());
  1486. operationIo.MapOutputs = GetPathList(
  1487. structuredMapOutputs,
  1488. additionalOutputsInferenceResult,
  1489. inferOutputSchema);
  1490. }
  1491. if (reduceCombiner) {
  1492. const bool isFirstStep = !mapper;
  1493. TStructuredJobTableList inputs;
  1494. if (isFirstStep) {
  1495. inputs = structuredInputs;
  1496. } else {
  1497. auto reduceCombinerIntermediateInput =
  1498. spec.GetIntermediateReduceCombinerInputDescription()
  1499. .GetOrElse(TUnspecifiedTableStructure());
  1500. inputs = {
  1501. TStructuredJobTable::Intermediate(reduceCombinerIntermediateInput),
  1502. };
  1503. }
  1504. auto reduceCombinerOutputDescription = spec.GetIntermediateReduceCombinerOutputDescription()
  1505. .GetOrElse(TUnspecifiedTableStructure());
  1506. TStructuredJobTableList outputs = {
  1507. TStructuredJobTable::Intermediate(reduceCombinerOutputDescription),
  1508. };
  1509. auto hints = spec.ReduceCombinerFormatHints_;
  1510. if (isFirstStep) {
  1511. currentInferenceResult = PrepareOperation<TStructuredJobTableList>(
  1512. *reduceCombiner,
  1513. TOperationPreparationContext(
  1514. inputs,
  1515. outputs,
  1516. preparer->GetContext(),
  1517. preparer->GetClientRetryPolicy(),
  1518. preparer->GetTransactionId()),
  1519. &inputs,
  1520. /* outputs */ nullptr,
  1521. hints);
  1522. } else {
  1523. currentInferenceResult = PrepareOperation<TStructuredJobTableList>(
  1524. *reduceCombiner,
  1525. TSpeculativeOperationPreparationContext(
  1526. currentInferenceResult,
  1527. inputs,
  1528. outputs),
  1529. /* inputs */ nullptr,
  1530. /* outputs */ nullptr,
  1531. hints);
  1532. }
  1533. auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat(
  1534. *reduceCombiner,
  1535. EIODirection::Input,
  1536. inputs,
  1537. hints.InputFormatHints_,
  1538. ENodeReaderFormat::Yson,
  1539. /* allowFormatFromTableAttribute = */ isFirstStep);
  1540. auto [outputFormat, outputFormatConfig] = formatBuilder.CreateFormat(
  1541. *reduceCombiner,
  1542. EIODirection::Output,
  1543. outputs,
  1544. hints.OutputFormatHints_,
  1545. ENodeReaderFormat::Yson,
  1546. /* allowFormatFromTableAttribute = */ false);
  1547. operationIo.ReduceCombinerJobFiles = CreateFormatConfig(inputFormatConfig, outputFormatConfig);
  1548. operationIo.ReduceCombinerInputFormat = inputFormat;
  1549. operationIo.ReduceCombinerOutputFormat = outputFormat;
  1550. if (isFirstStep) {
  1551. fixSpec(*operationIo.ReduceCombinerInputFormat);
  1552. }
  1553. }
  1554. const bool isFirstStep = (!mapper && !reduceCombiner);
  1555. TStructuredJobTableList reducerInputs;
  1556. if (isFirstStep) {
  1557. reducerInputs = structuredInputs;
  1558. } else {
  1559. auto reducerInputDescription =
  1560. spec.GetIntermediateReducerInputDescription()
  1561. .GetOrElse(TUnspecifiedTableStructure());
  1562. reducerInputs = {
  1563. TStructuredJobTable::Intermediate(reducerInputDescription),
  1564. };
  1565. }
  1566. auto hints = spec.ReducerFormatHints_;
  1567. TVector<TTableSchema> reducerInferenceResult;
  1568. if (isFirstStep) {
  1569. reducerInferenceResult = PrepareOperation(
  1570. *reducer,
  1571. TOperationPreparationContext(
  1572. structuredInputs,
  1573. structuredOutputs,
  1574. preparer->GetContext(),
  1575. preparer->GetClientRetryPolicy(),
  1576. preparer->GetTransactionId()),
  1577. &structuredInputs,
  1578. &structuredOutputs,
  1579. hints);
  1580. } else {
  1581. reducerInferenceResult = PrepareOperation<TStructuredJobTableList>(
  1582. *reducer,
  1583. TSpeculativeOperationPreparationContext(
  1584. currentInferenceResult,
  1585. reducerInputs,
  1586. structuredOutputs),
  1587. /* inputs */ nullptr,
  1588. &structuredOutputs,
  1589. hints);
  1590. }
  1591. auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat(
  1592. *reducer,
  1593. EIODirection::Input,
  1594. reducerInputs,
  1595. hints.InputFormatHints_,
  1596. ENodeReaderFormat::Yson,
  1597. /* allowFormatFromTableAttribute = */ isFirstStep);
  1598. auto [outputFormat, outputFormatConfig] = formatBuilder.CreateFormat(
  1599. *reducer,
  1600. EIODirection::Output,
  1601. ToStructuredJobTableList(spec.GetStructuredOutputs()),
  1602. hints.OutputFormatHints_,
  1603. ENodeReaderFormat::Yson,
  1604. /* allowFormatFromTableAttribute = */ false);
  1605. operationIo.ReducerJobFiles = CreateFormatConfig(inputFormatConfig, outputFormatConfig);
  1606. operationIo.ReducerInputFormat = inputFormat;
  1607. operationIo.ReducerOutputFormat = outputFormat;
  1608. if (isFirstStep) {
  1609. fixSpec(operationIo.ReducerInputFormat);
  1610. }
  1611. operationIo.Inputs = GetPathList(
  1612. ApplyProtobufColumnFilters(
  1613. structuredInputs,
  1614. *preparer,
  1615. GetColumnsUsedInOperation(spec),
  1616. options),
  1617. /* jobSchemaInferenceResult */ Nothing(),
  1618. /* inferSchema */ false);
  1619. operationIo.Outputs = GetPathList(
  1620. structuredOutputs,
  1621. reducerInferenceResult,
  1622. inferOutputSchema);
  1623. VerifyHasElements(operationIo.Outputs, "outputs");
  1624. return DoExecuteMapReduce(
  1625. operation,
  1626. preparer,
  1627. operationIo,
  1628. spec,
  1629. mapper,
  1630. reduceCombiner,
  1631. reducer,
  1632. options);
  1633. }
  1634. void ExecuteRawMapReduce(
  1635. const TOperationPtr& operation,
  1636. const TOperationPreparerPtr& preparer,
  1637. const TRawMapReduceOperationSpec& spec,
  1638. const ::TIntrusivePtr<IRawJob>& mapper,
  1639. const ::TIntrusivePtr<IRawJob>& reduceCombiner,
  1640. const ::TIntrusivePtr<IRawJob>& reducer,
  1641. const TOperationOptions& options)
  1642. {
  1643. YT_LOG_DEBUG("Starting raw map-reduce operation (PreparationId: %v)",
  1644. preparer->GetPreparationId());
  1645. TMapReduceOperationIo operationIo;
  1646. operationIo.Inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs());
  1647. operationIo.MapOutputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs());
  1648. operationIo.Outputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs());
  1649. VerifyHasElements(operationIo.Inputs, "inputs");
  1650. VerifyHasElements(operationIo.Outputs, "outputs");
  1651. auto getFormatOrDefault = [&] (const TMaybe<TFormat>& maybeFormat, const TMaybe<TFormat> stageDefaultFormat, const char* formatName) {
  1652. if (maybeFormat) {
  1653. return *maybeFormat;
  1654. } else if (stageDefaultFormat) {
  1655. return *stageDefaultFormat;
  1656. } else {
  1657. ythrow TApiUsageError() << "Cannot derive " << formatName;
  1658. }
  1659. };
  1660. if (mapper) {
  1661. operationIo.MapperInputFormat = getFormatOrDefault(spec.MapperInputFormat_, spec.MapperFormat_, "mapper input format");
  1662. operationIo.MapperOutputFormat = getFormatOrDefault(spec.MapperOutputFormat_, spec.MapperFormat_, "mapper output format");
  1663. }
  1664. if (reduceCombiner) {
  1665. operationIo.ReduceCombinerInputFormat = getFormatOrDefault(spec.ReduceCombinerInputFormat_, spec.ReduceCombinerFormat_, "reduce combiner input format");
  1666. operationIo.ReduceCombinerOutputFormat = getFormatOrDefault(spec.ReduceCombinerOutputFormat_, spec.ReduceCombinerFormat_, "reduce combiner output format");
  1667. }
  1668. operationIo.ReducerInputFormat = getFormatOrDefault(spec.ReducerInputFormat_, spec.ReducerFormat_, "reducer input format");
  1669. operationIo.ReducerOutputFormat = getFormatOrDefault(spec.ReducerOutputFormat_, spec.ReducerFormat_, "reducer output format");
  1670. return DoExecuteMapReduce(
  1671. operation,
  1672. preparer,
  1673. operationIo,
  1674. spec,
  1675. mapper,
  1676. reduceCombiner,
  1677. reducer,
  1678. options);
  1679. }
  1680. void ExecuteSort(
  1681. const TOperationPtr& operation,
  1682. const TOperationPreparerPtr& preparer,
  1683. const TSortOperationSpec& spec,
  1684. const TOperationOptions& options)
  1685. {
  1686. YT_LOG_DEBUG("Starting sort operation (PreparationId: %v)",
  1687. preparer->GetPreparationId());
  1688. auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
  1689. auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
  1690. if (options.CreateOutputTables_) {
  1691. CheckInputTablesExist(*preparer, inputs);
  1692. CreateOutputTable(*preparer, output);
  1693. }
  1694. TNode specNode = BuildYsonNodeFluently()
  1695. .BeginMap().Item("spec").BeginMap()
  1696. .Item("input_table_paths").List(inputs)
  1697. .Item("output_table_path").Value(output)
  1698. .Item("sort_by").Value(spec.SortBy_)
  1699. .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
  1700. fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
  1701. })
  1702. .EndMap().EndMap();
  1703. BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
  1704. BuildPartitionCountOperationPart(spec, &specNode["spec"]);
  1705. BuildPartitionJobCountOperationPart(spec, &specNode["spec"]);
  1706. BuildIntermediateDataPart(spec, &specNode["spec"]);
  1707. auto startOperation = [
  1708. operation=operation.Get(),
  1709. spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
  1710. preparer,
  1711. inputs,
  1712. output
  1713. ] () {
  1714. auto operationId = preparer->StartOperation(operation, "sort", spec);
  1715. LogYPaths(operationId, inputs, "input");
  1716. LogYPath(operationId, output, "output");
  1717. return operationId;
  1718. };
  1719. operation->SetDelayedStartFunction(std::move(startOperation));
  1720. }
  1721. void ExecuteMerge(
  1722. const TOperationPtr& operation,
  1723. const TOperationPreparerPtr& preparer,
  1724. const TMergeOperationSpec& spec,
  1725. const TOperationOptions& options)
  1726. {
  1727. YT_LOG_DEBUG("Starting merge operation (PreparationId: %v)",
  1728. preparer->GetPreparationId());
  1729. auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
  1730. auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
  1731. if (options.CreateOutputTables_) {
  1732. CheckInputTablesExist(*preparer, inputs);
  1733. CreateOutputTable(*preparer, output);
  1734. }
  1735. TNode specNode = BuildYsonNodeFluently()
  1736. .BeginMap().Item("spec").BeginMap()
  1737. .Item("input_table_paths").List(inputs)
  1738. .Item("output_table_path").Value(output)
  1739. .Item("mode").Value(ToString(spec.Mode_))
  1740. .Item("combine_chunks").Value(spec.CombineChunks_)
  1741. .Item("force_transform").Value(spec.ForceTransform_)
  1742. .Item("merge_by").Value(spec.MergeBy_)
  1743. .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
  1744. fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
  1745. })
  1746. .EndMap().EndMap();
  1747. BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
  1748. BuildJobCountOperationPart(spec, &specNode["spec"]);
  1749. auto startOperation = [
  1750. operation=operation.Get(),
  1751. spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
  1752. preparer,
  1753. inputs,
  1754. output
  1755. ] () {
  1756. auto operationId = preparer->StartOperation(operation, "merge", spec);
  1757. LogYPaths(operationId, inputs, "input");
  1758. LogYPath(operationId, output, "output");
  1759. return operationId;
  1760. };
  1761. operation->SetDelayedStartFunction(std::move(startOperation));
  1762. }
  1763. void ExecuteErase(
  1764. const TOperationPtr& operation,
  1765. const TOperationPreparerPtr& preparer,
  1766. const TEraseOperationSpec& spec,
  1767. const TOperationOptions& options)
  1768. {
  1769. YT_LOG_DEBUG("Starting erase operation (PreparationId: %v)",
  1770. preparer->GetPreparationId());
  1771. auto tablePath = CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_);
  1772. TNode specNode = BuildYsonNodeFluently()
  1773. .BeginMap().Item("spec").BeginMap()
  1774. .Item("table_path").Value(tablePath)
  1775. .Item("combine_chunks").Value(spec.CombineChunks_)
  1776. .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
  1777. fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
  1778. })
  1779. .EndMap().EndMap();
  1780. BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
  1781. auto startOperation = [
  1782. operation=operation.Get(),
  1783. spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
  1784. preparer,
  1785. tablePath
  1786. ] () {
  1787. auto operationId = preparer->StartOperation(operation, "erase", spec);
  1788. LogYPath(operationId, tablePath, "table_path");
  1789. return operationId;
  1790. };
  1791. operation->SetDelayedStartFunction(std::move(startOperation));
  1792. }
  1793. void ExecuteRemoteCopy(
  1794. const TOperationPtr& operation,
  1795. const TOperationPreparerPtr& preparer,
  1796. const TRemoteCopyOperationSpec& spec,
  1797. const TOperationOptions& options)
  1798. {
  1799. YT_LOG_DEBUG("Starting remote copy operation (PreparationId: %v)",
  1800. preparer->GetPreparationId());
  1801. auto inputs = CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
  1802. auto output = CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
  1803. if (options.CreateOutputTables_) {
  1804. CreateOutputTable(*preparer, output);
  1805. }
  1806. Y_ENSURE_EX(!spec.ClusterName_.empty(), TApiUsageError() << "ClusterName parameter is required");
  1807. TNode specNode = BuildYsonNodeFluently()
  1808. .BeginMap().Item("spec").BeginMap()
  1809. .Item("cluster_name").Value(spec.ClusterName_)
  1810. .Item("input_table_paths").List(inputs)
  1811. .Item("output_table_path").Value(output)
  1812. .DoIf(spec.NetworkName_.Defined(), [&] (TFluentMap fluent) {
  1813. fluent.Item("network_name").Value(*spec.NetworkName_);
  1814. })
  1815. .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
  1816. fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
  1817. })
  1818. .Item("copy_attributes").Value(spec.CopyAttributes_)
  1819. .DoIf(!spec.AttributeKeys_.empty(), [&] (TFluentMap fluent) {
  1820. Y_ENSURE_EX(spec.CopyAttributes_, TApiUsageError() <<
  1821. "Specifying nonempty AttributeKeys in RemoteCopy "
  1822. "doesn't make sense without CopyAttributes == true");
  1823. fluent.Item("attribute_keys").List(spec.AttributeKeys_);
  1824. })
  1825. .EndMap().EndMap();
  1826. BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
  1827. auto startOperation = [
  1828. operation=operation.Get(),
  1829. spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options),
  1830. preparer,
  1831. inputs,
  1832. output
  1833. ] () {
  1834. auto operationId = preparer->StartOperation(operation, "remote_copy", spec);
  1835. LogYPaths(operationId, inputs, "input");
  1836. LogYPath(operationId, output, "output");
  1837. return operationId;
  1838. };
  1839. operation->SetDelayedStartFunction(std::move(startOperation));
  1840. }
  1841. void ExecuteVanilla(
  1842. const TOperationPtr& operation,
  1843. const TOperationPreparerPtr& preparer,
  1844. const TVanillaOperationSpec& spec,
  1845. const TOperationOptions& options)
  1846. {
  1847. YT_LOG_DEBUG("Starting vanilla operation (PreparationId: %v)",
  1848. preparer->GetPreparationId());
  1849. auto addTask = [&](TFluentMap fluent, const TVanillaTask& task) {
  1850. Y_ABORT_UNLESS(task.Job_.Get());
  1851. if (std::holds_alternative<TVoidStructuredRowStream>(task.Job_->GetOutputRowStreamDescription())) {
  1852. Y_ENSURE_EX(task.Outputs_.empty(),
  1853. TApiUsageError() << "Vanilla task with void IVanillaJob doesn't expect output tables");
  1854. TJobPreparer jobPreparer(
  1855. *preparer,
  1856. task.Spec_,
  1857. *task.Job_,
  1858. /* outputTableCount */ 0,
  1859. /* smallFileList */ {},
  1860. options);
  1861. fluent
  1862. .Item(task.Name_).BeginMap()
  1863. .Item("job_count").Value(task.JobCount_)
  1864. .DoIf(task.NetworkProject_.Defined(), [&](TFluentMap fluent) {
  1865. fluent.Item("network_project").Value(*task.NetworkProject_);
  1866. })
  1867. .Do([&] (TFluentMap fluent) {
  1868. BuildUserJobFluently(
  1869. std::cref(jobPreparer),
  1870. /* inputFormat */ Nothing(),
  1871. /* outputFormat */ Nothing(),
  1872. fluent);
  1873. })
  1874. .EndMap();
  1875. } else {
  1876. auto operationIo = CreateSimpleOperationIo(
  1877. *task.Job_,
  1878. *preparer,
  1879. task,
  1880. options,
  1881. false);
  1882. Y_ENSURE_EX(operationIo.Outputs.size() > 0,
  1883. TApiUsageError() << "Vanilla task with IVanillaJob that has table writer expects output tables");
  1884. if (options.CreateOutputTables_) {
  1885. CreateOutputTables(*preparer, operationIo.Outputs);
  1886. }
  1887. TJobPreparer jobPreparer(
  1888. *preparer,
  1889. task.Spec_,
  1890. *task.Job_,
  1891. operationIo.Outputs.size(),
  1892. operationIo.JobFiles,
  1893. options);
  1894. fluent
  1895. .Item(task.Name_).BeginMap()
  1896. .Item("job_count").Value(task.JobCount_)
  1897. .DoIf(task.NetworkProject_.Defined(), [&](TFluentMap fluent) {
  1898. fluent.Item("network_project").Value(*task.NetworkProject_);
  1899. })
  1900. .Do([&] (TFluentMap fluent) {
  1901. BuildUserJobFluently(
  1902. std::cref(jobPreparer),
  1903. /* inputFormat */ Nothing(),
  1904. operationIo.OutputFormat,
  1905. fluent);
  1906. })
  1907. .Item("output_table_paths").List(operationIo.Outputs)
  1908. .Item("job_io").BeginMap()
  1909. .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&](TFluentMap fluent) {
  1910. fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
  1911. })
  1912. .Item("control_attributes").BeginMap()
  1913. .Item("enable_row_index").Value(TNode(true))
  1914. .Item("enable_range_index").Value(TNode(true))
  1915. .EndMap()
  1916. .EndMap()
  1917. .EndMap();
  1918. }
  1919. };
  1920. if (options.CreateDebugOutputTables_) {
  1921. CreateDebugOutputTables(spec, *preparer);
  1922. }
  1923. TNode specNode = BuildYsonNodeFluently()
  1924. .BeginMap().Item("spec").BeginMap()
  1925. .Item("tasks").DoMapFor(spec.Tasks_, addTask)
  1926. .EndMap().EndMap();
  1927. BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode["spec"]);
  1928. BuildCommonUserOperationPart(spec, &specNode["spec"]);
  1929. auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () {
  1930. auto operationId = preparer->StartOperation(operation, "vanilla", spec, /* useStartOperationRequest */ true);
  1931. return operationId;
  1932. };
  1933. operation->SetDelayedStartFunction(std::move(startOperation));
  1934. }
  1935. ////////////////////////////////////////////////////////////////////////////////
  1936. class TOperation::TOperationImpl
  1937. : public TThrRefBase
  1938. {
  1939. public:
  1940. TOperationImpl(
  1941. IClientRetryPolicyPtr clientRetryPolicy,
  1942. TClientContext context,
  1943. const TMaybe<TOperationId>& operationId = {})
  1944. : ClientRetryPolicy_(clientRetryPolicy)
  1945. , Context_(std::move(context))
  1946. , Id_(operationId)
  1947. , PreparedPromise_(::NThreading::NewPromise<void>())
  1948. , StartedPromise_(::NThreading::NewPromise<void>())
  1949. {
  1950. if (Id_) {
  1951. PreparedPromise_.SetValue();
  1952. StartedPromise_.SetValue();
  1953. } else {
  1954. PreparedPromise_.GetFuture().Subscribe([this_=::TIntrusivePtr(this)] (const ::NThreading::TFuture<void>& preparedResult) {
  1955. try {
  1956. preparedResult.GetValue();
  1957. } catch (...) {
  1958. this_->StartedPromise_.SetException(std::current_exception());
  1959. return;
  1960. }
  1961. });
  1962. }
  1963. }
  1964. const TOperationId& GetId() const;
  1965. TString GetWebInterfaceUrl() const;
  1966. void OnPrepared();
  1967. void SetDelayedStartFunction(std::function<TOperationId()> start);
  1968. void Start();
  1969. bool IsStarted() const;
  1970. void OnPreparationException(std::exception_ptr e);
  1971. TString GetStatus();
  1972. void OnStatusUpdated(const TString& newStatus);
  1973. ::NThreading::TFuture<void> GetPreparedFuture();
  1974. ::NThreading::TFuture<void> GetStartedFuture();
  1975. ::NThreading::TFuture<void> Watch(TClientPtr client);
  1976. EOperationBriefState GetBriefState();
  1977. TMaybe<TYtError> GetError();
  1978. TJobStatistics GetJobStatistics();
  1979. TMaybe<TOperationBriefProgress> GetBriefProgress();
  1980. void AbortOperation();
  1981. void CompleteOperation();
  1982. void SuspendOperation(const TSuspendOperationOptions& options);
  1983. void ResumeOperation(const TResumeOperationOptions& options);
  1984. TOperationAttributes GetAttributes(const TGetOperationOptions& options);
  1985. void UpdateParameters(const TUpdateOperationParametersOptions& options);
  1986. TJobAttributes GetJob(const TJobId& jobId, const TGetJobOptions& options);
  1987. TListJobsResult ListJobs(const TListJobsOptions& options);
  1988. void AsyncFinishOperation(TOperationAttributes operationAttributes);
  1989. void FinishWithException(std::exception_ptr exception);
  1990. void UpdateBriefProgress(TMaybe<TOperationBriefProgress> briefProgress);
  1991. void AnalyzeUnrecognizedSpec(TNode unrecognizedSpec);
  1992. const TClientContext& GetContext() const;
  1993. private:
  1994. void OnStarted(const TOperationId& operationId);
  1995. void UpdateAttributesAndCall(bool needJobStatistics, std::function<void(const TOperationAttributes&)> func);
  1996. void SyncFinishOperationImpl(const TOperationAttributes&);
  1997. static void* SyncFinishOperationProc(void* );
  1998. void ValidateOperationStarted() const;
  1999. private:
  2000. IClientRetryPolicyPtr ClientRetryPolicy_;
  2001. const TClientContext Context_;
  2002. TMaybe<TOperationId> Id_;
  2003. TMutex Lock_;
  2004. ::NThreading::TPromise<void> PreparedPromise_;
  2005. ::NThreading::TPromise<void> StartedPromise_;
  2006. TMaybe<::NThreading::TPromise<void>> CompletePromise_;
  2007. std::function<TOperationId()> DelayedStartFunction_;
  2008. TString Status_;
  2009. TOperationAttributes Attributes_;
  2010. };
  2011. ////////////////////////////////////////////////////////////////////////////////
  2012. class TOperationPollerItem
  2013. : public IYtPollerItem
  2014. {
  2015. public:
  2016. TOperationPollerItem(::TIntrusivePtr<TOperation::TOperationImpl> operationImpl)
  2017. : OperationImpl_(std::move(operationImpl))
  2018. { }
  2019. void PrepareRequest(TRawBatchRequest* batchRequest) override
  2020. {
  2021. auto filter = TOperationAttributeFilter()
  2022. .Add(EOperationAttribute::State)
  2023. .Add(EOperationAttribute::BriefProgress)
  2024. .Add(EOperationAttribute::Result);
  2025. if (!UnrecognizedSpecAnalyzed_) {
  2026. filter.Add(EOperationAttribute::UnrecognizedSpec);
  2027. }
  2028. OperationState_ = batchRequest->GetOperation(
  2029. OperationImpl_->GetId(),
  2030. TGetOperationOptions().AttributeFilter(filter));
  2031. }
  2032. EStatus OnRequestExecuted() override
  2033. {
  2034. try {
  2035. const auto& attributes = OperationState_.GetValue();
  2036. if (!UnrecognizedSpecAnalyzed_ && !attributes.UnrecognizedSpec.Empty()) {
  2037. OperationImpl_->AnalyzeUnrecognizedSpec(*attributes.UnrecognizedSpec);
  2038. UnrecognizedSpecAnalyzed_ = true;
  2039. }
  2040. Y_ABORT_UNLESS(attributes.BriefState,
  2041. "get_operation for operation %s has not returned \"state\" field",
  2042. GetGuidAsString(OperationImpl_->GetId()).data());
  2043. if (*attributes.BriefState != EOperationBriefState::InProgress) {
  2044. OperationImpl_->AsyncFinishOperation(attributes);
  2045. return PollBreak;
  2046. } else {
  2047. OperationImpl_->UpdateBriefProgress(attributes.BriefProgress);
  2048. }
  2049. } catch (const TErrorResponse& e) {
  2050. if (!IsRetriable(e)) {
  2051. OperationImpl_->FinishWithException(std::current_exception());
  2052. return PollBreak;
  2053. }
  2054. } catch (const std::exception& e) {
  2055. OperationImpl_->FinishWithException(std::current_exception());
  2056. return PollBreak;
  2057. }
  2058. return PollContinue;
  2059. }
  2060. void OnItemDiscarded() override {
  2061. OperationImpl_->FinishWithException(std::make_exception_ptr(yexception() << "Operation cancelled"));
  2062. }
  2063. private:
  2064. ::TIntrusivePtr<TOperation::TOperationImpl> OperationImpl_;
  2065. ::NThreading::TFuture<TOperationAttributes> OperationState_;
  2066. bool UnrecognizedSpecAnalyzed_ = false;
  2067. };
  2068. ////////////////////////////////////////////////////////////////////////////////
  2069. const TOperationId& TOperation::TOperationImpl::GetId() const
  2070. {
  2071. ValidateOperationStarted();
  2072. return *Id_;
  2073. }
  2074. TString TOperation::TOperationImpl::GetWebInterfaceUrl() const
  2075. {
  2076. ValidateOperationStarted();
  2077. return GetOperationWebInterfaceUrl(Context_.ServerName, *Id_);
  2078. }
  2079. void TOperation::TOperationImpl::OnPrepared()
  2080. {
  2081. Y_ABORT_UNLESS(!PreparedPromise_.HasException() && !PreparedPromise_.HasValue());
  2082. PreparedPromise_.SetValue();
  2083. }
  2084. void TOperation::TOperationImpl::SetDelayedStartFunction(std::function<TOperationId()> start)
  2085. {
  2086. DelayedStartFunction_ = std::move(start);
  2087. }
  2088. void TOperation::TOperationImpl::Start()
  2089. {
  2090. {
  2091. auto guard = Guard(Lock_);
  2092. if (Id_) {
  2093. ythrow TApiUsageError() << "Start() should not be called on running operations";
  2094. }
  2095. }
  2096. GetPreparedFuture().GetValueSync();
  2097. std::function<TOperationId()> startStuff;
  2098. {
  2099. auto guard = Guard(Lock_);
  2100. startStuff.swap(DelayedStartFunction_);
  2101. }
  2102. if (!startStuff) {
  2103. ythrow TApiUsageError() << "Seems that Start() was called multiple times. If not, contact yt@";
  2104. }
  2105. TOperationId operationId;
  2106. try {
  2107. operationId = startStuff();
  2108. } catch (...) {
  2109. auto exception = std::current_exception();
  2110. StartedPromise_.SetException(exception);
  2111. std::rethrow_exception(exception);
  2112. }
  2113. OnStarted(operationId);
  2114. }
  2115. bool TOperation::TOperationImpl::IsStarted() const {
  2116. auto guard = Guard(Lock_);
  2117. return bool(Id_);
  2118. }
  2119. void TOperation::TOperationImpl::OnPreparationException(std::exception_ptr e)
  2120. {
  2121. Y_ABORT_UNLESS(!PreparedPromise_.HasValue() && !PreparedPromise_.HasException());
  2122. PreparedPromise_.SetException(e);
  2123. }
  2124. TString TOperation::TOperationImpl::GetStatus()
  2125. {
  2126. {
  2127. auto guard = Guard(Lock_);
  2128. if (!Id_) {
  2129. return Status_;
  2130. }
  2131. }
  2132. TMaybe<TString> state;
  2133. UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
  2134. state = attributes.State;
  2135. });
  2136. return "On YT cluster: " + state.GetOrElse("undefined state");
  2137. }
  2138. void TOperation::TOperationImpl::OnStatusUpdated(const TString& newStatus)
  2139. {
  2140. auto guard = Guard(Lock_);
  2141. Status_ = newStatus;
  2142. }
  2143. ::NThreading::TFuture<void> TOperation::TOperationImpl::GetPreparedFuture()
  2144. {
  2145. return PreparedPromise_.GetFuture();
  2146. }
  2147. ::NThreading::TFuture<void> TOperation::TOperationImpl::GetStartedFuture()
  2148. {
  2149. return StartedPromise_.GetFuture();
  2150. }
  2151. ::NThreading::TFuture<void> TOperation::TOperationImpl::Watch(TClientPtr client)
  2152. {
  2153. {
  2154. auto guard = Guard(Lock_);
  2155. if (CompletePromise_) {
  2156. return *CompletePromise_;
  2157. }
  2158. CompletePromise_ = ::NThreading::NewPromise<void>();
  2159. }
  2160. GetStartedFuture().Subscribe([
  2161. this_=::TIntrusivePtr(this),
  2162. client=std::move(client)
  2163. ] (const ::NThreading::TFuture<void>& startedResult) {
  2164. try {
  2165. startedResult.GetValue();
  2166. } catch (...) {
  2167. this_->CompletePromise_->SetException(std::current_exception());
  2168. return;
  2169. }
  2170. client->GetYtPoller().Watch(::MakeIntrusive<TOperationPollerItem>(this_));
  2171. auto operationId = this_->GetId();
  2172. auto registry = TAbortableRegistry::Get();
  2173. registry->Add(
  2174. operationId,
  2175. ::MakeIntrusive<TOperationAbortable>(this_->ClientRetryPolicy_, this_->Context_, operationId));
  2176. // We have to own an IntrusivePtr to registry to prevent use-after-free
  2177. auto removeOperation = [registry, operationId] (const ::NThreading::TFuture<void>&) {
  2178. registry->Remove(operationId);
  2179. };
  2180. this_->CompletePromise_->GetFuture().Subscribe(removeOperation);
  2181. });
  2182. return *CompletePromise_;
  2183. }
  2184. EOperationBriefState TOperation::TOperationImpl::GetBriefState()
  2185. {
  2186. ValidateOperationStarted();
  2187. EOperationBriefState result = EOperationBriefState::InProgress;
  2188. UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
  2189. Y_ABORT_UNLESS(attributes.BriefState,
  2190. "get_operation for operation %s has not returned \"state\" field",
  2191. GetGuidAsString(*Id_).data());
  2192. result = *attributes.BriefState;
  2193. });
  2194. return result;
  2195. }
  2196. TMaybe<TYtError> TOperation::TOperationImpl::GetError()
  2197. {
  2198. ValidateOperationStarted();
  2199. TMaybe<TYtError> result;
  2200. UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
  2201. Y_ABORT_UNLESS(attributes.Result);
  2202. result = attributes.Result->Error;
  2203. });
  2204. return result;
  2205. }
  2206. TJobStatistics TOperation::TOperationImpl::GetJobStatistics()
  2207. {
  2208. ValidateOperationStarted();
  2209. TJobStatistics result;
  2210. UpdateAttributesAndCall(true, [&] (const TOperationAttributes& attributes) {
  2211. if (attributes.Progress) {
  2212. result = attributes.Progress->JobStatistics;
  2213. }
  2214. });
  2215. return result;
  2216. }
  2217. TMaybe<TOperationBriefProgress> TOperation::TOperationImpl::GetBriefProgress()
  2218. {
  2219. ValidateOperationStarted();
  2220. {
  2221. auto g = Guard(Lock_);
  2222. if (CompletePromise_.Defined()) {
  2223. // Poller do this job for us
  2224. return Attributes_.BriefProgress;
  2225. }
  2226. }
  2227. TMaybe<TOperationBriefProgress> result;
  2228. UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
  2229. result = attributes.BriefProgress;
  2230. });
  2231. return result;
  2232. }
  2233. void TOperation::TOperationImpl::UpdateBriefProgress(TMaybe<TOperationBriefProgress> briefProgress)
  2234. {
  2235. auto g = Guard(Lock_);
  2236. Attributes_.BriefProgress = std::move(briefProgress);
  2237. }
  2238. void TOperation::TOperationImpl::AnalyzeUnrecognizedSpec(TNode unrecognizedSpec)
  2239. {
  2240. static const TVector<TVector<TString>> knownUnrecognizedSpecFieldPaths = {
  2241. {"mapper", "class_name"},
  2242. {"reducer", "class_name"},
  2243. {"reduce_combiner", "class_name"},
  2244. };
  2245. auto removeByPath = [] (TNode& node, auto pathBegin, auto pathEnd, auto& removeByPath) {
  2246. if (pathBegin == pathEnd) {
  2247. return;
  2248. }
  2249. if (!node.IsMap()) {
  2250. return;
  2251. }
  2252. auto* child = node.AsMap().FindPtr(*pathBegin);
  2253. if (!child) {
  2254. return;
  2255. }
  2256. removeByPath(*child, std::next(pathBegin), pathEnd, removeByPath);
  2257. if (std::next(pathBegin) == pathEnd || (child->IsMap() && child->Empty())) {
  2258. node.AsMap().erase(*pathBegin);
  2259. }
  2260. };
  2261. Y_ABORT_UNLESS(unrecognizedSpec.IsMap());
  2262. for (const auto& knownFieldPath : knownUnrecognizedSpecFieldPaths) {
  2263. Y_ABORT_UNLESS(!knownFieldPath.empty());
  2264. removeByPath(unrecognizedSpec, knownFieldPath.cbegin(), knownFieldPath.cend(), removeByPath);
  2265. }
  2266. if (!unrecognizedSpec.Empty()) {
  2267. YT_LOG_INFO(
  2268. "WARNING! Unrecognized spec for operation %s is not empty "
  2269. "(fields added by the YT API library are excluded): %s",
  2270. GetGuidAsString(*Id_).data(),
  2271. NodeToYsonString(unrecognizedSpec).data());
  2272. }
  2273. }
  2274. void TOperation::TOperationImpl::OnStarted(const TOperationId& operationId)
  2275. {
  2276. auto guard = Guard(Lock_);
  2277. Y_ABORT_UNLESS(!Id_,
  2278. "OnStarted() called with operationId = %s for operation with id %s",
  2279. GetGuidAsString(operationId).data(),
  2280. GetGuidAsString(*Id_).data());
  2281. Id_ = operationId;
  2282. Y_ABORT_UNLESS(!StartedPromise_.HasValue() && !StartedPromise_.HasException());
  2283. StartedPromise_.SetValue();
  2284. }
  2285. void TOperation::TOperationImpl::UpdateAttributesAndCall(bool needJobStatistics, std::function<void(const TOperationAttributes&)> func)
  2286. {
  2287. {
  2288. auto g = Guard(Lock_);
  2289. if (Attributes_.BriefState
  2290. && *Attributes_.BriefState != EOperationBriefState::InProgress
  2291. && (!needJobStatistics || Attributes_.Progress))
  2292. {
  2293. func(Attributes_);
  2294. return;
  2295. }
  2296. }
  2297. TOperationAttributes attributes = NDetail::GetOperation(
  2298. ClientRetryPolicy_->CreatePolicyForGenericRequest(),
  2299. Context_,
  2300. *Id_,
  2301. TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
  2302. .Add(EOperationAttribute::Result)
  2303. .Add(EOperationAttribute::Progress)
  2304. .Add(EOperationAttribute::State)
  2305. .Add(EOperationAttribute::BriefProgress)));
  2306. func(attributes);
  2307. Y_ENSURE(attributes.BriefState);
  2308. if (*attributes.BriefState != EOperationBriefState::InProgress) {
  2309. auto g = Guard(Lock_);
  2310. Attributes_ = std::move(attributes);
  2311. }
  2312. }
  2313. void TOperation::TOperationImpl::FinishWithException(std::exception_ptr e)
  2314. {
  2315. CompletePromise_->SetException(std::move(e));
  2316. }
  2317. void TOperation::TOperationImpl::AbortOperation()
  2318. {
  2319. ValidateOperationStarted();
  2320. NYT::NDetail::AbortOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_);
  2321. }
  2322. void TOperation::TOperationImpl::CompleteOperation()
  2323. {
  2324. ValidateOperationStarted();
  2325. NYT::NDetail::CompleteOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_);
  2326. }
  2327. void TOperation::TOperationImpl::SuspendOperation(const TSuspendOperationOptions& options)
  2328. {
  2329. ValidateOperationStarted();
  2330. NYT::NDetail::SuspendOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
  2331. }
  2332. void TOperation::TOperationImpl::ResumeOperation(const TResumeOperationOptions& options)
  2333. {
  2334. ValidateOperationStarted();
  2335. NYT::NDetail::ResumeOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
  2336. }
  2337. TOperationAttributes TOperation::TOperationImpl::GetAttributes(const TGetOperationOptions& options)
  2338. {
  2339. ValidateOperationStarted();
  2340. return NYT::NDetail::GetOperation(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
  2341. }
  2342. void TOperation::TOperationImpl::UpdateParameters(const TUpdateOperationParametersOptions& options)
  2343. {
  2344. ValidateOperationStarted();
  2345. return NYT::NDetail::UpdateOperationParameters(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
  2346. }
  2347. TJobAttributes TOperation::TOperationImpl::GetJob(const TJobId& jobId, const TGetJobOptions& options)
  2348. {
  2349. ValidateOperationStarted();
  2350. return NYT::NDetail::GetJob(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, jobId, options);
  2351. }
  2352. TListJobsResult TOperation::TOperationImpl::ListJobs(const TListJobsOptions& options)
  2353. {
  2354. ValidateOperationStarted();
  2355. return NYT::NDetail::ListJobs(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, *Id_, options);
  2356. }
  2357. struct TAsyncFinishOperationsArgs
  2358. {
  2359. ::TIntrusivePtr<TOperation::TOperationImpl> OperationImpl;
  2360. TOperationAttributes OperationAttributes;
  2361. };
  2362. void TOperation::TOperationImpl::AsyncFinishOperation(TOperationAttributes operationAttributes)
  2363. {
  2364. auto args = new TAsyncFinishOperationsArgs;
  2365. args->OperationImpl = this;
  2366. args->OperationAttributes = std::move(operationAttributes);
  2367. TThread thread(TThread::TParams(&TOperation::TOperationImpl::SyncFinishOperationProc, args).SetName("finish operation"));
  2368. thread.Start();
  2369. thread.Detach();
  2370. }
  2371. void* TOperation::TOperationImpl::SyncFinishOperationProc(void* pArgs)
  2372. {
  2373. THolder<TAsyncFinishOperationsArgs> args(static_cast<TAsyncFinishOperationsArgs*>(pArgs));
  2374. args->OperationImpl->SyncFinishOperationImpl(args->OperationAttributes);
  2375. return nullptr;
  2376. }
  2377. void TOperation::TOperationImpl::SyncFinishOperationImpl(const TOperationAttributes& attributes)
  2378. {
  2379. {
  2380. auto guard = Guard(Lock_);
  2381. Y_ABORT_UNLESS(Id_);
  2382. }
  2383. Y_ABORT_UNLESS(attributes.BriefState,
  2384. "get_operation for operation %s has not returned \"state\" field",
  2385. GetGuidAsString(*Id_).data());
  2386. Y_ABORT_UNLESS(*attributes.BriefState != EOperationBriefState::InProgress);
  2387. {
  2388. try {
  2389. // `attributes' that came from poller don't have JobStatistics
  2390. // so we call `GetJobStatistics' in order to get it from server
  2391. // and cache inside object.
  2392. GetJobStatistics();
  2393. } catch (const TErrorResponse& ) {
  2394. // But if for any reason we failed to get attributes
  2395. // we complete operation using what we have.
  2396. auto g = Guard(Lock_);
  2397. Attributes_ = attributes;
  2398. }
  2399. }
  2400. if (*attributes.BriefState == EOperationBriefState::Completed) {
  2401. CompletePromise_->SetValue();
  2402. } else if (*attributes.BriefState == EOperationBriefState::Aborted || *attributes.BriefState == EOperationBriefState::Failed) {
  2403. Y_ABORT_UNLESS(attributes.Result && attributes.Result->Error);
  2404. const auto& error = *attributes.Result->Error;
  2405. YT_LOG_ERROR("Operation %v is `%v' with error: %v",
  2406. *Id_,
  2407. ToString(*attributes.BriefState),
  2408. error.FullDescription());
  2409. TString additionalExceptionText;
  2410. TVector<TFailedJobInfo> failedJobStderrInfo;
  2411. if (*attributes.BriefState == EOperationBriefState::Failed) {
  2412. try {
  2413. failedJobStderrInfo = NYT::NDetail::GetFailedJobInfo(ClientRetryPolicy_, Context_, *Id_, TGetFailedJobInfoOptions());
  2414. } catch (const std::exception& e) {
  2415. additionalExceptionText = "Cannot get job stderrs: ";
  2416. additionalExceptionText += e.what();
  2417. }
  2418. }
  2419. CompletePromise_->SetException(
  2420. std::make_exception_ptr(
  2421. TOperationFailedError(
  2422. *attributes.BriefState == EOperationBriefState::Failed
  2423. ? TOperationFailedError::Failed
  2424. : TOperationFailedError::Aborted,
  2425. *Id_,
  2426. error,
  2427. failedJobStderrInfo) << additionalExceptionText));
  2428. }
  2429. }
  2430. void TOperation::TOperationImpl::ValidateOperationStarted() const
  2431. {
  2432. auto guard = Guard(Lock_);
  2433. if (!Id_) {
  2434. ythrow TApiUsageError() << "Operation is not started";
  2435. }
  2436. }
  2437. const TClientContext& TOperation::TOperationImpl::GetContext() const
  2438. {
  2439. return Context_;
  2440. }
  2441. ////////////////////////////////////////////////////////////////////////////////
  2442. TOperation::TOperation(TClientPtr client)
  2443. : Client_(std::move(client))
  2444. , Impl_(::MakeIntrusive<TOperationImpl>(Client_->GetRetryPolicy(), Client_->GetContext()))
  2445. {
  2446. }
  2447. TOperation::TOperation(TOperationId id, TClientPtr client)
  2448. : Client_(std::move(client))
  2449. , Impl_(::MakeIntrusive<TOperationImpl>(Client_->GetRetryPolicy(), Client_->GetContext(), id))
  2450. {
  2451. }
  2452. const TOperationId& TOperation::GetId() const
  2453. {
  2454. return Impl_->GetId();
  2455. }
  2456. TString TOperation::GetWebInterfaceUrl() const
  2457. {
  2458. return Impl_->GetWebInterfaceUrl();
  2459. }
  2460. void TOperation::OnPrepared()
  2461. {
  2462. Impl_->OnPrepared();
  2463. }
  2464. void TOperation::SetDelayedStartFunction(std::function<TOperationId()> start)
  2465. {
  2466. Impl_->SetDelayedStartFunction(std::move(start));
  2467. }
  2468. void TOperation::Start()
  2469. {
  2470. Impl_->Start();
  2471. }
  2472. bool TOperation::IsStarted() const
  2473. {
  2474. return Impl_->IsStarted();
  2475. }
  2476. void TOperation::OnPreparationException(std::exception_ptr e)
  2477. {
  2478. Impl_->OnPreparationException(std::move(e));
  2479. }
  2480. TString TOperation::GetStatus() const
  2481. {
  2482. return Impl_->GetStatus();
  2483. }
  2484. void TOperation::OnStatusUpdated(const TString& newStatus)
  2485. {
  2486. Impl_->OnStatusUpdated(newStatus);
  2487. }
  2488. ::NThreading::TFuture<void> TOperation::GetPreparedFuture()
  2489. {
  2490. return Impl_->GetPreparedFuture();
  2491. }
  2492. ::NThreading::TFuture<void> TOperation::GetStartedFuture()
  2493. {
  2494. return Impl_->GetStartedFuture();
  2495. }
  2496. ::NThreading::TFuture<void> TOperation::Watch()
  2497. {
  2498. return Impl_->Watch(Client_);
  2499. }
  2500. TVector<TFailedJobInfo> TOperation::GetFailedJobInfo(const TGetFailedJobInfoOptions& options)
  2501. {
  2502. return NYT::NDetail::GetFailedJobInfo(Client_->GetRetryPolicy(), Client_->GetContext(), GetId(), options);
  2503. }
  2504. EOperationBriefState TOperation::GetBriefState()
  2505. {
  2506. return Impl_->GetBriefState();
  2507. }
  2508. TMaybe<TYtError> TOperation::GetError()
  2509. {
  2510. return Impl_->GetError();
  2511. }
  2512. TJobStatistics TOperation::GetJobStatistics()
  2513. {
  2514. return Impl_->GetJobStatistics();
  2515. }
  2516. TMaybe<TOperationBriefProgress> TOperation::GetBriefProgress()
  2517. {
  2518. return Impl_->GetBriefProgress();
  2519. }
  2520. void TOperation::AbortOperation()
  2521. {
  2522. Impl_->AbortOperation();
  2523. }
  2524. void TOperation::CompleteOperation()
  2525. {
  2526. Impl_->CompleteOperation();
  2527. }
  2528. void TOperation::SuspendOperation(const TSuspendOperationOptions& options)
  2529. {
  2530. Impl_->SuspendOperation(options);
  2531. }
  2532. void TOperation::ResumeOperation(const TResumeOperationOptions& options)
  2533. {
  2534. Impl_->ResumeOperation(options);
  2535. }
  2536. TOperationAttributes TOperation::GetAttributes(const TGetOperationOptions& options)
  2537. {
  2538. return Impl_->GetAttributes(options);
  2539. }
  2540. void TOperation::UpdateParameters(const TUpdateOperationParametersOptions& options)
  2541. {
  2542. Impl_->UpdateParameters(options);
  2543. }
  2544. TJobAttributes TOperation::GetJob(const TJobId& jobId, const TGetJobOptions& options)
  2545. {
  2546. return Impl_->GetJob(jobId, options);
  2547. }
  2548. TListJobsResult TOperation::ListJobs(const TListJobsOptions& options)
  2549. {
  2550. return Impl_->ListJobs(options);
  2551. }
  2552. ////////////////////////////////////////////////////////////////////////////////
  2553. struct TAsyncPrepareAndStartOperationArgs
  2554. {
  2555. std::function<void()> PrepareAndStart;
  2556. };
  2557. void* SyncPrepareAndStartOperation(void* pArgs)
  2558. {
  2559. THolder<TAsyncPrepareAndStartOperationArgs> args(static_cast<TAsyncPrepareAndStartOperationArgs*>(pArgs));
  2560. args->PrepareAndStart();
  2561. return nullptr;
  2562. }
  2563. ::TIntrusivePtr<TOperation> ProcessOperation(
  2564. NYT::NDetail::TClientPtr client,
  2565. std::function<void()> prepare,
  2566. ::TIntrusivePtr<TOperation> operation,
  2567. const TOperationOptions& options)
  2568. {
  2569. auto prepareAndStart = [prepare = std::move(prepare), operation, mode = options.StartOperationMode_] () {
  2570. try {
  2571. prepare();
  2572. operation->OnPrepared();
  2573. } catch (...) {
  2574. operation->OnPreparationException(std::current_exception());
  2575. }
  2576. if (mode >= TOperationOptions::EStartOperationMode::AsyncStart) {
  2577. try {
  2578. operation->Start();
  2579. } catch (...) { }
  2580. }
  2581. };
  2582. if (options.StartOperationMode_ >= TOperationOptions::EStartOperationMode::SyncStart) {
  2583. prepareAndStart();
  2584. WaitIfRequired(operation, client, options);
  2585. } else {
  2586. auto args = new TAsyncPrepareAndStartOperationArgs;
  2587. args->PrepareAndStart = std::move(prepareAndStart);
  2588. TThread thread(TThread::TParams(SyncPrepareAndStartOperation, args).SetName("prepare and start operation"));
  2589. thread.Start();
  2590. thread.Detach();
  2591. }
  2592. return operation;
  2593. }
  2594. void WaitIfRequired(const TOperationPtr& operation, const TClientPtr& client, const TOperationOptions& options)
  2595. {
  2596. auto retryPolicy = client->GetRetryPolicy();
  2597. auto context = client->GetContext();
  2598. if (options.StartOperationMode_ >= TOperationOptions::EStartOperationMode::SyncStart) {
  2599. operation->GetStartedFuture().GetValueSync();
  2600. }
  2601. if (options.StartOperationMode_ == TOperationOptions::EStartOperationMode::SyncWait) {
  2602. auto finishedFuture = operation->Watch();
  2603. TWaitProxy::Get()->WaitFuture(finishedFuture);
  2604. finishedFuture.GetValue();
  2605. if (context.Config->WriteStderrSuccessfulJobs) {
  2606. auto stderrs = GetJobsStderr(retryPolicy, context, operation->GetId());
  2607. for (const auto& jobStderr : stderrs) {
  2608. if (!jobStderr.empty()) {
  2609. Cerr << jobStderr << '\n';
  2610. }
  2611. }
  2612. }
  2613. }
  2614. }
  2615. ////////////////////////////////////////////////////////////////////////////////
  2616. void ResetUseClientProtobuf(const char* methodName)
  2617. {
  2618. Cerr << "WARNING! OPTION `TConfig::UseClientProtobuf' IS RESET TO `true'; "
  2619. << "IT CAN DETERIORATE YOUR CODE PERFORMANCE!!! DON'T USE DEPRECATED METHOD `"
  2620. << "TOperationIOSpec::" << methodName << "' TO AVOID THIS RESET" << Endl;
  2621. // Give users some time to contemplate about usage of deprecated functions.
  2622. Cerr << "Sleeping for 5 seconds..." << Endl;
  2623. Sleep(TDuration::Seconds(5));
  2624. TConfig::Get()->UseClientProtobuf = true;
  2625. }
  2626. } // namespace NDetail
  2627. ////////////////////////////////////////////////////////////////////////////////
  2628. ::TIntrusivePtr<INodeReaderImpl> CreateJobNodeReader(TRawTableReaderPtr rawTableReader)
  2629. {
  2630. if (auto schema = NDetail::GetJobInputSkiffSchema()) {
  2631. return new NDetail::TSkiffTableReader(rawTableReader, schema);
  2632. } else {
  2633. return new TNodeTableReader(rawTableReader);
  2634. }
  2635. }
  2636. ::TIntrusivePtr<IYaMRReaderImpl> CreateJobYaMRReader(TRawTableReaderPtr rawTableReader)
  2637. {
  2638. return new TYaMRTableReader(rawTableReader);
  2639. }
  2640. ::TIntrusivePtr<IProtoReaderImpl> CreateJobProtoReader(TRawTableReaderPtr rawTableReader)
  2641. {
  2642. if (TConfig::Get()->UseClientProtobuf) {
  2643. return new TProtoTableReader(
  2644. rawTableReader,
  2645. GetJobInputDescriptors());
  2646. } else {
  2647. return new TLenvalProtoTableReader(
  2648. rawTableReader,
  2649. GetJobInputDescriptors());
  2650. }
  2651. }
  2652. ::TIntrusivePtr<INodeWriterImpl> CreateJobNodeWriter(THolder<IProxyOutput> rawJobWriter)
  2653. {
  2654. return new TNodeTableWriter(std::move(rawJobWriter));
  2655. }
  2656. ::TIntrusivePtr<IYaMRWriterImpl> CreateJobYaMRWriter(THolder<IProxyOutput> rawJobWriter)
  2657. {
  2658. return new TYaMRTableWriter(std::move(rawJobWriter));
  2659. }
  2660. ::TIntrusivePtr<IProtoWriterImpl> CreateJobProtoWriter(THolder<IProxyOutput> rawJobWriter)
  2661. {
  2662. if (TConfig::Get()->UseClientProtobuf) {
  2663. return new TProtoTableWriter(
  2664. std::move(rawJobWriter),
  2665. GetJobOutputDescriptors());
  2666. } else {
  2667. return new TLenvalProtoTableWriter(
  2668. std::move(rawJobWriter),
  2669. GetJobOutputDescriptors());
  2670. }
  2671. }
  2672. ////////////////////////////////////////////////////////////////////////////////
  2673. } // namespace NYT