yqlrun_lib.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #include "yqlrun_lib.h"
  2. #include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
  3. #include <yt/yql/providers/yt/provider/yql_yt_provider.h>
  4. #include <yt/yql/providers/yt/gateway/file/yql_yt_file_services.h>
  5. #include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
  6. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  7. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  8. #include <yql/essentials/core/services/yql_transform_pipeline.h>
  9. #include <yql/essentials/core/cbo/simple/cbo_simple.h>
  10. #include <util/generic/yexception.h>
  11. #include <util/folder/iterator.h>
  12. #include <util/folder/dirut.h>
  13. #include <util/folder/path.h>
  14. #include <util/stream/output.h>
  15. namespace {
  16. class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
  17. public:
  18. TPeepHolePipelineConfigurator() = default;
  19. void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
  20. Y_UNUSED(pipeline);
  21. }
  22. void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
  23. Y_UNUSED(pipeline);
  24. }
  25. void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
  26. pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow");
  27. pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput");
  28. pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
  29. pipeline->Add(NYql::CreateYtBlockOutputTransformer(nullptr), "BlockOutput");
  30. }
  31. };
  32. TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE;
  33. } // unnamed
  34. namespace NYql {
  35. TYqlRunTool::TYqlRunTool()
  36. : TFacadeRunner("yqlrun")
  37. {
  38. GetRunOptions().UseRepeatableRandomAndTimeProviders = true;
  39. GetRunOptions().ResultsFormat = NYson::EYsonFormat::Pretty;
  40. GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
  41. opts.AddLongOption('t', "table", "Table mapping").RequiredArgument("table@file")
  42. .KVHandler([&](TString name, TString path) {
  43. if (name.empty() || path.empty()) {
  44. throw yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.plato.Input@input.txt";
  45. }
  46. TablesMapping_[name] = path;
  47. }, '@');
  48. opts.AddLongOption("tables-dir", "Table dirs mapping").RequiredArgument("cluster@dir")
  49. .KVHandler([&](TString cluster, TString dir) {
  50. if (cluster.empty() || dir.empty()) {
  51. throw yexception() << "Incorrect table directory mapping, expected form cluster@dir, e.g. yt.plato@/tmp/tables";
  52. }
  53. TablesDirMapping_[cluster] = dir;
  54. for (const auto& entry: TDirIterator(TFsPath(dir))) {
  55. if (auto entryPath = TFsPath(entry.fts_path); entryPath.IsFile() && entryPath.GetExtension() == "txt") {
  56. auto tableName = TString(cluster).append('.').append(entryPath.RelativeTo(TFsPath(dir)).GetPath());
  57. tableName = tableName.substr(0, tableName.size() - 4); // remove .txt extension
  58. TablesMapping_[tableName] = entryPath.GetPath();
  59. }
  60. }
  61. }, '@');
  62. opts.AddLongOption('C', "cluster", "Cluster to service mapping").RequiredArgument("name@service")
  63. .KVHandler([&](TString cluster, TString provider) {
  64. if (cluster.empty() || provider.empty()) {
  65. throw yexception() << "Incorrect service mapping, expected form cluster@provider, e.g. plato@yt";
  66. }
  67. AddClusterMapping(std::move(cluster), std::move(provider));
  68. }, '@');
  69. opts.AddLongOption("ndebug", "Do not show debug info in error output").NoArgument().SetFlag(&GetRunOptions().NoDebug);
  70. opts.AddLongOption("keep-temp", "Keep temporary tables").NoArgument().SetFlag(&KeepTemp_);
  71. opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
  72. .Handler0([&]() {
  73. SetOperationProgressWriter([](const TOperationProgress& progress) {
  74. Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << "\n";
  75. });
  76. });
  77. opts.AddLongOption("tmp-dir", "Directory for temporary tables").RequiredArgument("DIR").StoreResult(&TmpDir_);
  78. opts.AddLongOption("test-format", "Compare formatted query's AST with the original query's AST (only syntaxVersion=1 is supported)").NoArgument().SetFlag(&GetRunOptions().TestSqlFormat);
  79. opts.AddLongOption("validate-result-format", "Check that result-format can parse Result").NoArgument().SetFlag(&GetRunOptions().ValidateResultFormat);
  80. });
  81. GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
  82. GetRunOptions().GatewayTypes.emplace(YtProviderName);
  83. AddClusterMapping(TString{"plato"}, TString{YtProviderName});
  84. AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
  85. auto yqlNativeServices = NFile::TYtFileServices::Make(GetFuncRegistry().Get(), TablesMapping_, GetFileStorage(), TmpDir_, KeepTemp_, TablesDirMapping_);
  86. auto ytNativeGateway = CreateYtFileGateway(yqlNativeServices);
  87. return GetYtNativeDataProviderInitializer(ytNativeGateway, MakeSimpleCBOOptimizerFactory(), {});
  88. });
  89. SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);
  90. }
  91. } // NYql