#include "operation_preparer.h" #include "init.h" #include "file_writer.h" #include "operation.h" #include "operation_helpers.h" #include "operation_tracker.h" #include "transaction.h" #include "transaction_pinger.h" #include "yt_poller.h" #include #include #include #include #include #include #include #include #include #include #include namespace NYT::NDetail { //////////////////////////////////////////////////////////////////////////////// class TWaitOperationStartPollerItem : public IYtPollerItem { public: TWaitOperationStartPollerItem(TOperationId operationId, std::unique_ptr transaction) : OperationId_(operationId) , Transaction_(std::move(transaction)) { } void PrepareRequest(IRawBatchRequest* batchRequest) override { Future_ = batchRequest->GetOperation( OperationId_, TGetOperationOptions().AttributeFilter( TOperationAttributeFilter().Add(EOperationAttribute::State))); } EStatus OnRequestExecuted() override { try { auto attributes = Future_.GetValue(); Y_ENSURE(attributes.State.Defined()); bool operationHasLockedFiles = *attributes.State != "starting" && *attributes.State != "pending" && *attributes.State != "orphaned" && *attributes.State != "waiting_for_agent" && *attributes.State != "initializing"; return operationHasLockedFiles ? EStatus::PollBreak : EStatus::PollContinue; } catch (const TErrorResponse& e) { YT_LOG_ERROR("get_operation request failed: %v (RequestId: %v)", e.GetError().GetMessage(), e.GetRequestId()); return IsRetriable(e) ? PollContinue : PollBreak; } catch (const std::exception& e) { YT_LOG_ERROR("%v", e.what()); return PollBreak; } } void OnItemDiscarded() override { } private: TOperationId OperationId_; std::unique_ptr Transaction_; ::NThreading::TFuture Future_; }; //////////////////////////////////////////////////////////////////////////////// class TOperationForwardingRequestRetryPolicy : public IRequestRetryPolicy { public: TOperationForwardingRequestRetryPolicy(const IRequestRetryPolicyPtr& underlying, const TOperationPtr& operation) : Underlying_(underlying) , Operation_(operation) { } void NotifyNewAttempt() override { Underlying_->NotifyNewAttempt(); } TMaybe OnGenericError(const std::exception& e) override { UpdateOperationStatus(e.what()); return Underlying_->OnGenericError(e); } TMaybe OnRetriableError(const TErrorResponse& e) override { auto msg = e.GetError().ShortDescription(); UpdateOperationStatus(msg); return Underlying_->OnRetriableError(e); } void OnIgnoredError(const TErrorResponse& e) override { Underlying_->OnIgnoredError(e); } TString GetAttemptDescription() const override { return Underlying_->GetAttemptDescription(); } private: void UpdateOperationStatus(TStringBuf err) { Y_ABORT_UNLESS(Operation_); Operation_->OnStatusUpdated( ::TStringBuilder() << "Retriable error during operation start: " << err); } private: IRequestRetryPolicyPtr Underlying_; TOperationPtr Operation_; }; //////////////////////////////////////////////////////////////////////////////// TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transactionId) : Client_(std::move(client)) , TransactionId_(transactionId) , FileTransaction_(std::make_unique( Client_->GetRawClient(), Client_->GetRetryPolicy(), Client_->GetContext(), TransactionId_, Client_->GetTransactionPinger()->GetChildTxPinger(), TStartTransactionOptions())) , ClientRetryPolicy_(Client_->GetRetryPolicy()) , PreparationId_(CreateGuidAsString()) { } const TClientContext& TOperationPreparer::GetContext() const { return Client_->GetContext(); } TTransactionId TOperationPreparer::GetTransactionId() const { return TransactionId_; } TClientPtr TOperationPreparer::GetClient() const { return Client_; } const TString& TOperationPreparer::GetPreparationId() const { return PreparationId_; } const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const { return ClientRetryPolicy_; } TOperationId TOperationPreparer::StartOperation( TOperation* operation, EOperationType type, const TNode& spec) { CheckValidity(); auto operationId = RequestWithRetry( ::MakeIntrusive( ClientRetryPolicy_->CreatePolicyForStartOperationRequest(), TOperationPtr(operation)), [this, &type, &spec] (TMutationId& mutationId) { return Client_->GetRawClient()->StartOperation(mutationId, TransactionId_, type, spec); }); YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)", operationId, GetPreparationId()); YT_LOG_INFO("Operation %v started (%v): %v", operationId, type, GetOperationWebInterfaceUrl(GetContext().ServerName, operationId)); TOperationExecutionTimeTracker::Get()->Start(operationId); Client_->GetYtPoller().Watch( new TWaitOperationStartPollerItem(operationId, std::move(FileTransaction_))); return operationId; } void TOperationPreparer::LockFiles(TVector* paths) { CheckValidity(); TVector<::NThreading::TFuture> lockIdFutures; lockIdFutures.reserve(paths->size()); auto lockRequest = Client_->GetRawClient()->CreateRawBatchRequest(); for (const auto& path : *paths) { lockIdFutures.push_back(lockRequest->Lock( FileTransaction_->GetId(), path.Path_, ELockMode::LM_SNAPSHOT, TLockOptions().Waitable(true))); } lockRequest->ExecuteBatch(); TVector<::NThreading::TFuture> nodeIdFutures; nodeIdFutures.reserve(paths->size()); auto getNodeIdRequest = Client_->GetRawClient()->CreateRawBatchRequest(); for (const auto& lockIdFuture : lockIdFutures) { nodeIdFutures.push_back(getNodeIdRequest->Get( FileTransaction_->GetId(), ::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id", TGetOptions())); } getNodeIdRequest->ExecuteBatch(); for (size_t i = 0; i != paths->size(); ++i) { auto& richPath = (*paths)[i]; richPath.OriginalPath(richPath.Path_); richPath.Path("#" + nodeIdFutures[i].GetValue().AsString()); YT_LOG_DEBUG("Locked file %v, new path is %v", *richPath.OriginalPath_, richPath.Path_); } } void TOperationPreparer::CheckValidity() const { Y_ENSURE( FileTransaction_, "File transaction is already moved, are you trying to use preparer for more than one operation?"); } //////////////////////////////////////////////////////////////////////////////// class TRetryPolicyIgnoringLockConflicts : public TAttemptLimitedRetryPolicy { public: using TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy; using TAttemptLimitedRetryPolicy::OnGenericError; TMaybe OnRetriableError(const TErrorResponse& e) override { if (IsAttemptLimitExceeded()) { return Nothing(); } if (e.IsConcurrentTransactionLockConflict()) { return GetBackoffDuration(Config_); } return TAttemptLimitedRetryPolicy::OnRetriableError(e); } }; //////////////////////////////////////////////////////////////////////////////// class TFileToUpload : public IItemToUpload { public: TFileToUpload(TString fileName, TMaybe md5) : FileName_(std::move(fileName)) , MD5_(std::move(md5)) { } TString CalculateMD5() const override { if (MD5_) { return *MD5_; } constexpr size_t md5Size = 32; TString result; result.ReserveAndResize(md5Size); MD5::File(FileName_.data(), result.Detach()); MD5_ = result; return result; } std::unique_ptr CreateInputStream() const override { return std::make_unique(FileName_); } TString GetDescription() const override { return FileName_; } i64 GetDataSize() const override { return GetFileLength(FileName_); } private: TString FileName_; mutable TMaybe MD5_; }; class TDataToUpload : public IItemToUpload { public: TDataToUpload(TString data, TString description) : Data_(std::move(data)) , Description_(std::move(description)) { } TString CalculateMD5() const override { constexpr size_t md5Size = 32; TString result; result.ReserveAndResize(md5Size); MD5::Data(reinterpret_cast(Data_.data()), Data_.size(), result.Detach()); return result; } std::unique_ptr CreateInputStream() const override { return std::make_unique(Data_.data(), Data_.size()); } TString GetDescription() const override { return Description_; } i64 GetDataSize() const override { return std::ssize(Data_); } private: TString Data_; TString Description_; }; //////////////////////////////////////////////////////////////////////////////// static const TString& GetPersistentExecPathMd5() { static TString md5 = MD5::File(GetPersistentExecPath()); return md5; } static TMaybe GetJobState(const IJob& job) { TString result; { TStringOutput output(result); job.Save(output); output.Finish(); } if (result.empty()) { return Nothing(); } else { return TSmallJobFile{"jobstate", result}; } } //////////////////////////////////////////////////////////////////////////////// TJobPreparer::TJobPreparer( TOperationPreparer& operationPreparer, const TUserJobSpec& spec, const IJob& job, size_t outputTableCount, const TVector& smallFileList, const TOperationOptions& options) : RawClient_(operationPreparer.GetClient()->GetRawClient()) , OperationPreparer_(operationPreparer) , Spec_(spec) , Options_(options) , Layers_(spec.Layers_) { CreateStorage(); auto cypressFileList = NRawClient::CanonizeYPaths(RawClient_, spec.Files_); for (const auto& file : cypressFileList) { UseFileInCypress(file); } for (const auto& localFile : spec.GetLocalFiles()) { UploadLocalFile(std::get<0>(localFile), std::get<1>(localFile)); } auto jobStateSmallFile = GetJobState(job); if (jobStateSmallFile) { UploadSmallFile(*jobStateSmallFile); } for (const auto& smallFile : smallFileList) { UploadSmallFile(smallFile); } if (auto commandJob = dynamic_cast(&job)) { IsCommandJob_ = true; ClassName_ = TJobFactory::Get()->GetJobName(&job); Command_ = commandJob->GetCommand(); } else { PrepareJobBinary(job, outputTableCount, jobStateSmallFile.Defined()); } operationPreparer.LockFiles(&CachedFiles_); } TVector TJobPreparer::GetFiles() const { TVector allFiles = CypressFiles_; allFiles.insert(allFiles.end(), CachedFiles_.begin(), CachedFiles_.end()); return allFiles; } TVector TJobPreparer::GetLayers() const { return Layers_; } const TString& TJobPreparer::GetClassName() const { return ClassName_; } const TString& TJobPreparer::GetCommand() const { return Command_; } const TUserJobSpec& TJobPreparer::GetSpec() const { return Spec_; } bool TJobPreparer::ShouldMountSandbox() const { return OperationPreparer_.GetContext().Config->MountSandboxInTmpfs || Options_.MountSandboxInTmpfs_; } ui64 TJobPreparer::GetTotalFileSize() const { return TotalFileSize_; } bool TJobPreparer::ShouldRedirectStdoutToStderr() const { return !IsCommandJob_ && OperationPreparer_.GetContext().Config->RedirectStdoutToStderr; } TString TJobPreparer::GetFileStorage() const { return Options_.FileStorage_ ? *Options_.FileStorage_ : OperationPreparer_.GetContext().Config->RemoteTempFilesDirectory; } TYPath TJobPreparer::GetCachePath() const { return AddPathPrefix( ::TStringBuilder() << GetFileStorage() << "/new_cache", OperationPreparer_.GetContext().Config->Prefix); } void TJobPreparer::CreateStorage() const { RequestWithRetry( OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), [this] (TMutationId& mutationId) { RawClient_->Create( mutationId, Options_.FileStorageTransactionId_, GetCachePath(), NT_MAP, TCreateOptions() .IgnoreExisting(true) .Recursive(true)); }); } int TJobPreparer::GetFileCacheReplicationFactor() const { if (IsLocalMode()) { return 1; } else { return OperationPreparer_.GetContext().Config->FileCacheReplicationFactor; } } void TJobPreparer::CreateFileInCypress(const TString& path) const { auto attributes = TNode()("replication_factor", GetFileCacheReplicationFactor()); if (Options_.FileExpirationTimeout_) { attributes["expiration_timeout"] = Options_.FileExpirationTimeout_->MilliSeconds(); } RequestWithRetry( OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), [this, &path, &attributes] (TMutationId& mutationId) { RawClient_->Create( mutationId, Options_.FileStorageTransactionId_, path, NT_FILE, TCreateOptions() .IgnoreExisting(true) .Recursive(true) .Attributes(attributes)); }); } TString TJobPreparer::PutFileToCypressCache( const TString& path, const TString& md5Signature, TTransactionId transactionId) const { constexpr ui32 LockConflictRetryCount = 30; auto retryPolicy = MakeIntrusive( LockConflictRetryCount, OperationPreparer_.GetContext().Config); auto options = TPutFileToCacheOptions(); if (Options_.FileExpirationTimeout_) { options.PreserveExpirationTimeout(true); } auto cachePath = RequestWithRetry( retryPolicy, [this, &path, &md5Signature, &transactionId, &options] (TMutationId /*mutationId*/) { return RawClient_->PutFileToCache(transactionId, path, md5Signature, GetCachePath(), options); }); RequestWithRetry( OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), [this, &transactionId, &path] (TMutationId& mutationId) { RawClient_->Remove(mutationId, transactionId, path, TRemoveOptions().Force(true)); }); return cachePath; } TMaybe TJobPreparer::GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const { constexpr ui32 LockConflictRetryCount = 30; auto retryPolicy = MakeIntrusive( LockConflictRetryCount, OperationPreparer_.GetContext().Config); auto maybePath = RequestWithRetry>( retryPolicy, [this, &md5Signature] (TMutationId /*mutationId*/) { return RawClient_->GetFileFromCache(TTransactionId(), md5Signature, GetCachePath()); }); if (maybePath) { YT_LOG_DEBUG( "File is already in cache (FileName: %v, FilePath: %v)", fileName, *maybePath); } return maybePath; } TDuration TJobPreparer::GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const { TDuration extraTime = OperationPreparer_.GetContext().Config->WaitLockPollInterval + TDuration::MilliSeconds(100); auto dataSizeGb = (itemToUpload.GetDataSize() + 1_GB - 1) / 1_GB; dataSizeGb = Max(dataSizeGb, 1); return extraTime + dataSizeGb * OperationPreparer_.GetContext().Config->CacheLockTimeoutPerGb; } TString TJobPreparer::UploadToRandomPath(const IItemToUpload& itemToUpload) const { TString uniquePath = AddPathPrefix( ::TStringBuilder() << GetFileStorage() << "/cpp_" << CreateGuidAsString(), OperationPreparer_.GetContext().Config->Prefix); YT_LOG_INFO("Uploading file to random cypress path (FileName: %v; CypressPath: %v; PreparationId: %v)", itemToUpload.GetDescription(), uniquePath, OperationPreparer_.GetPreparationId()); CreateFileInCypress(uniquePath); { TFileWriter writer( uniquePath, OperationPreparer_.GetClient()->GetRawClient(), OperationPreparer_.GetClientRetryPolicy(), OperationPreparer_.GetClient()->GetTransactionPinger(), OperationPreparer_.GetContext(), Options_.FileStorageTransactionId_, TFileWriterOptions().ComputeMD5(true)); itemToUpload.CreateInputStream()->ReadAll(writer); writer.Finish(); } return uniquePath; } TMaybe TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const { const auto md5Signature = itemToUpload.CalculateMD5(); auto fileName = ::TStringBuilder() << GetFileStorage() << "/cpp_md5_" << md5Signature; if (OperationPreparer_.GetContext().Config->CacheUploadDeduplicationMode == EUploadDeduplicationMode::Host) { fileName << "_" << MD5::Data(TProcessState::Get()->FqdnHostName); } TString cypressPath = AddPathPrefix(fileName, OperationPreparer_.GetContext().Config->Prefix); CreateFileInCypress(cypressPath); auto uploadTx = MakeIntrusive( OperationPreparer_.GetClient()->GetRawClient(), OperationPreparer_.GetClient(), OperationPreparer_.GetContext(), TTransactionId(), TStartTransactionOptions()); ILockPtr lock; try { lock = uploadTx->Lock(cypressPath, ELockMode::LM_EXCLUSIVE, TLockOptions().Waitable(true)); } catch (const TErrorResponse& e) { if (e.IsResolveError()) { // If the node doesn't exist, it must be removed by concurrent uploading process. // Let's try to find it in the cache. return GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription()); } throw; } auto waitTimeout = GetWaitForUploadTimeout(itemToUpload); YT_LOG_DEBUG("Waiting for the lock on file (FileName: %v; CypressPath: %v; LockTimeout: %v)", itemToUpload.GetDescription(), cypressPath, waitTimeout); if (!TWaitProxy::Get()->WaitFuture(lock->GetAcquiredFuture(), waitTimeout)) { YT_LOG_DEBUG("Waiting for the lock timed out. Fallback to random path uploading (FileName: %v; CypressPath: %v)", itemToUpload.GetDescription(), cypressPath); return Nothing(); } YT_LOG_DEBUG("Exclusive lock successfully acquired (FileName: %v; CypressPath: %v)", itemToUpload.GetDescription(), cypressPath); // Ensure that this process is the first to take a lock. if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) { return *cachedItemPath; } YT_LOG_INFO("Uploading file to cypress (FileName: %v; CypressPath: %v; PreparationId: %v)", itemToUpload.GetDescription(), cypressPath, OperationPreparer_.GetPreparationId()); { auto writer = uploadTx->CreateFileWriter(cypressPath, TFileWriterOptions().ComputeMD5(true)); YT_VERIFY(writer); itemToUpload.CreateInputStream()->ReadAll(*writer); writer->Finish(); } auto path = PutFileToCypressCache(cypressPath, md5Signature, uploadTx->GetId()); uploadTx->Commit(); return path; } TString TJobPreparer::UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const { auto md5Signature = itemToUpload.CalculateMD5(); Y_ABORT_UNLESS(md5Signature.size() == 32); if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) { return *cachedItemPath; } YT_LOG_INFO("File not found in cache; uploading to cypress (FileName: %v; PreparationId: %v)", itemToUpload.GetDescription(), OperationPreparer_.GetPreparationId()); const auto& config = OperationPreparer_.GetContext().Config; if (config->CacheUploadDeduplicationMode != EUploadDeduplicationMode::Disabled && itemToUpload.GetDataSize() > config->CacheUploadDeduplicationThreshold) { if (auto path = TryUploadWithDeduplication(itemToUpload)) { return *path; } } auto path = UploadToRandomPath(itemToUpload); return PutFileToCypressCache(path, md5Signature, Options_.FileStorageTransactionId_); } TString TJobPreparer::UploadToCache(const IItemToUpload& itemToUpload) const { YT_LOG_INFO("Uploading file (FileName: %v; PreparationId: %v)", itemToUpload.GetDescription(), OperationPreparer_.GetPreparationId()); TString result; switch (Options_.FileCacheMode_) { case TOperationOptions::EFileCacheMode::ApiCommandBased: Y_ENSURE_EX(Options_.FileStorageTransactionId_.IsEmpty(), TApiUsageError() << "Default cache mode (API command-based) doesn't allow non-default 'FileStorageTransactionId_'"); result = UploadToCacheUsingApi(itemToUpload); break; case TOperationOptions::EFileCacheMode::CachelessRandomPathUpload: result = UploadToRandomPath(itemToUpload); break; default: Y_ABORT("Unknown file cache mode: %d", static_cast(Options_.FileCacheMode_)); } YT_LOG_INFO("Complete uploading file (FileName: %v; PreparationId: %v)", itemToUpload.GetDescription(), OperationPreparer_.GetPreparationId()); return result; } void TJobPreparer::UseFileInCypress(const TRichYPath& file) { auto exists = RequestWithRetry( OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), [this, &file] (TMutationId /*mutationId*/) { return RawClient_->Exists( file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()), file.Path_); }); if (!exists) { ythrow yexception() << "File " << file.Path_ << " does not exist"; } if (ShouldMountSandbox()) { auto size = RequestWithRetry( OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(), [this, &file] (TMutationId /*mutationId*/) { return RawClient_->Get( file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()), file.Path_ + "/@uncompressed_data_size") .AsInt64(); }); TotalFileSize_ += RoundUpFileSize(static_cast(size)); } CypressFiles_.push_back(file); } void TJobPreparer::UploadLocalFile( const TLocalFilePath& localPath, const TAddLocalFileOptions& options, bool isApiFile) { TFsPath fsPath(localPath); fsPath.CheckExists(); TFileStat stat; fsPath.Stat(stat); bool isExecutable = stat.Mode & (S_IXUSR | S_IXGRP | S_IXOTH); auto cachePath = UploadToCache(TFileToUpload(localPath, options.MD5CheckSum_)); TRichYPath cypressPath; if (isApiFile) { cypressPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions; } cypressPath.Path(cachePath).FileName(options.PathInJob_.GetOrElse(fsPath.Basename())); if (isExecutable) { cypressPath.Executable(true); } if (options.BypassArtifactCache_) { cypressPath.BypassArtifactCache(*options.BypassArtifactCache_); } if (ShouldMountSandbox()) { TotalFileSize_ += RoundUpFileSize(stat.Size); } CachedFiles_.push_back(cypressPath); } void TJobPreparer::UploadBinary(const TJobBinaryConfig& jobBinary) { if (std::holds_alternative(jobBinary)) { auto binaryLocalPath = std::get(jobBinary); auto opts = TAddLocalFileOptions().PathInJob("cppbinary"); if (binaryLocalPath.MD5CheckSum) { opts.MD5CheckSum(*binaryLocalPath.MD5CheckSum); } UploadLocalFile(binaryLocalPath.Path, opts, /* isApiFile */ true); } else if (std::holds_alternative(jobBinary)) { auto binaryCypressPath = std::get(jobBinary); TRichYPath ytPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions; ytPath.Path(binaryCypressPath.Path); if (binaryCypressPath.TransactionId) { ytPath.TransactionId(*binaryCypressPath.TransactionId); } UseFileInCypress(ytPath.FileName("cppbinary").Executable(true)); } else { Y_ABORT("%s", (::TStringBuilder() << "Unexpected jobBinary tag: " << jobBinary.index()).data()); } } void TJobPreparer::UploadSmallFile(const TSmallJobFile& smallFile) { auto cachePath = UploadToCache(TDataToUpload(smallFile.Data, smallFile.FileName + " [generated-file]")); auto path = OperationPreparer_.GetContext().Config->ApiFilePathOptions; CachedFiles_.push_back(path.Path(cachePath).FileName(smallFile.FileName)); if (ShouldMountSandbox()) { TotalFileSize_ += RoundUpFileSize(smallFile.Data.size()); } } bool TJobPreparer::IsLocalMode() const { return UseLocalModeOptimization( OperationPreparer_.GetClient()->GetRawClient(), OperationPreparer_.GetContext(), OperationPreparer_.GetClientRetryPolicy()); } void TJobPreparer::PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState) { auto jobBinary = TJobBinaryConfig(); if (!std::holds_alternative(Spec_.GetJobBinary())) { jobBinary = Spec_.GetJobBinary(); } TString binaryPathInsideJob; if (std::holds_alternative(jobBinary)) { if (GetInitStatus() != EInitStatus::FullInitialization) { ythrow yexception() << "NYT::Initialize() must be called prior to any operation"; } const bool isLocalMode = IsLocalMode(); const TMaybe md5 = !isLocalMode ? MakeMaybe(GetPersistentExecPathMd5()) : Nothing(); jobBinary = TJobBinaryLocalPath{GetPersistentExecPath(), md5}; if (isLocalMode) { binaryPathInsideJob = GetExecPath(); } } else if (std::holds_alternative(jobBinary)) { const bool isLocalMode = IsLocalMode(); if (isLocalMode) { binaryPathInsideJob = TFsPath(std::get(jobBinary).Path).RealPath(); } } Y_ASSERT(!std::holds_alternative(jobBinary)); // binaryPathInsideJob is only set when LocalModeOptimization option is on, so upload is not needed if (!binaryPathInsideJob) { binaryPathInsideJob = "./cppbinary"; UploadBinary(jobBinary); } TString jobCommandPrefix = Options_.JobCommandPrefix_; if (!Spec_.JobCommandPrefix_.empty()) { jobCommandPrefix = Spec_.JobCommandPrefix_; } TString jobCommandSuffix = Options_.JobCommandSuffix_; if (!Spec_.JobCommandSuffix_.empty()) { jobCommandSuffix = Spec_.JobCommandSuffix_; } ClassName_ = TJobFactory::Get()->GetJobName(&job); auto jobArguments = TNode::CreateMap(); jobArguments["job_name"] = ClassName_; jobArguments["output_table_count"] = static_cast(outputTableCount); jobArguments["has_state"] = hasState; Spec_.AddEnvironment("YT_JOB_ARGUMENTS", NodeToYsonString(jobArguments)); Command_ = ::TStringBuilder() << jobCommandPrefix << (OperationPreparer_.GetContext().Config->UseClientProtobuf ? "YT_USE_CLIENT_PROTOBUF=1" : "YT_USE_CLIENT_PROTOBUF=0") << " " << binaryPathInsideJob << jobCommandSuffix; } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NDetail