py_helpers.cpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #include "py_helpers.h"
  2. #include "client.h"
  3. #include "operation.h"
  4. #include "transaction.h"
  5. #include <yt/cpp/mapreduce/interface/client.h>
  6. #include <yt/cpp/mapreduce/interface/fluent.h>
  7. #include <yt/cpp/mapreduce/common/retry_lib.h>
  8. #include <yt/cpp/mapreduce/common/helpers.h>
  9. #include <library/cpp/yson/node/node_io.h>
  10. #include <util/generic/hash_set.h>
  11. namespace NYT {
  12. using namespace NDetail;
  13. ////////////////////////////////////////////////////////////////////////////////
  14. IStructuredJobPtr ConstructJob(const TString& jobName, const TString& state)
  15. {
  16. auto node = TNode();
  17. if (!state.empty()) {
  18. node = NodeFromYsonString(state);
  19. }
  20. return TJobFactory::Get()->GetConstructingFunction(jobName.data())(node);
  21. }
  22. TString GetJobStateString(const IStructuredJob& job)
  23. {
  24. TString result;
  25. {
  26. TStringOutput output(result);
  27. job.Save(output);
  28. output.Finish();
  29. }
  30. return result;
  31. }
  32. TStructuredJobTableList NodeToStructuredTablePaths(const TNode& node, const TOperationPreparer& preparer)
  33. {
  34. int intermediateTableCount = 0;
  35. TVector<TRichYPath> paths;
  36. for (const auto& inputNode : node.AsList()) {
  37. if (inputNode.IsNull()) {
  38. ++intermediateTableCount;
  39. } else {
  40. paths.emplace_back(inputNode.AsString());
  41. }
  42. }
  43. paths = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), paths);
  44. TStructuredJobTableList result(intermediateTableCount, TStructuredJobTable::Intermediate(TUnspecifiedTableStructure()));
  45. for (const auto& path : paths) {
  46. result.emplace_back(TStructuredJobTable{TUnspecifiedTableStructure(), path});
  47. }
  48. return result;
  49. }
  50. TString GetIOInfo(
  51. const IStructuredJob& job,
  52. const TCreateClientOptions& options,
  53. const TString& cluster,
  54. const TString& transactionId,
  55. const TString& inputPaths,
  56. const TString& outputPaths,
  57. const TString& neededColumns)
  58. {
  59. auto client = NDetail::CreateClientImpl(cluster, options);
  60. TOperationPreparer preparer(client, GetGuid(transactionId));
  61. auto structuredInputs = NodeToStructuredTablePaths(NodeFromYsonString(inputPaths), preparer);
  62. auto structuredOutputs = NodeToStructuredTablePaths(NodeFromYsonString(outputPaths), preparer);
  63. auto neededColumnsNode = NodeFromYsonString(neededColumns);
  64. THashSet<TString> columnsUsedInOperations;
  65. for (const auto& columnNode : neededColumnsNode.AsList()) {
  66. columnsUsedInOperations.insert(columnNode.AsString());
  67. }
  68. auto operationIo = CreateSimpleOperationIoHelper(
  69. job,
  70. preparer,
  71. TOperationOptions(),
  72. std::move(structuredInputs),
  73. std::move(structuredOutputs),
  74. TUserJobFormatHints(),
  75. ENodeReaderFormat::Yson,
  76. columnsUsedInOperations);
  77. return BuildYsonStringFluently().BeginMap()
  78. .Item("input_format").Value(operationIo.InputFormat.Config)
  79. .Item("output_format").Value(operationIo.OutputFormat.Config)
  80. .Item("input_table_paths").List(operationIo.Inputs)
  81. .Item("output_table_paths").List(operationIo.Outputs)
  82. .Item("small_files").DoListFor(
  83. operationIo.JobFiles.begin(),
  84. operationIo.JobFiles.end(),
  85. [] (TFluentList fluent, auto fileIt) {
  86. fluent.Item().BeginMap()
  87. .Item("file_name").Value(fileIt->FileName)
  88. .Item("data").Value(fileIt->Data)
  89. .EndMap();
  90. })
  91. .EndMap();
  92. }
  93. ////////////////////////////////////////////////////////////////////////////////
  94. } // namespace NYT