operation_preparer.h 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. #pragma once
  2. #include "client.h"
  3. #include "structured_table_formats.h"
  4. #include <yt/cpp/mapreduce/interface/operation.h>
  5. namespace NYT::NDetail {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. class TOperation;
  8. class TOperationPreparer
  9. : public TThrRefBase
  10. {
  11. public:
  12. TOperationPreparer(TClientPtr client, TTransactionId transactionId);
  13. const TClientContext& GetContext() const;
  14. TTransactionId GetTransactionId() const;
  15. ITransactionPingerPtr GetTransactionPinger() const;
  16. TClientPtr GetClient() const;
  17. const TString& GetPreparationId() const;
  18. void LockFiles(TVector<TRichYPath>* paths);
  19. TOperationId StartOperation(
  20. TOperation* operation,
  21. EOperationType type,
  22. const TNode& spec);
  23. const IClientRetryPolicyPtr& GetClientRetryPolicy() const;
  24. private:
  25. TClientPtr Client_;
  26. TTransactionId TransactionId_;
  27. std::unique_ptr<TPingableTransaction> FileTransaction_;
  28. IClientRetryPolicyPtr ClientRetryPolicy_;
  29. const TString PreparationId_;
  30. private:
  31. void CheckValidity() const;
  32. };
  33. using TOperationPreparerPtr = ::TIntrusivePtr<TOperationPreparer>;
  34. ////////////////////////////////////////////////////////////////////////////////
  35. struct IItemToUpload
  36. {
  37. virtual ~IItemToUpload() = default;
  38. virtual TString CalculateMD5() const = 0;
  39. virtual std::unique_ptr<IInputStream> CreateInputStream() const = 0;
  40. virtual TString GetDescription() const = 0;
  41. virtual i64 GetDataSize() const = 0;
  42. };
  43. ////////////////////////////////////////////////////////////////////////////////
  44. class TJobPreparer
  45. : private TNonCopyable
  46. {
  47. public:
  48. TJobPreparer(
  49. TOperationPreparer& operationPreparer,
  50. const TUserJobSpec& spec,
  51. const IJob& job,
  52. size_t outputTableCount,
  53. const TVector<TSmallJobFile>& smallFileList,
  54. const TOperationOptions& options);
  55. TVector<TRichYPath> GetFiles() const;
  56. TVector<TYPath> GetLayers() const;
  57. const TString& GetClassName() const;
  58. const TString& GetCommand() const;
  59. const TUserJobSpec& GetSpec() const;
  60. bool ShouldMountSandbox() const;
  61. ui64 GetTotalFileSize() const;
  62. bool ShouldRedirectStdoutToStderr() const;
  63. private:
  64. const IRawClientPtr RawClient_;
  65. TOperationPreparer& OperationPreparer_;
  66. TUserJobSpec Spec_;
  67. TOperationOptions Options_;
  68. TVector<TRichYPath> CypressFiles_;
  69. TVector<TRichYPath> CachedFiles_;
  70. TVector<TYPath> Layers_;
  71. TString ClassName_;
  72. TString Command_;
  73. ui64 TotalFileSize_ = 0;
  74. bool IsCommandJob_ = false;
  75. private:
  76. TString GetFileStorage() const;
  77. TYPath GetCachePath() const;
  78. bool IsLocalMode() const;
  79. int GetFileCacheReplicationFactor() const;
  80. void CreateStorage() const;
  81. void CreateFileInCypress(const TString& path) const;
  82. TString PutFileToCypressCache(const TString& path, const TString& md5Signature, TTransactionId transactionId) const;
  83. TMaybe<TString> GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const;
  84. TDuration GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const;
  85. TString UploadToRandomPath(const IItemToUpload& itemToUpload) const;
  86. TString UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const;
  87. TMaybe<TString> TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const;
  88. TString UploadToCache(const IItemToUpload& itemToUpload) const;
  89. void UseFileInCypress(const TRichYPath& file);
  90. void UploadLocalFile(
  91. const TLocalFilePath& localPath,
  92. const TAddLocalFileOptions& options,
  93. bool isApiFile = false);
  94. void UploadBinary(const TJobBinaryConfig& jobBinary);
  95. void UploadSmallFile(const TSmallJobFile& smallFile);
  96. void PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState);
  97. };
  98. ////////////////////////////////////////////////////////////////////////////////
  99. } // namespace NYT::NDetail