operation_preparer.h 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. #pragma once
  2. #include "client.h"
  3. #include "structured_table_formats.h"
  4. #include <yt/cpp/mapreduce/interface/operation.h>
  5. namespace NYT::NDetail {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. class TOperation;
  8. class TOperationPreparer
  9. : public TThrRefBase
  10. {
  11. public:
  12. TOperationPreparer(TClientPtr client, TTransactionId transactionId);
  13. const TClientContext& GetContext() const;
  14. TTransactionId GetTransactionId() const;
  15. ITransactionPingerPtr GetTransactionPinger() const;
  16. TClientPtr GetClient() const;
  17. const TString& GetPreparationId() const;
  18. void LockFiles(TVector<TRichYPath>* paths);
  19. TOperationId StartOperation(
  20. TOperation* operation,
  21. const TString& operationType,
  22. const TNode& spec,
  23. bool useStartOperationRequest = false);
  24. const IClientRetryPolicyPtr& GetClientRetryPolicy() const;
  25. private:
  26. TClientPtr Client_;
  27. TTransactionId TransactionId_;
  28. THolder<TPingableTransaction> FileTransaction_;
  29. IClientRetryPolicyPtr ClientRetryPolicy_;
  30. const TString PreparationId_;
  31. private:
  32. void CheckValidity() const;
  33. };
  34. using TOperationPreparerPtr = ::TIntrusivePtr<TOperationPreparer>;
  35. ////////////////////////////////////////////////////////////////////////////////
  36. struct IItemToUpload
  37. {
  38. virtual ~IItemToUpload() = default;
  39. virtual TString CalculateMD5() const = 0;
  40. virtual THolder<IInputStream> CreateInputStream() const = 0;
  41. virtual TString GetDescription() const = 0;
  42. virtual i64 GetDataSize() const = 0;
  43. };
  44. ////////////////////////////////////////////////////////////////////////////////
  45. class TJobPreparer
  46. : private TNonCopyable
  47. {
  48. public:
  49. TJobPreparer(
  50. TOperationPreparer& operationPreparer,
  51. const TUserJobSpec& spec,
  52. const IJob& job,
  53. size_t outputTableCount,
  54. const TVector<TSmallJobFile>& smallFileList,
  55. const TOperationOptions& options);
  56. TVector<TRichYPath> GetFiles() const;
  57. TVector<TYPath> GetLayers() const;
  58. const TString& GetClassName() const;
  59. const TString& GetCommand() const;
  60. const TUserJobSpec& GetSpec() const;
  61. bool ShouldMountSandbox() const;
  62. ui64 GetTotalFileSize() const;
  63. bool ShouldRedirectStdoutToStderr() const;
  64. private:
  65. TOperationPreparer& OperationPreparer_;
  66. TUserJobSpec Spec_;
  67. TOperationOptions Options_;
  68. TVector<TRichYPath> CypressFiles_;
  69. TVector<TRichYPath> CachedFiles_;
  70. TVector<TYPath> Layers_;
  71. TString ClassName_;
  72. TString Command_;
  73. ui64 TotalFileSize_ = 0;
  74. bool IsCommandJob_ = false;
  75. private:
  76. TString GetFileStorage() const;
  77. TYPath GetCachePath() const;
  78. bool IsLocalMode() const;
  79. int GetFileCacheReplicationFactor() const;
  80. void CreateStorage() const;
  81. void CreateFileInCypress(const TString& path) const;
  82. TString PutFileToCypressCache(const TString& path, const TString& md5Signature, TTransactionId transactionId) const;
  83. TMaybe<TString> GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const;
  84. TDuration GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const;
  85. TString UploadToRandomPath(const IItemToUpload& itemToUpload) const;
  86. TString UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const;
  87. TMaybe<TString> TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const;
  88. TString UploadToCache(const IItemToUpload& itemToUpload) const;
  89. void UseFileInCypress(const TRichYPath& file);
  90. void UploadLocalFile(
  91. const TLocalFilePath& localPath,
  92. const TAddLocalFileOptions& options,
  93. bool isApiFile = false);
  94. void UploadBinary(const TJobBinaryConfig& jobBinary);
  95. void UploadSmallFile(const TSmallJobFile& smallFile);
  96. void PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState);
  97. };
  98. ////////////////////////////////////////////////////////////////////////////////
  99. } // namespace NYT::NDetail