operation_preparer.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887
  1. #include "operation_preparer.h"
  2. #include "init.h"
  3. #include "file_writer.h"
  4. #include "operation.h"
  5. #include "operation_helpers.h"
  6. #include "operation_tracker.h"
  7. #include "transaction.h"
  8. #include "transaction_pinger.h"
  9. #include "yt_poller.h"
  10. #include <yt/cpp/mapreduce/common/helpers.h>
  11. #include <yt/cpp/mapreduce/common/retry_lib.h>
  12. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  13. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  14. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  15. #include <yt/cpp/mapreduce/interface/error_codes.h>
  16. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  17. #include <library/cpp/digest/md5/md5.h>
  18. #include <util/folder/path.h>
  19. #include <util/string/builder.h>
  20. #include <util/system/execpath.h>
  21. namespace NYT::NDetail {
  22. using namespace NRawClient;
  23. ////////////////////////////////////////////////////////////////////////////////
  24. class TWaitOperationStartPollerItem
  25. : public IYtPollerItem
  26. {
  27. public:
  28. TWaitOperationStartPollerItem(TOperationId operationId, THolder<TPingableTransaction> transaction)
  29. : OperationId_(operationId)
  30. , Transaction_(std::move(transaction))
  31. { }
  32. void PrepareRequest(TRawBatchRequest* batchRequest) override
  33. {
  34. Future_ = batchRequest->GetOperation(
  35. OperationId_,
  36. TGetOperationOptions().AttributeFilter(
  37. TOperationAttributeFilter().Add(EOperationAttribute::State)));
  38. }
  39. EStatus OnRequestExecuted() override
  40. {
  41. try {
  42. auto attributes = Future_.GetValue();
  43. Y_ENSURE(attributes.State.Defined());
  44. bool operationHasLockedFiles =
  45. *attributes.State != "starting" &&
  46. *attributes.State != "pending" &&
  47. *attributes.State != "orphaned" &&
  48. *attributes.State != "waiting_for_agent" &&
  49. *attributes.State != "initializing";
  50. return operationHasLockedFiles ? EStatus::PollBreak : EStatus::PollContinue;
  51. } catch (const TErrorResponse& e) {
  52. YT_LOG_ERROR("get_operation request failed: %v (RequestId: %v)",
  53. e.GetError().GetMessage(),
  54. e.GetRequestId());
  55. return IsRetriable(e) ? PollContinue : PollBreak;
  56. } catch (const std::exception& e) {
  57. YT_LOG_ERROR("%v", e.what());
  58. return PollBreak;
  59. }
  60. }
  61. void OnItemDiscarded() override {
  62. }
  63. private:
  64. TOperationId OperationId_;
  65. THolder<TPingableTransaction> Transaction_;
  66. ::NThreading::TFuture<TOperationAttributes> Future_;
  67. };
  68. ////////////////////////////////////////////////////////////////////////////////
  69. class TOperationForwardingRequestRetryPolicy
  70. : public IRequestRetryPolicy
  71. {
  72. public:
  73. TOperationForwardingRequestRetryPolicy(const IRequestRetryPolicyPtr& underlying, const TOperationPtr& operation)
  74. : Underlying_(underlying)
  75. , Operation_(operation)
  76. { }
  77. void NotifyNewAttempt() override
  78. {
  79. Underlying_->NotifyNewAttempt();
  80. }
  81. TMaybe<TDuration> OnGenericError(const std::exception& e) override
  82. {
  83. UpdateOperationStatus(e.what());
  84. return Underlying_->OnGenericError(e);
  85. }
  86. TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
  87. {
  88. auto msg = e.GetError().ShortDescription();
  89. UpdateOperationStatus(msg);
  90. return Underlying_->OnRetriableError(e);
  91. }
  92. void OnIgnoredError(const TErrorResponse& e) override
  93. {
  94. Underlying_->OnIgnoredError(e);
  95. }
  96. TString GetAttemptDescription() const override
  97. {
  98. return Underlying_->GetAttemptDescription();
  99. }
  100. private:
  101. void UpdateOperationStatus(TStringBuf err)
  102. {
  103. Y_VERIFY(Operation_);
  104. Operation_->OnStatusUpdated(
  105. ::TStringBuilder() << "Retriable error during operation start: " << err);
  106. }
  107. private:
  108. IRequestRetryPolicyPtr Underlying_;
  109. TOperationPtr Operation_;
  110. };
  111. ////////////////////////////////////////////////////////////////////////////////
  112. TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transactionId)
  113. : Client_(std::move(client))
  114. , TransactionId_(transactionId)
  115. , FileTransaction_(MakeHolder<TPingableTransaction>(
  116. Client_->GetRetryPolicy(),
  117. Client_->GetContext(),
  118. TransactionId_,
  119. Client_->GetTransactionPinger()->GetChildTxPinger(),
  120. TStartTransactionOptions()))
  121. , ClientRetryPolicy_(Client_->GetRetryPolicy())
  122. , PreparationId_(CreateGuidAsString())
  123. { }
  124. const TClientContext& TOperationPreparer::GetContext() const
  125. {
  126. return Client_->GetContext();
  127. }
  128. TTransactionId TOperationPreparer::GetTransactionId() const
  129. {
  130. return TransactionId_;
  131. }
  132. TClientPtr TOperationPreparer::GetClient() const
  133. {
  134. return Client_;
  135. }
  136. const TString& TOperationPreparer::GetPreparationId() const
  137. {
  138. return PreparationId_;
  139. }
  140. const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const
  141. {
  142. return ClientRetryPolicy_;
  143. }
  144. TOperationId TOperationPreparer::StartOperation(
  145. TOperation* operation,
  146. const TString& operationType,
  147. const TNode& spec,
  148. bool useStartOperationRequest)
  149. {
  150. CheckValidity();
  151. THttpHeader header("POST", (useStartOperationRequest ? "start_op" : operationType));
  152. if (useStartOperationRequest) {
  153. header.AddParameter("operation_type", operationType);
  154. }
  155. header.AddTransactionId(TransactionId_);
  156. header.AddMutationId();
  157. auto ysonSpec = NodeToYsonString(spec);
  158. auto responseInfo = RetryRequestWithPolicy(
  159. ::MakeIntrusive<TOperationForwardingRequestRetryPolicy>(
  160. ClientRetryPolicy_->CreatePolicyForStartOperationRequest(),
  161. TOperationPtr(operation)),
  162. GetContext(),
  163. header,
  164. ysonSpec);
  165. TOperationId operationId = ParseGuidFromResponse(responseInfo.Response);
  166. YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)",
  167. operationId,
  168. GetPreparationId());
  169. YT_LOG_INFO("Operation %v started (%v): %v",
  170. operationId,
  171. operationType,
  172. GetOperationWebInterfaceUrl(GetContext().ServerName, operationId));
  173. TOperationExecutionTimeTracker::Get()->Start(operationId);
  174. Client_->GetYtPoller().Watch(
  175. new TWaitOperationStartPollerItem(operationId, std::move(FileTransaction_)));
  176. return operationId;
  177. }
  178. void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths)
  179. {
  180. CheckValidity();
  181. TVector<::NThreading::TFuture<TLockId>> lockIdFutures;
  182. lockIdFutures.reserve(paths->size());
  183. TRawBatchRequest lockRequest(GetContext().Config);
  184. for (const auto& path : *paths) {
  185. lockIdFutures.push_back(lockRequest.Lock(
  186. FileTransaction_->GetId(),
  187. path.Path_,
  188. ELockMode::LM_SNAPSHOT,
  189. TLockOptions().Waitable(true)));
  190. }
  191. ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), lockRequest);
  192. TVector<::NThreading::TFuture<TNode>> nodeIdFutures;
  193. nodeIdFutures.reserve(paths->size());
  194. TRawBatchRequest getNodeIdRequest(GetContext().Config);
  195. for (const auto& lockIdFuture : lockIdFutures) {
  196. nodeIdFutures.push_back(getNodeIdRequest.Get(
  197. FileTransaction_->GetId(),
  198. ::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id",
  199. TGetOptions()));
  200. }
  201. ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), getNodeIdRequest);
  202. for (size_t i = 0; i != paths->size(); ++i) {
  203. auto& richPath = (*paths)[i];
  204. richPath.OriginalPath(richPath.Path_);
  205. richPath.Path("#" + nodeIdFutures[i].GetValue().AsString());
  206. YT_LOG_DEBUG("Locked file %v, new path is %v",
  207. *richPath.OriginalPath_,
  208. richPath.Path_);
  209. }
  210. }
  211. void TOperationPreparer::CheckValidity() const
  212. {
  213. Y_ENSURE(
  214. FileTransaction_,
  215. "File transaction is already moved, are you trying to use preparer for more than one operation?");
  216. }
  217. ////////////////////////////////////////////////////////////////////////////////
  218. class TRetryPolicyIgnoringLockConflicts
  219. : public TAttemptLimitedRetryPolicy
  220. {
  221. public:
  222. using TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy;
  223. using TAttemptLimitedRetryPolicy::OnGenericError;
  224. TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
  225. {
  226. if (IsAttemptLimitExceeded()) {
  227. return Nothing();
  228. }
  229. if (e.IsConcurrentTransactionLockConflict()) {
  230. return GetBackoffDuration(Config_);
  231. }
  232. return TAttemptLimitedRetryPolicy::OnRetriableError(e);
  233. }
  234. };
  235. ////////////////////////////////////////////////////////////////////////////////
  236. class TFileToUpload
  237. : public IItemToUpload
  238. {
  239. public:
  240. TFileToUpload(TString fileName, TMaybe<TString> md5)
  241. : FileName_(std::move(fileName))
  242. , MD5_(std::move(md5))
  243. { }
  244. TString CalculateMD5() const override
  245. {
  246. if (MD5_) {
  247. return *MD5_;
  248. }
  249. constexpr size_t md5Size = 32;
  250. TString result;
  251. result.ReserveAndResize(md5Size);
  252. MD5::File(FileName_.data(), result.Detach());
  253. MD5_ = result;
  254. return result;
  255. }
  256. THolder<IInputStream> CreateInputStream() const override
  257. {
  258. return MakeHolder<TFileInput>(FileName_);
  259. }
  260. TString GetDescription() const override
  261. {
  262. return FileName_;
  263. }
  264. ui64 GetDataSize() const override
  265. {
  266. return GetFileLength(FileName_);
  267. }
  268. private:
  269. TString FileName_;
  270. mutable TMaybe<TString> MD5_;
  271. };
  272. class TDataToUpload
  273. : public IItemToUpload
  274. {
  275. public:
  276. TDataToUpload(TString data, TString description)
  277. : Data_(std::move(data))
  278. , Description_(std::move(description))
  279. { }
  280. TString CalculateMD5() const override
  281. {
  282. constexpr size_t md5Size = 32;
  283. TString result;
  284. result.ReserveAndResize(md5Size);
  285. MD5::Data(reinterpret_cast<const unsigned char*>(Data_.data()), Data_.size(), result.Detach());
  286. return result;
  287. }
  288. THolder<IInputStream> CreateInputStream() const override
  289. {
  290. return MakeHolder<TMemoryInput>(Data_.data(), Data_.size());
  291. }
  292. TString GetDescription() const override
  293. {
  294. return Description_;
  295. }
  296. ui64 GetDataSize() const override
  297. {
  298. return Data_.size();
  299. }
  300. private:
  301. TString Data_;
  302. TString Description_;
  303. };
  304. ////////////////////////////////////////////////////////////////////////////////
  305. static const TString& GetPersistentExecPathMd5()
  306. {
  307. static TString md5 = MD5::File(GetPersistentExecPath());
  308. return md5;
  309. }
  310. static TMaybe<TSmallJobFile> GetJobState(const IJob& job)
  311. {
  312. TString result;
  313. {
  314. TStringOutput output(result);
  315. job.Save(output);
  316. output.Finish();
  317. }
  318. if (result.empty()) {
  319. return Nothing();
  320. } else {
  321. return TSmallJobFile{"jobstate", result};
  322. }
  323. }
  324. ////////////////////////////////////////////////////////////////////////////////
  325. TJobPreparer::TJobPreparer(
  326. TOperationPreparer& operationPreparer,
  327. const TUserJobSpec& spec,
  328. const IJob& job,
  329. size_t outputTableCount,
  330. const TVector<TSmallJobFile>& smallFileList,
  331. const TOperationOptions& options)
  332. : OperationPreparer_(operationPreparer)
  333. , Spec_(spec)
  334. , Options_(options)
  335. , Layers_(spec.Layers_)
  336. {
  337. CreateStorage();
  338. auto cypressFileList = CanonizeYPaths(/* retryPolicy */ nullptr, OperationPreparer_.GetContext(), spec.Files_);
  339. for (const auto& file : cypressFileList) {
  340. UseFileInCypress(file);
  341. }
  342. for (const auto& localFile : spec.GetLocalFiles()) {
  343. UploadLocalFile(std::get<0>(localFile), std::get<1>(localFile));
  344. }
  345. auto jobStateSmallFile = GetJobState(job);
  346. if (jobStateSmallFile) {
  347. UploadSmallFile(*jobStateSmallFile);
  348. }
  349. for (const auto& smallFile : smallFileList) {
  350. UploadSmallFile(smallFile);
  351. }
  352. if (auto commandJob = dynamic_cast<const ICommandJob*>(&job)) {
  353. ClassName_ = TJobFactory::Get()->GetJobName(&job);
  354. Command_ = commandJob->GetCommand();
  355. } else {
  356. PrepareJobBinary(job, outputTableCount, jobStateSmallFile.Defined());
  357. }
  358. operationPreparer.LockFiles(&CachedFiles_);
  359. }
  360. TVector<TRichYPath> TJobPreparer::GetFiles() const
  361. {
  362. TVector<TRichYPath> allFiles = CypressFiles_;
  363. allFiles.insert(allFiles.end(), CachedFiles_.begin(), CachedFiles_.end());
  364. return allFiles;
  365. }
  366. TVector<TYPath> TJobPreparer::GetLayers() const
  367. {
  368. return Layers_;
  369. }
  370. const TString& TJobPreparer::GetClassName() const
  371. {
  372. return ClassName_;
  373. }
  374. const TString& TJobPreparer::GetCommand() const
  375. {
  376. return Command_;
  377. }
  378. const TUserJobSpec& TJobPreparer::GetSpec() const
  379. {
  380. return Spec_;
  381. }
  382. bool TJobPreparer::ShouldMountSandbox() const
  383. {
  384. return OperationPreparer_.GetContext().Config->MountSandboxInTmpfs || Options_.MountSandboxInTmpfs_;
  385. }
  386. ui64 TJobPreparer::GetTotalFileSize() const
  387. {
  388. return TotalFileSize_;
  389. }
  390. TString TJobPreparer::GetFileStorage() const
  391. {
  392. return Options_.FileStorage_ ?
  393. *Options_.FileStorage_ :
  394. OperationPreparer_.GetContext().Config->RemoteTempFilesDirectory;
  395. }
  396. TYPath TJobPreparer::GetCachePath() const
  397. {
  398. return AddPathPrefix(
  399. ::TStringBuilder() << GetFileStorage() << "/new_cache",
  400. OperationPreparer_.GetContext().Config->Prefix);
  401. }
  402. void TJobPreparer::CreateStorage() const
  403. {
  404. Create(
  405. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  406. OperationPreparer_.GetContext(),
  407. Options_.FileStorageTransactionId_,
  408. GetCachePath(),
  409. NT_MAP,
  410. TCreateOptions()
  411. .IgnoreExisting(true)
  412. .Recursive(true));
  413. }
  414. int TJobPreparer::GetFileCacheReplicationFactor() const
  415. {
  416. if (IsLocalMode()) {
  417. return 1;
  418. } else {
  419. return OperationPreparer_.GetContext().Config->FileCacheReplicationFactor;
  420. }
  421. }
  422. void TJobPreparer::CreateFileInCypress(const TString& path) const
  423. {
  424. auto attributes = TNode()("replication_factor", GetFileCacheReplicationFactor());
  425. if (Options_.FileExpirationTimeout_) {
  426. attributes["expiration_timeout"] = Options_.FileExpirationTimeout_->MilliSeconds();
  427. }
  428. Create(
  429. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  430. OperationPreparer_.GetContext(),
  431. Options_.FileStorageTransactionId_,
  432. path,
  433. NT_FILE,
  434. TCreateOptions()
  435. .IgnoreExisting(true)
  436. .Recursive(true)
  437. .Attributes(attributes)
  438. );
  439. }
  440. TString TJobPreparer::PutFileToCypressCache(
  441. const TString& path,
  442. const TString& md5Signature,
  443. TTransactionId transactionId) const
  444. {
  445. constexpr ui32 LockConflictRetryCount = 30;
  446. auto retryPolicy = MakeIntrusive<TRetryPolicyIgnoringLockConflicts>(
  447. LockConflictRetryCount,
  448. OperationPreparer_.GetContext().Config);
  449. auto putFileToCacheOptions = TPutFileToCacheOptions();
  450. if (Options_.FileExpirationTimeout_) {
  451. putFileToCacheOptions.PreserveExpirationTimeout(true);
  452. }
  453. auto cachePath = PutFileToCache(
  454. retryPolicy,
  455. OperationPreparer_.GetContext(),
  456. transactionId,
  457. path,
  458. md5Signature,
  459. GetCachePath(),
  460. putFileToCacheOptions);
  461. Remove(
  462. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  463. OperationPreparer_.GetContext(),
  464. transactionId,
  465. path,
  466. TRemoveOptions().Force(true));
  467. return cachePath;
  468. }
  469. TMaybe<TString> TJobPreparer::GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const
  470. {
  471. constexpr ui32 LockConflictRetryCount = 30;
  472. auto retryPolicy = MakeIntrusive<TRetryPolicyIgnoringLockConflicts>(
  473. LockConflictRetryCount,
  474. OperationPreparer_.GetContext().Config);
  475. auto maybePath = GetFileFromCache(
  476. retryPolicy,
  477. OperationPreparer_.GetContext(),
  478. TTransactionId(),
  479. md5Signature,
  480. GetCachePath(),
  481. TGetFileFromCacheOptions());
  482. if (maybePath) {
  483. YT_LOG_DEBUG("File is already in cache (FileName: %v)",
  484. fileName,
  485. *maybePath);
  486. }
  487. return maybePath;
  488. }
  489. TDuration TJobPreparer::GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const
  490. {
  491. const TDuration extraTime = OperationPreparer_.GetContext().Config->WaitLockPollInterval +
  492. TDuration::MilliSeconds(100);
  493. const double dataSizeGb = static_cast<double>(itemToUpload.GetDataSize()) / 1_GB;
  494. return extraTime + dataSizeGb * OperationPreparer_.GetContext().Config->CacheLockTimeoutPerGb;
  495. }
  496. TString TJobPreparer::UploadToRandomPath(const IItemToUpload& itemToUpload) const
  497. {
  498. TString uniquePath = AddPathPrefix(
  499. ::TStringBuilder() << GetFileStorage() << "/cpp_" << CreateGuidAsString(),
  500. OperationPreparer_.GetContext().Config->Prefix);
  501. YT_LOG_INFO("Uploading file to random cypress path (FileName: %v; CypressPath: %v; PreparationId: %v)",
  502. itemToUpload.GetDescription(),
  503. uniquePath,
  504. OperationPreparer_.GetPreparationId());
  505. CreateFileInCypress(uniquePath);
  506. {
  507. TFileWriter writer(
  508. uniquePath,
  509. OperationPreparer_.GetClientRetryPolicy(),
  510. OperationPreparer_.GetClient()->GetTransactionPinger(),
  511. OperationPreparer_.GetContext(),
  512. Options_.FileStorageTransactionId_,
  513. TFileWriterOptions().ComputeMD5(true));
  514. itemToUpload.CreateInputStream()->ReadAll(writer);
  515. writer.Finish();
  516. }
  517. return uniquePath;
  518. }
  519. TMaybe<TString> TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const
  520. {
  521. const auto md5Signature = itemToUpload.CalculateMD5();
  522. auto fileName = ::TStringBuilder() << GetFileStorage() << "/cpp_md5_" << md5Signature;
  523. if (OperationPreparer_.GetContext().Config->CacheUploadDeduplicationMode == EUploadDeduplicationMode::Host) {
  524. fileName << "_" << MD5::Data(TProcessState::Get()->FqdnHostName);
  525. }
  526. TString cypressPath = AddPathPrefix(fileName, OperationPreparer_.GetContext().Config->Prefix);
  527. CreateFileInCypress(cypressPath);
  528. auto uploadTx = MakeIntrusive<TTransaction>(
  529. OperationPreparer_.GetClient(),
  530. OperationPreparer_.GetContext(),
  531. TTransactionId(),
  532. TStartTransactionOptions());
  533. ILockPtr lock;
  534. try {
  535. lock = uploadTx->Lock(cypressPath, ELockMode::LM_EXCLUSIVE, TLockOptions().Waitable(true));
  536. } catch (const TErrorResponse& e) {
  537. if (e.IsResolveError()) {
  538. // If the node doesn't exist, it must be removed by concurrent uploading process.
  539. // Let's try to find it in the cache.
  540. return GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription());
  541. }
  542. throw;
  543. }
  544. auto waitTimeout = GetWaitForUploadTimeout(itemToUpload);
  545. YT_LOG_DEBUG("Waiting for the lock on file (FileName: %v; CypressPath: %v; LockTimeout: %v)",
  546. itemToUpload.GetDescription(),
  547. cypressPath,
  548. waitTimeout);
  549. if (!TWaitProxy::Get()->WaitFuture(lock->GetAcquiredFuture(), waitTimeout)) {
  550. YT_LOG_DEBUG("Waiting for the lock timed out. Fallback to random path uploading (FileName: %v; CypressPath: %v)",
  551. itemToUpload.GetDescription(),
  552. cypressPath);
  553. return Nothing();
  554. }
  555. YT_LOG_DEBUG("Exclusive lock successfully acquired (FileName: %v; CypressPath: %v)",
  556. itemToUpload.GetDescription(),
  557. cypressPath);
  558. // Ensure that this process is the first to take a lock.
  559. if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) {
  560. return *cachedItemPath;
  561. }
  562. YT_LOG_INFO("Uploading file to cypress (FileName: %v; CypressPath: %v; PreparationId: %v)",
  563. itemToUpload.GetDescription(),
  564. cypressPath,
  565. OperationPreparer_.GetPreparationId());
  566. {
  567. auto writer = uploadTx->CreateFileWriter(cypressPath, TFileWriterOptions().ComputeMD5(true));
  568. YT_VERIFY(writer);
  569. itemToUpload.CreateInputStream()->ReadAll(*writer);
  570. writer->Finish();
  571. }
  572. auto path = PutFileToCypressCache(cypressPath, md5Signature, uploadTx->GetId());
  573. uploadTx->Commit();
  574. return path;
  575. }
  576. TString TJobPreparer::UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const
  577. {
  578. auto md5Signature = itemToUpload.CalculateMD5();
  579. Y_VERIFY(md5Signature.size() == 32);
  580. if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) {
  581. return *cachedItemPath;
  582. }
  583. YT_LOG_INFO("File not found in cache; uploading to cypress (FileName: %v; PreparationId: %v)",
  584. itemToUpload.GetDescription(),
  585. OperationPreparer_.GetPreparationId());
  586. if (OperationPreparer_.GetContext().Config->CacheUploadDeduplicationMode != EUploadDeduplicationMode::Disabled) {
  587. if (auto path = TryUploadWithDeduplication(itemToUpload)) {
  588. return *path;
  589. }
  590. }
  591. auto path = UploadToRandomPath(itemToUpload);
  592. return PutFileToCypressCache(path, md5Signature, Options_.FileStorageTransactionId_);
  593. }
  594. TString TJobPreparer::UploadToCache(const IItemToUpload& itemToUpload) const
  595. {
  596. YT_LOG_INFO("Uploading file (FileName: %v; PreparationId: %v)",
  597. itemToUpload.GetDescription(),
  598. OperationPreparer_.GetPreparationId());
  599. TString result;
  600. switch (Options_.FileCacheMode_) {
  601. case TOperationOptions::EFileCacheMode::ApiCommandBased:
  602. Y_ENSURE_EX(Options_.FileStorageTransactionId_.IsEmpty(), TApiUsageError() <<
  603. "Default cache mode (API command-based) doesn't allow non-default 'FileStorageTransactionId_'");
  604. result = UploadToCacheUsingApi(itemToUpload);
  605. break;
  606. case TOperationOptions::EFileCacheMode::CachelessRandomPathUpload:
  607. result = UploadToRandomPath(itemToUpload);
  608. break;
  609. default:
  610. Y_FAIL("Unknown file cache mode: %d", static_cast<int>(Options_.FileCacheMode_));
  611. }
  612. YT_LOG_INFO("Complete uploading file (FileName: %v; PreparationId: %v)",
  613. itemToUpload.GetDescription(),
  614. OperationPreparer_.GetPreparationId());
  615. return result;
  616. }
  617. void TJobPreparer::UseFileInCypress(const TRichYPath& file)
  618. {
  619. if (!Exists(
  620. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  621. OperationPreparer_.GetContext(),
  622. file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
  623. file.Path_))
  624. {
  625. ythrow yexception() << "File " << file.Path_ << " does not exist";
  626. }
  627. if (ShouldMountSandbox()) {
  628. auto size = Get(
  629. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  630. OperationPreparer_.GetContext(),
  631. file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
  632. file.Path_ + "/@uncompressed_data_size")
  633. .AsInt64();
  634. TotalFileSize_ += RoundUpFileSize(static_cast<ui64>(size));
  635. }
  636. CypressFiles_.push_back(file);
  637. }
  638. void TJobPreparer::UploadLocalFile(
  639. const TLocalFilePath& localPath,
  640. const TAddLocalFileOptions& options,
  641. bool isApiFile)
  642. {
  643. TFsPath fsPath(localPath);
  644. fsPath.CheckExists();
  645. TFileStat stat;
  646. fsPath.Stat(stat);
  647. bool isExecutable = stat.Mode & (S_IXUSR | S_IXGRP | S_IXOTH);
  648. auto cachePath = UploadToCache(TFileToUpload(localPath, options.MD5CheckSum_));
  649. TRichYPath cypressPath;
  650. if (isApiFile) {
  651. cypressPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  652. }
  653. cypressPath.Path(cachePath).FileName(options.PathInJob_.GetOrElse(fsPath.Basename()));
  654. if (isExecutable) {
  655. cypressPath.Executable(true);
  656. }
  657. if (options.BypassArtifactCache_) {
  658. cypressPath.BypassArtifactCache(*options.BypassArtifactCache_);
  659. }
  660. if (ShouldMountSandbox()) {
  661. TotalFileSize_ += RoundUpFileSize(stat.Size);
  662. }
  663. CachedFiles_.push_back(cypressPath);
  664. }
  665. void TJobPreparer::UploadBinary(const TJobBinaryConfig& jobBinary)
  666. {
  667. if (std::holds_alternative<TJobBinaryLocalPath>(jobBinary)) {
  668. auto binaryLocalPath = std::get<TJobBinaryLocalPath>(jobBinary);
  669. auto opts = TAddLocalFileOptions().PathInJob("cppbinary");
  670. if (binaryLocalPath.MD5CheckSum) {
  671. opts.MD5CheckSum(*binaryLocalPath.MD5CheckSum);
  672. }
  673. UploadLocalFile(binaryLocalPath.Path, opts, /* isApiFile */ true);
  674. } else if (std::holds_alternative<TJobBinaryCypressPath>(jobBinary)) {
  675. auto binaryCypressPath = std::get<TJobBinaryCypressPath>(jobBinary);
  676. TRichYPath ytPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  677. ytPath.Path(binaryCypressPath.Path);
  678. if (binaryCypressPath.TransactionId) {
  679. ytPath.TransactionId(*binaryCypressPath.TransactionId);
  680. }
  681. UseFileInCypress(ytPath.FileName("cppbinary").Executable(true));
  682. } else {
  683. Y_FAIL("%s", (::TStringBuilder() << "Unexpected jobBinary tag: " << jobBinary.index()).data());
  684. }
  685. }
  686. void TJobPreparer::UploadSmallFile(const TSmallJobFile& smallFile)
  687. {
  688. auto cachePath = UploadToCache(TDataToUpload(smallFile.Data, smallFile.FileName + " [generated-file]"));
  689. auto path = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  690. CachedFiles_.push_back(path.Path(cachePath).FileName(smallFile.FileName));
  691. if (ShouldMountSandbox()) {
  692. TotalFileSize_ += RoundUpFileSize(smallFile.Data.size());
  693. }
  694. }
  695. bool TJobPreparer::IsLocalMode() const
  696. {
  697. return UseLocalModeOptimization(OperationPreparer_.GetContext(), OperationPreparer_.GetClientRetryPolicy());
  698. }
  699. void TJobPreparer::PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState)
  700. {
  701. auto jobBinary = TJobBinaryConfig();
  702. if (!std::holds_alternative<TJobBinaryDefault>(Spec_.GetJobBinary())) {
  703. jobBinary = Spec_.GetJobBinary();
  704. }
  705. TString binaryPathInsideJob;
  706. if (std::holds_alternative<TJobBinaryDefault>(jobBinary)) {
  707. if (GetInitStatus() != EInitStatus::FullInitialization) {
  708. ythrow yexception() << "NYT::Initialize() must be called prior to any operation";
  709. }
  710. const bool isLocalMode = IsLocalMode();
  711. const TMaybe<TString> md5 = !isLocalMode ? MakeMaybe(GetPersistentExecPathMd5()) : Nothing();
  712. jobBinary = TJobBinaryLocalPath{GetPersistentExecPath(), md5};
  713. if (isLocalMode) {
  714. binaryPathInsideJob = GetExecPath();
  715. }
  716. } else if (std::holds_alternative<TJobBinaryLocalPath>(jobBinary)) {
  717. const bool isLocalMode = IsLocalMode();
  718. if (isLocalMode) {
  719. binaryPathInsideJob = TFsPath(std::get<TJobBinaryLocalPath>(jobBinary).Path).RealPath();
  720. }
  721. }
  722. Y_ASSERT(!std::holds_alternative<TJobBinaryDefault>(jobBinary));
  723. // binaryPathInsideJob is only set when LocalModeOptimization option is on, so upload is not needed
  724. if (!binaryPathInsideJob) {
  725. binaryPathInsideJob = "./cppbinary";
  726. UploadBinary(jobBinary);
  727. }
  728. TString jobCommandPrefix = Options_.JobCommandPrefix_;
  729. if (!Spec_.JobCommandPrefix_.empty()) {
  730. jobCommandPrefix = Spec_.JobCommandPrefix_;
  731. }
  732. TString jobCommandSuffix = Options_.JobCommandSuffix_;
  733. if (!Spec_.JobCommandSuffix_.empty()) {
  734. jobCommandSuffix = Spec_.JobCommandSuffix_;
  735. }
  736. ClassName_ = TJobFactory::Get()->GetJobName(&job);
  737. auto jobArguments = TNode::CreateMap();
  738. jobArguments["job_name"] = ClassName_;
  739. jobArguments["output_table_count"] = static_cast<i64>(outputTableCount);
  740. jobArguments["has_state"] = hasState;
  741. Spec_.AddEnvironment("YT_JOB_ARGUMENTS", NodeToYsonString(jobArguments));
  742. Command_ = ::TStringBuilder() <<
  743. jobCommandPrefix <<
  744. (OperationPreparer_.GetContext().Config->UseClientProtobuf ? "YT_USE_CLIENT_PROTOBUF=1" : "YT_USE_CLIENT_PROTOBUF=0") << " " <<
  745. binaryPathInsideJob <<
  746. jobCommandSuffix;
  747. }
  748. ////////////////////////////////////////////////////////////////////////////////
  749. } // namespace NYT::NDetail