prepare_operation.cpp 8.7 KB


  1. #include "prepare_operation.h"
  2. #include <yt/cpp/mapreduce/common/retry_request.h>
  3. #include <yt/cpp/mapreduce/interface/raw_batch_request.h>
  4. #include <yt/cpp/mapreduce/interface/raw_client.h>
  5. #include <yt/cpp/mapreduce/interface/serialize.h>
  6. #include <library/cpp/iterator/functools.h>
  7. namespace NYT::NDetail {
  8. ////////////////////////////////////////////////////////////////////////////////
  9. TOperationPreparationContext::TOperationPreparationContext(
  10. const TStructuredJobTableList& structuredInputs,
  11. const TStructuredJobTableList& structuredOutputs,
  12. const IRawClientPtr& rawClient,
  13. const IClientRetryPolicyPtr& retryPolicy,
  14. TTransactionId transactionId)
  15. : RawClient_(rawClient)
  16. , RetryPolicy_(retryPolicy)
  17. , TransactionId_(transactionId)
  18. , InputSchemas_(structuredInputs.size())
  19. , InputSchemasLoaded_(structuredInputs.size(), false)
  20. {
  21. Inputs_.reserve(structuredInputs.size());
  22. for (const auto& input : structuredInputs) {
  23. Inputs_.push_back(input.RichYPath);
  24. }
  25. Outputs_.reserve(structuredOutputs.size());
  26. for (const auto& output : structuredOutputs) {
  27. Outputs_.push_back(output.RichYPath);
  28. }
  29. }
  30. TOperationPreparationContext::TOperationPreparationContext(
  31. TVector<TRichYPath> inputs,
  32. TVector<TRichYPath> outputs,
  33. const IRawClientPtr& rawClient,
  34. const IClientRetryPolicyPtr& retryPolicy,
  35. TTransactionId transactionId)
  36. : RawClient_(rawClient)
  37. , RetryPolicy_(retryPolicy)
  38. , TransactionId_(transactionId)
  39. , InputSchemas_(inputs.size())
  40. , InputSchemasLoaded_(inputs.size(), false)
  41. {
  42. Inputs_.reserve(inputs.size());
  43. for (auto& input : inputs) {
  44. Inputs_.push_back(std::move(input));
  45. }
  46. Outputs_.reserve(outputs.size());
  47. for (const auto& output : outputs) {
  48. Outputs_.push_back(std::move(output));
  49. }
  50. }
  51. int TOperationPreparationContext::GetInputCount() const
  52. {
  53. return static_cast<int>(Inputs_.size());
  54. }
  55. int TOperationPreparationContext::GetOutputCount() const
  56. {
  57. return static_cast<int>(Outputs_.size());
  58. }
  59. const TVector<TTableSchema>& TOperationPreparationContext::GetInputSchemas() const
  60. {
  61. TVector<::NThreading::TFuture<TNode>> schemaFutures;
  62. auto batch = RawClient_->CreateRawBatchRequest();
  63. for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
  64. if (InputSchemasLoaded_[tableIndex]) {
  65. schemaFutures.emplace_back();
  66. continue;
  67. }
  68. Y_ABORT_UNLESS(Inputs_[tableIndex]);
  69. schemaFutures.push_back(batch->Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{}));
  70. }
  71. batch->ExecuteBatch();
  72. for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
  73. if (schemaFutures[tableIndex].Initialized()) {
  74. Deserialize(InputSchemas_[tableIndex], schemaFutures[tableIndex].ExtractValueSync());
  75. }
  76. }
  77. return InputSchemas_;
  78. }
  79. const TTableSchema& TOperationPreparationContext::GetInputSchema(int index) const
  80. {
  81. auto& schema = InputSchemas_[index];
  82. if (!InputSchemasLoaded_[index]) {
  83. Y_ABORT_UNLESS(Inputs_[index]);
  84. auto schemaNode = RequestWithRetry<TNode>(
  85. RetryPolicy_->CreatePolicyForGenericRequest(),
  86. [this, &index] (TMutationId /*mutationId*/) {
  87. return RawClient_->Get(TransactionId_, Inputs_[index]->Path_ + "/@schema");
  88. });
  89. Deserialize(schema, schemaNode);
  90. }
  91. return schema;
  92. }
  93. TMaybe<TYPath> TOperationPreparationContext::GetInputPath(int index) const
  94. {
  95. Y_ABORT_UNLESS(index < static_cast<int>(Inputs_.size()));
  96. if (Inputs_[index]) {
  97. return Inputs_[index]->Path_;
  98. }
  99. return Nothing();
  100. }
  101. TMaybe<TYPath> TOperationPreparationContext::GetOutputPath(int index) const
  102. {
  103. Y_ABORT_UNLESS(index < static_cast<int>(Outputs_.size()));
  104. if (Outputs_[index]) {
  105. return Outputs_[index]->Path_;
  106. }
  107. return Nothing();
  108. }
  109. ////////////////////////////////////////////////////////////////////////////////
  110. TSpeculativeOperationPreparationContext::TSpeculativeOperationPreparationContext(
  111. const TVector<TTableSchema>& previousResult,
  112. TStructuredJobTableList inputs,
  113. TStructuredJobTableList outputs)
  114. : InputSchemas_(previousResult)
  115. , Inputs_(std::move(inputs))
  116. , Outputs_(std::move(outputs))
  117. {
  118. Y_ABORT_UNLESS(Inputs_.size() == previousResult.size());
  119. }
  120. int TSpeculativeOperationPreparationContext::GetInputCount() const
  121. {
  122. return static_cast<int>(Inputs_.size());
  123. }
  124. int TSpeculativeOperationPreparationContext::GetOutputCount() const
  125. {
  126. return static_cast<int>(Outputs_.size());
  127. }
  128. const TVector<TTableSchema>& TSpeculativeOperationPreparationContext::GetInputSchemas() const
  129. {
  130. return InputSchemas_;
  131. }
  132. const TTableSchema& TSpeculativeOperationPreparationContext::GetInputSchema(int index) const
  133. {
  134. Y_ABORT_UNLESS(index < static_cast<int>(InputSchemas_.size()));
  135. return InputSchemas_[index];
  136. }
  137. TMaybe<TYPath> TSpeculativeOperationPreparationContext::GetInputPath(int index) const
  138. {
  139. Y_ABORT_UNLESS(index < static_cast<int>(Inputs_.size()));
  140. if (Inputs_[index].RichYPath) {
  141. return Inputs_[index].RichYPath->Path_;
  142. }
  143. return Nothing();
  144. }
  145. TMaybe<TYPath> TSpeculativeOperationPreparationContext::GetOutputPath(int index) const
  146. {
  147. Y_ABORT_UNLESS(index < static_cast<int>(Outputs_.size()));
  148. if (Outputs_[index].RichYPath) {
  149. return Outputs_[index].RichYPath->Path_;
  150. }
  151. return Nothing();
  152. }
  153. ////////////////////////////////////////////////////////////////////////////////
  154. static void FixInputTable(TRichYPath& table, int index, const TJobOperationPreparer& preparer)
  155. {
  156. const auto& columnRenamings = preparer.GetInputColumnRenamings();
  157. const auto& columnFilters = preparer.GetInputColumnFilters();
  158. if (!columnRenamings[index].empty()) {
  159. table.RenameColumns(columnRenamings[index]);
  160. }
  161. if (columnFilters[index]) {
  162. table.Columns(*columnFilters[index]);
  163. }
  164. }
  165. static void FixInputTable(TStructuredJobTable& table, int index, const TJobOperationPreparer& preparer)
  166. {
  167. const auto& inputDescriptions = preparer.GetInputDescriptions();
  168. if (inputDescriptions[index] && std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) {
  169. table.Description = *inputDescriptions[index];
  170. }
  171. if (table.RichYPath) {
  172. FixInputTable(*table.RichYPath, index, preparer);
  173. }
  174. }
  175. static void FixOutputTable(TRichYPath& /* table */, int /* index */, const TJobOperationPreparer& /* preparer */)
  176. { }
  177. static void FixOutputTable(TStructuredJobTable& table, int index, const TJobOperationPreparer& preparer)
  178. {
  179. const auto& outputDescriptions = preparer.GetOutputDescriptions();
  180. if (outputDescriptions[index] && std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) {
  181. table.Description = *outputDescriptions[index];
  182. }
  183. if (table.RichYPath) {
  184. FixOutputTable(*table.RichYPath, index, preparer);
  185. }
  186. }
  187. template <typename TTables>
  188. TVector<TTableSchema> PrepareOperation(
  189. const IJob& job,
  190. const IOperationPreparationContext& context,
  191. TTables* inputsPtr,
  192. TTables* outputsPtr,
  193. TUserJobFormatHints& hints)
  194. {
  195. TJobOperationPreparer preparer(context);
  196. job.PrepareOperation(context, preparer);
  197. preparer.Finish();
  198. if (inputsPtr) {
  199. auto& inputs = *inputsPtr;
  200. for (int i = 0; i < static_cast<int>(inputs.size()); ++i) {
  201. FixInputTable(inputs[i], i, preparer);
  202. }
  203. }
  204. if (outputsPtr) {
  205. auto& outputs = *outputsPtr;
  206. for (int i = 0; i < static_cast<int>(outputs.size()); ++i) {
  207. FixOutputTable(outputs[i], i, preparer);
  208. }
  209. }
  210. auto applyPatch = [](TMaybe<TFormatHints>& origin, const TMaybe<TFormatHints>& patch) {
  211. if (origin) {
  212. if (patch) {
  213. origin->Merge(*patch);
  214. }
  215. } else {
  216. origin = patch;
  217. }
  218. };
  219. auto preparerHints = preparer.GetFormatHints();
  220. applyPatch(preparerHints.InputFormatHints_, hints.InputFormatHints_);
  221. applyPatch(preparerHints.OutputFormatHints_, hints.OutputFormatHints_);
  222. hints = std::move(preparerHints);
  223. return preparer.GetOutputSchemas();
  224. }
  225. template
  226. TVector<TTableSchema> PrepareOperation<TStructuredJobTableList>(
  227. const IJob& job,
  228. const IOperationPreparationContext& context,
  229. TStructuredJobTableList* inputsPtr,
  230. TStructuredJobTableList* outputsPtr,
  231. TUserJobFormatHints& hints);
  232. template
  233. TVector<TTableSchema> PrepareOperation<TVector<TRichYPath>>(
  234. const IJob& job,
  235. const IOperationPreparationContext& context,
  236. TVector<TRichYPath>* inputsPtr,
  237. TVector<TRichYPath>* outputsPtr,
  238. TUserJobFormatHints& hints);
  239. ////////////////////////////////////////////////////////////////////////////////
  240. } // namespace NYT::NDetail