operation_preparer.h 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #pragma once
  2. #include "client.h"
  3. #include "structured_table_formats.h"
  4. #include <yt/cpp/mapreduce/interface/operation.h>
  5. namespace NYT::NDetail {
  6. ////////////////////////////////////////////////////////////////////////////////
  7. class TOperation;
  8. class TOperationPreparer
  9. : public TThrRefBase
  10. {
  11. public:
  12. TOperationPreparer(TClientPtr client, TTransactionId transactionId);
  13. const TClientContext& GetContext() const;
  14. TTransactionId GetTransactionId() const;
  15. ITransactionPingerPtr GetTransactionPinger() const;
  16. TClientPtr GetClient() const;
  17. const TString& GetPreparationId() const;
  18. void LockFiles(TVector<TRichYPath>* paths);
  19. TOperationId StartOperation(
  20. TOperation* operation,
  21. const TString& operationType,
  22. const TNode& spec,
  23. bool useStartOperationRequest = false);
  24. const IClientRetryPolicyPtr& GetClientRetryPolicy() const;
  25. private:
  26. TClientPtr Client_;
  27. TTransactionId TransactionId_;
  28. THolder<TPingableTransaction> FileTransaction_;
  29. IClientRetryPolicyPtr ClientRetryPolicy_;
  30. const TString PreparationId_;
  31. private:
  32. void CheckValidity() const;
  33. };
  34. using TOperationPreparerPtr = ::TIntrusivePtr<TOperationPreparer>;
  35. ////////////////////////////////////////////////////////////////////////////////
  36. struct IItemToUpload
  37. {
  38. virtual ~IItemToUpload() = default;
  39. virtual TString CalculateMD5() const = 0;
  40. virtual THolder<IInputStream> CreateInputStream() const = 0;
  41. virtual TString GetDescription() const = 0;
  42. virtual ui64 GetDataSize() const = 0;
  43. };
  44. ////////////////////////////////////////////////////////////////////////////////
  45. class TJobPreparer
  46. : private TNonCopyable
  47. {
  48. public:
  49. TJobPreparer(
  50. TOperationPreparer& operationPreparer,
  51. const TUserJobSpec& spec,
  52. const IJob& job,
  53. size_t outputTableCount,
  54. const TVector<TSmallJobFile>& smallFileList,
  55. const TOperationOptions& options);
  56. TVector<TRichYPath> GetFiles() const;
  57. TVector<TYPath> GetLayers() const;
  58. const TString& GetClassName() const;
  59. const TString& GetCommand() const;
  60. const TUserJobSpec& GetSpec() const;
  61. bool ShouldMountSandbox() const;
  62. ui64 GetTotalFileSize() const;
  63. private:
  64. TOperationPreparer& OperationPreparer_;
  65. TUserJobSpec Spec_;
  66. TOperationOptions Options_;
  67. TVector<TRichYPath> CypressFiles_;
  68. TVector<TRichYPath> CachedFiles_;
  69. TVector<TYPath> Layers_;
  70. TString ClassName_;
  71. TString Command_;
  72. ui64 TotalFileSize_ = 0;
  73. private:
  74. TString GetFileStorage() const;
  75. TYPath GetCachePath() const;
  76. bool IsLocalMode() const;
  77. int GetFileCacheReplicationFactor() const;
  78. void CreateStorage() const;
  79. void CreateFileInCypress(const TString& path) const;
  80. TString PutFileToCypressCache(const TString& path, const TString& md5Signature, TTransactionId transactionId) const;
  81. TMaybe<TString> GetItemFromCypressCache(const TString& md5Signature, const TString& fileName) const;
  82. TDuration GetWaitForUploadTimeout(const IItemToUpload& itemToUpload) const;
  83. TString UploadToRandomPath(const IItemToUpload& itemToUpload) const;
  84. TString UploadToCacheUsingApi(const IItemToUpload& itemToUpload) const;
  85. TMaybe<TString> TryUploadWithDeduplication(const IItemToUpload& itemToUpload) const;
  86. TString UploadToCache(const IItemToUpload& itemToUpload) const;
  87. void UseFileInCypress(const TRichYPath& file);
  88. void UploadLocalFile(
  89. const TLocalFilePath& localPath,
  90. const TAddLocalFileOptions& options,
  91. bool isApiFile = false);
  92. void UploadBinary(const TJobBinaryConfig& jobBinary);
  93. void UploadSmallFile(const TSmallJobFile& smallFile);
  94. void PrepareJobBinary(const IJob& job, int outputTableCount, bool hasState);
  95. };
  96. ////////////////////////////////////////////////////////////////////////////////
  97. } // namespace NYT::NDetail