operation_preparer.cpp 29 KB


  1. #include "operation_preparer.h"
  2. #include "init.h"
  3. #include "file_writer.h"
  4. #include "operation.h"
  5. #include "operation_helpers.h"
  6. #include "operation_tracker.h"
  7. #include "transaction.h"
  8. #include "transaction_pinger.h"
  9. #include "yt_poller.h"
  10. #include <yt/cpp/mapreduce/common/helpers.h>
  11. #include <yt/cpp/mapreduce/common/retry_lib.h>
  12. #include <yt/cpp/mapreduce/common/wait_proxy.h>
  13. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  14. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  15. #include <yt/cpp/mapreduce/interface/error_codes.h>
  16. #include <yt/cpp/mapreduce/interface/raw_client.h>
  17. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  18. #include <library/cpp/digest/md5/md5.h>
  19. #include <util/folder/path.h>
  20. #include <util/string/builder.h>
  21. #include <util/system/execpath.h>
  22. namespace NYT::NDetail {
  23. using namespace NRawClient;
  24. ////////////////////////////////////////////////////////////////////////////////
  25. class TWaitOperationStartPollerItem
  26. : public IYtPollerItem
  27. {
  28. public:
  29. TWaitOperationStartPollerItem(TOperationId operationId, THolder<TPingableTransaction> transaction)
  30. : OperationId_(operationId)
  31. , Transaction_(std::move(transaction))
  32. { }
  33. void PrepareRequest(TRawBatchRequest* batchRequest) override
  34. {
  35. Future_ = batchRequest->GetOperation(
  36. OperationId_,
  37. TGetOperationOptions().AttributeFilter(
  38. TOperationAttributeFilter().Add(EOperationAttribute::State)));
  39. }
  40. EStatus OnRequestExecuted() override
  41. {
  42. try {
  43. auto attributes = Future_.GetValue();
  44. Y_ENSURE(attributes.State.Defined());
  45. bool operationHasLockedFiles =
  46. *attributes.State != "starting" &&
  47. *attributes.State != "pending" &&
  48. *attributes.State != "orphaned" &&
  49. *attributes.State != "waiting_for_agent" &&
  50. *attributes.State != "initializing";
  51. return operationHasLockedFiles ? EStatus::PollBreak : EStatus::PollContinue;
  52. } catch (const TErrorResponse& e) {
  53. YT_LOG_ERROR("get_operation request failed: %v (RequestId: %v)",
  54. e.GetError().GetMessage(),
  55. e.GetRequestId());
  56. return IsRetriable(e) ? PollContinue : PollBreak;
  57. } catch (const std::exception& e) {
  58. YT_LOG_ERROR("%v", e.what());
  59. return PollBreak;
  60. }
  61. }
  62. void OnItemDiscarded() override {
  63. }
  64. private:
  65. TOperationId OperationId_;
  66. THolder<TPingableTransaction> Transaction_;
  67. ::NThreading::TFuture<TOperationAttributes> Future_;
  68. };
  69. ////////////////////////////////////////////////////////////////////////////////
  70. class TOperationForwardingRequestRetryPolicy
  71. : public IRequestRetryPolicy
  72. {
  73. public:
  74. TOperationForwardingRequestRetryPolicy(const IRequestRetryPolicyPtr& underlying, const TOperationPtr& operation)
  75. : Underlying_(underlying)
  76. , Operation_(operation)
  77. { }
  78. void NotifyNewAttempt() override
  79. {
  80. Underlying_->NotifyNewAttempt();
  81. }
  82. TMaybe<TDuration> OnGenericError(const std::exception& e) override
  83. {
  84. UpdateOperationStatus(e.what());
  85. return Underlying_->OnGenericError(e);
  86. }
  87. TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
  88. {
  89. auto msg = e.GetError().ShortDescription();
  90. UpdateOperationStatus(msg);
  91. return Underlying_->OnRetriableError(e);
  92. }
  93. void OnIgnoredError(const TErrorResponse& e) override
  94. {
  95. Underlying_->OnIgnoredError(e);
  96. }
  97. TString GetAttemptDescription() const override
  98. {
  99. return Underlying_->GetAttemptDescription();
  100. }
  101. private:
  102. void UpdateOperationStatus(TStringBuf err)
  103. {
  104. Y_ABORT_UNLESS(Operation_);
  105. Operation_->OnStatusUpdated(
  106. ::TStringBuilder() << "Retriable error during operation start: " << err);
  107. }
  108. private:
  109. IRequestRetryPolicyPtr Underlying_;
  110. TOperationPtr Operation_;
  111. };
  112. ////////////////////////////////////////////////////////////////////////////////
  113. TOperationPreparer::TOperationPreparer(TClientPtr client, TTransactionId transactionId)
  114. : Client_(std::move(client))
  115. , TransactionId_(transactionId)
  116. , FileTransaction_(MakeHolder<TPingableTransaction>(
  117. Client_->GetRawClient(),
  118. Client_->GetRetryPolicy(),
  119. Client_->GetContext(),
  120. TransactionId_,
  121. Client_->GetTransactionPinger()->GetChildTxPinger(),
  122. TStartTransactionOptions()))
  123. , ClientRetryPolicy_(Client_->GetRetryPolicy())
  124. , PreparationId_(CreateGuidAsString())
  125. { }
  126. const TClientContext& TOperationPreparer::GetContext() const
  127. {
  128. return Client_->GetContext();
  129. }
  130. TTransactionId TOperationPreparer::GetTransactionId() const
  131. {
  132. return TransactionId_;
  133. }
  134. TClientPtr TOperationPreparer::GetClient() const
  135. {
  136. return Client_;
  137. }
  138. const TString& TOperationPreparer::GetPreparationId() const
  139. {
  140. return PreparationId_;
  141. }
  142. const IClientRetryPolicyPtr& TOperationPreparer::GetClientRetryPolicy() const
  143. {
  144. return ClientRetryPolicy_;
  145. }
  146. TOperationId TOperationPreparer::StartOperation(
  147. TOperation* operation,
  148. const TString& operationType,
  149. const TNode& spec,
  150. bool useStartOperationRequest)
  151. {
  152. CheckValidity();
  153. THttpHeader header("POST", (useStartOperationRequest ? "start_op" : operationType));
  154. if (useStartOperationRequest) {
  155. header.AddParameter("operation_type", operationType);
  156. }
  157. header.AddTransactionId(TransactionId_);
  158. header.AddMutationId();
  159. auto ysonSpec = NodeToYsonString(spec);
  160. auto responseInfo = RetryRequestWithPolicy(
  161. ::MakeIntrusive<TOperationForwardingRequestRetryPolicy>(
  162. ClientRetryPolicy_->CreatePolicyForStartOperationRequest(),
  163. TOperationPtr(operation)),
  164. GetContext(),
  165. header,
  166. ysonSpec);
  167. TOperationId operationId = ParseGuidFromResponse(responseInfo.Response);
  168. YT_LOG_DEBUG("Operation started (OperationId: %v; PreparationId: %v)",
  169. operationId,
  170. GetPreparationId());
  171. YT_LOG_INFO("Operation %v started (%v): %v",
  172. operationId,
  173. operationType,
  174. GetOperationWebInterfaceUrl(GetContext().ServerName, operationId));
  175. TOperationExecutionTimeTracker::Get()->Start(operationId);
  176. Client_->GetYtPoller().Watch(
  177. new TWaitOperationStartPollerItem(operationId, std::move(FileTransaction_)));
  178. return operationId;
  179. }
  180. void TOperationPreparer::LockFiles(TVector<TRichYPath>* paths)
  181. {
  182. CheckValidity();
  183. TVector<::NThreading::TFuture<TLockId>> lockIdFutures;
  184. lockIdFutures.reserve(paths->size());
  185. TRawBatchRequest lockRequest(GetContext().Config);
  186. for (const auto& path : *paths) {
  187. lockIdFutures.push_back(lockRequest.Lock(
  188. FileTransaction_->GetId(),
  189. path.Path_,
  190. ELockMode::LM_SNAPSHOT,
  191. TLockOptions().Waitable(true)));
  192. }
  193. ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), lockRequest);
  194. TVector<::NThreading::TFuture<TNode>> nodeIdFutures;
  195. nodeIdFutures.reserve(paths->size());
  196. TRawBatchRequest getNodeIdRequest(GetContext().Config);
  197. for (const auto& lockIdFuture : lockIdFutures) {
  198. nodeIdFutures.push_back(getNodeIdRequest.Get(
  199. FileTransaction_->GetId(),
  200. ::TStringBuilder() << '#' << GetGuidAsString(lockIdFuture.GetValue()) << "/@node_id",
  201. TGetOptions()));
  202. }
  203. ExecuteBatch(ClientRetryPolicy_->CreatePolicyForGenericRequest(), GetContext(), getNodeIdRequest);
  204. for (size_t i = 0; i != paths->size(); ++i) {
  205. auto& richPath = (*paths)[i];
  206. richPath.OriginalPath(richPath.Path_);
  207. richPath.Path("#" + nodeIdFutures[i].GetValue().AsString());
  208. YT_LOG_DEBUG("Locked file %v, new path is %v",
  209. *richPath.OriginalPath_,
  210. richPath.Path_);
  211. }
  212. }
  213. void TOperationPreparer::CheckValidity() const
  214. {
  215. Y_ENSURE(
  216. FileTransaction_,
  217. "File transaction is already moved, are you trying to use preparer for more than one operation?");
  218. }
  219. ////////////////////////////////////////////////////////////////////////////////
  220. class TRetryPolicyIgnoringLockConflicts
  221. : public TAttemptLimitedRetryPolicy
  222. {
  223. public:
  224. using TAttemptLimitedRetryPolicy::TAttemptLimitedRetryPolicy;
  225. using TAttemptLimitedRetryPolicy::OnGenericError;
  226. TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
  227. {
  228. if (IsAttemptLimitExceeded()) {
  229. return Nothing();
  230. }
  231. if (e.IsConcurrentTransactionLockConflict()) {
  232. return GetBackoffDuration(Config_);
  233. }
  234. return TAttemptLimitedRetryPolicy::OnRetriableError(e);
  235. }
  236. };
  237. ////////////////////////////////////////////////////////////////////////////////
  238. class TFileToUpload
  239. : public IItemToUpload
  240. {
  241. public:
  242. TFileToUpload(TString fileName, TMaybe<TString> md5)
  243. : FileName_(std::move(fileName))
  244. , MD5_(std::move(md5))
  245. { }
  246. TString CalculateMD5() const override
  247. {
  248. if (MD5_) {
  249. return *MD5_;
  250. }
  251. constexpr size_t md5Size = 32;
  252. TString result;
  253. result.ReserveAndResize(md5Size);
  254. MD5::File(FileName_.data(), result.Detach());
  255. MD5_ = result;
  256. return result;
  257. }
  258. THolder<IInputStream> CreateInputStream() const override
  259. {
  260. return MakeHolder<TFileInput>(FileName_);
  261. }
  262. TString GetDescription() const override
  263. {
  264. return FileName_;
  265. }
  266. i64 GetDataSize() const override
  267. {
  268. return GetFileLength(FileName_);
  269. }
  270. private:
  271. TString FileName_;
  272. mutable TMaybe<TString> MD5_;
  273. };
  274. class TDataToUpload
  275. : public IItemToUpload
  276. {
  277. public:
  278. TDataToUpload(TString data, TString description)
  279. : Data_(std::move(data))
  280. , Description_(std::move(description))
  281. { }
  282. TString CalculateMD5() const override
  283. {
  284. constexpr size_t md5Size = 32;
  285. TString result;
  286. result.ReserveAndResize(md5Size);
  287. MD5::Data(reinterpret_cast<const unsigned char*>(Data_.data()), Data_.size(), result.Detach());
  288. return result;
  289. }
  290. THolder<IInputStream> CreateInputStream() const override
  291. {
  292. return MakeHolder<TMemoryInput>(Data_.data(), Data_.size());
  293. }
  294. TString GetDescription() const override
  295. {
  296. return Description_;
  297. }
  298. i64 GetDataSize() const override
  299. {
  300. return std::ssize(Data_);
  301. }
  302. private:
  303. TString Data_;
  304. TString Description_;
  305. };
  306. ////////////////////////////////////////////////////////////////////////////////
  307. static const TString& GetPersistentExecPathMd5()
  308. {
  309. static TString md5 = MD5::File(GetPersistentExecPath());
  310. return md5;
  311. }
  312. static TMaybe<TSmallJobFile> GetJobState(const IJob& job)
  313. {
  314. TString result;
  315. {
  316. TStringOutput output(result);
  317. job.Save(output);
  318. output.Finish();
  319. }
  320. if (result.empty()) {
  321. return Nothing();
  322. } else {
  323. return TSmallJobFile{"jobstate", result};
  324. }
  325. }
  326. ////////////////////////////////////////////////////////////////////////////////
  327. TJobPreparer::TJobPreparer(
  328. TOperationPreparer& operationPreparer,
  329. const TUserJobSpec& spec,
  330. const IJob& job,
  331. size_t outputTableCount,
  332. const TVector<TSmallJobFile>& smallFileList,
  333. const TOperationOptions& options)
  334. : RawClient_(operationPreparer.GetClient()->GetRawClient())
  335. , OperationPreparer_(operationPreparer)
  336. , Spec_(spec)
  337. , Options_(options)
  338. , Layers_(spec.Layers_)
  339. {
  340. CreateStorage();
  341. auto cypressFileList = CanonizeYPaths(/* retryPolicy */ nullptr, OperationPreparer_.GetContext(), spec.Files_);
  342. for (const auto& file : cypressFileList) {
  343. UseFileInCypress(file);
  344. }
  345. for (const auto& localFile : spec.GetLocalFiles()) {
  346. UploadLocalFile(std::get<0>(localFile), std::get<1>(localFile));
  347. }
  348. auto jobStateSmallFile = GetJobState(job);
  349. if (jobStateSmallFile) {
  350. UploadSmallFile(*jobStateSmallFile);
  351. }
  352. for (const auto& smallFile : smallFileList) {
  353. UploadSmallFile(smallFile);
  354. }
  355. if (auto commandJob = dynamic_cast<const ICommandJob*>(&job)) {
  356. IsCommandJob_ = true;
  357. ClassName_ = TJobFactory::Get()->GetJobName(&job);
  358. Command_ = commandJob->GetCommand();
  359. } else {
  360. PrepareJobBinary(job, outputTableCount, jobStateSmallFile.Defined());
  361. }
  362. operationPreparer.LockFiles(&CachedFiles_);
  363. }
  364. TVector<TRichYPath> TJobPreparer::GetFiles() const
  365. {
  366. TVector<TRichYPath> allFiles = CypressFiles_;
  367. allFiles.insert(allFiles.end(), CachedFiles_.begin(), CachedFiles_.end());
  368. return allFiles;
  369. }
  370. TVector<TYPath> TJobPreparer::GetLayers() const
  371. {
  372. return Layers_;
  373. }
  374. const TString& TJobPreparer::GetClassName() const
  375. {
  376. return ClassName_;
  377. }
  378. const TString& TJobPreparer::GetCommand() const
  379. {
  380. return Command_;
  381. }
  382. const TUserJobSpec& TJobPreparer::GetSpec() const
  383. {
  384. return Spec_;
  385. }
  386. bool TJobPreparer::ShouldMountSandbox() const
  387. {
  388. return OperationPreparer_.GetContext().Config->MountSandboxInTmpfs || Options_.MountSandboxInTmpfs_;
  389. }
  390. ui64 TJobPreparer::GetTotalFileSize() const
  391. {
  392. return TotalFileSize_;
  393. }
  394. bool TJobPreparer::ShouldRedirectStdoutToStderr() const
  395. {
  396. return !IsCommandJob_ && OperationPreparer_.GetContext().Config->RedirectStdoutToStderr;
  397. }
  398. TString TJobPreparer::GetFileStorage() const
  399. {
  400. return Options_.FileStorage_ ?
  401. *Options_.FileStorage_ :
  402. OperationPreparer_.GetContext().Config->RemoteTempFilesDirectory;
  403. }
  404. TYPath TJobPreparer::GetCachePath() const
  405. {
  406. return AddPathPrefix(
  407. ::TStringBuilder() << GetFileStorage() << "/new_cache",
  408. OperationPreparer_.GetContext().Config->Prefix);
  409. }
  410. void TJobPreparer::CreateStorage() const
  411. {
  412. RequestWithRetry<void>(
  413. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  414. [this] (TMutationId& mutationId) {
  415. RawClient_->Create(
  416. mutationId,
  417. Options_.FileStorageTransactionId_,
  418. GetCachePath(),
  419. NT_MAP,
  420. TCreateOptions()
  421. .IgnoreExisting(true)
  422. .Recursive(true));
  423. });
  424. }
  425. int TJobPreparer::GetFileCacheReplicationFactor() const
  426. {
  427. if (IsLocalMode()) {
  428. return 1;
  429. } else {
  430. return OperationPreparer_.GetContext().Config->FileCacheReplicationFactor;
  431. }
  432. }
  433. void TJobPreparer::CreateFileInCypress(const TString& path) const
  434. {
  435. auto attributes = TNode()("replication_factor", GetFileCacheReplicationFactor());
  436. if (Options_.FileExpirationTimeout_) {
  437. attributes["expiration_timeout"] = Options_.FileExpirationTimeout_->MilliSeconds();
  438. }
  439. RequestWithRetry<void>(
  440. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  441. [this, &path, &attributes] (TMutationId& mutationId) {
  442. RawClient_->Create(
  443. mutationId,
  444. Options_.FileStorageTransactionId_,
  445. path,
  446. NT_FILE,
  447. TCreateOptions()
  448. .IgnoreExisting(true)
  449. .Recursive(true)
  450. .Attributes(attributes));
  451. });
  452. }
  453. TString TJobPreparer::PutFileToCypressCache(
  454. const TString& path,
  455. const TString& md5Signature,
  456. TTransactionId transactionId) const
  457. {
  458. constexpr ui32 LockConflictRetryCount = 30;
  459. auto retryPolicy = MakeIntrusive<TRetryPolicyIgnoringLockConflicts>(
  460. LockConflictRetryCount,
  461. OperationPreparer_.GetContext().Config);
  462. auto putFileToCacheOptions = TPutFileToCacheOptions();
  463. if (Options_.FileExpirationTimeout_) {
  464. putFileToCacheOptions.PreserveExpirationTimeout(true);
  465. }
  466. auto cachePath = PutFileToCache(
  467. retryPolicy,
  468. OperationPreparer_.GetContext(),
  469. transactionId,
  470. path,
  471. md5Signature,
  472. GetCachePath(),
  473. putFileToCacheOptions);
  474. RequestWithRetry<void>(
  475. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  476. [this, &transactionId, &path] (TMutationId& mutationId) {
  477. RawClient_->Remove(mutationId, transactionId, path, TRemoveOptions().Force(true));
  478. });
  479. return cachePath;
  480. }
  481. TMaybe<TString> TJobPreparer::GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const
  482. {
  483. constexpr ui32 LockConflictRetryCount = 30;
  484. auto retryPolicy = MakeIntrusive<TRetryPolicyIgnoringLockConflicts>(
  485. LockConflictRetryCount,
  486. OperationPreparer_.GetContext().Config);
  487. auto maybePath = GetFileFromCache(
  488. retryPolicy,
  489. OperationPreparer_.GetContext(),
  490. TTransactionId(),
  491. md5Signature,
  492. GetCachePath(),
  493. TGetFileFromCacheOptions());
  494. if (maybePath) {
  495. YT_LOG_DEBUG(
  496. "File is already in cache (FileName: %v, FilePath: %v)",
  497. fileName,
  498. *maybePath);
  499. }
  500. return maybePath;
  501. }
  502. TDuration TJobPreparer::GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const
  503. {
  504. TDuration extraTime = OperationPreparer_.GetContext().Config->WaitLockPollInterval +
  505. TDuration::MilliSeconds(100);
  506. auto dataSizeGb = (itemToUpload.GetDataSize() + 1_GB - 1) / 1_GB;
  507. dataSizeGb = Max<ui64>(dataSizeGb, 1);
  508. return extraTime + dataSizeGb * OperationPreparer_.GetContext().Config->CacheLockTimeoutPerGb;
  509. }
  510. TString TJobPreparer::UploadToRandomPath(const IItemToUpload& itemToUpload) const
  511. {
  512. TString uniquePath = AddPathPrefix(
  513. ::TStringBuilder() << GetFileStorage() << "/cpp_" << CreateGuidAsString(),
  514. OperationPreparer_.GetContext().Config->Prefix);
  515. YT_LOG_INFO("Uploading file to random cypress path (FileName: %v; CypressPath: %v; PreparationId: %v)",
  516. itemToUpload.GetDescription(),
  517. uniquePath,
  518. OperationPreparer_.GetPreparationId());
  519. CreateFileInCypress(uniquePath);
  520. {
  521. TFileWriter writer(
  522. uniquePath,
  523. OperationPreparer_.GetClient()->GetRawClient(),
  524. OperationPreparer_.GetClientRetryPolicy(),
  525. OperationPreparer_.GetClient()->GetTransactionPinger(),
  526. OperationPreparer_.GetContext(),
  527. Options_.FileStorageTransactionId_,
  528. TFileWriterOptions().ComputeMD5(true));
  529. itemToUpload.CreateInputStream()->ReadAll(writer);
  530. writer.Finish();
  531. }
  532. return uniquePath;
  533. }
  534. TMaybe<TString> TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const
  535. {
  536. const auto md5Signature = itemToUpload.CalculateMD5();
  537. auto fileName = ::TStringBuilder() << GetFileStorage() << "/cpp_md5_" << md5Signature;
  538. if (OperationPreparer_.GetContext().Config->CacheUploadDeduplicationMode == EUploadDeduplicationMode::Host) {
  539. fileName << "_" << MD5::Data(TProcessState::Get()->FqdnHostName);
  540. }
  541. TString cypressPath = AddPathPrefix(fileName, OperationPreparer_.GetContext().Config->Prefix);
  542. CreateFileInCypress(cypressPath);
  543. auto uploadTx = MakeIntrusive<TTransaction>(
  544. OperationPreparer_.GetClient()->GetRawClient(),
  545. OperationPreparer_.GetClient(),
  546. OperationPreparer_.GetContext(),
  547. TTransactionId(),
  548. TStartTransactionOptions());
  549. ILockPtr lock;
  550. try {
  551. lock = uploadTx->Lock(cypressPath, ELockMode::LM_EXCLUSIVE, TLockOptions().Waitable(true));
  552. } catch (const TErrorResponse& e) {
  553. if (e.IsResolveError()) {
  554. // If the node doesn't exist, it must be removed by concurrent uploading process.
  555. // Let's try to find it in the cache.
  556. return GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription());
  557. }
  558. throw;
  559. }
  560. auto waitTimeout = GetWaitForUploadTimeout(itemToUpload);
  561. YT_LOG_DEBUG("Waiting for the lock on file (FileName: %v; CypressPath: %v; LockTimeout: %v)",
  562. itemToUpload.GetDescription(),
  563. cypressPath,
  564. waitTimeout);
  565. if (!TWaitProxy::Get()->WaitFuture(lock->GetAcquiredFuture(), waitTimeout)) {
  566. YT_LOG_DEBUG("Waiting for the lock timed out. Fallback to random path uploading (FileName: %v; CypressPath: %v)",
  567. itemToUpload.GetDescription(),
  568. cypressPath);
  569. return Nothing();
  570. }
  571. YT_LOG_DEBUG("Exclusive lock successfully acquired (FileName: %v; CypressPath: %v)",
  572. itemToUpload.GetDescription(),
  573. cypressPath);
  574. // Ensure that this process is the first to take a lock.
  575. if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) {
  576. return *cachedItemPath;
  577. }
  578. YT_LOG_INFO("Uploading file to cypress (FileName: %v; CypressPath: %v; PreparationId: %v)",
  579. itemToUpload.GetDescription(),
  580. cypressPath,
  581. OperationPreparer_.GetPreparationId());
  582. {
  583. auto writer = uploadTx->CreateFileWriter(cypressPath, TFileWriterOptions().ComputeMD5(true));
  584. YT_VERIFY(writer);
  585. itemToUpload.CreateInputStream()->ReadAll(*writer);
  586. writer->Finish();
  587. }
  588. auto path = PutFileToCypressCache(cypressPath, md5Signature, uploadTx->GetId());
  589. uploadTx->Commit();
  590. return path;
  591. }
  592. TString TJobPreparer::UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const
  593. {
  594. auto md5Signature = itemToUpload.CalculateMD5();
  595. Y_ABORT_UNLESS(md5Signature.size() == 32);
  596. if (auto cachedItemPath = GetItemFromCypressCache(md5Signature, itemToUpload.GetDescription())) {
  597. return *cachedItemPath;
  598. }
  599. YT_LOG_INFO("File not found in cache; uploading to cypress (FileName: %v; PreparationId: %v)",
  600. itemToUpload.GetDescription(),
  601. OperationPreparer_.GetPreparationId());
  602. const auto& config = OperationPreparer_.GetContext().Config;
  603. if (config->CacheUploadDeduplicationMode != EUploadDeduplicationMode::Disabled &&
  604. itemToUpload.GetDataSize() > config->CacheUploadDeduplicationThreshold) {
  605. if (auto path = TryUploadWithDeduplication(itemToUpload)) {
  606. return *path;
  607. }
  608. }
  609. auto path = UploadToRandomPath(itemToUpload);
  610. return PutFileToCypressCache(path, md5Signature, Options_.FileStorageTransactionId_);
  611. }
  612. TString TJobPreparer::UploadToCache(const IItemToUpload& itemToUpload) const
  613. {
  614. YT_LOG_INFO("Uploading file (FileName: %v; PreparationId: %v)",
  615. itemToUpload.GetDescription(),
  616. OperationPreparer_.GetPreparationId());
  617. TString result;
  618. switch (Options_.FileCacheMode_) {
  619. case TOperationOptions::EFileCacheMode::ApiCommandBased:
  620. Y_ENSURE_EX(Options_.FileStorageTransactionId_.IsEmpty(), TApiUsageError() <<
  621. "Default cache mode (API command-based) doesn't allow non-default 'FileStorageTransactionId_'");
  622. result = UploadToCacheUsingApi(itemToUpload);
  623. break;
  624. case TOperationOptions::EFileCacheMode::CachelessRandomPathUpload:
  625. result = UploadToRandomPath(itemToUpload);
  626. break;
  627. default:
  628. Y_ABORT("Unknown file cache mode: %d", static_cast<int>(Options_.FileCacheMode_));
  629. }
  630. YT_LOG_INFO("Complete uploading file (FileName: %v; PreparationId: %v)",
  631. itemToUpload.GetDescription(),
  632. OperationPreparer_.GetPreparationId());
  633. return result;
  634. }
  635. void TJobPreparer::UseFileInCypress(const TRichYPath& file)
  636. {
  637. auto exists = RequestWithRetry<bool>(
  638. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  639. [this, &file] (TMutationId /*mutationId*/) {
  640. return RawClient_->Exists(
  641. file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
  642. file.Path_);
  643. });
  644. if (!exists)
  645. {
  646. ythrow yexception() << "File " << file.Path_ << " does not exist";
  647. }
  648. if (ShouldMountSandbox()) {
  649. auto size = RequestWithRetry<i64>(
  650. OperationPreparer_.GetClientRetryPolicy()->CreatePolicyForGenericRequest(),
  651. [this, &file] (TMutationId /*mutationId*/) {
  652. return RawClient_->Get(
  653. file.TransactionId_.GetOrElse(OperationPreparer_.GetTransactionId()),
  654. file.Path_ + "/@uncompressed_data_size")
  655. .AsInt64();
  656. });
  657. TotalFileSize_ += RoundUpFileSize(static_cast<ui64>(size));
  658. }
  659. CypressFiles_.push_back(file);
  660. }
  661. void TJobPreparer::UploadLocalFile(
  662. const TLocalFilePath& localPath,
  663. const TAddLocalFileOptions& options,
  664. bool isApiFile)
  665. {
  666. TFsPath fsPath(localPath);
  667. fsPath.CheckExists();
  668. TFileStat stat;
  669. fsPath.Stat(stat);
  670. bool isExecutable = stat.Mode & (S_IXUSR | S_IXGRP | S_IXOTH);
  671. auto cachePath = UploadToCache(TFileToUpload(localPath, options.MD5CheckSum_));
  672. TRichYPath cypressPath;
  673. if (isApiFile) {
  674. cypressPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  675. }
  676. cypressPath.Path(cachePath).FileName(options.PathInJob_.GetOrElse(fsPath.Basename()));
  677. if (isExecutable) {
  678. cypressPath.Executable(true);
  679. }
  680. if (options.BypassArtifactCache_) {
  681. cypressPath.BypassArtifactCache(*options.BypassArtifactCache_);
  682. }
  683. if (ShouldMountSandbox()) {
  684. TotalFileSize_ += RoundUpFileSize(stat.Size);
  685. }
  686. CachedFiles_.push_back(cypressPath);
  687. }
  688. void TJobPreparer::UploadBinary(const TJobBinaryConfig& jobBinary)
  689. {
  690. if (std::holds_alternative<TJobBinaryLocalPath>(jobBinary)) {
  691. auto binaryLocalPath = std::get<TJobBinaryLocalPath>(jobBinary);
  692. auto opts = TAddLocalFileOptions().PathInJob("cppbinary");
  693. if (binaryLocalPath.MD5CheckSum) {
  694. opts.MD5CheckSum(*binaryLocalPath.MD5CheckSum);
  695. }
  696. UploadLocalFile(binaryLocalPath.Path, opts, /* isApiFile */ true);
  697. } else if (std::holds_alternative<TJobBinaryCypressPath>(jobBinary)) {
  698. auto binaryCypressPath = std::get<TJobBinaryCypressPath>(jobBinary);
  699. TRichYPath ytPath = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  700. ytPath.Path(binaryCypressPath.Path);
  701. if (binaryCypressPath.TransactionId) {
  702. ytPath.TransactionId(*binaryCypressPath.TransactionId);
  703. }
  704. UseFileInCypress(ytPath.FileName("cppbinary").Executable(true));
  705. } else {
  706. Y_ABORT("%s", (::TStringBuilder() << "Unexpected jobBinary tag: " << jobBinary.index()).data());
  707. }
  708. }
  709. void TJobPreparer::UploadSmallFile(const TSmallJobFile& smallFile)
  710. {
  711. auto cachePath = UploadToCache(TDataToUpload(smallFile.Data, smallFile.FileName + " [generated-file]"));
  712. auto path = OperationPreparer_.GetContext().Config->ApiFilePathOptions;
  713. CachedFiles_.push_back(path.Path(cachePath).FileName(smallFile.FileName));
  714. if (ShouldMountSandbox()) {
  715. TotalFileSize_ += RoundUpFileSize(smallFile.Data.size());
  716. }
  717. }
  718. bool TJobPreparer::IsLocalMode() const
  719. {
  720. return UseLocalModeOptimization(
  721. OperationPreparer_.GetClient()->GetRawClient(),
  722. OperationPreparer_.GetContext(),
  723. OperationPreparer_.GetClientRetryPolicy());
  724. }
  725. void TJobPreparer::PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState)
  726. {
  727. auto jobBinary = TJobBinaryConfig();
  728. if (!std::holds_alternative<TJobBinaryDefault>(Spec_.GetJobBinary())) {
  729. jobBinary = Spec_.GetJobBinary();
  730. }
  731. TString binaryPathInsideJob;
  732. if (std::holds_alternative<TJobBinaryDefault>(jobBinary)) {
  733. if (GetInitStatus() != EInitStatus::FullInitialization) {
  734. ythrow yexception() << "NYT::Initialize() must be called prior to any operation";
  735. }
  736. const bool isLocalMode = IsLocalMode();
  737. const TMaybe<TString> md5 = !isLocalMode ? MakeMaybe(GetPersistentExecPathMd5()) : Nothing();
  738. jobBinary = TJobBinaryLocalPath{GetPersistentExecPath(), md5};
  739. if (isLocalMode) {
  740. binaryPathInsideJob = GetExecPath();
  741. }
  742. } else if (std::holds_alternative<TJobBinaryLocalPath>(jobBinary)) {
  743. const bool isLocalMode = IsLocalMode();
  744. if (isLocalMode) {
  745. binaryPathInsideJob = TFsPath(std::get<TJobBinaryLocalPath>(jobBinary).Path).RealPath();
  746. }
  747. }
  748. Y_ASSERT(!std::holds_alternative<TJobBinaryDefault>(jobBinary));
  749. // binaryPathInsideJob is only set when LocalModeOptimization option is on, so upload is not needed
  750. if (!binaryPathInsideJob) {
  751. binaryPathInsideJob = "./cppbinary";
  752. UploadBinary(jobBinary);
  753. }
  754. TString jobCommandPrefix = Options_.JobCommandPrefix_;
  755. if (!Spec_.JobCommandPrefix_.empty()) {
  756. jobCommandPrefix = Spec_.JobCommandPrefix_;
  757. }
  758. TString jobCommandSuffix = Options_.JobCommandSuffix_;
  759. if (!Spec_.JobCommandSuffix_.empty()) {
  760. jobCommandSuffix = Spec_.JobCommandSuffix_;
  761. }
  762. ClassName_ = TJobFactory::Get()->GetJobName(&job);
  763. auto jobArguments = TNode::CreateMap();
  764. jobArguments["job_name"] = ClassName_;
  765. jobArguments["output_table_count"] = static_cast<i64>(outputTableCount);
  766. jobArguments["has_state"] = hasState;
  767. Spec_.AddEnvironment("YT_JOB_ARGUMENTS", NodeToYsonString(jobArguments));
  768. Command_ = ::TStringBuilder() <<
  769. jobCommandPrefix <<
  770. (OperationPreparer_.GetContext().Config->UseClientProtobuf ? "YT_USE_CLIENT_PROTOBUF=1" : "YT_USE_CLIENT_PROTOBUF=0") << " " <<
  771. binaryPathInsideJob <<
  772. jobCommandSuffix;
  773. }
  774. ////////////////////////////////////////////////////////////////////////////////
  775. } // namespace NYT::NDetail