operation.h 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. #pragma once
  2. #include "fwd.h"
  3. #include "structured_table_formats.h"
  4. #include "operation_preparer.h"
  5. #include <yt/cpp/mapreduce/http/fwd.h>
  6. #include <yt/cpp/mapreduce/interface/client.h>
  7. #include <yt/cpp/mapreduce/interface/operation.h>
  8. #include <yt/cpp/mapreduce/interface/retry_policy.h>
  9. #include <util/generic/ptr.h>
  10. #include <util/generic/vector.h>
  11. namespace NYT::NDetail {
  12. ////////////////////////////////////////////////////////////////////////////////
  13. class TOperation
  14. : public IOperation
  15. {
  16. public:
  17. class TOperationImpl;
  18. public:
  19. explicit TOperation(TClientPtr client);
  20. TOperation(TOperationId id, TClientPtr client);
  21. virtual const TOperationId& GetId() const override;
  22. virtual TString GetWebInterfaceUrl() const override;
  23. void OnPrepared();
  24. void SetDelayedStartFunction(std::function<TOperationId()> start);
  25. virtual void Start() override;
  26. void OnPreparationException(std::exception_ptr e);
  27. virtual bool IsStarted() const override;
  28. virtual TString GetStatus() const override;
  29. void OnStatusUpdated(const TString& newStatus);
  30. virtual ::NThreading::TFuture<void> GetPreparedFuture() override;
  31. virtual ::NThreading::TFuture<void> GetStartedFuture() override;
  32. virtual ::NThreading::TFuture<void> Watch() override;
  33. virtual TVector<TFailedJobInfo> GetFailedJobInfo(const TGetFailedJobInfoOptions& options = TGetFailedJobInfoOptions()) override;
  34. virtual EOperationBriefState GetBriefState() override;
  35. virtual TMaybe<TYtError> GetError() override;
  36. virtual TJobStatistics GetJobStatistics() override;
  37. virtual TMaybe<TOperationBriefProgress> GetBriefProgress() override;
  38. virtual void AbortOperation() override;
  39. virtual void CompleteOperation() override;
  40. virtual void SuspendOperation(const TSuspendOperationOptions& options) override;
  41. virtual void ResumeOperation(const TResumeOperationOptions& options) override;
  42. virtual TOperationAttributes GetAttributes(const TGetOperationOptions& options) override;
  43. virtual void UpdateParameters(const TUpdateOperationParametersOptions& options) override;
  44. virtual TJobAttributes GetJob(const TJobId& jobId, const TGetJobOptions& options) override;
  45. virtual TListJobsResult ListJobs(const TListJobsOptions& options) override;
  46. private:
  47. TClientPtr Client_;
  48. ::TIntrusivePtr<TOperationImpl> Impl_;
  49. };
  50. using TOperationPtr = ::TIntrusivePtr<TOperation>;
  51. ////////////////////////////////////////////////////////////////////////////////
  52. struct TSimpleOperationIo
  53. {
  54. TVector<TRichYPath> Inputs;
  55. TVector<TRichYPath> Outputs;
  56. TFormat InputFormat;
  57. TFormat OutputFormat;
  58. TVector<TSmallJobFile> JobFiles;
  59. };
  60. TSimpleOperationIo CreateSimpleOperationIoHelper(
  61. const IStructuredJob& structuredJob,
  62. const TOperationPreparer& preparer,
  63. const TOperationOptions& options,
  64. TStructuredJobTableList structuredInputs,
  65. TStructuredJobTableList structuredOutputs,
  66. TUserJobFormatHints hints,
  67. ENodeReaderFormat nodeReaderFormat,
  68. const THashSet<TString>& columnsUsedInOperations);
  69. ////////////////////////////////////////////////////////////////////////////////
  70. void ExecuteMap(
  71. const TOperationPtr& operation,
  72. const TOperationPreparerPtr& preparer,
  73. const TMapOperationSpec& spec,
  74. const ::TIntrusivePtr<IStructuredJob>& mapper,
  75. const TOperationOptions& options);
  76. void ExecuteRawMap(
  77. const TOperationPtr& operation,
  78. const TOperationPreparerPtr& preparer,
  79. const TRawMapOperationSpec& spec,
  80. const ::TIntrusivePtr<IRawJob>& mapper,
  81. const TOperationOptions& options);
  82. void ExecuteReduce(
  83. const TOperationPtr& operation,
  84. const TOperationPreparerPtr& preparer,
  85. const TReduceOperationSpec& spec,
  86. const ::TIntrusivePtr<IStructuredJob>& reducer,
  87. const TOperationOptions& options);
  88. void ExecuteRawReduce(
  89. const TOperationPtr& operation,
  90. const TOperationPreparerPtr& preparer,
  91. const TRawReduceOperationSpec& spec,
  92. const ::TIntrusivePtr<IRawJob>& reducer,
  93. const TOperationOptions& options);
  94. void ExecuteJoinReduce(
  95. const TOperationPtr& operation,
  96. const TOperationPreparerPtr& preparer,
  97. const TJoinReduceOperationSpec& spec,
  98. const ::TIntrusivePtr<IStructuredJob>& reducer,
  99. const TOperationOptions& options);
  100. void ExecuteRawJoinReduce(
  101. const TOperationPtr& operation,
  102. const TOperationPreparerPtr& preparer,
  103. const TRawJoinReduceOperationSpec& spec,
  104. const ::TIntrusivePtr<IRawJob>& reducer,
  105. const TOperationOptions& options);
  106. void ExecuteMapReduce(
  107. const TOperationPtr& operation,
  108. const TOperationPreparerPtr& preparer,
  109. const TMapReduceOperationSpec& spec,
  110. const ::TIntrusivePtr<IStructuredJob>& mapper,
  111. const ::TIntrusivePtr<IStructuredJob>& reduceCombiner,
  112. const ::TIntrusivePtr<IStructuredJob>& reducer,
  113. const TOperationOptions& options);
  114. void ExecuteRawMapReduce(
  115. const TOperationPtr& operation,
  116. const TOperationPreparerPtr& preparer,
  117. const TRawMapReduceOperationSpec& spec,
  118. const ::TIntrusivePtr<IRawJob>& mapper,
  119. const ::TIntrusivePtr<IRawJob>& reduceCombiner,
  120. const ::TIntrusivePtr<IRawJob>& reducer,
  121. const TOperationOptions& options);
  122. void ExecuteSort(
  123. const TOperationPtr& operation,
  124. const TOperationPreparerPtr& preparer,
  125. const TSortOperationSpec& spec,
  126. const TOperationOptions& options);
  127. void ExecuteMerge(
  128. const TOperationPtr& operation,
  129. const TOperationPreparerPtr& preparer,
  130. const TMergeOperationSpec& spec,
  131. const TOperationOptions& options);
  132. void ExecuteErase(
  133. const TOperationPtr& operation,
  134. const TOperationPreparerPtr& preparer,
  135. const TEraseOperationSpec& spec,
  136. const TOperationOptions& options);
  137. void ExecuteRemoteCopy(
  138. const TOperationPtr& operation,
  139. const TOperationPreparerPtr& preparer,
  140. const TRemoteCopyOperationSpec& spec,
  141. const TOperationOptions& options);
  142. void ExecuteVanilla(
  143. const TOperationPtr& operation,
  144. const TOperationPreparerPtr& preparer,
  145. const TVanillaOperationSpec& spec,
  146. const TOperationOptions& options);
  147. EOperationBriefState CheckOperation(
  148. const IClientRetryPolicyPtr& clientRetryPolicy,
  149. const TClientContext& context,
  150. const TOperationId& operationId);
  151. void WaitForOperation(
  152. const IClientRetryPolicyPtr& clientRetryPolicy,
  153. const TClientContext& context,
  154. const TOperationId& operationId);
  155. ////////////////////////////////////////////////////////////////////////////////
  156. ::TIntrusivePtr<TOperation> ProcessOperation(
  157. NYT::NDetail::TClientPtr client,
  158. std::function<void()> prepare,
  159. ::TIntrusivePtr<TOperation> operation,
  160. const TOperationOptions& options);
  161. void WaitIfRequired(const TOperationPtr& operation, const TClientPtr& client, const TOperationOptions& options);
  162. ////////////////////////////////////////////////////////////////////////////////
  163. } // namespace NYT::NDetail