operation_preparer.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898
  1. #include "operation_preparer.h"
  2. #include "init.h"
  3. #include "file_writer.h"
  4. #include "operation.h"
  5. #include "operation_helpers.h"
  6. #include "operation_tracker.h"
  7. #include "transaction.h"
  8. #include "transaction_pinger.h"
  9. #include "yt_poller.h"
  10. #include <yt/cpp/mapreduce/common/helpers.h>
  11. #include <yt/cpp/mapreduce/common/retry_lib.h>
  12. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  13. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  14. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  15. #include <yt/cpp/mapreduce/interface/error_codes.h>
  16. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  17. #include <library/cpp/digest/md5/md5.h>
  18. #include <util/folder/path.h>
  19. #include <util/string/builder.h>
  20. #include <util/system/execpath.h>
  21. namespace NYT::NDetail {
  22. using namespace NRawClient;
  23. ////////////////////////////////////////////////////////////////////////////////
  24. class TWaitOperationStartPollerItem
  25. : public IYtPollerItem
  26. {
  27. public:
  28. TWaitOperationStartPollerItem(TOperationId operationId, THolder<TPingableTransaction> transaction)
  29. : OperationId_(operationId)
  30. , Transaction_(std::move(transaction))
  31. { }
  32. void PrepareRequest(TRawBatchRequest* batchRequest) override
  33. {
  34. Future_ = batchRequest->GetOperation(
  35. OperationId_,
  36. TGetOperationOptions().AttributeFilter(
  37. TOperationAttributeFilter().Add(EOperationAttribute::State)));
  38. }
  39. EStatus OnRequestExecuted() override
  40. {
  41. try {
  42. auto attributes = Future_.GetValue();
  43. Y_ENSURE(attributes.State.Defined());
  44. bool operationHasLockedFiles =
  45. *attributes.State != "starting" &&
  46. *attributes.State != "pending" &&
  47. *attributes.State != "orphaned" &&
  48. *attributes.State != "waiting_for_agent" &&
  49. *attributes.State != "initializing";
  50. return operationHasLockedFiles ? EStatus::PollBreak : EStatus::PollContinue;
  51. } catch (const TErrorResponse& e) {
  52. YT_LOG_ERROR("get_operation request failed: %v (RequestId: %v)",
  53. e.GetError().GetMessage(),
  54. e.GetRequestId());
  55. return IsRetriable(e) ? PollContinue : PollBreak;
  56. } catch (const std::exception& e) {
  57. YT_LOG_ERROR("%v", e.what());
  58. return PollBreak;
  59. }
  60. }
  61. void OnItemDiscarded() override {
  62. }
  63. private:
  64. TOperationId OperationId_;
  65. THolder<TPingableTransaction> Transaction_;
  66. ::NThreading::TFuture<TOperationAttributes> Future_;
  67. };
  68. ////////////////////////////////////////////////////////////////////////////////
  69. class TOperationForwardingRequestRetryPolicy
  70. : public IRequestRetryPolicy
  71. {
  72. public:
  73. TOperationForwardingRequestRetryPolicy(const IRequestRetryPolicyPtr& underlying, const TOperationPtr& operation)
  74. : Underlying_(underlying)
  75. , Operation_(operation)
  76. { }
  77. void NotifyNewAttempt() override
  78. {
  79. Underlying_->NotifyNewAttempt();
  80. }
  81. TMaybe<TDuration> OnGenericError(const std::exception& e) override
  82. {
  83. UpdateOperationStatus(e.what());
  84. return Underlying_->OnGenericError(e);
  85. }
  86. TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
  87. {
  88. auto msg = e.GetError().ShortDescription();
  89. UpdateOperationStatus(msg);
  90. return Underlying_->OnRetriableError(e);
  91. }
  92. void OnIgnoredError(const TErrorResponse& e) override
  93. {
  94. Underlying_->OnIgnoredError(e);
  95. }
  96. TString GetAttemptDescription() const override
  97. {
  98. return Underlying_->GetAttemptDescription();
  99. }
  100. private:
  101. void UpdateOperationStatus(TStringBuf err)
  102. {
  103. Y_ABORT_UNLESS(Operation_);
  104. Operation_->OnStatusUpdated(
  105. ::TStringBuilder() << "Retriable error during operation start: " << err);
  106. }
  107. private:
  108. IRequestRetryPolicyPtr Underlying_;
  109. TOperationPtr Operation_;
  110. };
  111. ////////////////////////////////////////////////////////////////////////////////
  112. TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transactionId)
  113. : Client_(std::move(client))
  114. , TransactionId_(transactionId)
  115. , FileTransaction_(MakeHolder<TPingableTransaction>(
  116. Client_->GetRetryPolicy(),
  117. Client_->GetContext(),
  118. TransactionId_,
  119. Client_->GetTransactionPinger()->GetChildTxPinger(),
  120. TStartTransactionOptions()))
  121. , ClientRetryPolicy_(Client_->GetRetryPolicy())
  122. , PreparationId_(CreateGuidAsString())
  123. { }
  124. const TClientContext& TOperationPreparer::GetContext() const
  125. {
  126. return Client_->GetContext();
  127. }
  128. TTransactionId TOperationPreparer::GetTransactionId() const
  129. {
  130. return TransactionId_;
  131. }
  132. TClientPtr TOperationPreparer::GetClient() const
  133. {
  134. return Client_;
  135. }
  136. const TString& TOperationPreparer::GetPreparationId() const
  137. {
  138. return PreparationId_;
  139. }
  140. const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const
  141. {
  142. return ClientRetryPolicy_;
  143. }
  144. TOperationId TOperationPreparer::StartOperation(
  145. TOperation* operation,
  146. const TString& operationType,
  147. const TNode& spec,
  148. bool useStartOperationRequest)
  149. {
  150. CheckValidity();
  151. THttpHeader header("POST", (useStartOperationRequest ? "start_op" : operationType));
  152. if (useStartOperationRequest) {
  153. header.AddParameter("operation_type", operationType);
  154. }
  155. header.AddTransactionId(TransactionId_);
  156. header.AddMutationId();
  157. auto ysonSpec = NodeToYsonString(spec);
  158. auto responseInfo = RetryRequestWithPolicy(
  159. ::MakeIntrusive<TOperationForwardingRequestRetryPolicy>(
  160. ClientRetryPolicy_->CreatePolicyForStartOperationRequest(),
  161. TOperationPtr(operation)),
  162. GetContext(),
  163. header,
  164. ysonSpec);
  165. TOperationId operationId = ParseGuidFromResponse(responseInfo.Response);
  166. YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)",
  167. operationId,
  168. GetPreparationId());
  169. YT_LOG_INFO("Operation %v started (%v): %v",
  170. operationId,
  171. operationType,
  172. GetOperationWebInterfaceUrl(GetContext().ServerName, operationId));
  173. TOperationExecutionTimeTracker::Get()->Start(operationId);
  174. Client_->GetYtPoller().Watch(
  175. new TWaitOperationStartPollerItem(operationId, std::move(FileTransaction_)));
  176. return operationId;
  177. }
  178. void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths)
  179. {
  180. CheckValidity();
  181. TVector<::NThreading::TFuture<TLockId>> lockIdFutures;
  182. lockIdFutures.reserve(paths->size());
  183. TRawBatchRequest lockRequest(GetContext().Config);
  184. for (const auto& path : *paths) {
  185. lockIdFutures.push_back(lockRequest.Lock(
  186. FileTransaction_->GetId(),
  187. path.Path_,
  188. ELockMode::LM_SNAPSHOT,
  189. TLockOptions().Waitable(true)));
  190. }
  191. ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), lockRequest);
  192. TVector<::NThreading::TFuture<TNode>> nodeIdFutures;
  193. nodeIdFutures.reserve(paths->size());
  194. TRawBatchRequest getNodeIdRequest(GetContext().Config);
  195. for (const auto& lockIdFuture : lockIdFutures) {
  196. nodeIdFutures.push_back(getNodeIdRequest.Get(
  197. FileTransaction_->GetId(),
  198. ::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id",
  199. TGetOptions()));
  200. }
  201. ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), getNodeIdRequest);
  202. for (size_t i = 0; i != paths->size(); ++i) {
  203. auto& richPath = (*paths)[i];
  204. richPath.OriginalPath(richPath.Path_);
  205. richPath.Path("#" + nodeIdFutures[i].GetValue().AsString());
  206. YT_LOG_DEBUG("Locked file %v, new path is %v",
  207. *richPath.OriginalPath_,
  208. richPath.Path_);
  209. }
  210. }
  211. void TOperationPreparer::CheckValidity() const
  212. {
  213. Y_ENSURE(
  214. FileTransaction_,
  215. "File transaction is already moved, are you trying to use preparer for more than one operation?");
  216. }
  217. ////////////////////////////////////////////////////////////////////////////////
  218. class TRetryPolicyIgnoringLockConflicts
  219. : public TAttemptLimitedRetryPolicy
  220. {
  221. public:
  222. using TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy;
  223. using TAttemptLimitedRetryPolicy::OnGenericError;
  224. TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
  225. {
  226. if (IsAttemptLimitExceeded()) {
  227. return Nothing();
  228. }
  229. if (e.IsConcurrentTransactionLockConflict()) {
  230. return GetBackoffDuration(Config_);
  231. }
  232. return TAttemptLimitedRetryPolicy::OnRetriableError(e);
  233. }
  234. };
  235. ////////////////////////////////////////////////////////////////////////////////
  236. class TFileToUpload
  237. : public IItemToUpload
  238. {
  239. public:
  240. TFileToUpload(TString fileName, TMaybe<TString> md5)
  241. : FileName_(std::move(fileName))
  242. , MD5_(std::move(md5))
  243. { }
  244. TString CalculateMD5() const override
  245. {
  246. if (MD5_) {
  247. return *MD5_;
  248. }
  249. constexpr size_t md5Size = 32;
  250. TString result;
  251. result.ReserveAndResize(md5Size);
  252. MD5::File(FileName_.data(), result.Detach());
  253. MD5_ = result;
  254. return result;
  255. }
  256. THolder<IInputStream> CreateInputStream() const override
  257. {
  258. return MakeHolder<TFileInput>(FileName_);
  259. }
  260. TString GetDescription() const override
  261. {
  262. return FileName_;
  263. }
  264. i64 GetDataSize() const override
  265. {
  266. return GetFileLength(FileName_);
  267. }
  268. private:
  269. TString FileName_;
  270. mutable TMaybe<TString> MD5_;
  271. };
  272. class TDataToUpload
  273. : public IItemToUpload
  274. {
  275. public:
  276. TDataToUpload(TString data, TString description)
  277. : Data_(std::move(data))
  278. , Description_(std::move(description))
  279. { }
  280. TString CalculateMD5() const override
  281. {
  282. constexpr size_t md5Size = 32;
  283. TString result;
  284. result.ReserveAndResize(md5Size);
  285. MD5::Data(reinterpret_cast<const unsigned char*>(Data_.data()), Data_.size(), result.Detach());
  286. return result;
  287. }
  288. THolder<IInputStream> CreateInputStream() const override
  289. {
  290. return MakeHolder<TMemoryInput>(Data_.data(), Data_.size());
  291. }
  292. TString GetDescription() const override
  293. {
  294. return Description_;
  295. }
  296. i64 GetDataSize() const override
  297. {
  298. return std::ssize(Data_);
  299. }
  300. private:
  301. TString Data_;
  302. TString Description_;
  303. };
  304. ////////////////////////////////////////////////////////////////////////////////
  305. static const TString& GetPersistentExecPathMd5()
  306. {
  307. static TString md5 = MD5::File(GetPersistentExecPath());
  308. return md5;
  309. }
  310. static TMaybe<TSmallJobFile> GetJobState(const IJob& job)
  311. {
  312. TString result;
  313. {
  314. TStringOutput output(result);
  315. job.Save(output);
  316. output.Finish();
  317. }
  318. if (result.empty()) {
  319. return Nothing();
  320. } else {
  321. return TSmallJobFile{"jobstate", result};
  322. }
  323. }
  324. ////////////////////////////////////////////////////////////////////////////////
  325. TJobPreparer::TJobPreparer(
  326. TOperationPreparer& operationPreparer,
  327. const TUserJobSpec& spec,
  328. const IJob& job,
  329. size_t outputTableCount,
  330. const TVector<TSmallJobFile>& smallFileList,
  331. const TOperationOptions& options)
  332. : OperationPreparer_(operationPreparer)
  333. , Spec_(spec)
  334. , Options_(options)
  335. , Layers_(spec.Layers_)
  336. {
  337. CreateStorage();
  338. auto cypressFileList = CanonizeYPaths(/* retryPolicy */ nullptr, OperationPreparer_.GetContext(), spec.Files_);
  339. for (const auto& file : cypressFileList) {
  340. UseFileInCypress(file);
  341. }
  342. for (const auto& localFile : spec.GetLocalFiles()) {
  343. UploadLocalFile(std::get<0>(localFile), std::get<1>(localFile));
  344. }
  345. auto jobStateSmallFile = GetJobState(job);
  346. if (jobStateSmallFile) {
  347. UploadSmallFile(*jobStateSmallFile);
  348. }
  349. for (const auto& smallFile : smallFileList) {
  350. UploadSmallFile(smallFile);
  351. }
  352. if (auto commandJob = dynamic_cast<const ICommandJob*>(&job)) {
  353. IsCommandJob_ = true;
  354. ClassName_ = TJobFactory::Get()->GetJobName(&job);
  355. Command_ = commandJob->GetCommand();
  356. } else {
  357. PrepareJobBinary(job, outputTableCount, jobStateSmallFile.Defined());
  358. }
  359. operationPreparer.LockFiles(&CachedFiles_);
  360. }
  361. TVector<TRichYPath> TJobPreparer::GetFiles() const
  362. {
  363. TVector<TRichYPath> allFiles = CypressFiles_;
  364. allFiles.insert(allFiles.end(), CachedFiles_.begin(), CachedFiles_.end());
  365. return allFiles;
  366. }
  367. TVector<TYPath> TJobPreparer::GetLayers() const
  368. {
  369. return Layers_;
  370. }
  371. const TString& TJobPreparer::GetClassName() const
  372. {
  373. return ClassName_;
  374. }
  375. const TString& TJobPreparer::GetCommand() const
  376. {
  377. return Command_;
  378. }
  379. const TUserJobSpec& TJobPreparer::GetSpec() const
  380. {
  381. return Spec_;
  382. }
  383. bool TJobPreparer::ShouldMountSandbox() const
  384. {
  385. return OperationPreparer_.GetContext().Config->MountSandboxInTmpfs || Options_.MountSandboxInTmpfs_;
  386. }
  387. ui64 TJobPreparer::GetTotalFileSize() const
  388. {
  389. return TotalFileSize_;
  390. }
  391. bool TJobPreparer::ShouldRedirectStdoutToStderr() const
  392. {
  393. return !IsCommandJob_ && OperationPreparer_.GetContext().Config->RedirectStdoutToStderr;
  394. }
  395. TString TJobPreparer::GetFileStorage() const
  396. {
  397. return Options_.FileStorage_ ?
  398. *Options_.FileStorage_ :
  399. OperationPreparer_.GetContext().Config->RemoteTempFilesDirectory;
  400. }
  401. TYPath TJobPreparer::GetCachePath() const
  402. {
  403. return AddPathPrefix(
  404. ::TStringBuilder() << GetFileStorage() << "/new_cache",
  405. OperationPreparer_.GetContext().Config->Prefix);
  406. }
  407. void TJobPreparer::CreateStorage() const
  408. {
  409. Create(
  410. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  411. OperationPreparer_.GetContext(),
  412. Options_.FileStorageTransactionId_,
  413. GetCachePath(),
  414. NT_MAP,
  415. TCreateOptions()
  416. .IgnoreExisting(true)
  417. .Recursive(true));
  418. }
  419. int TJobPreparer::GetFileCacheReplicationFactor() const
  420. {
  421. if (IsLocalMode()) {
  422. return 1;
  423. } else {
  424. return OperationPreparer_.GetContext().Config->FileCacheReplicationFactor;
  425. }
  426. }
  427. void TJobPreparer::CreateFileInCypress(const TString& path) const
  428. {
  429. auto attributes = TNode()("replication_factor", GetFileCacheReplicationFactor());
  430. if (Options_.FileExpirationTimeout_) {
  431. attributes["expiration_timeout"] = Options_.FileExpirationTimeout_->MilliSeconds();
  432. }
  433. Create(
  434. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  435. OperationPreparer_.GetContext(),
  436. Options_.FileStorageTransactionId_,
  437. path,
  438. NT_FILE,
  439. TCreateOptions()
  440. .IgnoreExisting(true)
  441. .Recursive(true)
  442. .Attributes(attributes)
  443. );
  444. }
  445. TString TJobPreparer::PutFileToCypressCache(
  446. const TString& path,
  447. const TString& md5Signature,
  448. TTransactionId transactionId) const
  449. {
  450. constexpr ui32 LockConflictRetryCount = 30;
  451. auto retryPolicy = MakeIntrusive<TRetryPolicyIgnoringLockConflicts>(
  452. LockConflictRetryCount,
  453. OperationPreparer_.GetContext().Config);
  454. auto putFileToCacheOptions = TPutFileToCacheOptions();
  455. if (Options_.FileExpirationTimeout_) {
  456. putFileToCacheOptions.PreserveExpirationTimeout(true);
  457. }
  458. auto cachePath = PutFileToCache(
  459. retryPolicy,
  460. OperationPreparer_.GetContext(),
  461. transactionId,
  462. path,
  463. md5Signature,
  464. GetCachePath(),
  465. putFileToCacheOptions);
  466. Remove(
  467. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  468. OperationPreparer_.GetContext(),
  469. transactionId,
  470. path,
  471. TRemoveOptions().Force(true));
  472. return cachePath;
  473. }
  474. TMaybe<TString> TJobPreparer::GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const
  475. {
  476. constexpr ui32 LockConflictRetryCount = 30;
  477. auto retryPolicy = MakeIntrusive<TRetryPolicyIgnoringLockConflicts>(
  478. LockConflictRetryCount,
  479. OperationPreparer_.GetContext().Config);
  480. auto maybePath = GetFileFromCache(
  481. retryPolicy,
  482. OperationPreparer_.GetContext(),
  483. TTransactionId(),
  484. md5Signature,
  485. GetCachePath(),
  486. TGetFileFromCacheOptions());
  487. if (maybePath) {
  488. YT_LOG_DEBUG(
  489. "File is already in cache (FileName: %v, FilePath: %v)",
  490. fileName,
  491. *maybePath);
  492. }
  493. return maybePath;
  494. }
  495. TDuration TJobPreparer::GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const
  496. {
  497. TDuration extraTime = OperationPreparer_.GetContext().Config->WaitLockPollInterval +
  498. TDuration::MilliSeconds(100);
  499. auto dataSizeGb = (itemToUpload.GetDataSize() + 1_GB - 1) / 1_GB;
  500. dataSizeGb = Max<ui64>(dataSizeGb, 1);
  501. return extraTime + dataSizeGb * OperationPreparer_.GetContext().Config->CacheLockTimeoutPerGb;
  502. }
  503. TString TJobPreparer::UploadToRandomPath(const IItemToUpload& itemToUpload) const
  504. {
  505. TString uniquePath = AddPathPrefix(
  506. ::TStringBuilder() << GetFileStorage() << "/cpp_" << CreateGuidAsString(),
  507. OperationPreparer_.GetContext().Config->Prefix);
  508. YT_LOG_INFO("Uploading file to random cypress path (FileName: %v; CypressPath: %v; PreparationId: %v)",
  509. itemToUpload.GetDescription(),
  510. uniquePath,
  511. OperationPreparer_.GetPreparationId());
  512. CreateFileInCypress(uniquePath);
  513. {
  514. TFileWriter writer(
  515. uniquePath,
  516. OperationPreparer_.GetClientRetryPolicy(),
  517. OperationPreparer_.GetClient()->GetTransactionPinger(),
  518. OperationPreparer_.GetContext(),
  519. Options_.FileStorageTransactionId_,
  520. TFileWriterOptions().ComputeMD5(true));
  521. itemToUpload.CreateInputStream()->ReadAll(writer);
  522. writer.Finish();
  523. }
  524. return uniquePath;
  525. }
  526. TMaybe<TString> TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const
  527. {
  528. const auto md5Signature = itemToUpload.CalculateMD5();
  529. auto fileName = ::TStringBuilder() << GetFileStorage() << "/cpp_md5_" << md5Signature;
  530. if (OperationPreparer_.GetContext().Config->CacheUploadDeduplicationMode == EUploadDeduplicationMode::Host) {
  531. fileName << "_" << MD5::Data(TProcessState::Get()->FqdnHostName);
  532. }
  533. TString cypressPath = AddPathPrefix(fileName, OperationPreparer_.GetContext().Config->Prefix);
  534. CreateFileInCypress(cypressPath);
  535. auto uploadTx = MakeIntrusive<TTransaction>(
  536. OperationPreparer_.GetClient(),
  537. OperationPreparer_.GetContext(),
  538. TTransactionId(),
  539. TStartTransactionOptions());
  540. ILockPtr lock;
  541. try {
  542. lock = uploadTx->Lock(cypressPath, ELockMode::LM_EXCLUSIVE, TLockOptions().Waitable(true));
  543. } catch (const TErrorResponse& e) {
  544. if (e.IsResolveError()) {
  545. // If the node doesn't exist, it must be removed by concurrent uploading process.
  546. // Let's try to find it in the cache.
  547. return GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription());
  548. }
  549. throw;
  550. }
  551. auto waitTimeout = GetWaitForUploadTimeout(itemToUpload);
  552. YT_LOG_DEBUG("Waiting for the lock on file (FileName: %v; CypressPath: %v; LockTimeout: %v)",
  553. itemToUpload.GetDescription(),
  554. cypressPath,
  555. waitTimeout);
  556. if (!TWaitProxy::Get()->WaitFuture(lock->GetAcquiredFuture(), waitTimeout)) {
  557. YT_LOG_DEBUG("Waiting for the lock timed out. Fallback to random path uploading (FileName: %v; CypressPath: %v)",
  558. itemToUpload.GetDescription(),
  559. cypressPath);
  560. return Nothing();
  561. }
  562. YT_LOG_DEBUG("Exclusive lock successfully acquired (FileName: %v; CypressPath: %v)",
  563. itemToUpload.GetDescription(),
  564. cypressPath);
  565. // Ensure that this process is the first to take a lock.
  566. if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) {
  567. return *cachedItemPath;
  568. }
  569. YT_LOG_INFO("Uploading file to cypress (FileName: %v; CypressPath: %v; PreparationId: %v)",
  570. itemToUpload.GetDescription(),
  571. cypressPath,
  572. OperationPreparer_.GetPreparationId());
  573. {
  574. auto writer = uploadTx->CreateFileWriter(cypressPath, TFileWriterOptions().ComputeMD5(true));
  575. YT_VERIFY(writer);
  576. itemToUpload.CreateInputStream()->ReadAll(*writer);
  577. writer->Finish();
  578. }
  579. auto path = PutFileToCypressCache(cypressPath, md5Signature, uploadTx->GetId());
  580. uploadTx->Commit();
  581. return path;
  582. }
  583. TString TJobPreparer::UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const
  584. {
  585. auto md5Signature = itemToUpload.CalculateMD5();
  586. Y_ABORT_UNLESS(md5Signature.size() == 32);
  587. if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) {
  588. return *cachedItemPath;
  589. }
  590. YT_LOG_INFO("File not found in cache; uploading to cypress (FileName: %v; PreparationId: %v)",
  591. itemToUpload.GetDescription(),
  592. OperationPreparer_.GetPreparationId());
  593. const auto& config = OperationPreparer_.GetContext().Config;
  594. if (config->CacheUploadDeduplicationMode != EUploadDeduplicationMode::Disabled &&
  595. itemToUpload.GetDataSize() > config->CacheUploadDeduplicationThreshold) {
  596. if (auto path = TryUploadWithDeduplication(itemToUpload)) {
  597. return *path;
  598. }
  599. }
  600. auto path = UploadToRandomPath(itemToUpload);
  601. return PutFileToCypressCache(path, md5Signature, Options_.FileStorageTransactionId_);
  602. }
  603. TString TJobPreparer::UploadToCache(const IItemToUpload& itemToUpload) const
  604. {
  605. YT_LOG_INFO("Uploading file (FileName: %v; PreparationId: %v)",
  606. itemToUpload.GetDescription(),
  607. OperationPreparer_.GetPreparationId());
  608. TString result;
  609. switch (Options_.FileCacheMode_) {
  610. case TOperationOptions::EFileCacheMode::ApiCommandBased:
  611. Y_ENSURE_EX(Options_.FileStorageTransactionId_.IsEmpty(), TApiUsageError() <<
  612. "Default cache mode (API command-based) doesn't allow non-default 'FileStorageTransactionId_'");
  613. result = UploadToCacheUsingApi(itemToUpload);
  614. break;
  615. case TOperationOptions::EFileCacheMode::CachelessRandomPathUpload:
  616. result = UploadToRandomPath(itemToUpload);
  617. break;
  618. default:
  619. Y_ABORT("Unknown file cache mode: %d", static_cast<int>(Options_.FileCacheMode_));
  620. }
  621. YT_LOG_INFO("Complete uploading file (FileName: %v; PreparationId: %v)",
  622. itemToUpload.GetDescription(),
  623. OperationPreparer_.GetPreparationId());
  624. return result;
  625. }
  626. void TJobPreparer::UseFileInCypress(const TRichYPath& file)
  627. {
  628. if (!Exists(
  629. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  630. OperationPreparer_.GetContext(),
  631. file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
  632. file.Path_))
  633. {
  634. ythrow yexception() << "File " << file.Path_ << " does not exist";
  635. }
  636. if (ShouldMountSandbox()) {
  637. auto size = Get(
  638. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  639. OperationPreparer_.GetContext(),
  640. file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
  641. file.Path_ + "/@uncompressed_data_size")
  642. .AsInt64();
  643. TotalFileSize_ += RoundUpFileSize(static_cast<ui64>(size));
  644. }
  645. CypressFiles_.push_back(file);
  646. }
  647. void TJobPreparer::UploadLocalFile(
  648. const TLocalFilePath& localPath,
  649. const TAddLocalFileOptions& options,
  650. bool isApiFile)
  651. {
  652. TFsPath fsPath(localPath);
  653. fsPath.CheckExists();
  654. TFileStat stat;
  655. fsPath.Stat(stat);
  656. bool isExecutable = stat.Mode & (S_IXUSR | S_IXGRP | S_IXOTH);
  657. auto cachePath = UploadToCache(TFileToUpload(localPath, options.MD5CheckSum_));
  658. TRichYPath cypressPath;
  659. if (isApiFile) {
  660. cypressPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  661. }
  662. cypressPath.Path(cachePath).FileName(options.PathInJob_.GetOrElse(fsPath.Basename()));
  663. if (isExecutable) {
  664. cypressPath.Executable(true);
  665. }
  666. if (options.BypassArtifactCache_) {
  667. cypressPath.BypassArtifactCache(*options.BypassArtifactCache_);
  668. }
  669. if (ShouldMountSandbox()) {
  670. TotalFileSize_ += RoundUpFileSize(stat.Size);
  671. }
  672. CachedFiles_.push_back(cypressPath);
  673. }
  674. void TJobPreparer::UploadBinary(const TJobBinaryConfig& jobBinary)
  675. {
  676. if (std::holds_alternative<TJobBinaryLocalPath>(jobBinary)) {
  677. auto binaryLocalPath = std::get<TJobBinaryLocalPath>(jobBinary);
  678. auto opts = TAddLocalFileOptions().PathInJob("cppbinary");
  679. if (binaryLocalPath.MD5CheckSum) {
  680. opts.MD5CheckSum(*binaryLocalPath.MD5CheckSum);
  681. }
  682. UploadLocalFile(binaryLocalPath.Path, opts, /* isApiFile */ true);
  683. } else if (std::holds_alternative<TJobBinaryCypressPath>(jobBinary)) {
  684. auto binaryCypressPath = std::get<TJobBinaryCypressPath>(jobBinary);
  685. TRichYPath ytPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  686. ytPath.Path(binaryCypressPath.Path);
  687. if (binaryCypressPath.TransactionId) {
  688. ytPath.TransactionId(*binaryCypressPath.TransactionId);
  689. }
  690. UseFileInCypress(ytPath.FileName("cppbinary").Executable(true));
  691. } else {
  692. Y_ABORT("%s", (::TStringBuilder() << "Unexpected jobBinary tag: " << jobBinary.index()).data());
  693. }
  694. }
  695. void TJobPreparer::UploadSmallFile(const TSmallJobFile& smallFile)
  696. {
  697. auto cachePath = UploadToCache(TDataToUpload(smallFile.Data, smallFile.FileName + " [generated-file]"));
  698. auto path = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  699. CachedFiles_.push_back(path.Path(cachePath).FileName(smallFile.FileName));
  700. if (ShouldMountSandbox()) {
  701. TotalFileSize_ += RoundUpFileSize(smallFile.Data.size());
  702. }
  703. }
  704. bool TJobPreparer::IsLocalMode() const
  705. {
  706. return UseLocalModeOptimization(OperationPreparer_.GetContext(), OperationPreparer_.GetClientRetryPolicy());
  707. }
  708. void TJobPreparer::PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState)
  709. {
  710. auto jobBinary = TJobBinaryConfig();
  711. if (!std::holds_alternative<TJobBinaryDefault>(Spec_.GetJobBinary())) {
  712. jobBinary = Spec_.GetJobBinary();
  713. }
  714. TString binaryPathInsideJob;
  715. if (std::holds_alternative<TJobBinaryDefault>(jobBinary)) {
  716. if (GetInitStatus() != EInitStatus::FullInitialization) {
  717. ythrow yexception() << "NYT::Initialize() must be called prior to any operation";
  718. }
  719. const bool isLocalMode = IsLocalMode();
  720. const TMaybe<TString> md5 = !isLocalMode ? MakeMaybe(GetPersistentExecPathMd5()) : Nothing();
  721. jobBinary = TJobBinaryLocalPath{GetPersistentExecPath(), md5};
  722. if (isLocalMode) {
  723. binaryPathInsideJob = GetExecPath();
  724. }
  725. } else if (std::holds_alternative<TJobBinaryLocalPath>(jobBinary)) {
  726. const bool isLocalMode = IsLocalMode();
  727. if (isLocalMode) {
  728. binaryPathInsideJob = TFsPath(std::get<TJobBinaryLocalPath>(jobBinary).Path).RealPath();
  729. }
  730. }
  731. Y_ASSERT(!std::holds_alternative<TJobBinaryDefault>(jobBinary));
  732. // binaryPathInsideJob is only set when LocalModeOptimization option is on, so upload is not needed
  733. if (!binaryPathInsideJob) {
  734. binaryPathInsideJob = "./cppbinary";
  735. UploadBinary(jobBinary);
  736. }
  737. TString jobCommandPrefix = Options_.JobCommandPrefix_;
  738. if (!Spec_.JobCommandPrefix_.empty()) {
  739. jobCommandPrefix = Spec_.JobCommandPrefix_;
  740. }
  741. TString jobCommandSuffix = Options_.JobCommandSuffix_;
  742. if (!Spec_.JobCommandSuffix_.empty()) {
  743. jobCommandSuffix = Spec_.JobCommandSuffix_;
  744. }
  745. ClassName_ = TJobFactory::Get()->GetJobName(&job);
  746. auto jobArguments = TNode::CreateMap();
  747. jobArguments["job_name"] = ClassName_;
  748. jobArguments["output_table_count"] = static_cast<i64>(outputTableCount);
  749. jobArguments["has_state"] = hasState;
  750. Spec_.AddEnvironment("YT_JOB_ARGUMENTS", NodeToYsonString(jobArguments));
  751. Command_ = ::TStringBuilder() <<
  752. jobCommandPrefix <<
  753. (OperationPreparer_.GetContext().Config->UseClientProtobuf ? "YT_USE_CLIENT_PROTOBUF=1" : "YT_USE_CLIENT_PROTOBUF=0") << " " <<
  754. binaryPathInsideJob <<
  755. jobCommandSuffix;
  756. }
  757. ////////////////////////////////////////////////////////////////////////////////
  758. } // namespace NYT::NDetail