py_helpers.cpp 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. #include "py_helpers.h"
  2. #include "client.h"
  3. #include "operation.h"
  4. #include "transaction.h"
  5. #include <yt/cpp/mapreduce/common/retry_lib.h>
  6. #include <yt/cpp/mapreduce/common/helpers.h>
  7. #include <yt/cpp/mapreduce/interface/client.h>
  8. #include <yt/cpp/mapreduce/interface/fluent.h>
  9. #include <yt/cpp/mapreduce/http_client/raw_requests.h>
  10. #include <library/cpp/yson/node/node_io.h>
  11. #include <util/generic/hash_set.h>
  12. namespace NYT {
  13. using namespace NDetail;
  14. ////////////////////////////////////////////////////////////////////////////////
  15. IStructuredJobPtr ConstructJob(const TString& jobName, const TString& state)
  16. {
  17. auto node = TNode();
  18. if (!state.empty()) {
  19. node = NodeFromYsonString(state);
  20. }
  21. return TJobFactory::Get()->GetConstructingFunction(jobName.data())(node);
  22. }
  23. TString GetJobStateString(const IStructuredJob& job)
  24. {
  25. TString result;
  26. {
  27. TStringOutput output(result);
  28. job.Save(output);
  29. output.Finish();
  30. }
  31. return result;
  32. }
  33. TStructuredJobTableList NodeToStructuredTablePaths(const TNode& node, const TOperationPreparer& preparer)
  34. {
  35. int intermediateTableCount = 0;
  36. TVector<TRichYPath> paths;
  37. for (const auto& inputNode : node.AsList()) {
  38. if (inputNode.IsNull()) {
  39. ++intermediateTableCount;
  40. } else {
  41. paths.emplace_back(inputNode.AsString());
  42. }
  43. }
  44. paths = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), paths);
  45. TStructuredJobTableList result(intermediateTableCount, TStructuredJobTable::Intermediate(TUnspecifiedTableStructure()));
  46. for (const auto& path : paths) {
  47. result.emplace_back(TStructuredJobTable{TUnspecifiedTableStructure(), path});
  48. }
  49. return result;
  50. }
  51. TString GetIOInfo(
  52. const IStructuredJob& job,
  53. const TCreateClientOptions& options,
  54. const TString& cluster,
  55. const TString& transactionId,
  56. const TString& inputPaths,
  57. const TString& outputPaths,
  58. const TString& neededColumns)
  59. {
  60. auto client = NDetail::CreateClientImpl(cluster, options);
  61. TOperationPreparer preparer(client, GetGuid(transactionId));
  62. auto structuredInputs = NodeToStructuredTablePaths(NodeFromYsonString(inputPaths), preparer);
  63. auto structuredOutputs = NodeToStructuredTablePaths(NodeFromYsonString(outputPaths), preparer);
  64. auto neededColumnsNode = NodeFromYsonString(neededColumns);
  65. THashSet<TString> columnsUsedInOperations;
  66. for (const auto& columnNode : neededColumnsNode.AsList()) {
  67. columnsUsedInOperations.insert(columnNode.AsString());
  68. }
  69. auto operationIo = CreateSimpleOperationIoHelper(
  70. job,
  71. preparer,
  72. TOperationOptions(),
  73. std::move(structuredInputs),
  74. std::move(structuredOutputs),
  75. TUserJobFormatHints(),
  76. ENodeReaderFormat::Yson,
  77. columnsUsedInOperations);
  78. return BuildYsonStringFluently().BeginMap()
  79. .Item("input_format").Value(operationIo.InputFormat.Config)
  80. .Item("output_format").Value(operationIo.OutputFormat.Config)
  81. .Item("input_table_paths").List(operationIo.Inputs)
  82. .Item("output_table_paths").List(operationIo.Outputs)
  83. .Item("small_files").DoListFor(
  84. operationIo.JobFiles.begin(),
  85. operationIo.JobFiles.end(),
  86. [] (TFluentList fluent, auto fileIt) {
  87. fluent.Item().BeginMap()
  88. .Item("file_name").Value(fileIt->FileName)
  89. .Item("data").Value(fileIt->Data)
  90. .EndMap();
  91. })
  92. .EndMap();
  93. }
  94. ////////////////////////////////////////////////////////////////////////////////
  95. } // namespace NYT