prepare_operation.cpp 8.7 KB


  1. #include "prepare_operation.h"
  2. #include <yt/cpp/mapreduce/common/retry_lib.h>
  3. #include <yt/cpp/mapreduce/interface/serialize.h>
  4. #include <yt/cpp/mapreduce/raw_client/raw_requests.h>
  5. #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
  6. #include <library/cpp/iterator/functools.h>
  7. namespace NYT::NDetail {
  8. ////////////////////////////////////////////////////////////////////////////////
  9. TOperationPreparationContext::TOperationPreparationContext(
  10. const TStructuredJobTableList& structuredInputs,
  11. const TStructuredJobTableList& structuredOutputs,
  12. const TClientContext& context,
  13. const IClientRetryPolicyPtr& retryPolicy,
  14. TTransactionId transactionId)
  15. : Context_(context)
  16. , RetryPolicy_(retryPolicy)
  17. , TransactionId_(transactionId)
  18. , InputSchemas_(structuredInputs.size())
  19. , InputSchemasLoaded_(structuredInputs.size(), false)
  20. {
  21. Inputs_.reserve(structuredInputs.size());
  22. for (const auto& input : structuredInputs) {
  23. Inputs_.push_back(input.RichYPath);
  24. }
  25. Outputs_.reserve(structuredOutputs.size());
  26. for (const auto& output : structuredOutputs) {
  27. Outputs_.push_back(output.RichYPath);
  28. }
  29. }
  30. TOperationPreparationContext::TOperationPreparationContext(
  31. TVector<TRichYPath> inputs,
  32. TVector<TRichYPath> outputs,
  33. const TClientContext& context,
  34. const IClientRetryPolicyPtr& retryPolicy,
  35. TTransactionId transactionId)
  36. : Context_(context)
  37. , RetryPolicy_(retryPolicy)
  38. , TransactionId_(transactionId)
  39. , InputSchemas_(inputs.size())
  40. , InputSchemasLoaded_(inputs.size(), false)
  41. {
  42. Inputs_.reserve(inputs.size());
  43. for (auto& input : inputs) {
  44. Inputs_.push_back(std::move(input));
  45. }
  46. Outputs_.reserve(outputs.size());
  47. for (const auto& output : outputs) {
  48. Outputs_.push_back(std::move(output));
  49. }
  50. }
  51. int TOperationPreparationContext::GetInputCount() const
  52. {
  53. return static_cast<int>(Inputs_.size());
  54. }
  55. int TOperationPreparationContext::GetOutputCount() const
  56. {
  57. return static_cast<int>(Outputs_.size());
  58. }
  59. const TVector<TTableSchema>& TOperationPreparationContext::GetInputSchemas() const
  60. {
  61. TVector<::NThreading::TFuture<TNode>> schemaFutures;
  62. NRawClient::TRawBatchRequest batch(Context_.Config);
  63. for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
  64. if (InputSchemasLoaded_[tableIndex]) {
  65. schemaFutures.emplace_back();
  66. continue;
  67. }
  68. Y_ABORT_UNLESS(Inputs_[tableIndex]);
  69. schemaFutures.push_back(batch.Get(TransactionId_, Inputs_[tableIndex]->Path_ + "/@schema", TGetOptions{}));
  70. }
  71. NRawClient::ExecuteBatch(
  72. RetryPolicy_->CreatePolicyForGenericRequest(),
  73. Context_,
  74. batch);
  75. for (int tableIndex = 0; tableIndex < static_cast<int>(InputSchemas_.size()); ++tableIndex) {
  76. if (schemaFutures[tableIndex].Initialized()) {
  77. Deserialize(InputSchemas_[tableIndex], schemaFutures[tableIndex].ExtractValueSync());
  78. }
  79. }
  80. return InputSchemas_;
  81. }
  82. const TTableSchema& TOperationPreparationContext::GetInputSchema(int index) const
  83. {
  84. auto& schema = InputSchemas_[index];
  85. if (!InputSchemasLoaded_[index]) {
  86. Y_ABORT_UNLESS(Inputs_[index]);
  87. auto schemaNode = NRawClient::Get(
  88. RetryPolicy_->CreatePolicyForGenericRequest(),
  89. Context_,
  90. TransactionId_,
  91. Inputs_[index]->Path_ + "/@schema");
  92. Deserialize(schema, schemaNode);
  93. }
  94. return schema;
  95. }
  96. TMaybe<TYPath> TOperationPreparationContext::GetInputPath(int index) const
  97. {
  98. Y_ABORT_UNLESS(index < static_cast<int>(Inputs_.size()));
  99. if (Inputs_[index]) {
  100. return Inputs_[index]->Path_;
  101. }
  102. return Nothing();
  103. }
  104. TMaybe<TYPath> TOperationPreparationContext::GetOutputPath(int index) const
  105. {
  106. Y_ABORT_UNLESS(index < static_cast<int>(Outputs_.size()));
  107. if (Outputs_[index]) {
  108. return Outputs_[index]->Path_;
  109. }
  110. return Nothing();
  111. }
  112. ////////////////////////////////////////////////////////////////////////////////
  113. TSpeculativeOperationPreparationContext::TSpeculativeOperationPreparationContext(
  114. const TVector<TTableSchema>& previousResult,
  115. TStructuredJobTableList inputs,
  116. TStructuredJobTableList outputs)
  117. : InputSchemas_(previousResult)
  118. , Inputs_(std::move(inputs))
  119. , Outputs_(std::move(outputs))
  120. {
  121. Y_ABORT_UNLESS(Inputs_.size() == previousResult.size());
  122. }
  123. int TSpeculativeOperationPreparationContext::GetInputCount() const
  124. {
  125. return static_cast<int>(Inputs_.size());
  126. }
  127. int TSpeculativeOperationPreparationContext::GetOutputCount() const
  128. {
  129. return static_cast<int>(Outputs_.size());
  130. }
  131. const TVector<TTableSchema>& TSpeculativeOperationPreparationContext::GetInputSchemas() const
  132. {
  133. return InputSchemas_;
  134. }
  135. const TTableSchema& TSpeculativeOperationPreparationContext::GetInputSchema(int index) const
  136. {
  137. Y_ABORT_UNLESS(index < static_cast<int>(InputSchemas_.size()));
  138. return InputSchemas_[index];
  139. }
  140. TMaybe<TYPath> TSpeculativeOperationPreparationContext::GetInputPath(int index) const
  141. {
  142. Y_ABORT_UNLESS(index < static_cast<int>(Inputs_.size()));
  143. if (Inputs_[index].RichYPath) {
  144. return Inputs_[index].RichYPath->Path_;
  145. }
  146. return Nothing();
  147. }
  148. TMaybe<TYPath> TSpeculativeOperationPreparationContext::GetOutputPath(int index) const
  149. {
  150. Y_ABORT_UNLESS(index < static_cast<int>(Outputs_.size()));
  151. if (Outputs_[index].RichYPath) {
  152. return Outputs_[index].RichYPath->Path_;
  153. }
  154. return Nothing();
  155. }
  156. ////////////////////////////////////////////////////////////////////////////////
  157. static void FixInputTable(TRichYPath& table, int index, const TJobOperationPreparer& preparer)
  158. {
  159. const auto& columnRenamings = preparer.GetInputColumnRenamings();
  160. const auto& columnFilters = preparer.GetInputColumnFilters();
  161. if (!columnRenamings[index].empty()) {
  162. table.RenameColumns(columnRenamings[index]);
  163. }
  164. if (columnFilters[index]) {
  165. table.Columns(*columnFilters[index]);
  166. }
  167. }
  168. static void FixInputTable(TStructuredJobTable& table, int index, const TJobOperationPreparer& preparer)
  169. {
  170. const auto& inputDescriptions = preparer.GetInputDescriptions();
  171. if (inputDescriptions[index] && std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) {
  172. table.Description = *inputDescriptions[index];
  173. }
  174. if (table.RichYPath) {
  175. FixInputTable(*table.RichYPath, index, preparer);
  176. }
  177. }
  178. static void FixOutputTable(TRichYPath& /* table */, int /* index */, const TJobOperationPreparer& /* preparer */)
  179. { }
  180. static void FixOutputTable(TStructuredJobTable& table, int index, const TJobOperationPreparer& preparer)
  181. {
  182. const auto& outputDescriptions = preparer.GetOutputDescriptions();
  183. if (outputDescriptions[index] && std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) {
  184. table.Description = *outputDescriptions[index];
  185. }
  186. if (table.RichYPath) {
  187. FixOutputTable(*table.RichYPath, index, preparer);
  188. }
  189. }
  190. template <typename TTables>
  191. TVector<TTableSchema> PrepareOperation(
  192. const IJob& job,
  193. const IOperationPreparationContext& context,
  194. TTables* inputsPtr,
  195. TTables* outputsPtr,
  196. TUserJobFormatHints& hints)
  197. {
  198. TJobOperationPreparer preparer(context);
  199. job.PrepareOperation(context, preparer);
  200. preparer.Finish();
  201. if (inputsPtr) {
  202. auto& inputs = *inputsPtr;
  203. for (int i = 0; i < static_cast<int>(inputs.size()); ++i) {
  204. FixInputTable(inputs[i], i, preparer);
  205. }
  206. }
  207. if (outputsPtr) {
  208. auto& outputs = *outputsPtr;
  209. for (int i = 0; i < static_cast<int>(outputs.size()); ++i) {
  210. FixOutputTable(outputs[i], i, preparer);
  211. }
  212. }
  213. auto applyPatch = [](TMaybe<TFormatHints>& origin, const TMaybe<TFormatHints>& patch) {
  214. if (origin) {
  215. if (patch) {
  216. origin->Merge(*patch);
  217. }
  218. } else {
  219. origin = patch;
  220. }
  221. };
  222. auto preparerHints = preparer.GetFormatHints();
  223. applyPatch(preparerHints.InputFormatHints_, hints.InputFormatHints_);
  224. applyPatch(preparerHints.OutputFormatHints_, hints.OutputFormatHints_);
  225. hints = std::move(preparerHints);
  226. return preparer.GetOutputSchemas();
  227. }
  228. template
  229. TVector<TTableSchema> PrepareOperation<TStructuredJobTableList>(
  230. const IJob& job,
  231. const IOperationPreparationContext& context,
  232. TStructuredJobTableList* inputsPtr,
  233. TStructuredJobTableList* outputsPtr,
  234. TUserJobFormatHints& hints);
  235. template
  236. TVector<TTableSchema> PrepareOperation<TVector<TRichYPath>>(
  237. const IJob& job,
  238. const IOperationPreparationContext& context,
  239. TVector<TRichYPath>* inputsPtr,
  240. TVector<TRichYPath>* outputsPtr,
  241. TUserJobFormatHints& hints);
  242. ////////////////////////////////////////////////////////////////////////////////
  243. } // namespace NYT::NDetail