12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143 |
- #include "operation.h"
- #include "abortable_registry.h"
- #include "client.h"
- #include "operation_helpers.h"
- #include "operation_tracker.h"
- #include "prepare_operation.h"
- #include "skiff.h"
- #include "structured_table_formats.h"
- #include "yt_poller.h"
- #include <yt/cpp/mapreduce/common/helpers.h>
- #include <yt/cpp/mapreduce/common/retry_lib.h>
- #include <yt/cpp/mapreduce/common/wait_proxy.h>
- #include <yt/cpp/mapreduce/interface/config.h>
- #include <yt/cpp/mapreduce/interface/errors.h>
- #include <yt/cpp/mapreduce/interface/fluent.h>
- #include <yt/cpp/mapreduce/interface/format.h>
- #include <yt/cpp/mapreduce/interface/job_statistics.h>
- #include <yt/cpp/mapreduce/interface/protobuf_format.h>
- #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
- #include <yt/cpp/mapreduce/http/requests.h>
- #include <yt/cpp/mapreduce/http/retry_request.h>
- #include <yt/cpp/mapreduce/io/job_reader.h>
- #include <yt/cpp/mapreduce/io/job_writer.h>
- #include <yt/cpp/mapreduce/io/yamr_table_reader.h>
- #include <yt/cpp/mapreduce/io/yamr_table_writer.h>
- #include <yt/cpp/mapreduce/io/node_table_reader.h>
- #include <yt/cpp/mapreduce/io/node_table_writer.h>
- #include <yt/cpp/mapreduce/io/proto_table_reader.h>
- #include <yt/cpp/mapreduce/io/proto_table_writer.h>
- #include <yt/cpp/mapreduce/io/proto_helpers.h>
- #include <yt/cpp/mapreduce/io/skiff_table_reader.h>
- #include <yt/cpp/mapreduce/http_client/raw_requests.h>
- #include <library/cpp/yson/node/serialize.h>
- #include <util/generic/hash_set.h>
- #include <util/string/builder.h>
- #include <util/string/cast.h>
- #include <util/system/thread.h>
- #include <util/system/env.h>
- #include <util/system/fs.h>
- namespace NYT {
- namespace NDetail {
- using ::ToString;
- ////////////////////////////////////////////////////////////////////////////////
- static const ui64 DefaultExrtaTmpfsSize = 1024LL * 1024LL;
- ////////////////////////////////////////////////////////////////////////////////
- namespace {
- ////////////////////////////////////////////////////////////////////////////////
- struct TMapReduceOperationIo
- {
- TVector<TRichYPath> Inputs;
- TVector<TRichYPath> MapOutputs;
- TVector<TRichYPath> Outputs;
- TMaybe<TFormat> MapperInputFormat;
- TMaybe<TFormat> MapperOutputFormat;
- TMaybe<TFormat> ReduceCombinerInputFormat;
- TMaybe<TFormat> ReduceCombinerOutputFormat;
- TFormat ReducerInputFormat = TFormat::YsonBinary();
- TFormat ReducerOutputFormat = TFormat::YsonBinary();
- TVector<TSmallJobFile> MapperJobFiles;
- TVector<TSmallJobFile> ReduceCombinerJobFiles;
- TVector<TSmallJobFile> ReducerJobFiles;
- };
- template <typename T>
- void VerifyHasElements(const TVector<T>& paths, TStringBuf name)
- {
- if (paths.empty()) {
- ythrow TApiUsageError() << "no " << name << " table is specified";
- }
- }
- ////////////////////////////////////////////////////////////////////////////////
- TVector<TSmallJobFile> CreateFormatConfig(
- TMaybe<TSmallJobFile> inputConfig,
- const TMaybe<TSmallJobFile>& outputConfig)
- {
- TVector<TSmallJobFile> result;
- if (inputConfig) {
- result.push_back(std::move(*inputConfig));
- }
- if (outputConfig) {
- result.push_back(std::move(*outputConfig));
- }
- return result;
- }
- template <typename T>
- ENodeReaderFormat NodeReaderFormatFromHintAndGlobalConfig(const TUserJobFormatHintsBase<T>& formatHints)
- {
- auto result = TConfig::Get()->NodeReaderFormat;
- if (formatHints.InputFormatHints_ && formatHints.InputFormatHints_->SkipNullValuesForTNode_) {
- Y_ENSURE_EX(
- result != ENodeReaderFormat::Skiff,
- TApiUsageError() << "skiff format doesn't support SkipNullValuesForTNode format hint");
- result = ENodeReaderFormat::Yson;
- }
- return result;
- }
- template <class TSpec>
- const TVector<TStructuredTablePath>& GetStructuredInputs(const TSpec& spec)
- {
- if constexpr (std::is_same_v<TSpec, TVanillaTask>) {
- static const TVector<TStructuredTablePath> empty;
- return empty;
- } else {
- return spec.GetStructuredInputs();
- }
- }
- template <class TSpec>
- const TVector<TStructuredTablePath>& GetStructuredOutputs(const TSpec& spec)
- {
- return spec.GetStructuredOutputs();
- }
- template <class TSpec>
- const TMaybe<TFormatHints>& GetInputFormatHints(const TSpec& spec)
- {
- if constexpr (std::is_same_v<TSpec, TVanillaTask>) {
- static const TMaybe<TFormatHints> empty = Nothing();
- return empty;
- } else {
- return spec.InputFormatHints_;
- }
- }
- template <class TSpec>
- const TMaybe<TFormatHints>& GetOutputFormatHints(const TSpec& spec)
- {
- return spec.OutputFormatHints_;
- }
- template <class TSpec>
- ENodeReaderFormat GetNodeReaderFormat(const TSpec& spec, bool allowSkiff)
- {
- if constexpr (std::is_same<TSpec, TVanillaTask>::value) {
- return ENodeReaderFormat::Yson;
- } else {
- return allowSkiff
- ? NodeReaderFormatFromHintAndGlobalConfig(spec)
- : ENodeReaderFormat::Yson;
- }
- }
- static void SortColumnsToNames(const TSortColumns& sortColumns, THashSet<TString>* result)
- {
- auto names = sortColumns.GetNames();
- result->insert(names.begin(), names.end());
- }
- static THashSet<TString> SortColumnsToNames(const TSortColumns& sortColumns)
- {
- THashSet<TString> columnNames;
- SortColumnsToNames(sortColumns, &columnNames);
- return columnNames;
- }
- THashSet<TString> GetColumnsUsedInOperation(const TJoinReduceOperationSpec& spec)
- {
- return SortColumnsToNames(spec.JoinBy_);
- }
- THashSet<TString> GetColumnsUsedInOperation(const TReduceOperationSpec& spec) {
- auto result = SortColumnsToNames(spec.SortBy_);
- SortColumnsToNames(spec.ReduceBy_, &result);
- if (spec.JoinBy_) {
- SortColumnsToNames(*spec.JoinBy_, &result);
- }
- return result;
- }
- THashSet<TString> GetColumnsUsedInOperation(const TMapReduceOperationSpec& spec)
- {
- auto result = SortColumnsToNames(spec.SortBy_);
- SortColumnsToNames(spec.ReduceBy_, &result);
- return result;
- }
- THashSet<TString> GetColumnsUsedInOperation(const TMapOperationSpec&)
- {
- return THashSet<TString>();
- }
- THashSet<TString> GetColumnsUsedInOperation(const TVanillaTask&)
- {
- return THashSet<TString>();
- }
- TStructuredJobTableList ApplyProtobufColumnFilters(
- const TStructuredJobTableList& tableList,
- const TOperationPreparer& preparer,
- const THashSet<TString>& columnsUsedInOperations,
- const TOperationOptions& options)
- {
- bool hasInputQuery = options.Spec_.Defined() && options.Spec_->IsMap() && options.Spec_->HasKey("input_query");
- if (hasInputQuery) {
- return tableList;
- }
- auto isDynamic = NRawClient::BatchTransform(
- preparer.GetClient()->GetRawClient(),
- tableList,
- [&] (IRawBatchRequestPtr batch, const auto& table) {
- return batch->Get(preparer.GetTransactionId(), table.RichYPath->Path_ + "/@dynamic", TGetOptions());
- });
- auto newTableList = tableList;
- for (size_t tableIndex = 0; tableIndex < tableList.size(); ++tableIndex) {
- if (isDynamic[tableIndex].AsBool()) {
- continue;
- }
- auto& table = newTableList[tableIndex];
- Y_ABORT_UNLESS(table.RichYPath);
- if (table.RichYPath->Columns_) {
- continue;
- }
- if (!std::holds_alternative<TProtobufTableStructure>(table.Description)) {
- continue;
- }
- const auto& descriptor = std::get<TProtobufTableStructure>(table.Description).Descriptor;
- if (!descriptor) {
- continue;
- }
- auto fromDescriptor = NDetail::InferColumnFilter(*descriptor);
- if (!fromDescriptor) {
- continue;
- }
- THashSet<TString> columns(fromDescriptor->begin(), fromDescriptor->end());
- columns.insert(columnsUsedInOperations.begin(), columnsUsedInOperations.end());
- table.RichYPath->Columns(TVector<TString>(columns.begin(), columns.end()));
- }
- return newTableList;
- }
- template <class TSpec>
- TSimpleOperationIo CreateSimpleOperationIo(
- const IStructuredJob& structuredJob,
- const TOperationPreparer& preparer,
- const TSpec& spec,
- const TOperationOptions& options,
- bool allowSkiff)
- {
- if (!std::holds_alternative<TVoidStructuredRowStream>(structuredJob.GetInputRowStreamDescription())) {
- VerifyHasElements(GetStructuredInputs(spec), "input");
- }
- TUserJobFormatHints hints;
- hints.InputFormatHints_ = GetInputFormatHints(spec);
- hints.OutputFormatHints_ = GetOutputFormatHints(spec);
- ENodeReaderFormat nodeReaderFormat = GetNodeReaderFormat(spec, allowSkiff);
- return CreateSimpleOperationIoHelper(
- structuredJob,
- preparer,
- options,
- CanonizeStructuredTableList(preparer.GetClient()->GetRawClient(), GetStructuredInputs(spec)),
- CanonizeStructuredTableList(preparer.GetClient()->GetRawClient(), GetStructuredOutputs(spec)),
- hints,
- nodeReaderFormat,
- GetColumnsUsedInOperation(spec));
- }
- template <class T>
- TSimpleOperationIo CreateSimpleOperationIo(
- const IJob& job,
- const TOperationPreparer& preparer,
- const TSimpleRawOperationIoSpec<T>& spec)
- {
- auto getFormatOrDefault = [&] (const TMaybe<TFormat>& maybeFormat, const char* formatName) {
- if (maybeFormat) {
- return *maybeFormat;
- } else if (spec.Format_) {
- return *spec.Format_;
- } else {
- ythrow TApiUsageError() << "Neither " << formatName << "format nor default format is specified for raw operation";
- }
- };
- auto inputs = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), spec.GetInputs());
- auto outputs = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), spec.GetOutputs());
- VerifyHasElements(inputs, "input");
- VerifyHasElements(outputs, "output");
- TUserJobFormatHints hints;
- auto outputSchemas = PrepareOperation(
- job,
- TOperationPreparationContext(
- inputs,
- outputs,
- preparer.GetClient()->GetRawClient(),
- preparer.GetClientRetryPolicy(),
- preparer.GetTransactionId()),
- &inputs,
- &outputs,
- hints);
- Y_ABORT_UNLESS(outputs.size() == outputSchemas.size());
- for (int i = 0; i < static_cast<int>(outputs.size()); ++i) {
- if (!outputs[i].Schema_ && !outputSchemas[i].Columns().empty()) {
- outputs[i].Schema_ = outputSchemas[i];
- }
- }
- return TSimpleOperationIo {
- inputs,
- outputs,
- getFormatOrDefault(spec.InputFormat_, "input"),
- getFormatOrDefault(spec.OutputFormat_, "output"),
- TVector<TSmallJobFile>{},
- };
- }
- ////////////////////////////////////////////////////////////////////////////////
- TString GetJobStderrWithRetriesAndIgnoreErrors(
- const IRequestRetryPolicyPtr& retryPolicy,
- const IRawClientPtr& rawClient,
- const TOperationId& operationId,
- const TJobId& jobId,
- const size_t stderrTailSize,
- const TGetJobStderrOptions& options = {})
- {
- TString jobStderr;
- try {
- jobStderr = RequestWithRetry<TString>(
- retryPolicy,
- [&rawClient, &operationId, &jobId, &options] (TMutationId /*mutationId*/) {
- return rawClient->GetJobStderr(operationId, jobId, options)->ReadAll();
- });
- } catch (const TErrorResponse& e) {
- YT_LOG_ERROR("Cannot get job stderr (OperationId: %v, JobId: %v, Error: %v)",
- operationId,
- jobId,
- e.what());
- }
- if (jobStderr.size() > stderrTailSize) {
- jobStderr = jobStderr.substr(jobStderr.size() - stderrTailSize, stderrTailSize);
- }
- return jobStderr;
- }
- TVector<TFailedJobInfo> GetFailedJobInfo(
- const IClientRetryPolicyPtr& clientRetryPolicy,
- const IRawClientPtr& rawClient,
- const TOperationId& operationId,
- const TGetFailedJobInfoOptions& options)
- {
- const auto listJobsResult = RequestWithRetry<TListJobsResult>(
- clientRetryPolicy->CreatePolicyForGenericRequest(),
- [&rawClient, &operationId, &options] (TMutationId /*mutationId*/) {
- return rawClient->ListJobs(
- operationId,
- TListJobsOptions()
- .State(EJobState::Failed)
- .Limit(options.MaxJobCount_));
- });
- const auto stderrTailSize = options.StderrTailSize_;
- TVector<TFailedJobInfo> result;
- for (const auto& job : listJobsResult.Jobs) {
- auto& info = result.emplace_back();
- Y_ENSURE(job.Id);
- info.JobId = *job.Id;
- info.Error = job.Error.GetOrElse(TYtError(TString("unknown error")));
- if (job.StderrSize.GetOrElse(0) != 0) {
- // There are cases when due to bad luck we cannot read stderr even if
- // list_jobs reports that stderr_size > 0.
- //
- // Such errors don't have special error code
- // so we ignore all errors and try our luck on other jobs.
- info.Stderr = GetJobStderrWithRetriesAndIgnoreErrors(
- clientRetryPolicy->CreatePolicyForGenericRequest(),
- rawClient,
- operationId,
- *job.Id,
- stderrTailSize);
- }
- }
- return result;
- }
- struct TGetJobsStderrOptions
- {
- using TSelf = TGetJobsStderrOptions;
- // How many jobs to download. Which jobs will be chosen is undefined.
- FLUENT_FIELD_DEFAULT(ui64, MaxJobCount, 10);
- // How much of stderr should be downloaded.
- FLUENT_FIELD_DEFAULT(ui64, StderrTailSize, 64 * 1024);
- };
- static TVector<TString> GetJobsStderr(
- const IClientRetryPolicyPtr& clientRetryPolicy,
- const IRawClientPtr& rawClient,
- const TOperationId& operationId,
- const TGetJobsStderrOptions& options = {})
- {
- const auto listJobsResult = RequestWithRetry<TListJobsResult>(
- clientRetryPolicy->CreatePolicyForGenericRequest(),
- [&rawClient, &operationId, &options] (TMutationId /*mutationId*/) {
- return rawClient->ListJobs(
- operationId,
- TListJobsOptions()
- .Limit(options.MaxJobCount_)
- .WithStderr(true));
- });
- const auto stderrTailSize = options.StderrTailSize_;
- TVector<TString> result;
- for (const auto& job : listJobsResult.Jobs) {
- result.push_back(
- // There are cases when due to bad luck we cannot read stderr even if
- // list_jobs reports that stderr_size > 0.
- //
- // Such errors don't have special error code
- // so we ignore all errors and try our luck on other jobs.
- GetJobStderrWithRetriesAndIgnoreErrors(
- clientRetryPolicy->CreatePolicyForGenericRequest(),
- rawClient,
- operationId,
- *job.Id,
- stderrTailSize)
- );
- }
- return result;
- }
- int CountIntermediateTables(const TStructuredJobTableList& tables)
- {
- int result = 0;
- for (const auto& table : tables) {
- if (table.RichYPath) {
- break;
- }
- ++result;
- }
- return result;
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace
- ////////////////////////////////////////////////////////////////////////////////
- TSimpleOperationIo CreateSimpleOperationIoHelper(
- const IStructuredJob& structuredJob,
- const TOperationPreparer& preparer,
- const TOperationOptions& options,
- TStructuredJobTableList structuredInputs,
- TStructuredJobTableList structuredOutputs,
- TUserJobFormatHints hints,
- ENodeReaderFormat nodeReaderFormat,
- const THashSet<TString>& columnsUsedInOperations)
- {
- auto intermediateInputTableCount = CountIntermediateTables(structuredInputs);
- auto intermediateOutputTableCount = CountIntermediateTables(structuredOutputs);
- auto jobSchemaInferenceResult = PrepareOperation(
- structuredJob,
- TOperationPreparationContext(
- structuredInputs,
- structuredOutputs,
- preparer.GetClient()->GetRawClient(),
- preparer.GetClientRetryPolicy(),
- preparer.GetTransactionId()),
- &structuredInputs,
- &structuredOutputs,
- hints);
- TVector<TSmallJobFile> formatConfigList;
- TFormatBuilder formatBuilder(
- preparer.GetClient()->GetRawClient(),
- preparer.GetClientRetryPolicy(),
- preparer.GetContext(),
- preparer.GetTransactionId(),
- options);
- auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat(
- structuredJob,
- EIODirection::Input,
- structuredInputs,
- hints.InputFormatHints_,
- nodeReaderFormat,
- /* allowFormatFromTableAttribute = */ true);
- auto [outputFormat, outputFormatConfig] = formatBuilder.CreateFormat(
- structuredJob,
- EIODirection::Output,
- structuredOutputs,
- hints.OutputFormatHints_,
- ENodeReaderFormat::Yson,
- /* allowFormatFromTableAttribute = */ false);
- const bool inferOutputSchema = options.InferOutputSchema_.GetOrElse(preparer.GetContext().Config->InferTableSchema);
- auto outputPaths = GetPathList(
- TStructuredJobTableList(structuredOutputs.begin() + intermediateOutputTableCount, structuredOutputs.end()),
- TVector<TTableSchema>(jobSchemaInferenceResult.begin() + intermediateOutputTableCount, jobSchemaInferenceResult.end()),
- inferOutputSchema);
- auto inputPaths = GetPathList(
- ApplyProtobufColumnFilters(
- TStructuredJobTableList(structuredInputs.begin() + intermediateInputTableCount, structuredInputs.end()),
- preparer,
- columnsUsedInOperations,
- options),
- /*schemaInferenceResult*/ Nothing(),
- /*inferSchema*/ false);
- return TSimpleOperationIo {
- inputPaths,
- outputPaths,
- inputFormat,
- outputFormat,
- CreateFormatConfig(inputFormatConfig, outputFormatConfig)
- };
- }
- EOperationBriefState CheckOperation(
- const IRawClientPtr& rawClient,
- const IClientRetryPolicyPtr& clientRetryPolicy,
- const TOperationId& operationId)
- {
- auto attributes = RequestWithRetry<TOperationAttributes>(
- clientRetryPolicy->CreatePolicyForGenericRequest(),
- [&rawClient, &operationId] (TMutationId /*mutationId*/) {
- return rawClient->GetOperation(
- operationId,
- TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
- .Add(EOperationAttribute::State)
- .Add(EOperationAttribute::Result)));
- });
- Y_ABORT_UNLESS(attributes.BriefState,
- "get_operation for operation %s has not returned \"state\" field",
- GetGuidAsString(operationId).data());
- if (*attributes.BriefState == EOperationBriefState::Completed) {
- return EOperationBriefState::Completed;
- } else if (*attributes.BriefState == EOperationBriefState::Aborted || *attributes.BriefState == EOperationBriefState::Failed) {
- YT_LOG_ERROR("Operation %v %v (%v)",
- operationId,
- ToString(*attributes.BriefState),
- ToString(TOperationExecutionTimeTracker::Get()->Finish(operationId)));
- auto failedJobInfoList = GetFailedJobInfo(
- clientRetryPolicy,
- rawClient,
- operationId,
- TGetFailedJobInfoOptions());
- Y_ABORT_UNLESS(attributes.Result && attributes.Result->Error);
- ythrow TOperationFailedError(
- *attributes.BriefState == EOperationBriefState::Aborted
- ? TOperationFailedError::Aborted
- : TOperationFailedError::Failed,
- operationId,
- *attributes.Result->Error,
- failedJobInfoList);
- }
- return EOperationBriefState::InProgress;
- }
- void WaitForOperation(
- const IClientRetryPolicyPtr& clientRetryPolicy,
- const IRawClientPtr& rawClient,
- const TClientContext& context,
- const TOperationId& operationId)
- {
- const TDuration checkOperationStateInterval =
- UseLocalModeOptimization(rawClient, context, clientRetryPolicy)
- ? Min(TDuration::MilliSeconds(100), context.Config->OperationTrackerPollPeriod)
- : context.Config->OperationTrackerPollPeriod;
- while (true) {
- auto status = CheckOperation(rawClient, clientRetryPolicy, operationId);
- if (status == EOperationBriefState::Completed) {
- YT_LOG_INFO("Operation %v completed (%v)",
- operationId,
- TOperationExecutionTimeTracker::Get()->Finish(operationId));
- break;
- }
- TWaitProxy::Get()->Sleep(checkOperationStateInterval);
- }
- }
- ////////////////////////////////////////////////////////////////////////////////
- namespace {
- TNode BuildAutoMergeSpec(const TAutoMergeSpec& options)
- {
- TNode result;
- if (options.Mode_) {
- result["mode"] = ToString(*options.Mode_);
- }
- if (options.MaxIntermediateChunkCount_) {
- result["max_intermediate_chunk_count"] = *options.MaxIntermediateChunkCount_;
- }
- if (options.ChunkCountPerMergeJob_) {
- result["chunk_count_per_merge_job"] = *options.ChunkCountPerMergeJob_;
- }
- if (options.ChunkSizeThreshold_) {
- result["chunk_size_threshold"] = *options.ChunkSizeThreshold_;
- }
- return result;
- }
- [[maybe_unused]] TNode BuildJobProfilerSpec(const TJobProfilerSpec& profilerSpec)
- {
- TNode result;
- if (profilerSpec.ProfilingBinary_) {
- result["binary"] = ToString(*profilerSpec.ProfilingBinary_);
- }
- if (profilerSpec.ProfilerType_) {
- result["type"] = ToString(*profilerSpec.ProfilerType_);
- }
- if (profilerSpec.ProfilingProbability_) {
- result["profiling_probability"] = *profilerSpec.ProfilingProbability_;
- }
- if (profilerSpec.SamplingFrequency_) {
- result["sampling_frequency"] = *profilerSpec.SamplingFrequency_;
- }
- return result;
- }
- // Returns undefined node if resources doesn't contain any meaningful field
- TNode BuildSchedulerResourcesSpec(const TSchedulerResources& resources)
- {
- TNode result;
- if (resources.UserSlots().Defined()) {
- result["user_slots"] = *resources.UserSlots();
- }
- if (resources.Cpu().Defined()) {
- result["cpu"] = *resources.Cpu();
- }
- if (resources.Memory().Defined()) {
- result["memory"] = *resources.Memory();
- }
- return result;
- }
- void BuildUserJobFluently(
- const TJobPreparer& preparer,
- const TMaybe<TFormat>& inputFormat,
- const TMaybe<TFormat>& outputFormat,
- TFluentMap fluent)
- {
- const auto& userJobSpec = preparer.GetSpec();
- TMaybe<i64> memoryLimit = userJobSpec.MemoryLimit_;
- TMaybe<double> cpuLimit = userJobSpec.CpuLimit_;
- TMaybe<ui16> portCount = userJobSpec.PortCount_;
- // Use 1MB extra tmpfs size by default, it helps to detect job sandbox as tmp directory
- // for standard python libraries. See YTADMINREQ-14505 for more details.
- auto tmpfsSize = preparer.GetSpec().ExtraTmpfsSize_.GetOrElse(DefaultExrtaTmpfsSize);
- if (preparer.ShouldMountSandbox()) {
- tmpfsSize += preparer.GetTotalFileSize();
- if (tmpfsSize == 0) {
- // This can be a case for example when it is local mode and we don't upload binary.
- // NOTE: YT doesn't like zero tmpfs size.
- tmpfsSize = RoundUpFileSize(1);
- }
- memoryLimit = memoryLimit.GetOrElse(512ll << 20) + tmpfsSize;
- }
- fluent
- .Item("file_paths").List(preparer.GetFiles())
- .DoIf(!preparer.GetLayers().empty(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("layer_paths").List(preparer.GetLayers());
- })
- .DoIf(userJobSpec.DockerImage_.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("docker_image").Value(*userJobSpec.DockerImage_);
- })
- .Item("command").Value(preparer.GetCommand())
- .Item("class_name").Value(preparer.GetClassName())
- .DoIf(!userJobSpec.Environment_.empty(), [&] (TFluentMap fluentMap) {
- TNode environment;
- for (const auto& item : userJobSpec.Environment_) {
- environment[item.first] = item.second;
- }
- fluentMap.Item("environment").Value(environment);
- })
- .DoIf(userJobSpec.DiskSpaceLimit_.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("disk_space_limit").Value(*userJobSpec.DiskSpaceLimit_);
- })
- .DoIf(inputFormat.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("input_format").Value(inputFormat->Config);
- })
- .DoIf(outputFormat.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("output_format").Value(outputFormat->Config);
- })
- .DoIf(memoryLimit.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("memory_limit").Value(*memoryLimit);
- })
- .DoIf(userJobSpec.MemoryReserveFactor_.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("memory_reserve_factor").Value(*userJobSpec.MemoryReserveFactor_);
- })
- .DoIf(cpuLimit.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("cpu_limit").Value(*cpuLimit);
- })
- .DoIf(portCount.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("port_count").Value(*portCount);
- })
- .DoIf(userJobSpec.JobTimeLimit_.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("job_time_limit").Value(userJobSpec.JobTimeLimit_->MilliSeconds());
- })
- .DoIf(userJobSpec.DiskRequest_.Defined(), [&] (TFluentMap fluentMap) {
- const auto& diskRequest = *userJobSpec.DiskRequest_;
- TNode diskRequestNode = TNode::CreateMap();
- if (diskRequest.DiskSpace_.Defined()) {
- diskRequestNode["disk_space"] = *diskRequest.DiskSpace_;
- }
- if (diskRequest.InodeCount_.Defined()) {
- diskRequestNode["inode_count"] = *diskRequest.InodeCount_;
- }
- if (diskRequest.Account_.Defined()) {
- diskRequestNode["account"] = *diskRequest.Account_;
- }
- if (diskRequest.MediumName_.Defined()) {
- diskRequestNode["medium_name"] = *diskRequest.MediumName_;
- }
- fluentMap.Item("disk_request").Value(diskRequestNode);
- })
- .DoIf(userJobSpec.NetworkProject_.Defined(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("network_project").Value(*userJobSpec.NetworkProject_);
- })
- .DoIf(preparer.ShouldMountSandbox(), [&] (TFluentMap fluentMap) {
- fluentMap.Item("tmpfs_path").Value(".");
- fluentMap.Item("tmpfs_size").Value(tmpfsSize);
- fluentMap.Item("copy_files").Value(true);
- })
- .Item("profilers")
- .BeginList()
- .DoFor(userJobSpec.JobProfilers_, [&] (TFluentList list, const auto& jobProfiler) {
- list.Item().Value(BuildJobProfilerSpec(jobProfiler));
- })
- .EndList()
- .Item("redirect_stdout_to_stderr").Value(preparer.ShouldRedirectStdoutToStderr());
- }
- struct TNirvanaContext
- {
- TNode BlockUrl;
- TNode Annotations;
- };
- // Try to detect if we are inside nirvana operation and reat nirvana job context.
- // Items of TNirvanaContext might be Undefined, if we are not inside nirvana context (or if nirvana context is unexpected)
- TNirvanaContext GetNirvanaContext()
- {
- static const auto filePath = TString("/slot/sandbox/j/job_context.json");
- auto nvYtOperationId = GetEnv("NV_YT_OPERATION_ID");
- if (nvYtOperationId.empty()) {
- return {};
- }
- if (!NFs::Exists(filePath)) {
- return {};
- }
- NJson::TJsonValue json;
- try {
- auto inf = TIFStream(filePath);
- json = NJson::ReadJsonTree(&inf, /*throwOnError*/ true);
- } catch (const std::exception& ex) {
- YT_LOG_ERROR("Failed to load nirvana job context: %v", ex.what());
- return {};
- }
- TNirvanaContext result;
- const auto* url = json.GetValueByPath("meta.blockURL");
- if (url && url->IsString()) {
- result.BlockUrl = url->GetString();
- result.BlockUrl.Attributes()["_type_tag"] = "url";
- }
- const auto* annotations = json.GetValueByPath("meta.annotations");
- if (annotations && annotations->IsMap()) {
- result.Annotations = NodeFromJsonValue(*annotations);
- }
- return result;
- }
- template <typename T>
- void BuildCommonOperationPart(
- const TConfigPtr& config,
- const TOperationSpecBase<T>& baseSpec,
- const TOperationOptions& options,
- TNode* specNode)
- {
- const TProcessState* properties = TProcessState::Get();
- auto& startedBySpec = (*specNode)["started_by"];
- startedBySpec["hostname"] = properties->FqdnHostName,
- startedBySpec["pid"] = properties->Pid;
- startedBySpec["user"] = properties->UserName;
- startedBySpec["wrapper_version"] = properties->ClientVersion;
- startedBySpec["binary"] = properties->BinaryPath;
- startedBySpec["binary_name"] = properties->BinaryName;
- auto nirvanaContext = GetNirvanaContext();
- if (!nirvanaContext.BlockUrl.IsUndefined()) {
- startedBySpec["nirvana_block_url"] = nirvanaContext.BlockUrl;
- }
- if (!nirvanaContext.Annotations.IsUndefined()) {
- MergeNodes((*specNode)["annotations"], nirvanaContext.Annotations);
- }
- TString pool;
- if (baseSpec.Pool_) {
- pool = *baseSpec.Pool_;
- } else {
- pool = config->Pool;
- }
- if (!pool.empty()) {
- (*specNode)["pool"] = pool;
- }
- if (baseSpec.Weight_.Defined()) {
- (*specNode)["weight"] = *baseSpec.Weight_;
- }
- if (baseSpec.TimeLimit_.Defined()) {
- (*specNode)["time_limit"] = baseSpec.TimeLimit_->MilliSeconds();
- }
- if (baseSpec.PoolTrees().Defined()) {
- TNode poolTreesSpec = TNode::CreateList();
- for (const auto& tree : *baseSpec.PoolTrees()) {
- poolTreesSpec.Add(tree);
- }
- (*specNode)["pool_trees"] = std::move(poolTreesSpec);
- }
- if (baseSpec.ResourceLimits().Defined()) {
- auto resourceLimitsSpec = BuildSchedulerResourcesSpec(*baseSpec.ResourceLimits());
- if (!resourceLimitsSpec.IsUndefined()) {
- (*specNode)["resource_limits"] = std::move(resourceLimitsSpec);
- }
- }
- if (options.SecureVault_.Defined()) {
- Y_ENSURE(options.SecureVault_->IsMap(),
- "SecureVault must be a map node, got " << options.SecureVault_->GetType());
- (*specNode)["secure_vault"] = *options.SecureVault_;
- }
- if (baseSpec.Title_.Defined()) {
- (*specNode)["title"] = *baseSpec.Title_;
- }
- if (baseSpec.MaxFailedJobCount_.Defined()) {
- (*specNode)["max_failed_job_count"] = *baseSpec.MaxFailedJobCount_;
- }
- }
- template <typename TSpec>
- void BuildCommonUserOperationPart(const TSpec& baseSpec, TNode* spec)
- {
- if (baseSpec.FailOnJobRestart_.Defined()) {
- (*spec)["fail_on_job_restart"] = *baseSpec.FailOnJobRestart_;
- }
- if (baseSpec.StderrTablePath_.Defined()) {
- (*spec)["stderr_table_path"] = *baseSpec.StderrTablePath_;
- }
- if (baseSpec.CoreTablePath_.Defined()) {
- (*spec)["core_table_path"] = *baseSpec.CoreTablePath_;
- }
- if (baseSpec.WaitingJobTimeout_.Defined()) {
- (*spec)["waiting_job_timeout"] = baseSpec.WaitingJobTimeout_->MilliSeconds();
- }
- }
- template <typename TSpec>
- void BuildJobCountOperationPart(const TSpec& spec, TNode* nodeSpec)
- {
- if (spec.JobCount_.Defined()) {
- (*nodeSpec)["job_count"] = *spec.JobCount_;
- }
- if (spec.DataSizePerJob_.Defined()) {
- (*nodeSpec)["data_size_per_job"] = *spec.DataSizePerJob_;
- }
- }
- template <typename TSpec>
- void BuildPartitionCountOperationPart(const TSpec& spec, TNode* nodeSpec)
- {
- if (spec.PartitionCount_.Defined()) {
- (*nodeSpec)["partition_count"] = *spec.PartitionCount_;
- }
- if (spec.PartitionDataSize_.Defined()) {
- (*nodeSpec)["partition_data_size"] = *spec.PartitionDataSize_;
- }
- }
- template <typename TSpec>
- void BuildDataSizePerSortJobPart(const TSpec& spec, TNode* nodeSpec)
- {
- if (spec.DataSizePerSortJob_.Defined()) {
- (*nodeSpec)["data_size_per_sort_job"] = *spec.DataSizePerSortJob_;
- }
- }
- template <typename TSpec>
- void BuildPartitionJobCountOperationPart(const TSpec& spec, TNode* nodeSpec)
- {
- if (spec.PartitionJobCount_.Defined()) {
- (*nodeSpec)["partition_job_count"] = *spec.PartitionJobCount_;
- }
- if (spec.DataSizePerPartitionJob_.Defined()) {
- (*nodeSpec)["data_size_per_partition_job"] = *spec.DataSizePerPartitionJob_;
- }
- }
- template <typename TSpec>
- void BuildMapJobCountOperationPart(const TSpec& spec, TNode* nodeSpec)
- {
- if (spec.MapJobCount_.Defined()) {
- (*nodeSpec)["map_job_count"] = *spec.MapJobCount_;
- }
- if (spec.DataSizePerMapJob_.Defined()) {
- (*nodeSpec)["data_size_per_map_job"] = *spec.DataSizePerMapJob_;
- }
- }
- template <typename TSpec>
- void BuildIntermediateDataPart(const TSpec& spec, TNode* nodeSpec)
- {
- if (spec.IntermediateDataAccount_.Defined()) {
- (*nodeSpec)["intermediate_data_account"] = *spec.IntermediateDataAccount_;
- }
- if (spec.IntermediateDataReplicationFactor_.Defined()) {
- (*nodeSpec)["intermediate_data_replication_factor"] = *spec.IntermediateDataReplicationFactor_;
- }
- }
- ////////////////////////////////////////////////////////////////////////////////
- TNode MergeSpec(TNode dst, TNode spec, const TOperationOptions& options)
- {
- MergeNodes(dst, spec);
- if (options.Spec_) {
- MergeNodes(dst, *options.Spec_);
- }
- return dst;
- }
- template <typename TSpec>
- void CreateDebugOutputTables(const TSpec& spec, const TOperationPreparer& preparer)
- {
- if (spec.StderrTablePath_.Defined()) {
- RequestWithRetry<void>(
- preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- [&spec, &preparer] (TMutationId& mutationId) {
- preparer.GetClient()->GetRawClient()->Create(
- mutationId,
- TTransactionId(),
- *spec.StderrTablePath_,
- NT_TABLE,
- TCreateOptions()
- .IgnoreExisting(true)
- .Recursive(true));
- });
- }
- if (spec.CoreTablePath_.Defined()) {
- RequestWithRetry<void>(
- preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- [&spec, &preparer] (TMutationId& mutationId) {
- preparer.GetClient()->GetRawClient()->Create(
- mutationId,
- TTransactionId(),
- *spec.CoreTablePath_,
- NT_TABLE,
- TCreateOptions()
- .IgnoreExisting(true)
- .Recursive(true));
- });
- }
- }
- void CreateOutputTable(
- const TOperationPreparer& preparer,
- const TRichYPath& path)
- {
- Y_ENSURE(path.Path_, "Output table is not set");
- if (!path.Create_.Defined()) {
- // If `create` attribute is defined
- RequestWithRetry<void>(
- preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- [&preparer, &path] (TMutationId& mutationId) {
- preparer.GetClient()->GetRawClient()->Create(
- mutationId,
- preparer.GetTransactionId(),
- path.Path_,
- NT_TABLE,
- TCreateOptions()
- .IgnoreExisting(true)
- .Recursive(true));
- });
- }
- }
- void CreateOutputTables(
- const TOperationPreparer& preparer,
- const TVector<TRichYPath>& paths)
- {
- for (auto& path : paths) {
- CreateOutputTable(preparer, path);
- }
- }
- void CheckInputTablesExist(
- const TOperationPreparer& preparer,
- const TVector<TRichYPath>& paths)
- {
- Y_ENSURE(!paths.empty(), "Input tables are not set");
- for (auto& path : paths) {
- auto curTransactionId = path.TransactionId_.GetOrElse(preparer.GetTransactionId());
- auto exists = RequestWithRetry<bool>(
- preparer.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
- [&preparer, &curTransactionId, &path] (TMutationId /*mutationId*/) {
- return preparer.GetClient()->GetRawClient()->Exists(
- curTransactionId,
- path.Path_);
- });
- Y_ENSURE_EX(
- path.Cluster_.Defined() || exists,
- TApiUsageError() << "Input table '" << path.Path_ << "' doesn't exist");
- }
- }
- void LogJob(const TOperationId& opId, const IJob* job, const char* type)
- {
- if (job) {
- YT_LOG_INFO("Operation %v; %v = %v",
- opId,
- type,
- TJobFactory::Get()->GetJobName(job));
- }
- }
- void LogYPaths(const TOperationId& opId, const TVector<TRichYPath>& paths, const char* type)
- {
- for (size_t i = 0; i < paths.size(); ++i) {
- YT_LOG_INFO("Operation %v; %v[%v] = %v",
- opId,
- type,
- i,
- paths[i].Path_);
- }
- }
- void LogYPath(const TOperationId& opId, const TRichYPath& path, const char* type)
- {
- YT_LOG_INFO("Operation %v; %v = %v",
- opId,
- type,
- path.Path_);
- }
- TString AddModeToTitleIfDebug(const TString& title) {
- #ifndef NDEBUG
- return title + " (debug build)";
- #else
- return title;
- #endif
- }
- } // namespace
- ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- void DoExecuteMap(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TSimpleOperationIo& operationIo,
- TMapOperationSpecBase<T> spec,
- const IJobPtr& mapper,
- const TOperationOptions& options)
- {
- if (options.CreateDebugOutputTables_) {
- CreateDebugOutputTables(spec, *preparer);
- }
- if (options.CreateOutputTables_) {
- CheckInputTablesExist(*preparer, operationIo.Inputs);
- CreateOutputTables(*preparer, operationIo.Outputs);
- }
- TJobPreparer map(
- *preparer,
- spec.MapperSpec_,
- *mapper,
- operationIo.Outputs.size(),
- operationIo.JobFiles,
- options);
- spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(map.GetClassName()));
- TNode specNode = BuildYsonNodeFluently()
- .BeginMap()
- .Item("mapper").DoMap([&] (TFluentMap fluent) {
- BuildUserJobFluently(
- map,
- operationIo.InputFormat,
- operationIo.OutputFormat,
- fluent);
- })
- .DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) {
- auto autoMergeSpec = BuildAutoMergeSpec(*spec.AutoMerge_);
- if (!autoMergeSpec.IsUndefined()) {
- fluent.Item("auto_merge").Value(std::move(autoMergeSpec));
- }
- })
- .Item("input_table_paths").List(operationIo.Inputs)
- .Item("output_table_paths").List(operationIo.Outputs)
- .DoIf(spec.Ordered_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("ordered").Value(spec.Ordered_.GetRef());
- })
- .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- specNode["job_io"]["control_attributes"]["enable_row_index"] = TNode(true);
- specNode["job_io"]["control_attributes"]["enable_range_index"] = TNode(true);
- if (!preparer->GetContext().Config->TableWriter.Empty()) {
- specNode["job_io"]["table_writer"] = preparer->GetContext().Config->TableWriter;
- }
- BuildCommonUserOperationPart(spec, &specNode);
- BuildJobCountOperationPart(spec, &specNode);
- auto startOperation = [
- operation=operation.Get(),
- spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
- preparer,
- operationIo,
- mapper
- ] () {
- auto operationId = preparer->StartOperation(operation, EOperationType::Map, spec);
- LogJob(operationId, mapper.Get(), "mapper");
- LogYPaths(operationId, operationIo.Inputs, "input");
- LogYPaths(operationId, operationIo.Outputs, "output");
- return operationId;
- };
- operation->SetDelayedStartFunction(std::move(startOperation));
- }
- void ExecuteMap(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TMapOperationSpec& spec,
- const ::TIntrusivePtr<IStructuredJob>& mapper,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting map operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto operationIo = CreateSimpleOperationIo(*mapper, *preparer, spec, options, /* allowSkiff = */ true);
- DoExecuteMap(
- operation,
- preparer,
- operationIo,
- spec,
- mapper,
- options);
- }
- void ExecuteRawMap(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TRawMapOperationSpec& spec,
- const ::TIntrusivePtr<IRawJob>& mapper,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting raw map operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto operationIo = CreateSimpleOperationIo(*mapper, *preparer, spec);
- DoExecuteMap(
- operation,
- preparer,
- operationIo,
- spec,
- mapper,
- options);
- }
- ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- void DoExecuteReduce(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TSimpleOperationIo& operationIo,
- TReduceOperationSpecBase<T> spec,
- const IJobPtr& reducer,
- const TOperationOptions& options)
- {
- if (options.CreateDebugOutputTables_) {
- CreateDebugOutputTables(spec, *preparer);
- }
- if (options.CreateOutputTables_) {
- CheckInputTablesExist(*preparer, operationIo.Inputs);
- CreateOutputTables(*preparer, operationIo.Outputs);
- }
- TJobPreparer reduce(
- *preparer,
- spec.ReducerSpec_,
- *reducer,
- operationIo.Outputs.size(),
- operationIo.JobFiles,
- options);
- spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName()));
- TNode specNode = BuildYsonNodeFluently()
- .BeginMap()
- .Item("reducer").DoMap([&] (TFluentMap fluent) {
- BuildUserJobFluently(
- reduce,
- operationIo.InputFormat,
- operationIo.OutputFormat,
- fluent);
- })
- .DoIf(!spec.SortBy_.Parts_.empty(), [&] (TFluentMap fluent) {
- fluent.Item("sort_by").Value(spec.SortBy_);
- })
- .Item("reduce_by").Value(spec.ReduceBy_)
- .DoIf(spec.JoinBy_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("join_by").Value(spec.JoinBy_.GetRef());
- })
- .DoIf(spec.EnableKeyGuarantee_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("enable_key_guarantee").Value(spec.EnableKeyGuarantee_.GetRef());
- })
- .Item("input_table_paths").List(operationIo.Inputs)
- .Item("output_table_paths").List(operationIo.Outputs)
- .Item("job_io").BeginMap()
- .Item("control_attributes").BeginMap()
- .Item("enable_key_switch").Value(true)
- .Item("enable_row_index").Value(true)
- .Item("enable_range_index").Value(true)
- .EndMap()
- .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&] (TFluentMap fluent) {
- fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
- })
- .EndMap()
- .DoIf(spec.AutoMerge_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("auto_merge").Value(BuildAutoMergeSpec(*spec.AutoMerge_));
- })
- .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- BuildCommonUserOperationPart(spec, &specNode);
- BuildJobCountOperationPart(spec, &specNode);
- auto startOperation = [
- operation=operation.Get(),
- spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
- preparer,
- operationIo,
- reducer
- ] () {
- auto operationId = preparer->StartOperation(operation, EOperationType::Reduce, spec);
- LogJob(operationId, reducer.Get(), "reducer");
- LogYPaths(operationId, operationIo.Inputs, "input");
- LogYPaths(operationId, operationIo.Outputs, "output");
- return operationId;
- };
- operation->SetDelayedStartFunction(std::move(startOperation));
- }
- void ExecuteReduce(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TReduceOperationSpec& spec,
- const ::TIntrusivePtr<IStructuredJob>& reducer,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting reduce operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto operationIo = CreateSimpleOperationIo(*reducer, *preparer, spec, options, /* allowSkiff = */ false);
- DoExecuteReduce(
- operation,
- preparer,
- operationIo,
- spec,
- reducer,
- options);
- }
- void ExecuteRawReduce(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TRawReduceOperationSpec& spec,
- const ::TIntrusivePtr<IRawJob>& reducer,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting raw reduce operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto operationIo = CreateSimpleOperationIo(*reducer, *preparer, spec);
- DoExecuteReduce(
- operation,
- preparer,
- operationIo,
- spec,
- reducer,
- options);
- }
- ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- void DoExecuteJoinReduce(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TSimpleOperationIo& operationIo,
- TJoinReduceOperationSpecBase<T> spec,
- const IJobPtr& reducer,
- const TOperationOptions& options)
- {
- if (options.CreateDebugOutputTables_) {
- CreateDebugOutputTables(spec, *preparer);
- }
- if (options.CreateOutputTables_) {
- CheckInputTablesExist(*preparer, operationIo.Inputs);
- CreateOutputTables(*preparer, operationIo.Outputs);
- }
- TJobPreparer reduce(
- *preparer,
- spec.ReducerSpec_,
- *reducer,
- operationIo.Outputs.size(),
- operationIo.JobFiles,
- options);
- spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(reduce.GetClassName()));
- TNode specNode = BuildYsonNodeFluently()
- .BeginMap()
- .Item("reducer").DoMap([&] (TFluentMap fluent) {
- BuildUserJobFluently(
- reduce,
- operationIo.InputFormat,
- operationIo.OutputFormat,
- fluent);
- })
- .Item("join_by").Value(spec.JoinBy_)
- .Item("input_table_paths").List(operationIo.Inputs)
- .Item("output_table_paths").List(operationIo.Outputs)
- .Item("job_io").BeginMap()
- .Item("control_attributes").BeginMap()
- .Item("enable_key_switch").Value(true)
- .Item("enable_row_index").Value(true)
- .Item("enable_range_index").Value(true)
- .EndMap()
- .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&] (TFluentMap fluent) {
- fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
- })
- .EndMap()
- .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- BuildCommonUserOperationPart(spec, &specNode);
- BuildJobCountOperationPart(spec, &specNode);
- auto startOperation = [
- operation=operation.Get(),
- spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
- preparer,
- reducer,
- operationIo
- ] () {
- auto operationId = preparer->StartOperation(operation, EOperationType::JoinReduce, spec);
- LogJob(operationId, reducer.Get(), "reducer");
- LogYPaths(operationId, operationIo.Inputs, "input");
- LogYPaths(operationId, operationIo.Outputs, "output");
- return operationId;
- };
- operation->SetDelayedStartFunction(std::move(startOperation));
- }
- void ExecuteJoinReduce(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TJoinReduceOperationSpec& spec,
- const ::TIntrusivePtr<IStructuredJob>& reducer,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting join reduce operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto operationIo = CreateSimpleOperationIo(*reducer, *preparer, spec, options, /* allowSkiff = */ false);
- return DoExecuteJoinReduce(
- operation,
- preparer,
- operationIo,
- spec,
- reducer,
- options);
- }
- void ExecuteRawJoinReduce(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TRawJoinReduceOperationSpec& spec,
- const ::TIntrusivePtr<IRawJob>& reducer,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting raw join reduce operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto operationIo = CreateSimpleOperationIo(*reducer, *preparer, spec);
- return DoExecuteJoinReduce(
- operation,
- preparer,
- operationIo,
- spec,
- reducer,
- options);
- }
- ////////////////////////////////////////////////////////////////////////////////
- template <typename T>
- void DoExecuteMapReduce(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TMapReduceOperationIo& operationIo,
- TMapReduceOperationSpecBase<T> spec,
- const IJobPtr& mapper,
- const IJobPtr& reduceCombiner,
- const IJobPtr& reducer,
- const TOperationOptions& options)
- {
- TVector<TRichYPath> allOutputs;
- allOutputs.insert(allOutputs.end(), operationIo.MapOutputs.begin(), operationIo.MapOutputs.end());
- allOutputs.insert(allOutputs.end(), operationIo.Outputs.begin(), operationIo.Outputs.end());
- if (options.CreateDebugOutputTables_) {
- CreateDebugOutputTables(spec, *preparer);
- }
- if (options.CreateOutputTables_) {
- CheckInputTablesExist(*preparer, operationIo.Inputs);
- CreateOutputTables(*preparer, allOutputs);
- }
- TSortColumns sortBy = spec.SortBy_;
- TSortColumns reduceBy = spec.ReduceBy_;
- if (sortBy.Parts_.empty()) {
- sortBy = reduceBy;
- }
- const bool hasMapper = mapper != nullptr;
- const bool hasCombiner = reduceCombiner != nullptr;
- TVector<TRichYPath> files;
- TJobPreparer reduce(
- *preparer,
- spec.ReducerSpec_,
- *reducer,
- operationIo.Outputs.size(),
- operationIo.ReducerJobFiles,
- options);
- TString title;
- TNode specNode = BuildYsonNodeFluently()
- .BeginMap()
- .DoIf(hasMapper, [&] (TFluentMap fluent) {
- TJobPreparer map(
- *preparer,
- spec.MapperSpec_,
- *mapper,
- 1 + operationIo.MapOutputs.size(),
- operationIo.MapperJobFiles,
- options);
- fluent.Item("mapper").DoMap([&] (TFluentMap fluent) {
- BuildUserJobFluently(
- std::cref(map),
- *operationIo.MapperInputFormat,
- *operationIo.MapperOutputFormat,
- fluent);
- });
- title = "mapper:" + map.GetClassName() + " ";
- })
- .DoIf(hasCombiner, [&] (TFluentMap fluent) {
- TJobPreparer combine(
- *preparer,
- spec.ReduceCombinerSpec_,
- *reduceCombiner,
- size_t(1),
- operationIo.ReduceCombinerJobFiles,
- options);
- fluent.Item("reduce_combiner").DoMap([&] (TFluentMap fluent) {
- BuildUserJobFluently(
- combine,
- *operationIo.ReduceCombinerInputFormat,
- *operationIo.ReduceCombinerOutputFormat,
- fluent);
- });
- title += "combiner:" + combine.GetClassName() + " ";
- })
- .Item("reducer").DoMap([&] (TFluentMap fluent) {
- BuildUserJobFluently(
- reduce,
- operationIo.ReducerInputFormat,
- operationIo.ReducerOutputFormat,
- fluent);
- })
- .Item("sort_by").Value(sortBy)
- .Item("reduce_by").Value(reduceBy)
- .Item("input_table_paths").List(operationIo.Inputs)
- .Item("output_table_paths").List(allOutputs)
- .Item("mapper_output_table_count").Value(operationIo.MapOutputs.size())
- .DoIf(spec.ForceReduceCombiners_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("force_reduce_combiners").Value(*spec.ForceReduceCombiners_);
- })
- .Item("map_job_io").BeginMap()
- .Item("control_attributes").BeginMap()
- .Item("enable_row_index").Value(true)
- .Item("enable_range_index").Value(true)
- .EndMap()
- .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&] (TFluentMap fluent) {
- fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
- })
- .EndMap()
- .Item("sort_job_io").BeginMap()
- .Item("control_attributes").BeginMap()
- .Item("enable_key_switch").Value(true)
- .EndMap()
- .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&] (TFluentMap fluent) {
- fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
- })
- .EndMap()
- .Item("reduce_job_io").BeginMap()
- .Item("control_attributes").BeginMap()
- .Item("enable_key_switch").Value(true)
- .EndMap()
- .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&] (TFluentMap fluent) {
- fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
- })
- .EndMap()
- .Do([&] (TFluentMap) {
- spec.Title_ = spec.Title_.GetOrElse(AddModeToTitleIfDebug(title + "reducer:" + reduce.GetClassName()));
- })
- .EndMap();
- if (spec.Ordered_) {
- specNode["ordered"] = *spec.Ordered_;
- }
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- BuildCommonUserOperationPart(spec, &specNode);
- BuildMapJobCountOperationPart(spec, &specNode);
- BuildPartitionCountOperationPart(spec, &specNode);
- BuildIntermediateDataPart(spec, &specNode);
- BuildDataSizePerSortJobPart(spec, &specNode);
- auto startOperation = [
- operation=operation.Get(),
- spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
- preparer,
- mapper,
- reduceCombiner,
- reducer,
- inputs=operationIo.Inputs,
- allOutputs
- ] () {
- auto operationId = preparer->StartOperation(operation, EOperationType::MapReduce, spec);
- LogJob(operationId, mapper.Get(), "mapper");
- LogJob(operationId, reduceCombiner.Get(), "reduce_combiner");
- LogJob(operationId, reducer.Get(), "reducer");
- LogYPaths(operationId, inputs, "input");
- LogYPaths(operationId, allOutputs, "output");
- return operationId;
- };
- operation->SetDelayedStartFunction(std::move(startOperation));
- }
- void ExecuteMapReduce(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TMapReduceOperationSpec& spec_,
- const ::TIntrusivePtr<IStructuredJob>& mapper,
- const ::TIntrusivePtr<IStructuredJob>& reduceCombiner,
- const ::TIntrusivePtr<IStructuredJob>& reducer,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting map-reduce operation (PreparationId: %v)",
- preparer->GetPreparationId());
- TMapReduceOperationSpec spec = spec_;
- TMapReduceOperationIo operationIo;
- auto structuredInputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredInputs());
- auto structuredMapOutputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredMapOutputs());
- auto structuredOutputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredOutputs());
- const bool inferOutputSchema = options.InferOutputSchema_.GetOrElse(preparer->GetContext().Config->InferTableSchema);
- TVector<TTableSchema> currentInferenceResult;
- auto fixSpec = [&] (const TFormat& format) {
- if (format.IsYamredDsv()) {
- spec.SortBy_.Parts_.clear();
- spec.ReduceBy_.Parts_.clear();
- const TYamredDsvAttributes attributes = format.GetYamredDsvAttributes();
- for (auto& column : attributes.KeyColumnNames) {
- spec.SortBy_.Parts_.push_back(column);
- spec.ReduceBy_.Parts_.push_back(column);
- }
- for (const auto& column : attributes.SubkeyColumnNames) {
- spec.SortBy_.Parts_.push_back(column);
- }
- }
- };
- VerifyHasElements(structuredInputs, "inputs");
- TFormatBuilder formatBuilder(
- preparer->GetClient()->GetRawClient(),
- preparer->GetClientRetryPolicy(),
- preparer->GetContext(),
- preparer->GetTransactionId(),
- options);
- if (mapper) {
- auto mapperOutputDescription =
- spec.GetIntermediateMapOutputDescription()
- .GetOrElse(TUnspecifiedTableStructure());
- TStructuredJobTableList mapperOutput = {
- TStructuredJobTable::Intermediate(mapperOutputDescription),
- };
- for (const auto& table : structuredMapOutputs) {
- mapperOutput.push_back(TStructuredJobTable{table.Description, table.RichYPath});
- }
- auto hints = spec.MapperFormatHints_;
- auto mapperInferenceResult = PrepareOperation<TStructuredJobTableList>(
- *mapper,
- TOperationPreparationContext(
- structuredInputs,
- mapperOutput,
- preparer->GetClient()->GetRawClient(),
- preparer->GetClientRetryPolicy(),
- preparer->GetTransactionId()),
- &structuredInputs,
- /* outputs */ nullptr,
- hints);
- auto nodeReaderFormat = NodeReaderFormatFromHintAndGlobalConfig(spec.MapperFormatHints_);
- auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat(
- *mapper,
- EIODirection::Input,
- structuredInputs,
- hints.InputFormatHints_,
- nodeReaderFormat,
- /* allowFormatFromTableAttribute */ true);
- auto [outputFormat, outputFormatConfig] = formatBuilder.CreateFormat(
- *mapper,
- EIODirection::Output,
- mapperOutput,
- hints.OutputFormatHints_,
- ENodeReaderFormat::Yson,
- /* allowFormatFromTableAttribute */ false);
- operationIo.MapperJobFiles = CreateFormatConfig(inputFormatConfig, outputFormatConfig);
- operationIo.MapperInputFormat = inputFormat;
- operationIo.MapperOutputFormat = outputFormat;
- Y_ABORT_UNLESS(mapperInferenceResult.size() >= 1);
- currentInferenceResult = TVector<TTableSchema>{mapperInferenceResult[0]};
- // The first output as it corresponds to the intermediate data.
- TVector<TTableSchema> additionalOutputsInferenceResult(mapperInferenceResult.begin() + 1, mapperInferenceResult.end());
- operationIo.MapOutputs = GetPathList(
- structuredMapOutputs,
- additionalOutputsInferenceResult,
- inferOutputSchema);
- }
- if (reduceCombiner) {
- const bool isFirstStep = !mapper;
- TStructuredJobTableList inputs;
- if (isFirstStep) {
- inputs = structuredInputs;
- } else {
- auto reduceCombinerIntermediateInput =
- spec.GetIntermediateReduceCombinerInputDescription()
- .GetOrElse(TUnspecifiedTableStructure());
- inputs = {
- TStructuredJobTable::Intermediate(reduceCombinerIntermediateInput),
- };
- }
- auto reduceCombinerOutputDescription = spec.GetIntermediateReduceCombinerOutputDescription()
- .GetOrElse(TUnspecifiedTableStructure());
- TStructuredJobTableList outputs = {
- TStructuredJobTable::Intermediate(reduceCombinerOutputDescription),
- };
- auto hints = spec.ReduceCombinerFormatHints_;
- if (isFirstStep) {
- currentInferenceResult = PrepareOperation<TStructuredJobTableList>(
- *reduceCombiner,
- TOperationPreparationContext(
- inputs,
- outputs,
- preparer->GetClient()->GetRawClient(),
- preparer->GetClientRetryPolicy(),
- preparer->GetTransactionId()),
- &inputs,
- /* outputs */ nullptr,
- hints);
- } else {
- currentInferenceResult = PrepareOperation<TStructuredJobTableList>(
- *reduceCombiner,
- TSpeculativeOperationPreparationContext(
- currentInferenceResult,
- inputs,
- outputs),
- /* inputs */ nullptr,
- /* outputs */ nullptr,
- hints);
- }
- auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat(
- *reduceCombiner,
- EIODirection::Input,
- inputs,
- hints.InputFormatHints_,
- ENodeReaderFormat::Yson,
- /* allowFormatFromTableAttribute = */ isFirstStep);
- auto [outputFormat, outputFormatConfig] = formatBuilder.CreateFormat(
- *reduceCombiner,
- EIODirection::Output,
- outputs,
- hints.OutputFormatHints_,
- ENodeReaderFormat::Yson,
- /* allowFormatFromTableAttribute = */ false);
- operationIo.ReduceCombinerJobFiles = CreateFormatConfig(inputFormatConfig, outputFormatConfig);
- operationIo.ReduceCombinerInputFormat = inputFormat;
- operationIo.ReduceCombinerOutputFormat = outputFormat;
- if (isFirstStep) {
- fixSpec(*operationIo.ReduceCombinerInputFormat);
- }
- }
- const bool isFirstStep = (!mapper && !reduceCombiner);
- TStructuredJobTableList reducerInputs;
- if (isFirstStep) {
- reducerInputs = structuredInputs;
- } else {
- auto reducerInputDescription =
- spec.GetIntermediateReducerInputDescription()
- .GetOrElse(TUnspecifiedTableStructure());
- reducerInputs = {
- TStructuredJobTable::Intermediate(reducerInputDescription),
- };
- }
- auto hints = spec.ReducerFormatHints_;
- TVector<TTableSchema> reducerInferenceResult;
- if (isFirstStep) {
- reducerInferenceResult = PrepareOperation(
- *reducer,
- TOperationPreparationContext(
- structuredInputs,
- structuredOutputs,
- preparer->GetClient()->GetRawClient(),
- preparer->GetClientRetryPolicy(),
- preparer->GetTransactionId()),
- &structuredInputs,
- &structuredOutputs,
- hints);
- } else {
- reducerInferenceResult = PrepareOperation<TStructuredJobTableList>(
- *reducer,
- TSpeculativeOperationPreparationContext(
- currentInferenceResult,
- reducerInputs,
- structuredOutputs),
- /* inputs */ nullptr,
- &structuredOutputs,
- hints);
- }
- auto [inputFormat, inputFormatConfig] = formatBuilder.CreateFormat(
- *reducer,
- EIODirection::Input,
- reducerInputs,
- hints.InputFormatHints_,
- ENodeReaderFormat::Yson,
- /* allowFormatFromTableAttribute = */ isFirstStep);
- auto [outputFormat, outputFormatConfig] = formatBuilder.CreateFormat(
- *reducer,
- EIODirection::Output,
- ToStructuredJobTableList(spec.GetStructuredOutputs()),
- hints.OutputFormatHints_,
- ENodeReaderFormat::Yson,
- /* allowFormatFromTableAttribute = */ false);
- operationIo.ReducerJobFiles = CreateFormatConfig(inputFormatConfig, outputFormatConfig);
- operationIo.ReducerInputFormat = inputFormat;
- operationIo.ReducerOutputFormat = outputFormat;
- if (isFirstStep) {
- fixSpec(operationIo.ReducerInputFormat);
- }
- operationIo.Inputs = GetPathList(
- ApplyProtobufColumnFilters(
- structuredInputs,
- *preparer,
- GetColumnsUsedInOperation(spec),
- options),
- /* jobSchemaInferenceResult */ Nothing(),
- /* inferSchema */ false);
- operationIo.Outputs = GetPathList(
- structuredOutputs,
- reducerInferenceResult,
- inferOutputSchema);
- VerifyHasElements(operationIo.Outputs, "outputs");
- return DoExecuteMapReduce(
- operation,
- preparer,
- operationIo,
- spec,
- mapper,
- reduceCombiner,
- reducer,
- options);
- }
- void ExecuteRawMapReduce(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TRawMapReduceOperationSpec& spec,
- const ::TIntrusivePtr<IRawJob>& mapper,
- const ::TIntrusivePtr<IRawJob>& reduceCombiner,
- const ::TIntrusivePtr<IRawJob>& reducer,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting raw map-reduce operation (PreparationId: %v)",
- preparer->GetPreparationId());
- TMapReduceOperationIo operationIo;
- operationIo.Inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetInputs());
- operationIo.MapOutputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetMapOutputs());
- operationIo.Outputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetOutputs());
- VerifyHasElements(operationIo.Inputs, "inputs");
- VerifyHasElements(operationIo.Outputs, "outputs");
- auto getFormatOrDefault = [&] (const TMaybe<TFormat>& maybeFormat, const TMaybe<TFormat> stageDefaultFormat, const char* formatName) {
- if (maybeFormat) {
- return *maybeFormat;
- } else if (stageDefaultFormat) {
- return *stageDefaultFormat;
- } else {
- ythrow TApiUsageError() << "Cannot derive " << formatName;
- }
- };
- if (mapper) {
- operationIo.MapperInputFormat = getFormatOrDefault(spec.MapperInputFormat_, spec.MapperFormat_, "mapper input format");
- operationIo.MapperOutputFormat = getFormatOrDefault(spec.MapperOutputFormat_, spec.MapperFormat_, "mapper output format");
- }
- if (reduceCombiner) {
- operationIo.ReduceCombinerInputFormat = getFormatOrDefault(spec.ReduceCombinerInputFormat_, spec.ReduceCombinerFormat_, "reduce combiner input format");
- operationIo.ReduceCombinerOutputFormat = getFormatOrDefault(spec.ReduceCombinerOutputFormat_, spec.ReduceCombinerFormat_, "reduce combiner output format");
- }
- operationIo.ReducerInputFormat = getFormatOrDefault(spec.ReducerInputFormat_, spec.ReducerFormat_, "reducer input format");
- operationIo.ReducerOutputFormat = getFormatOrDefault(spec.ReducerOutputFormat_, spec.ReducerFormat_, "reducer output format");
- return DoExecuteMapReduce(
- operation,
- preparer,
- operationIo,
- spec,
- mapper,
- reduceCombiner,
- reducer,
- options);
- }
- void ExecuteSort(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TSortOperationSpec& spec,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting sort operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_);
- auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_);
- if (options.CreateOutputTables_) {
- CheckInputTablesExist(*preparer, inputs);
- CreateOutputTable(*preparer, output);
- }
- TNode specNode = BuildYsonNodeFluently()
- .BeginMap()
- .Item("input_table_paths").List(inputs)
- .Item("output_table_path").Value(output)
- .Item("sort_by").Value(spec.SortBy_)
- .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
- })
- .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- BuildPartitionCountOperationPart(spec, &specNode);
- BuildPartitionJobCountOperationPart(spec, &specNode);
- BuildIntermediateDataPart(spec, &specNode);
- auto startOperation = [
- operation=operation.Get(),
- spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
- preparer,
- inputs,
- output
- ] () {
- auto operationId = preparer->StartOperation(operation, EOperationType::Sort, spec);
- LogYPaths(operationId, inputs, "input");
- LogYPath(operationId, output, "output");
- return operationId;
- };
- operation->SetDelayedStartFunction(std::move(startOperation));
- }
- void ExecuteMerge(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TMergeOperationSpec& spec,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting merge operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_);
- auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_);
- if (options.CreateOutputTables_) {
- CheckInputTablesExist(*preparer, inputs);
- CreateOutputTable(*preparer, output);
- }
- TNode specNode = BuildYsonNodeFluently()
- .BeginMap()
- .Item("input_table_paths").List(inputs)
- .Item("output_table_path").Value(output)
- .Item("mode").Value(ToString(spec.Mode_))
- .Item("combine_chunks").Value(spec.CombineChunks_)
- .Item("force_transform").Value(spec.ForceTransform_)
- .Item("merge_by").Value(spec.MergeBy_)
- .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
- })
- .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- BuildJobCountOperationPart(spec, &specNode);
- auto startOperation = [
- operation=operation.Get(),
- spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
- preparer,
- inputs,
- output
- ] () {
- auto operationId = preparer->StartOperation(operation, EOperationType::Merge, spec);
- LogYPaths(operationId, inputs, "input");
- LogYPath(operationId, output, "output");
- return operationId;
- };
- operation->SetDelayedStartFunction(std::move(startOperation));
- }
- void ExecuteErase(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TEraseOperationSpec& spec,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting erase operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto tablePath = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.TablePath_);
- TNode specNode = BuildYsonNodeFluently()
- .BeginMap()
- .Item("table_path").Value(tablePath)
- .Item("combine_chunks").Value(spec.CombineChunks_)
- .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
- })
- .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- auto startOperation = [
- operation=operation.Get(),
- spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options),
- preparer,
- tablePath
- ] () {
- auto operationId = preparer->StartOperation(operation, EOperationType::Erase, spec);
- LogYPath(operationId, tablePath, "table_path");
- return operationId;
- };
- operation->SetDelayedStartFunction(std::move(startOperation));
- }
- void ExecuteRemoteCopy(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TRemoteCopyOperationSpec& spec,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting remote copy operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_);
- auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_);
- if (options.CreateOutputTables_) {
- CreateOutputTable(*preparer, output);
- }
- Y_ENSURE_EX(!spec.ClusterName_.empty(), TApiUsageError() << "ClusterName parameter is required");
- TNode specNode = BuildYsonNodeFluently()
- .BeginMap()
- .Item("cluster_name").Value(spec.ClusterName_)
- .Item("input_table_paths").List(inputs)
- .Item("output_table_path").Value(output)
- .DoIf(spec.NetworkName_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("network_name").Value(*spec.NetworkName_);
- })
- .DoIf(spec.SchemaInferenceMode_.Defined(), [&] (TFluentMap fluent) {
- fluent.Item("schema_inference_mode").Value(ToString(*spec.SchemaInferenceMode_));
- })
- .Item("copy_attributes").Value(spec.CopyAttributes_)
- .DoIf(!spec.AttributeKeys_.empty(), [&] (TFluentMap fluent) {
- Y_ENSURE_EX(spec.CopyAttributes_, TApiUsageError() <<
- "Specifying nonempty AttributeKeys in RemoteCopy "
- "doesn't make sense without CopyAttributes == true");
- fluent.Item("attribute_keys").List(spec.AttributeKeys_);
- })
- .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- auto startOperation = [
- operation=operation.Get(),
- spec=MergeSpec(specNode, preparer->GetContext().Config->Spec, options),
- preparer,
- inputs,
- output
- ] () {
- auto operationId = preparer->StartOperation(operation, EOperationType::RemoteCopy, spec);
- LogYPaths(operationId, inputs, "input");
- LogYPath(operationId, output, "output");
- return operationId;
- };
- operation->SetDelayedStartFunction(std::move(startOperation));
- }
- void ExecuteVanilla(
- const TOperationPtr& operation,
- const TOperationPreparerPtr& preparer,
- const TVanillaOperationSpec& spec,
- const TOperationOptions& options)
- {
- YT_LOG_DEBUG("Starting vanilla operation (PreparationId: %v)",
- preparer->GetPreparationId());
- auto addTask = [&](TFluentMap fluent, const TVanillaTask& task) {
- Y_ABORT_UNLESS(task.Job_.Get());
- if (std::holds_alternative<TVoidStructuredRowStream>(task.Job_->GetOutputRowStreamDescription())) {
- Y_ENSURE_EX(task.Outputs_.empty(),
- TApiUsageError() << "Vanilla task with void IVanillaJob doesn't expect output tables");
- TJobPreparer jobPreparer(
- *preparer,
- task.Spec_,
- *task.Job_,
- /* outputTableCount */ 0,
- /* smallFileList */ {},
- options);
- fluent
- .Item(task.Name_).BeginMap()
- .Item("job_count").Value(task.JobCount_)
- .DoIf(task.NetworkProject_.Defined(), [&](TFluentMap fluent) {
- fluent.Item("network_project").Value(*task.NetworkProject_);
- })
- .Do([&] (TFluentMap fluent) {
- BuildUserJobFluently(
- std::cref(jobPreparer),
- /* inputFormat */ Nothing(),
- /* outputFormat */ Nothing(),
- fluent);
- })
- .EndMap();
- } else {
- auto operationIo = CreateSimpleOperationIo(
- *task.Job_,
- *preparer,
- task,
- options,
- false);
- Y_ENSURE_EX(operationIo.Outputs.size() > 0,
- TApiUsageError() << "Vanilla task with IVanillaJob that has table writer expects output tables");
- if (options.CreateOutputTables_) {
- CreateOutputTables(*preparer, operationIo.Outputs);
- }
- TJobPreparer jobPreparer(
- *preparer,
- task.Spec_,
- *task.Job_,
- operationIo.Outputs.size(),
- operationIo.JobFiles,
- options);
- fluent
- .Item(task.Name_).BeginMap()
- .Item("job_count").Value(task.JobCount_)
- .DoIf(task.NetworkProject_.Defined(), [&](TFluentMap fluent) {
- fluent.Item("network_project").Value(*task.NetworkProject_);
- })
- .Do([&] (TFluentMap fluent) {
- BuildUserJobFluently(
- std::cref(jobPreparer),
- /* inputFormat */ Nothing(),
- operationIo.OutputFormat,
- fluent);
- })
- .Item("output_table_paths").List(operationIo.Outputs)
- .Item("job_io").BeginMap()
- .DoIf(!preparer->GetContext().Config->TableWriter.Empty(), [&](TFluentMap fluent) {
- fluent.Item("table_writer").Value(preparer->GetContext().Config->TableWriter);
- })
- .Item("control_attributes").BeginMap()
- .Item("enable_row_index").Value(TNode(true))
- .Item("enable_range_index").Value(TNode(true))
- .EndMap()
- .EndMap()
- .EndMap();
- }
- };
- if (options.CreateDebugOutputTables_) {
- CreateDebugOutputTables(spec, *preparer);
- }
- TNode specNode = BuildYsonNodeFluently()
- .BeginMap()
- .Item("tasks").DoMapFor(spec.Tasks_, addTask)
- .EndMap();
- BuildCommonOperationPart(preparer->GetContext().Config, spec, options, &specNode);
- BuildCommonUserOperationPart(spec, &specNode);
- auto startOperation = [operation=operation.Get(), spec=MergeSpec(std::move(specNode), preparer->GetContext().Config->Spec, options), preparer] () {
- auto operationId = preparer->StartOperation(operation, EOperationType::Vanilla, spec);
- return operationId;
- };
- operation->SetDelayedStartFunction(std::move(startOperation));
- }
- ////////////////////////////////////////////////////////////////////////////////
- class TOperation::TOperationImpl
- : public TThrRefBase
- {
- public:
- TOperationImpl(
- IRawClientPtr rawClient,
- IClientRetryPolicyPtr clientRetryPolicy,
- TClientContext context,
- const TMaybe<TOperationId>& operationId = {})
- : RawClient_(std::move(rawClient))
- , Context_(std::move(context))
- , ClientRetryPolicy_(clientRetryPolicy)
- , Id_(operationId)
- , PreparedPromise_(::NThreading::NewPromise<void>())
- , StartedPromise_(::NThreading::NewPromise<void>())
- {
- if (Id_) {
- PreparedPromise_.SetValue();
- StartedPromise_.SetValue();
- } else {
- PreparedPromise_.GetFuture().Subscribe([this_=::TIntrusivePtr(this)] (const ::NThreading::TFuture<void>& preparedResult) {
- try {
- preparedResult.GetValue();
- } catch (...) {
- this_->StartedPromise_.SetException(std::current_exception());
- return;
- }
- });
- }
- }
- const TOperationId& GetId() const;
- TString GetWebInterfaceUrl() const;
- void OnPrepared();
- void SetDelayedStartFunction(std::function<TOperationId()> start);
- void Start();
- bool IsStarted() const;
- void OnPreparationException(std::exception_ptr e);
- TString GetStatus();
- void OnStatusUpdated(const TString& newStatus);
- ::NThreading::TFuture<void> GetPreparedFuture();
- ::NThreading::TFuture<void> GetStartedFuture();
- ::NThreading::TFuture<void> Watch(TClientPtr client);
- EOperationBriefState GetBriefState();
- TMaybe<TYtError> GetError();
- TJobStatistics GetJobStatistics();
- TMaybe<TOperationBriefProgress> GetBriefProgress();
- void AbortOperation();
- void CompleteOperation();
- void SuspendOperation(const TSuspendOperationOptions& options);
- void ResumeOperation(const TResumeOperationOptions& options);
- TOperationAttributes GetAttributes(const TGetOperationOptions& options);
- void UpdateParameters(const TUpdateOperationParametersOptions& options);
- TJobAttributes GetJob(const TJobId& jobId, const TGetJobOptions& options);
- TListJobsResult ListJobs(const TListJobsOptions& options);
- void AsyncFinishOperation(TOperationAttributes operationAttributes);
- void FinishWithException(std::exception_ptr exception);
- void UpdateBriefProgress(TMaybe<TOperationBriefProgress> briefProgress);
- void AnalyzeUnrecognizedSpec(TNode unrecognizedSpec);
- const TClientContext& GetContext() const;
- private:
- void OnStarted(const TOperationId& operationId);
- void UpdateAttributesAndCall(bool needJobStatistics, std::function<void(const TOperationAttributes&)> func);
- void SyncFinishOperationImpl(const TOperationAttributes&);
- static void* SyncFinishOperationProc(void* );
- void ValidateOperationStarted() const;
- private:
- const IRawClientPtr RawClient_;
- const TClientContext Context_;
- IClientRetryPolicyPtr ClientRetryPolicy_;
- TMaybe<TOperationId> Id_;
- TMutex Lock_;
- ::NThreading::TPromise<void> PreparedPromise_;
- ::NThreading::TPromise<void> StartedPromise_;
- TMaybe<::NThreading::TPromise<void>> CompletePromise_;
- std::function<TOperationId()> DelayedStartFunction_;
- TString Status_;
- TOperationAttributes Attributes_;
- };
- ////////////////////////////////////////////////////////////////////////////////
- class TOperationPollerItem
- : public IYtPollerItem
- {
- public:
- TOperationPollerItem(::TIntrusivePtr<TOperation::TOperationImpl> operationImpl)
- : OperationImpl_(std::move(operationImpl))
- { }
- void PrepareRequest(IRawBatchRequest* batchRequest) override
- {
- auto filter = TOperationAttributeFilter()
- .Add(EOperationAttribute::State)
- .Add(EOperationAttribute::BriefProgress)
- .Add(EOperationAttribute::Result);
- if (!UnrecognizedSpecAnalyzed_) {
- filter.Add(EOperationAttribute::UnrecognizedSpec);
- }
- OperationState_ = batchRequest->GetOperation(
- OperationImpl_->GetId(),
- TGetOperationOptions().AttributeFilter(filter));
- }
- EStatus OnRequestExecuted() override
- {
- try {
- const auto& attributes = OperationState_.GetValue();
- if (!UnrecognizedSpecAnalyzed_ && !attributes.UnrecognizedSpec.Empty()) {
- OperationImpl_->AnalyzeUnrecognizedSpec(*attributes.UnrecognizedSpec);
- UnrecognizedSpecAnalyzed_ = true;
- }
- Y_ABORT_UNLESS(attributes.BriefState,
- "get_operation for operation %s has not returned \"state\" field",
- GetGuidAsString(OperationImpl_->GetId()).data());
- if (*attributes.BriefState != EOperationBriefState::InProgress) {
- OperationImpl_->AsyncFinishOperation(attributes);
- return PollBreak;
- } else {
- OperationImpl_->UpdateBriefProgress(attributes.BriefProgress);
- }
- } catch (const TErrorResponse& e) {
- if (!IsRetriable(e)) {
- OperationImpl_->FinishWithException(std::current_exception());
- return PollBreak;
- }
- } catch (const std::exception& e) {
- OperationImpl_->FinishWithException(std::current_exception());
- return PollBreak;
- }
- return PollContinue;
- }
- void OnItemDiscarded() override {
- OperationImpl_->FinishWithException(std::make_exception_ptr(yexception() << "Operation cancelled"));
- }
- private:
- ::TIntrusivePtr<TOperation::TOperationImpl> OperationImpl_;
- ::NThreading::TFuture<TOperationAttributes> OperationState_;
- bool UnrecognizedSpecAnalyzed_ = false;
- };
- ////////////////////////////////////////////////////////////////////////////////
- const TOperationId& TOperation::TOperationImpl::GetId() const
- {
- ValidateOperationStarted();
- return *Id_;
- }
- TString TOperation::TOperationImpl::GetWebInterfaceUrl() const
- {
- ValidateOperationStarted();
- return GetOperationWebInterfaceUrl(Context_.ServerName, *Id_);
- }
- void TOperation::TOperationImpl::OnPrepared()
- {
- Y_ABORT_IF(PreparedPromise_.HasException());
- Y_ABORT_IF(PreparedPromise_.HasValue());
- PreparedPromise_.SetValue();
- }
- void TOperation::TOperationImpl::SetDelayedStartFunction(std::function<TOperationId()> start)
- {
- DelayedStartFunction_ = std::move(start);
- }
- void TOperation::TOperationImpl::Start()
- {
- {
- auto guard = Guard(Lock_);
- if (Id_) {
- ythrow TApiUsageError() << "Start() should not be called on running operations";
- }
- }
- GetPreparedFuture().GetValueSync();
- std::function<TOperationId()> startStuff;
- {
- auto guard = Guard(Lock_);
- startStuff.swap(DelayedStartFunction_);
- }
- if (!startStuff) {
- ythrow TApiUsageError() << "Seems that Start() was called multiple times. If not, contact yt@";
- }
- TOperationId operationId;
- try {
- operationId = startStuff();
- } catch (...) {
- auto exception = std::current_exception();
- StartedPromise_.SetException(exception);
- std::rethrow_exception(exception);
- }
- OnStarted(operationId);
- }
- bool TOperation::TOperationImpl::IsStarted() const {
- auto guard = Guard(Lock_);
- return bool(Id_);
- }
- void TOperation::TOperationImpl::OnPreparationException(std::exception_ptr e)
- {
- Y_ABORT_IF(PreparedPromise_.HasValue());
- Y_ABORT_IF(PreparedPromise_.HasException());
- PreparedPromise_.SetException(e);
- }
- TString TOperation::TOperationImpl::GetStatus()
- {
- {
- auto guard = Guard(Lock_);
- if (!Id_) {
- return Status_;
- }
- }
- TMaybe<TString> state;
- UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
- state = attributes.State;
- });
- return "On YT cluster: " + state.GetOrElse("undefined state");
- }
- void TOperation::TOperationImpl::OnStatusUpdated(const TString& newStatus)
- {
- auto guard = Guard(Lock_);
- Status_ = newStatus;
- }
- ::NThreading::TFuture<void> TOperation::TOperationImpl::GetPreparedFuture()
- {
- return PreparedPromise_.GetFuture();
- }
- ::NThreading::TFuture<void> TOperation::TOperationImpl::GetStartedFuture()
- {
- return StartedPromise_.GetFuture();
- }
- ::NThreading::TFuture<void> TOperation::TOperationImpl::Watch(TClientPtr client)
- {
- {
- auto guard = Guard(Lock_);
- if (CompletePromise_) {
- return *CompletePromise_;
- }
- CompletePromise_ = ::NThreading::NewPromise<void>();
- }
- GetStartedFuture().Subscribe([
- this_=::TIntrusivePtr(this),
- client=std::move(client)
- ] (const ::NThreading::TFuture<void>& startedResult) {
- try {
- startedResult.GetValue();
- } catch (...) {
- this_->CompletePromise_->SetException(std::current_exception());
- return;
- }
- client->GetYtPoller().Watch(::MakeIntrusive<TOperationPollerItem>(this_));
- auto operationId = this_->GetId();
- auto registry = TAbortableRegistry::Get();
- registry->Add(
- operationId,
- ::MakeIntrusive<TOperationAbortable>(this_->RawClient_, this_->ClientRetryPolicy_, operationId));
- // We have to own an IntrusivePtr to registry to prevent use-after-free
- auto removeOperation = [registry, operationId] (const ::NThreading::TFuture<void>&) {
- registry->Remove(operationId);
- };
- this_->CompletePromise_->GetFuture().Subscribe(removeOperation);
- });
- return *CompletePromise_;
- }
- EOperationBriefState TOperation::TOperationImpl::GetBriefState()
- {
- ValidateOperationStarted();
- EOperationBriefState result = EOperationBriefState::InProgress;
- UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
- Y_ABORT_UNLESS(attributes.BriefState,
- "get_operation for operation %s has not returned \"state\" field",
- GetGuidAsString(*Id_).data());
- result = *attributes.BriefState;
- });
- return result;
- }
- TMaybe<TYtError> TOperation::TOperationImpl::GetError()
- {
- ValidateOperationStarted();
- TMaybe<TYtError> result;
- UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
- Y_ABORT_UNLESS(attributes.Result);
- result = attributes.Result->Error;
- });
- return result;
- }
- TJobStatistics TOperation::TOperationImpl::GetJobStatistics()
- {
- ValidateOperationStarted();
- TJobStatistics result;
- UpdateAttributesAndCall(true, [&] (const TOperationAttributes& attributes) {
- if (attributes.Progress) {
- result = attributes.Progress->JobStatistics;
- }
- });
- return result;
- }
- TMaybe<TOperationBriefProgress> TOperation::TOperationImpl::GetBriefProgress()
- {
- ValidateOperationStarted();
- {
- auto g = Guard(Lock_);
- if (CompletePromise_.Defined()) {
- // Poller do this job for us
- return Attributes_.BriefProgress;
- }
- }
- TMaybe<TOperationBriefProgress> result;
- UpdateAttributesAndCall(false, [&] (const TOperationAttributes& attributes) {
- result = attributes.BriefProgress;
- });
- return result;
- }
- void TOperation::TOperationImpl::UpdateBriefProgress(TMaybe<TOperationBriefProgress> briefProgress)
- {
- auto g = Guard(Lock_);
- Attributes_.BriefProgress = std::move(briefProgress);
- }
- void TOperation::TOperationImpl::AnalyzeUnrecognizedSpec(TNode unrecognizedSpec)
- {
- static const TVector<TVector<TString>> knownUnrecognizedSpecFieldPaths = {
- {"mapper", "class_name"},
- {"reducer", "class_name"},
- {"reduce_combiner", "class_name"},
- };
- auto removeByPath = [] (TNode& node, auto pathBegin, auto pathEnd, auto& removeByPath) {
- if (pathBegin == pathEnd) {
- return;
- }
- if (!node.IsMap()) {
- return;
- }
- auto* child = node.AsMap().FindPtr(*pathBegin);
- if (!child) {
- return;
- }
- removeByPath(*child, std::next(pathBegin), pathEnd, removeByPath);
- if (std::next(pathBegin) == pathEnd || (child->IsMap() && child->Empty())) {
- node.AsMap().erase(*pathBegin);
- }
- };
- Y_ABORT_UNLESS(unrecognizedSpec.IsMap());
- for (const auto& knownFieldPath : knownUnrecognizedSpecFieldPaths) {
- Y_ABORT_UNLESS(!knownFieldPath.empty());
- removeByPath(unrecognizedSpec, knownFieldPath.cbegin(), knownFieldPath.cend(), removeByPath);
- }
- if (!unrecognizedSpec.Empty()) {
- YT_LOG_INFO(
- "WARNING! Unrecognized spec for operation %s is not empty "
- "(fields added by the YT API library are excluded): %s",
- GetGuidAsString(*Id_).data(),
- NodeToYsonString(unrecognizedSpec).data());
- }
- }
- void TOperation::TOperationImpl::OnStarted(const TOperationId& operationId)
- {
- auto guard = Guard(Lock_);
- Y_ABORT_UNLESS(!Id_,
- "OnStarted() called with operationId = %s for operation with id %s",
- GetGuidAsString(operationId).data(),
- GetGuidAsString(*Id_).data());
- Id_ = operationId;
- Y_ABORT_UNLESS(!StartedPromise_.HasValue() && !StartedPromise_.HasException());
- StartedPromise_.SetValue();
- }
- void TOperation::TOperationImpl::UpdateAttributesAndCall(
- bool needJobStatistics,
- std::function<void(const TOperationAttributes&)> func)
- {
- {
- auto g = Guard(Lock_);
- if (Attributes_.BriefState
- && *Attributes_.BriefState != EOperationBriefState::InProgress
- && (!needJobStatistics || Attributes_.Progress))
- {
- func(Attributes_);
- return;
- }
- }
- auto attributes = RequestWithRetry<TOperationAttributes>(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- [this] (TMutationId /*mutationId*/) {
- return RawClient_->GetOperation(
- *Id_,
- TGetOperationOptions().AttributeFilter(TOperationAttributeFilter()
- .Add(EOperationAttribute::Result)
- .Add(EOperationAttribute::Progress)
- .Add(EOperationAttribute::State)
- .Add(EOperationAttribute::BriefProgress)));
- });
- func(attributes);
- Y_ENSURE(attributes.BriefState);
- if (*attributes.BriefState != EOperationBriefState::InProgress) {
- auto g = Guard(Lock_);
- Attributes_ = std::move(attributes);
- }
- }
- void TOperation::TOperationImpl::FinishWithException(std::exception_ptr e)
- {
- CompletePromise_->SetException(std::move(e));
- }
- void TOperation::TOperationImpl::AbortOperation()
- {
- ValidateOperationStarted();
- RequestWithRetry<void>(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- [this] (TMutationId& mutationId) {
- RawClient_->AbortOperation(mutationId, *Id_);
- });
- }
- void TOperation::TOperationImpl::CompleteOperation()
- {
- ValidateOperationStarted();
- RequestWithRetry<void>(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- [this] (TMutationId& mutationId) {
- RawClient_->CompleteOperation(mutationId, *Id_);
- });
- }
- void TOperation::TOperationImpl::SuspendOperation(const TSuspendOperationOptions& options)
- {
- ValidateOperationStarted();
- RequestWithRetry<void>(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- [this, &options] (TMutationId& mutationId) {
- RawClient_->SuspendOperation(mutationId, *Id_, options);
- });
- }
- void TOperation::TOperationImpl::ResumeOperation(const TResumeOperationOptions& options)
- {
- ValidateOperationStarted();
- RequestWithRetry<void>(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- [this, &options] (TMutationId& mutationId) {
- RawClient_->ResumeOperation(mutationId, *Id_, options);
- });
- }
- TOperationAttributes TOperation::TOperationImpl::GetAttributes(const TGetOperationOptions& options)
- {
- ValidateOperationStarted();
- return RequestWithRetry<TOperationAttributes>(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- [this, &options] (TMutationId /*mutationId*/) {
- return RawClient_->GetOperation(*Id_, options);
- });
- }
- void TOperation::TOperationImpl::UpdateParameters(const TUpdateOperationParametersOptions& options)
- {
- ValidateOperationStarted();
- RequestWithRetry<void>(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- [this, &options] (TMutationId /*mutationId*/) {
- RawClient_->UpdateOperationParameters(*Id_, options);
- });
- }
- TJobAttributes TOperation::TOperationImpl::GetJob(const TJobId& jobId, const TGetJobOptions& options)
- {
- ValidateOperationStarted();
- auto result = RequestWithRetry<NYson::TYsonString>(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- [this, &jobId, &options] (TMutationId /*mutationId*/) {
- return RawClient_->GetJob(*Id_, jobId, options);
- });
- return NRawClient::ParseJobAttributes(NodeFromYsonString(result.AsStringBuf()));
- }
- TListJobsResult TOperation::TOperationImpl::ListJobs(const TListJobsOptions& options)
- {
- ValidateOperationStarted();
- return RequestWithRetry<TListJobsResult>(
- ClientRetryPolicy_->CreatePolicyForGenericRequest(),
- [this, &options] (TMutationId /*mutationId*/) {
- return RawClient_->ListJobs(*Id_, options);
- });
- }
- struct TAsyncFinishOperationsArgs
- {
- ::TIntrusivePtr<TOperation::TOperationImpl> OperationImpl;
- TOperationAttributes OperationAttributes;
- };
- void TOperation::TOperationImpl::AsyncFinishOperation(TOperationAttributes operationAttributes)
- {
- auto args = new TAsyncFinishOperationsArgs;
- args->OperationImpl = this;
- args->OperationAttributes = std::move(operationAttributes);
- TThread thread(TThread::TParams(&TOperation::TOperationImpl::SyncFinishOperationProc, args).SetName("finish operation"));
- thread.Start();
- thread.Detach();
- }
- void* TOperation::TOperationImpl::SyncFinishOperationProc(void* pArgs)
- {
- std::unique_ptr<TAsyncFinishOperationsArgs> args(static_cast<TAsyncFinishOperationsArgs*>(pArgs));
- args->OperationImpl->SyncFinishOperationImpl(args->OperationAttributes);
- return nullptr;
- }
- void TOperation::TOperationImpl::SyncFinishOperationImpl(const TOperationAttributes& attributes)
- {
- {
- auto guard = Guard(Lock_);
- Y_ABORT_UNLESS(Id_);
- }
- Y_ABORT_UNLESS(attributes.BriefState,
- "get_operation for operation %s has not returned \"state\" field",
- GetGuidAsString(*Id_).data());
- Y_ABORT_UNLESS(*attributes.BriefState != EOperationBriefState::InProgress);
- {
- try {
- // `attributes' that came from poller don't have JobStatistics
- // so we call `GetJobStatistics' in order to get it from server
- // and cache inside object.
- GetJobStatistics();
- } catch (const TErrorResponse& ) {
- // But if for any reason we failed to get attributes
- // we complete operation using what we have.
- auto g = Guard(Lock_);
- Attributes_ = attributes;
- }
- }
- if (*attributes.BriefState == EOperationBriefState::Completed) {
- CompletePromise_->SetValue();
- } else if (*attributes.BriefState == EOperationBriefState::Aborted || *attributes.BriefState == EOperationBriefState::Failed) {
- Y_ABORT_UNLESS(attributes.Result && attributes.Result->Error);
- const auto& error = *attributes.Result->Error;
- YT_LOG_ERROR("Operation %v is `%v' with error: %v",
- *Id_,
- ToString(*attributes.BriefState),
- error.FullDescription());
- TString additionalExceptionText;
- TVector<TFailedJobInfo> failedJobStderrInfo;
- if (*attributes.BriefState == EOperationBriefState::Failed) {
- try {
- failedJobStderrInfo = NYT::NDetail::GetFailedJobInfo(ClientRetryPolicy_, RawClient_, *Id_, TGetFailedJobInfoOptions());
- } catch (const std::exception& e) {
- additionalExceptionText = "Cannot get job stderrs: ";
- additionalExceptionText += e.what();
- }
- }
- CompletePromise_->SetException(
- std::make_exception_ptr(
- TOperationFailedError(
- *attributes.BriefState == EOperationBriefState::Failed
- ? TOperationFailedError::Failed
- : TOperationFailedError::Aborted,
- *Id_,
- error,
- failedJobStderrInfo) << additionalExceptionText));
- }
- }
- void TOperation::TOperationImpl::ValidateOperationStarted() const
- {
- auto guard = Guard(Lock_);
- if (!Id_) {
- ythrow TApiUsageError() << "Operation is not started";
- }
- }
- const TClientContext& TOperation::TOperationImpl::GetContext() const
- {
- return Context_;
- }
- ////////////////////////////////////////////////////////////////////////////////
- TOperation::TOperation(TClientPtr client)
- : Client_(std::move(client))
- , Impl_(::MakeIntrusive<TOperationImpl>(
- Client_->GetRawClient(),
- Client_->GetRetryPolicy(),
- Client_->GetContext()))
- {
- }
- TOperation::TOperation(TOperationId id, TClientPtr client)
- : Client_(std::move(client))
- , Impl_(::MakeIntrusive<TOperationImpl>(
- Client_->GetRawClient(),
- Client_->GetRetryPolicy(),
- Client_->GetContext(), id))
- {
- }
- const TOperationId& TOperation::GetId() const
- {
- return Impl_->GetId();
- }
- TString TOperation::GetWebInterfaceUrl() const
- {
- return Impl_->GetWebInterfaceUrl();
- }
- void TOperation::OnPrepared()
- {
- Impl_->OnPrepared();
- }
- void TOperation::SetDelayedStartFunction(std::function<TOperationId()> start)
- {
- Impl_->SetDelayedStartFunction(std::move(start));
- }
- void TOperation::Start()
- {
- Impl_->Start();
- }
- bool TOperation::IsStarted() const
- {
- return Impl_->IsStarted();
- }
- void TOperation::OnPreparationException(std::exception_ptr e)
- {
- Impl_->OnPreparationException(std::move(e));
- }
- TString TOperation::GetStatus() const
- {
- return Impl_->GetStatus();
- }
- void TOperation::OnStatusUpdated(const TString& newStatus)
- {
- Impl_->OnStatusUpdated(newStatus);
- }
- ::NThreading::TFuture<void> TOperation::GetPreparedFuture()
- {
- return Impl_->GetPreparedFuture();
- }
- ::NThreading::TFuture<void> TOperation::GetStartedFuture()
- {
- return Impl_->GetStartedFuture();
- }
- ::NThreading::TFuture<void> TOperation::Watch()
- {
- return Impl_->Watch(Client_);
- }
- TVector<TFailedJobInfo> TOperation::GetFailedJobInfo(const TGetFailedJobInfoOptions& options)
- {
- return NYT::NDetail::GetFailedJobInfo(Client_->GetRetryPolicy(), Client_->GetRawClient(), GetId(), options);
- }
- EOperationBriefState TOperation::GetBriefState()
- {
- return Impl_->GetBriefState();
- }
- TMaybe<TYtError> TOperation::GetError()
- {
- return Impl_->GetError();
- }
- TJobStatistics TOperation::GetJobStatistics()
- {
- return Impl_->GetJobStatistics();
- }
- TMaybe<TOperationBriefProgress> TOperation::GetBriefProgress()
- {
- return Impl_->GetBriefProgress();
- }
- void TOperation::AbortOperation()
- {
- Impl_->AbortOperation();
- }
- void TOperation::CompleteOperation()
- {
- Impl_->CompleteOperation();
- }
- void TOperation::SuspendOperation(const TSuspendOperationOptions& options)
- {
- Impl_->SuspendOperation(options);
- }
- void TOperation::ResumeOperation(const TResumeOperationOptions& options)
- {
- Impl_->ResumeOperation(options);
- }
- TOperationAttributes TOperation::GetAttributes(const TGetOperationOptions& options)
- {
- return Impl_->GetAttributes(options);
- }
- void TOperation::UpdateParameters(const TUpdateOperationParametersOptions& options)
- {
- Impl_->UpdateParameters(options);
- }
- TJobAttributes TOperation::GetJob(const TJobId& jobId, const TGetJobOptions& options)
- {
- return Impl_->GetJob(jobId, options);
- }
- TListJobsResult TOperation::ListJobs(const TListJobsOptions& options)
- {
- return Impl_->ListJobs(options);
- }
- ////////////////////////////////////////////////////////////////////////////////
- struct TAsyncPrepareAndStartOperationArgs
- {
- std::function<void()> PrepareAndStart;
- };
- void* SyncPrepareAndStartOperation(void* pArgs)
- {
- std::unique_ptr<TAsyncPrepareAndStartOperationArgs> args(static_cast<TAsyncPrepareAndStartOperationArgs*>(pArgs));
- args->PrepareAndStart();
- return nullptr;
- }
- ::TIntrusivePtr<TOperation> ProcessOperation(
- NYT::NDetail::TClientPtr client,
- std::function<void()> prepare,
- ::TIntrusivePtr<TOperation> operation,
- const TOperationOptions& options)
- {
- auto prepareAndStart = [prepare = std::move(prepare), operation, mode = options.StartOperationMode_] () {
- try {
- prepare();
- operation->OnPrepared();
- } catch (const std::exception& ex) {
- YT_LOG_INFO("Operation preparation failed: %v", ex.what());
- operation->OnPreparationException(std::current_exception());
- }
- if (mode >= TOperationOptions::EStartOperationMode::AsyncStart) {
- try {
- operation->Start();
- } catch (...) { }
- }
- };
- if (options.StartOperationMode_ >= TOperationOptions::EStartOperationMode::SyncStart) {
- prepareAndStart();
- WaitIfRequired(operation, client, options);
- } else {
- auto args = new TAsyncPrepareAndStartOperationArgs;
- args->PrepareAndStart = std::move(prepareAndStart);
- TThread thread(TThread::TParams(SyncPrepareAndStartOperation, args).SetName("prepare and start operation"));
- thread.Start();
- thread.Detach();
- }
- return operation;
- }
- void WaitIfRequired(const TOperationPtr& operation, const TClientPtr& client, const TOperationOptions& options)
- {
- auto retryPolicy = client->GetRetryPolicy();
- auto context = client->GetContext();
- if (options.StartOperationMode_ >= TOperationOptions::EStartOperationMode::SyncStart) {
- operation->GetStartedFuture().GetValueSync();
- }
- if (options.StartOperationMode_ == TOperationOptions::EStartOperationMode::SyncWait) {
- auto finishedFuture = operation->Watch();
- TWaitProxy::Get()->WaitFuture(finishedFuture);
- finishedFuture.GetValue();
- if (context.Config->WriteStderrSuccessfulJobs) {
- auto stderrs = GetJobsStderr(retryPolicy, client->GetRawClient(), operation->GetId());
- for (const auto& jobStderr : stderrs) {
- if (!jobStderr.empty()) {
- Cerr << jobStderr << '\n';
- }
- }
- }
- }
- }
- ////////////////////////////////////////////////////////////////////////////////
- void ResetUseClientProtobuf(const char* methodName)
- {
- Cerr << "WARNING! OPTION `TConfig::UseClientProtobuf' IS RESET TO `true'; "
- << "IT CAN DETERIORATE YOUR CODE PERFORMANCE!!! DON'T USE DEPRECATED METHOD `"
- << "TOperationIOSpec::" << methodName << "' TO AVOID THIS RESET" << Endl;
- // Give users some time to contemplate about usage of deprecated functions.
- Cerr << "Sleeping for 5 seconds..." << Endl;
- Sleep(TDuration::Seconds(5));
- TConfig::Get()->UseClientProtobuf = true;
- }
- } // namespace NDetail
- ////////////////////////////////////////////////////////////////////////////////
- ::TIntrusivePtr<INodeReaderImpl> CreateJobNodeReader(TRawTableReaderPtr rawTableReader)
- {
- if (auto schema = NDetail::GetJobInputSkiffSchema()) {
- return new NDetail::TSkiffTableReader(rawTableReader, schema);
- } else {
- return new TNodeTableReader(rawTableReader);
- }
- }
- ::TIntrusivePtr<IYaMRReaderImpl> CreateJobYaMRReader(TRawTableReaderPtr rawTableReader)
- {
- return new TYaMRTableReader(rawTableReader);
- }
- ::TIntrusivePtr<IProtoReaderImpl> CreateJobProtoReader(TRawTableReaderPtr rawTableReader)
- {
- if (TConfig::Get()->UseClientProtobuf) {
- return new TProtoTableReader(
- rawTableReader,
- GetJobInputDescriptors());
- } else {
- return new TLenvalProtoTableReader(
- rawTableReader,
- GetJobInputDescriptors());
- }
- }
- ::TIntrusivePtr<INodeWriterImpl> CreateJobNodeWriter(THolder<IProxyOutput> rawJobWriter)
- {
- return new TNodeTableWriter(std::move(rawJobWriter));
- }
- ::TIntrusivePtr<IYaMRWriterImpl> CreateJobYaMRWriter(THolder<IProxyOutput> rawJobWriter)
- {
- return new TYaMRTableWriter(std::move(rawJobWriter));
- }
- ::TIntrusivePtr<IProtoWriterImpl> CreateJobProtoWriter(THolder<IProxyOutput> rawJobWriter)
- {
- if (TConfig::Get()->UseClientProtobuf) {
- return new TProtoTableWriter(
- std::move(rawJobWriter),
- GetJobOutputDescriptors());
- } else {
- return new TLenvalProtoTableWriter(
- std::move(rawJobWriter),
- GetJobOutputDescriptors());
- }
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT
|