operation_preparer.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891
  1. #include "operation_preparer.h"
  2. #include "init.h"
  3. #include "file_writer.h"
  4. #include "operation.h"
  5. #include "operation_helpers.h"
  6. #include "operation_tracker.h"
  7. #include "transaction.h"
  8. #include "transaction_pinger.h"
  9. #include "yt_poller.h"
  10. #include <yt/cpp/mapreduce/common/helpers.h>
  11. #include <yt/cpp/mapreduce/common/retry_lib.h>
  12. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  13. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  14. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  15. #include <yt/cpp/mapreduce/interface/error_codes.h>
  16. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  17. #include <library/cpp/digest/md5/md5.h>
  18. #include <util/folder/path.h>
  19. #include <util/string/builder.h>
  20. #include <util/system/execpath.h>
  21. namespace NYT::NDetail {
  22. using namespace NRawClient;
  23. ////////////////////////////////////////////////////////////////////////////////
  24. class TWaitOperationStartPollerItem
  25. : public IYtPollerItem
  26. {
  27. public:
  28. TWaitOperationStartPollerItem(TOperationId operationId, THolder<TPingableTransaction> transaction)
  29. : OperationId_(operationId)
  30. , Transaction_(std::move(transaction))
  31. { }
  32. void PrepareRequest(TRawBatchRequest* batchRequest) override
  33. {
  34. Future_ = batchRequest->GetOperation(
  35. OperationId_,
  36. TGetOperationOptions().AttributeFilter(
  37. TOperationAttributeFilter().Add(EOperationAttribute::State)));
  38. }
  39. EStatus OnRequestExecuted() override
  40. {
  41. try {
  42. auto attributes = Future_.GetValue();
  43. Y_ENSURE(attributes.State.Defined());
  44. bool operationHasLockedFiles =
  45. *attributes.State != "starting" &&
  46. *attributes.State != "pending" &&
  47. *attributes.State != "orphaned" &&
  48. *attributes.State != "waiting_for_agent" &&
  49. *attributes.State != "initializing";
  50. return operationHasLockedFiles ? EStatus::PollBreak : EStatus::PollContinue;
  51. } catch (const TErrorResponse& e) {
  52. YT_LOG_ERROR("get_operation request failed: %v (RequestId: %v)",
  53. e.GetError().GetMessage(),
  54. e.GetRequestId());
  55. return IsRetriable(e) ? PollContinue : PollBreak;
  56. } catch (const std::exception& e) {
  57. YT_LOG_ERROR("%v", e.what());
  58. return PollBreak;
  59. }
  60. }
  61. void OnItemDiscarded() override {
  62. }
  63. private:
  64. TOperationId OperationId_;
  65. THolder<TPingableTransaction> Transaction_;
  66. ::NThreading::TFuture<TOperationAttributes> Future_;
  67. };
  68. ////////////////////////////////////////////////////////////////////////////////
  69. class TOperationForwardingRequestRetryPolicy
  70. : public IRequestRetryPolicy
  71. {
  72. public:
  73. TOperationForwardingRequestRetryPolicy(const IRequestRetryPolicyPtr& underlying, const TOperationPtr& operation)
  74. : Underlying_(underlying)
  75. , Operation_(operation)
  76. { }
  77. void NotifyNewAttempt() override
  78. {
  79. Underlying_->NotifyNewAttempt();
  80. }
  81. TMaybe<TDuration> OnGenericError(const std::exception& e) override
  82. {
  83. UpdateOperationStatus(e.what());
  84. return Underlying_->OnGenericError(e);
  85. }
  86. TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
  87. {
  88. auto msg = e.GetError().ShortDescription();
  89. UpdateOperationStatus(msg);
  90. return Underlying_->OnRetriableError(e);
  91. }
  92. void OnIgnoredError(const TErrorResponse& e) override
  93. {
  94. Underlying_->OnIgnoredError(e);
  95. }
  96. TString GetAttemptDescription() const override
  97. {
  98. return Underlying_->GetAttemptDescription();
  99. }
  100. private:
  101. void UpdateOperationStatus(TStringBuf err)
  102. {
  103. Y_ABORT_UNLESS(Operation_);
  104. Operation_->OnStatusUpdated(
  105. ::TStringBuilder() << "Retriable error during operation start: " << err);
  106. }
  107. private:
  108. IRequestRetryPolicyPtr Underlying_;
  109. TOperationPtr Operation_;
  110. };
  111. ////////////////////////////////////////////////////////////////////////////////
  112. TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transactionId)
  113. : Client_(std::move(client))
  114. , TransactionId_(transactionId)
  115. , FileTransaction_(MakeHolder<TPingableTransaction>(
  116. Client_->GetRetryPolicy(),
  117. Client_->GetContext(),
  118. TransactionId_,
  119. Client_->GetTransactionPinger()->GetChildTxPinger(),
  120. TStartTransactionOptions()))
  121. , ClientRetryPolicy_(Client_->GetRetryPolicy())
  122. , PreparationId_(CreateGuidAsString())
  123. { }
  124. const TClientContext& TOperationPreparer::GetContext() const
  125. {
  126. return Client_->GetContext();
  127. }
  128. TTransactionId TOperationPreparer::GetTransactionId() const
  129. {
  130. return TransactionId_;
  131. }
  132. TClientPtr TOperationPreparer::GetClient() const
  133. {
  134. return Client_;
  135. }
  136. const TString& TOperationPreparer::GetPreparationId() const
  137. {
  138. return PreparationId_;
  139. }
  140. const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const
  141. {
  142. return ClientRetryPolicy_;
  143. }
  144. TOperationId TOperationPreparer::StartOperation(
  145. TOperation* operation,
  146. const TString& operationType,
  147. const TNode& spec,
  148. bool useStartOperationRequest)
  149. {
  150. CheckValidity();
  151. THttpHeader header("POST", (useStartOperationRequest ? "start_op" : operationType));
  152. if (useStartOperationRequest) {
  153. header.AddParameter("operation_type", operationType);
  154. }
  155. header.AddTransactionId(TransactionId_);
  156. header.AddMutationId();
  157. auto ysonSpec = NodeToYsonString(spec);
  158. auto responseInfo = RetryRequestWithPolicy(
  159. ::MakeIntrusive<TOperationForwardingRequestRetryPolicy>(
  160. ClientRetryPolicy_->CreatePolicyForStartOperationRequest(),
  161. TOperationPtr(operation)),
  162. GetContext(),
  163. header,
  164. ysonSpec);
  165. TOperationId operationId = ParseGuidFromResponse(responseInfo.Response);
  166. YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)",
  167. operationId,
  168. GetPreparationId());
  169. YT_LOG_INFO("Operation %v started (%v): %v",
  170. operationId,
  171. operationType,
  172. GetOperationWebInterfaceUrl(GetContext().ServerName, operationId));
  173. TOperationExecutionTimeTracker::Get()->Start(operationId);
  174. Client_->GetYtPoller().Watch(
  175. new TWaitOperationStartPollerItem(operationId, std::move(FileTransaction_)));
  176. return operationId;
  177. }
  178. void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths)
  179. {
  180. CheckValidity();
  181. TVector<::NThreading::TFuture<TLockId>> lockIdFutures;
  182. lockIdFutures.reserve(paths->size());
  183. TRawBatchRequest lockRequest(GetContext().Config);
  184. for (const auto& path : *paths) {
  185. lockIdFutures.push_back(lockRequest.Lock(
  186. FileTransaction_->GetId(),
  187. path.Path_,
  188. ELockMode::LM_SNAPSHOT,
  189. TLockOptions().Waitable(true)));
  190. }
  191. ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), lockRequest);
  192. TVector<::NThreading::TFuture<TNode>> nodeIdFutures;
  193. nodeIdFutures.reserve(paths->size());
  194. TRawBatchRequest getNodeIdRequest(GetContext().Config);
  195. for (const auto& lockIdFuture : lockIdFutures) {
  196. nodeIdFutures.push_back(getNodeIdRequest.Get(
  197. FileTransaction_->GetId(),
  198. ::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id",
  199. TGetOptions()));
  200. }
  201. ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), getNodeIdRequest);
  202. for (size_t i = 0; i != paths->size(); ++i) {
  203. auto& richPath = (*paths)[i];
  204. richPath.OriginalPath(richPath.Path_);
  205. richPath.Path("#" + nodeIdFutures[i].GetValue().AsString());
  206. YT_LOG_DEBUG("Locked file %v, new path is %v",
  207. *richPath.OriginalPath_,
  208. richPath.Path_);
  209. }
  210. }
  211. void TOperationPreparer::CheckValidity() const
  212. {
  213. Y_ENSURE(
  214. FileTransaction_,
  215. "File transaction is already moved, are you trying to use preparer for more than one operation?");
  216. }
  217. ////////////////////////////////////////////////////////////////////////////////
  218. class TRetryPolicyIgnoringLockConflicts
  219. : public TAttemptLimitedRetryPolicy
  220. {
  221. public:
  222. using TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy;
  223. using TAttemptLimitedRetryPolicy::OnGenericError;
  224. TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
  225. {
  226. if (IsAttemptLimitExceeded()) {
  227. return Nothing();
  228. }
  229. if (e.IsConcurrentTransactionLockConflict()) {
  230. return GetBackoffDuration(Config_);
  231. }
  232. return TAttemptLimitedRetryPolicy::OnRetriableError(e);
  233. }
  234. };
  235. ////////////////////////////////////////////////////////////////////////////////
  236. class TFileToUpload
  237. : public IItemToUpload
  238. {
  239. public:
  240. TFileToUpload(TString fileName, TMaybe<TString> md5)
  241. : FileName_(std::move(fileName))
  242. , MD5_(std::move(md5))
  243. { }
  244. TString CalculateMD5() const override
  245. {
  246. if (MD5_) {
  247. return *MD5_;
  248. }
  249. constexpr size_t md5Size = 32;
  250. TString result;
  251. result.ReserveAndResize(md5Size);
  252. MD5::File(FileName_.data(), result.Detach());
  253. MD5_ = result;
  254. return result;
  255. }
  256. THolder<IInputStream> CreateInputStream() const override
  257. {
  258. return MakeHolder<TFileInput>(FileName_);
  259. }
  260. TString GetDescription() const override
  261. {
  262. return FileName_;
  263. }
  264. i64 GetDataSize() const override
  265. {
  266. return GetFileLength(FileName_);
  267. }
  268. private:
  269. TString FileName_;
  270. mutable TMaybe<TString> MD5_;
  271. };
  272. class TDataToUpload
  273. : public IItemToUpload
  274. {
  275. public:
  276. TDataToUpload(TString data, TString description)
  277. : Data_(std::move(data))
  278. , Description_(std::move(description))
  279. { }
  280. TString CalculateMD5() const override
  281. {
  282. constexpr size_t md5Size = 32;
  283. TString result;
  284. result.ReserveAndResize(md5Size);
  285. MD5::Data(reinterpret_cast<const unsigned char*>(Data_.data()), Data_.size(), result.Detach());
  286. return result;
  287. }
  288. THolder<IInputStream> CreateInputStream() const override
  289. {
  290. return MakeHolder<TMemoryInput>(Data_.data(), Data_.size());
  291. }
  292. TString GetDescription() const override
  293. {
  294. return Description_;
  295. }
  296. i64 GetDataSize() const override
  297. {
  298. return std::ssize(Data_);
  299. }
  300. private:
  301. TString Data_;
  302. TString Description_;
  303. };
  304. ////////////////////////////////////////////////////////////////////////////////
  305. static const TString& GetPersistentExecPathMd5()
  306. {
  307. static TString md5 = MD5::File(GetPersistentExecPath());
  308. return md5;
  309. }
  310. static TMaybe<TSmallJobFile> GetJobState(const IJob& job)
  311. {
  312. TString result;
  313. {
  314. TStringOutput output(result);
  315. job.Save(output);
  316. output.Finish();
  317. }
  318. if (result.empty()) {
  319. return Nothing();
  320. } else {
  321. return TSmallJobFile{"jobstate", result};
  322. }
  323. }
  324. ////////////////////////////////////////////////////////////////////////////////
  325. TJobPreparer::TJobPreparer(
  326. TOperationPreparer& operationPreparer,
  327. const TUserJobSpec& spec,
  328. const IJob& job,
  329. size_t outputTableCount,
  330. const TVector<TSmallJobFile>& smallFileList,
  331. const TOperationOptions& options)
  332. : OperationPreparer_(operationPreparer)
  333. , Spec_(spec)
  334. , Options_(options)
  335. , Layers_(spec.Layers_)
  336. {
  337. CreateStorage();
  338. auto cypressFileList = CanonizeYPaths(/* retryPolicy */ nullptr, OperationPreparer_.GetContext(), spec.Files_);
  339. for (const auto& file : cypressFileList) {
  340. UseFileInCypress(file);
  341. }
  342. for (const auto& localFile : spec.GetLocalFiles()) {
  343. UploadLocalFile(std::get<0>(localFile), std::get<1>(localFile));
  344. }
  345. auto jobStateSmallFile = GetJobState(job);
  346. if (jobStateSmallFile) {
  347. UploadSmallFile(*jobStateSmallFile);
  348. }
  349. for (const auto& smallFile : smallFileList) {
  350. UploadSmallFile(smallFile);
  351. }
  352. if (auto commandJob = dynamic_cast<const ICommandJob*>(&job)) {
  353. ClassName_ = TJobFactory::Get()->GetJobName(&job);
  354. Command_ = commandJob->GetCommand();
  355. } else {
  356. PrepareJobBinary(job, outputTableCount, jobStateSmallFile.Defined());
  357. }
  358. operationPreparer.LockFiles(&CachedFiles_);
  359. }
  360. TVector<TRichYPath> TJobPreparer::GetFiles() const
  361. {
  362. TVector<TRichYPath> allFiles = CypressFiles_;
  363. allFiles.insert(allFiles.end(), CachedFiles_.begin(), CachedFiles_.end());
  364. return allFiles;
  365. }
  366. TVector<TYPath> TJobPreparer::GetLayers() const
  367. {
  368. return Layers_;
  369. }
  370. const TString& TJobPreparer::GetClassName() const
  371. {
  372. return ClassName_;
  373. }
  374. const TString& TJobPreparer::GetCommand() const
  375. {
  376. return Command_;
  377. }
  378. const TUserJobSpec& TJobPreparer::GetSpec() const
  379. {
  380. return Spec_;
  381. }
  382. bool TJobPreparer::ShouldMountSandbox() const
  383. {
  384. return OperationPreparer_.GetContext().Config->MountSandboxInTmpfs || Options_.MountSandboxInTmpfs_;
  385. }
  386. ui64 TJobPreparer::GetTotalFileSize() const
  387. {
  388. return TotalFileSize_;
  389. }
  390. TString TJobPreparer::GetFileStorage() const
  391. {
  392. return Options_.FileStorage_ ?
  393. *Options_.FileStorage_ :
  394. OperationPreparer_.GetContext().Config->RemoteTempFilesDirectory;
  395. }
  396. TYPath TJobPreparer::GetCachePath() const
  397. {
  398. return AddPathPrefix(
  399. ::TStringBuilder() << GetFileStorage() << "/new_cache",
  400. OperationPreparer_.GetContext().Config->Prefix);
  401. }
  402. void TJobPreparer::CreateStorage() const
  403. {
  404. Create(
  405. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  406. OperationPreparer_.GetContext(),
  407. Options_.FileStorageTransactionId_,
  408. GetCachePath(),
  409. NT_MAP,
  410. TCreateOptions()
  411. .IgnoreExisting(true)
  412. .Recursive(true));
  413. }
  414. int TJobPreparer::GetFileCacheReplicationFactor() const
  415. {
  416. if (IsLocalMode()) {
  417. return 1;
  418. } else {
  419. return OperationPreparer_.GetContext().Config->FileCacheReplicationFactor;
  420. }
  421. }
  422. void TJobPreparer::CreateFileInCypress(const TString& path) const
  423. {
  424. auto attributes = TNode()("replication_factor", GetFileCacheReplicationFactor());
  425. if (Options_.FileExpirationTimeout_) {
  426. attributes["expiration_timeout"] = Options_.FileExpirationTimeout_->MilliSeconds();
  427. }
  428. Create(
  429. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  430. OperationPreparer_.GetContext(),
  431. Options_.FileStorageTransactionId_,
  432. path,
  433. NT_FILE,
  434. TCreateOptions()
  435. .IgnoreExisting(true)
  436. .Recursive(true)
  437. .Attributes(attributes)
  438. );
  439. }
  440. TString TJobPreparer::PutFileToCypressCache(
  441. const TString& path,
  442. const TString& md5Signature,
  443. TTransactionId transactionId) const
  444. {
  445. constexpr ui32 LockConflictRetryCount = 30;
  446. auto retryPolicy = MakeIntrusive<TRetryPolicyIgnoringLockConflicts>(
  447. LockConflictRetryCount,
  448. OperationPreparer_.GetContext().Config);
  449. auto putFileToCacheOptions = TPutFileToCacheOptions();
  450. if (Options_.FileExpirationTimeout_) {
  451. putFileToCacheOptions.PreserveExpirationTimeout(true);
  452. }
  453. auto cachePath = PutFileToCache(
  454. retryPolicy,
  455. OperationPreparer_.GetContext(),
  456. transactionId,
  457. path,
  458. md5Signature,
  459. GetCachePath(),
  460. putFileToCacheOptions);
  461. Remove(
  462. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  463. OperationPreparer_.GetContext(),
  464. transactionId,
  465. path,
  466. TRemoveOptions().Force(true));
  467. return cachePath;
  468. }
  469. TMaybe<TString> TJobPreparer::GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const
  470. {
  471. constexpr ui32 LockConflictRetryCount = 30;
  472. auto retryPolicy = MakeIntrusive<TRetryPolicyIgnoringLockConflicts>(
  473. LockConflictRetryCount,
  474. OperationPreparer_.GetContext().Config);
  475. auto maybePath = GetFileFromCache(
  476. retryPolicy,
  477. OperationPreparer_.GetContext(),
  478. TTransactionId(),
  479. md5Signature,
  480. GetCachePath(),
  481. TGetFileFromCacheOptions());
  482. if (maybePath) {
  483. YT_LOG_DEBUG("File is already in cache (FileName: %v)",
  484. fileName,
  485. *maybePath);
  486. }
  487. return maybePath;
  488. }
  489. TDuration TJobPreparer::GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const
  490. {
  491. TDuration extraTime = OperationPreparer_.GetContext().Config->WaitLockPollInterval +
  492. TDuration::MilliSeconds(100);
  493. auto dataSizeGb = (itemToUpload.GetDataSize() + 1_GB - 1) / 1_GB;
  494. dataSizeGb = Max<ui64>(dataSizeGb, 1);
  495. return extraTime + dataSizeGb * OperationPreparer_.GetContext().Config->CacheLockTimeoutPerGb;
  496. }
  497. TString TJobPreparer::UploadToRandomPath(const IItemToUpload& itemToUpload) const
  498. {
  499. TString uniquePath = AddPathPrefix(
  500. ::TStringBuilder() << GetFileStorage() << "/cpp_" << CreateGuidAsString(),
  501. OperationPreparer_.GetContext().Config->Prefix);
  502. YT_LOG_INFO("Uploading file to random cypress path (FileName: %v; CypressPath: %v; PreparationId: %v)",
  503. itemToUpload.GetDescription(),
  504. uniquePath,
  505. OperationPreparer_.GetPreparationId());
  506. CreateFileInCypress(uniquePath);
  507. {
  508. TFileWriter writer(
  509. uniquePath,
  510. OperationPreparer_.GetClientRetryPolicy(),
  511. OperationPreparer_.GetClient()->GetTransactionPinger(),
  512. OperationPreparer_.GetContext(),
  513. Options_.FileStorageTransactionId_,
  514. TFileWriterOptions().ComputeMD5(true));
  515. itemToUpload.CreateInputStream()->ReadAll(writer);
  516. writer.Finish();
  517. }
  518. return uniquePath;
  519. }
  520. TMaybe<TString> TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const
  521. {
  522. const auto md5Signature = itemToUpload.CalculateMD5();
  523. auto fileName = ::TStringBuilder() << GetFileStorage() << "/cpp_md5_" << md5Signature;
  524. if (OperationPreparer_.GetContext().Config->CacheUploadDeduplicationMode == EUploadDeduplicationMode::Host) {
  525. fileName << "_" << MD5::Data(TProcessState::Get()->FqdnHostName);
  526. }
  527. TString cypressPath = AddPathPrefix(fileName, OperationPreparer_.GetContext().Config->Prefix);
  528. CreateFileInCypress(cypressPath);
  529. auto uploadTx = MakeIntrusive<TTransaction>(
  530. OperationPreparer_.GetClient(),
  531. OperationPreparer_.GetContext(),
  532. TTransactionId(),
  533. TStartTransactionOptions());
  534. ILockPtr lock;
  535. try {
  536. lock = uploadTx->Lock(cypressPath, ELockMode::LM_EXCLUSIVE, TLockOptions().Waitable(true));
  537. } catch (const TErrorResponse& e) {
  538. if (e.IsResolveError()) {
  539. // If the node doesn't exist, it must be removed by concurrent uploading process.
  540. // Let's try to find it in the cache.
  541. return GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription());
  542. }
  543. throw;
  544. }
  545. auto waitTimeout = GetWaitForUploadTimeout(itemToUpload);
  546. YT_LOG_DEBUG("Waiting for the lock on file (FileName: %v; CypressPath: %v; LockTimeout: %v)",
  547. itemToUpload.GetDescription(),
  548. cypressPath,
  549. waitTimeout);
  550. if (!TWaitProxy::Get()->WaitFuture(lock->GetAcquiredFuture(), waitTimeout)) {
  551. YT_LOG_DEBUG("Waiting for the lock timed out. Fallback to random path uploading (FileName: %v; CypressPath: %v)",
  552. itemToUpload.GetDescription(),
  553. cypressPath);
  554. return Nothing();
  555. }
  556. YT_LOG_DEBUG("Exclusive lock successfully acquired (FileName: %v; CypressPath: %v)",
  557. itemToUpload.GetDescription(),
  558. cypressPath);
  559. // Ensure that this process is the first to take a lock.
  560. if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) {
  561. return *cachedItemPath;
  562. }
  563. YT_LOG_INFO("Uploading file to cypress (FileName: %v; CypressPath: %v; PreparationId: %v)",
  564. itemToUpload.GetDescription(),
  565. cypressPath,
  566. OperationPreparer_.GetPreparationId());
  567. {
  568. auto writer = uploadTx->CreateFileWriter(cypressPath, TFileWriterOptions().ComputeMD5(true));
  569. YT_VERIFY(writer);
  570. itemToUpload.CreateInputStream()->ReadAll(*writer);
  571. writer->Finish();
  572. }
  573. auto path = PutFileToCypressCache(cypressPath, md5Signature, uploadTx->GetId());
  574. uploadTx->Commit();
  575. return path;
  576. }
  577. TString TJobPreparer::UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const
  578. {
  579. auto md5Signature = itemToUpload.CalculateMD5();
  580. Y_ABORT_UNLESS(md5Signature.size() == 32);
  581. if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) {
  582. return *cachedItemPath;
  583. }
  584. YT_LOG_INFO("File not found in cache; uploading to cypress (FileName: %v; PreparationId: %v)",
  585. itemToUpload.GetDescription(),
  586. OperationPreparer_.GetPreparationId());
  587. const auto& config = OperationPreparer_.GetContext().Config;
  588. if (config->CacheUploadDeduplicationMode != EUploadDeduplicationMode::Disabled &&
  589. itemToUpload.GetDataSize() > config->CacheUploadDeduplicationThreshold) {
  590. if (auto path = TryUploadWithDeduplication(itemToUpload)) {
  591. return *path;
  592. }
  593. }
  594. auto path = UploadToRandomPath(itemToUpload);
  595. return PutFileToCypressCache(path, md5Signature, Options_.FileStorageTransactionId_);
  596. }
  597. TString TJobPreparer::UploadToCache(const IItemToUpload& itemToUpload) const
  598. {
  599. YT_LOG_INFO("Uploading file (FileName: %v; PreparationId: %v)",
  600. itemToUpload.GetDescription(),
  601. OperationPreparer_.GetPreparationId());
  602. TString result;
  603. switch (Options_.FileCacheMode_) {
  604. case TOperationOptions::EFileCacheMode::ApiCommandBased:
  605. Y_ENSURE_EX(Options_.FileStorageTransactionId_.IsEmpty(), TApiUsageError() <<
  606. "Default cache mode (API command-based) doesn't allow non-default 'FileStorageTransactionId_'");
  607. result = UploadToCacheUsingApi(itemToUpload);
  608. break;
  609. case TOperationOptions::EFileCacheMode::CachelessRandomPathUpload:
  610. result = UploadToRandomPath(itemToUpload);
  611. break;
  612. default:
  613. Y_ABORT("Unknown file cache mode: %d", static_cast<int>(Options_.FileCacheMode_));
  614. }
  615. YT_LOG_INFO("Complete uploading file (FileName: %v; PreparationId: %v)",
  616. itemToUpload.GetDescription(),
  617. OperationPreparer_.GetPreparationId());
  618. return result;
  619. }
  620. void TJobPreparer::UseFileInCypress(const TRichYPath& file)
  621. {
  622. if (!Exists(
  623. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  624. OperationPreparer_.GetContext(),
  625. file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
  626. file.Path_))
  627. {
  628. ythrow yexception() << "File " << file.Path_ << " does not exist";
  629. }
  630. if (ShouldMountSandbox()) {
  631. auto size = Get(
  632. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  633. OperationPreparer_.GetContext(),
  634. file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
  635. file.Path_ + "/@uncompressed_data_size")
  636. .AsInt64();
  637. TotalFileSize_ += RoundUpFileSize(static_cast<ui64>(size));
  638. }
  639. CypressFiles_.push_back(file);
  640. }
  641. void TJobPreparer::UploadLocalFile(
  642. const TLocalFilePath& localPath,
  643. const TAddLocalFileOptions& options,
  644. bool isApiFile)
  645. {
  646. TFsPath fsPath(localPath);
  647. fsPath.CheckExists();
  648. TFileStat stat;
  649. fsPath.Stat(stat);
  650. bool isExecutable = stat.Mode & (S_IXUSR | S_IXGRP | S_IXOTH);
  651. auto cachePath = UploadToCache(TFileToUpload(localPath, options.MD5CheckSum_));
  652. TRichYPath cypressPath;
  653. if (isApiFile) {
  654. cypressPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  655. }
  656. cypressPath.Path(cachePath).FileName(options.PathInJob_.GetOrElse(fsPath.Basename()));
  657. if (isExecutable) {
  658. cypressPath.Executable(true);
  659. }
  660. if (options.BypassArtifactCache_) {
  661. cypressPath.BypassArtifactCache(*options.BypassArtifactCache_);
  662. }
  663. if (ShouldMountSandbox()) {
  664. TotalFileSize_ += RoundUpFileSize(stat.Size);
  665. }
  666. CachedFiles_.push_back(cypressPath);
  667. }
  668. void TJobPreparer::UploadBinary(const TJobBinaryConfig& jobBinary)
  669. {
  670. if (std::holds_alternative<TJobBinaryLocalPath>(jobBinary)) {
  671. auto binaryLocalPath = std::get<TJobBinaryLocalPath>(jobBinary);
  672. auto opts = TAddLocalFileOptions().PathInJob("cppbinary");
  673. if (binaryLocalPath.MD5CheckSum) {
  674. opts.MD5CheckSum(*binaryLocalPath.MD5CheckSum);
  675. }
  676. UploadLocalFile(binaryLocalPath.Path, opts, /* isApiFile */ true);
  677. } else if (std::holds_alternative<TJobBinaryCypressPath>(jobBinary)) {
  678. auto binaryCypressPath = std::get<TJobBinaryCypressPath>(jobBinary);
  679. TRichYPath ytPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  680. ytPath.Path(binaryCypressPath.Path);
  681. if (binaryCypressPath.TransactionId) {
  682. ytPath.TransactionId(*binaryCypressPath.TransactionId);
  683. }
  684. UseFileInCypress(ytPath.FileName("cppbinary").Executable(true));
  685. } else {
  686. Y_ABORT("%s", (::TStringBuilder() << "Unexpected jobBinary tag: " << jobBinary.index()).data());
  687. }
  688. }
  689. void TJobPreparer::UploadSmallFile(const TSmallJobFile& smallFile)
  690. {
  691. auto cachePath = UploadToCache(TDataToUpload(smallFile.Data, smallFile.FileName + " [generated-file]"));
  692. auto path = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  693. CachedFiles_.push_back(path.Path(cachePath).FileName(smallFile.FileName));
  694. if (ShouldMountSandbox()) {
  695. TotalFileSize_ += RoundUpFileSize(smallFile.Data.size());
  696. }
  697. }
  698. bool TJobPreparer::IsLocalMode() const
  699. {
  700. return UseLocalModeOptimization(OperationPreparer_.GetContext(), OperationPreparer_.GetClientRetryPolicy());
  701. }
  702. void TJobPreparer::PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState)
  703. {
  704. auto jobBinary = TJobBinaryConfig();
  705. if (!std::holds_alternative<TJobBinaryDefault>(Spec_.GetJobBinary())) {
  706. jobBinary = Spec_.GetJobBinary();
  707. }
  708. TString binaryPathInsideJob;
  709. if (std::holds_alternative<TJobBinaryDefault>(jobBinary)) {
  710. if (GetInitStatus() != EInitStatus::FullInitialization) {
  711. ythrow yexception() << "NYT::Initialize() must be called prior to any operation";
  712. }
  713. const bool isLocalMode = IsLocalMode();
  714. const TMaybe<TString> md5 = !isLocalMode ? MakeMaybe(GetPersistentExecPathMd5()) : Nothing();
  715. jobBinary = TJobBinaryLocalPath{GetPersistentExecPath(), md5};
  716. if (isLocalMode) {
  717. binaryPathInsideJob = GetExecPath();
  718. }
  719. } else if (std::holds_alternative<TJobBinaryLocalPath>(jobBinary)) {
  720. const bool isLocalMode = IsLocalMode();
  721. if (isLocalMode) {
  722. binaryPathInsideJob = TFsPath(std::get<TJobBinaryLocalPath>(jobBinary).Path).RealPath();
  723. }
  724. }
  725. Y_ASSERT(!std::holds_alternative<TJobBinaryDefault>(jobBinary));
  726. // binaryPathInsideJob is only set when LocalModeOptimization option is on, so upload is not needed
  727. if (!binaryPathInsideJob) {
  728. binaryPathInsideJob = "./cppbinary";
  729. UploadBinary(jobBinary);
  730. }
  731. TString jobCommandPrefix = Options_.JobCommandPrefix_;
  732. if (!Spec_.JobCommandPrefix_.empty()) {
  733. jobCommandPrefix = Spec_.JobCommandPrefix_;
  734. }
  735. TString jobCommandSuffix = Options_.JobCommandSuffix_;
  736. if (!Spec_.JobCommandSuffix_.empty()) {
  737. jobCommandSuffix = Spec_.JobCommandSuffix_;
  738. }
  739. ClassName_ = TJobFactory::Get()->GetJobName(&job);
  740. auto jobArguments = TNode::CreateMap();
  741. jobArguments["job_name"] = ClassName_;
  742. jobArguments["output_table_count"] = static_cast<i64>(outputTableCount);
  743. jobArguments["has_state"] = hasState;
  744. Spec_.AddEnvironment("YT_JOB_ARGUMENTS", NodeToYsonString(jobArguments));
  745. Command_ = ::TStringBuilder() <<
  746. jobCommandPrefix <<
  747. (OperationPreparer_.GetContext().Config->UseClientProtobuf ? "YT_USE_CLIENT_PROTOBUF=1" : "YT_USE_CLIENT_PROTOBUF=0") << " " <<
  748. binaryPathInsideJob <<
  749. jobCommandSuffix;
  750. }
  751. ////////////////////////////////////////////////////////////////////////////////
  752. } // namespace NYT::NDetail