operation.h 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. #pragma once
  2. #include "structured_table_formats.h"
  3. #include "operation_preparer.h"
  4. #include <yt/cpp/mapreduce/http/fwd.h>
  5. #include <yt/cpp/mapreduce/interface/client.h>
  6. #include <yt/cpp/mapreduce/interface/operation.h>
  7. #include <yt/cpp/mapreduce/interface/retry_policy.h>
  8. #include <util/generic/ptr.h>
  9. #include <util/generic/vector.h>
  10. namespace NYT::NDetail {
  11. ////////////////////////////////////////////////////////////////////////////////
  12. class TOperation
  13. : public IOperation
  14. {
  15. public:
  16. class TOperationImpl;
  17. public:
  18. explicit TOperation(TClientPtr client);
  19. TOperation(TOperationId id, TClientPtr client);
  20. virtual const TOperationId& GetId() const override;
  21. virtual TString GetWebInterfaceUrl() const override;
  22. void OnPrepared();
  23. void SetDelayedStartFunction(std::function<TOperationId()> start);
  24. virtual void Start() override;
  25. void OnPreparationException(std::exception_ptr e);
  26. virtual bool IsStarted() const override;
  27. virtual TString GetStatus() const override;
  28. void OnStatusUpdated(const TString& newStatus);
  29. virtual ::NThreading::TFuture<void> GetPreparedFuture() override;
  30. virtual ::NThreading::TFuture<void> GetStartedFuture() override;
  31. virtual ::NThreading::TFuture<void> Watch() override;
  32. virtual TVector<TFailedJobInfo> GetFailedJobInfo(const TGetFailedJobInfoOptions& options = TGetFailedJobInfoOptions()) override;
  33. virtual EOperationBriefState GetBriefState() override;
  34. virtual TMaybe<TYtError> GetError() override;
  35. virtual TJobStatistics GetJobStatistics() override;
  36. virtual TMaybe<TOperationBriefProgress> GetBriefProgress() override;
  37. virtual void AbortOperation() override;
  38. virtual void CompleteOperation() override;
  39. virtual void SuspendOperation(const TSuspendOperationOptions& options) override;
  40. virtual void ResumeOperation(const TResumeOperationOptions& options) override;
  41. virtual TOperationAttributes GetAttributes(const TGetOperationOptions& options) override;
  42. virtual void UpdateParameters(const TUpdateOperationParametersOptions& options) override;
  43. virtual TJobAttributes GetJob(const TJobId& jobId, const TGetJobOptions& options) override;
  44. virtual TListJobsResult ListJobs(const TListJobsOptions& options) override;
  45. private:
  46. TClientPtr Client_;
  47. ::TIntrusivePtr<TOperationImpl> Impl_;
  48. };
  49. using TOperationPtr = ::TIntrusivePtr<TOperation>;
  50. ////////////////////////////////////////////////////////////////////////////////
  51. struct TSimpleOperationIo
  52. {
  53. TVector<TRichYPath> Inputs;
  54. TVector<TRichYPath> Outputs;
  55. TFormat InputFormat;
  56. TFormat OutputFormat;
  57. TVector<TSmallJobFile> JobFiles;
  58. };
  59. TSimpleOperationIo CreateSimpleOperationIoHelper(
  60. const IStructuredJob& structuredJob,
  61. const TOperationPreparer& preparer,
  62. const TOperationOptions& options,
  63. TStructuredJobTableList structuredInputs,
  64. TStructuredJobTableList structuredOutputs,
  65. TUserJobFormatHints hints,
  66. ENodeReaderFormat nodeReaderFormat,
  67. const THashSet<TString>& columnsUsedInOperations);
  68. ////////////////////////////////////////////////////////////////////////////////
  69. void ExecuteMap(
  70. const TOperationPtr& operation,
  71. const TOperationPreparerPtr& preparer,
  72. const TMapOperationSpec& spec,
  73. const ::TIntrusivePtr<IStructuredJob>& mapper,
  74. const TOperationOptions& options);
  75. void ExecuteRawMap(
  76. const TOperationPtr& operation,
  77. const TOperationPreparerPtr& preparer,
  78. const TRawMapOperationSpec& spec,
  79. const ::TIntrusivePtr<IRawJob>& mapper,
  80. const TOperationOptions& options);
  81. void ExecuteReduce(
  82. const TOperationPtr& operation,
  83. const TOperationPreparerPtr& preparer,
  84. const TReduceOperationSpec& spec,
  85. const ::TIntrusivePtr<IStructuredJob>& reducer,
  86. const TOperationOptions& options);
  87. void ExecuteRawReduce(
  88. const TOperationPtr& operation,
  89. const TOperationPreparerPtr& preparer,
  90. const TRawReduceOperationSpec& spec,
  91. const ::TIntrusivePtr<IRawJob>& reducer,
  92. const TOperationOptions& options);
  93. void ExecuteJoinReduce(
  94. const TOperationPtr& operation,
  95. const TOperationPreparerPtr& preparer,
  96. const TJoinReduceOperationSpec& spec,
  97. const ::TIntrusivePtr<IStructuredJob>& reducer,
  98. const TOperationOptions& options);
  99. void ExecuteRawJoinReduce(
  100. const TOperationPtr& operation,
  101. const TOperationPreparerPtr& preparer,
  102. const TRawJoinReduceOperationSpec& spec,
  103. const ::TIntrusivePtr<IRawJob>& reducer,
  104. const TOperationOptions& options);
  105. void ExecuteMapReduce(
  106. const TOperationPtr& operation,
  107. const TOperationPreparerPtr& preparer,
  108. const TMapReduceOperationSpec& spec,
  109. const ::TIntrusivePtr<IStructuredJob>& mapper,
  110. const ::TIntrusivePtr<IStructuredJob>& reduceCombiner,
  111. const ::TIntrusivePtr<IStructuredJob>& reducer,
  112. const TOperationOptions& options);
  113. void ExecuteRawMapReduce(
  114. const TOperationPtr& operation,
  115. const TOperationPreparerPtr& preparer,
  116. const TRawMapReduceOperationSpec& spec,
  117. const ::TIntrusivePtr<IRawJob>& mapper,
  118. const ::TIntrusivePtr<IRawJob>& reduceCombiner,
  119. const ::TIntrusivePtr<IRawJob>& reducer,
  120. const TOperationOptions& options);
  121. void ExecuteSort(
  122. const TOperationPtr& operation,
  123. const TOperationPreparerPtr& preparer,
  124. const TSortOperationSpec& spec,
  125. const TOperationOptions& options);
  126. void ExecuteMerge(
  127. const TOperationPtr& operation,
  128. const TOperationPreparerPtr& preparer,
  129. const TMergeOperationSpec& spec,
  130. const TOperationOptions& options);
  131. void ExecuteErase(
  132. const TOperationPtr& operation,
  133. const TOperationPreparerPtr& preparer,
  134. const TEraseOperationSpec& spec,
  135. const TOperationOptions& options);
  136. void ExecuteRemoteCopy(
  137. const TOperationPtr& operation,
  138. const TOperationPreparerPtr& preparer,
  139. const TRemoteCopyOperationSpec& spec,
  140. const TOperationOptions& options);
  141. void ExecuteVanilla(
  142. const TOperationPtr& operation,
  143. const TOperationPreparerPtr& preparer,
  144. const TVanillaOperationSpec& spec,
  145. const TOperationOptions& options);
  146. EOperationBriefState CheckOperation(
  147. const IRawClientPtr& rawClient,
  148. const IClientRetryPolicyPtr& clientRetryPolicy,
  149. const TOperationId& operationId);
  150. void WaitForOperation(
  151. const IClientRetryPolicyPtr& clientRetryPolicy,
  152. const IRawClientPtr& rawClient,
  153. const TClientContext& context,
  154. const TOperationId& operationId);
  155. ////////////////////////////////////////////////////////////////////////////////
  156. ::TIntrusivePtr<TOperation> ProcessOperation(
  157. NYT::NDetail::TClientPtr client,
  158. std::function<void()> prepare,
  159. ::TIntrusivePtr<TOperation> operation,
  160. const TOperationOptions& options);
  161. void WaitIfRequired(const TOperationPtr& operation, const TClientPtr& client, const TOperationOptions& options);
  162. ////////////////////////////////////////////////////////////////////////////////
  163. } // namespace NYT::NDetail