ytrun_lib.cpp 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #include "ytrun_lib.h"
  2. #include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
  3. #include <yt/yql/providers/yt/provider/yql_yt_provider.h>
  4. #include <yt/yql/providers/yt/lib/config_clusters/config_clusters.h>
  5. #include <yt/yql/providers/yt/lib/yt_download/yt_download.h>
  6. #include <yt/yql/providers/yt/lib/yt_url_lister/yt_url_lister.h>
  7. #include <yt/yql/providers/yt/lib/log/yt_logger.h>
  8. #include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
  9. #include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h>
  10. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  11. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  12. #include <yql/essentials/core/services/yql_transform_pipeline.h>
  13. #include <yql/essentials/core/cbo/simple/cbo_simple.h>
  14. #include <yql/essentials/utils/backtrace/backtrace.h>
  15. #include <yt/cpp/mapreduce/client/init.h>
  16. #include <yt/cpp/mapreduce/interface/config.h>
  17. #include <library/cpp/digest/md5/md5.h>
  18. #include <library/cpp/malloc/api/malloc.h>
  19. #include <library/cpp/sighandler/async_signals_handler.h>
  20. #include <util/folder/path.h>
  21. #include <util/stream/file.h>
  22. namespace {
  23. class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
  24. public:
  25. TPeepHolePipelineConfigurator() = default;
  26. void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
  27. Y_UNUSED(pipeline);
  28. }
  29. void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
  30. Y_UNUSED(pipeline);
  31. }
  32. void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
  33. pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow");
  34. pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput");
  35. pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
  36. pipeline->Add(NYql::CreateYtBlockOutputTransformer(nullptr), "BlockOutput");
  37. }
  38. };
  39. TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE;
  40. void FlushYtDebugLogOnSignal() {
  41. if (!NMalloc::IsAllocatorCorrupted) {
  42. NYql::FlushYtDebugLog();
  43. }
  44. }
  45. } // unnamed
  46. namespace NYql {
  47. TYtRunTool::TYtRunTool(TString name)
  48. : TFacadeRunner(std::move(name))
  49. {
  50. GetRunOptions().EnableResultPosition = true;
  51. GetRunOptions().EnableCredentials = true;
  52. GetRunOptions().EnableQPlayer = true;
  53. GetRunOptions().ResultStream = &Cout;
  54. GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
  55. opts.AddLongOption("user", "MR user")
  56. .Optional()
  57. .RequiredArgument("USER")
  58. .StoreResult(&GetRunOptions().User);
  59. opts.AddLongOption("mrjob-bin", "Path to mrjob binary")
  60. .Optional()
  61. .StoreResult(&MrJobBin_);
  62. opts.AddLongOption("mrjob-udfsdir", "Path to udfs for mr jobs")
  63. .Optional()
  64. .StoreResult(&MrJobUdfsDir_);
  65. opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
  66. .Handler0([&]() {
  67. SetOperationProgressWriter([](const TOperationProgress& progress) {
  68. TStringBuilder remoteId;
  69. if (progress.RemoteId) {
  70. remoteId << ", remoteId: " << progress.RemoteId;
  71. }
  72. TStringBuilder counters;
  73. if (progress.Counters) {
  74. if (progress.Counters->Running) {
  75. counters << ' ' << progress.Counters->Running;
  76. }
  77. if (progress.Counters->Total) {
  78. counters << TStringBuf(" (") << (100ul * progress.Counters->Completed / progress.Counters->Total) << TStringBuf("%)");
  79. }
  80. }
  81. Cerr << "Operation: [" << progress.Category << "] " << progress.Id
  82. << ", state: " << progress.State << remoteId << counters
  83. << ", current stage: " << progress.Stage.first << Endl;
  84. });
  85. });
  86. opts.AddLongOption("threads", "gateway threads")
  87. .Optional()
  88. .RequiredArgument("COUNT")
  89. .StoreResult(&NumThreads_);
  90. opts.AddLongOption("keep-temp", "keep temporary tables")
  91. .Optional()
  92. .NoArgument()
  93. .SetFlag(&KeepTemp_);
  94. opts.AddLongOption("use-graph-meta", "Use tables metadata from graph")
  95. .Optional()
  96. .NoArgument()
  97. .SetFlag(&GetRunOptions().UseMetaFromGrpah);
  98. });
  99. GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) {
  100. Y_UNUSED(res);
  101. if (!GetRunOptions().GatewaysConfig) {
  102. GetRunOptions().GatewaysConfig = MakeHolder<TGatewaysConfig>();
  103. }
  104. auto ytConfig = GetRunOptions().GatewaysConfig->MutableYt();
  105. ytConfig->SetGatewayThreads(NumThreads_);
  106. if (MrJobBin_.empty()) {
  107. ytConfig->ClearMrJobBin();
  108. } else {
  109. ytConfig->SetMrJobBin(MrJobBin_);
  110. ytConfig->SetMrJobBinMd5(MD5::File(MrJobBin_));
  111. }
  112. if (MrJobUdfsDir_.empty()) {
  113. ytConfig->ClearMrJobUdfsDir();
  114. } else {
  115. ytConfig->SetMrJobUdfsDir(MrJobUdfsDir_);
  116. }
  117. auto attr = ytConfig->MutableDefaultSettings()->Add();
  118. attr->SetName("KeepTempTables");
  119. attr->SetValue(KeepTemp_ ? "yes" : "no");
  120. FillClusterMapping(*ytConfig, TString{YtProviderName});
  121. DefYtServer_ = NYql::TConfigClusters::GetDefaultYtServer(*ytConfig);
  122. });
  123. GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
  124. GetRunOptions().GatewayTypes.emplace(YtProviderName);
  125. AddFsDownloadFactory([this]() -> NFS::IDownloaderPtr {
  126. return MakeYtDownloader(*GetRunOptions().FsConfig, DefYtServer_);
  127. });
  128. AddUrlListerFactory([]() -> IUrlListerPtr {
  129. return MakeYtUrlLister();
  130. });
  131. AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
  132. if (GetRunOptions().GatewayTypes.contains(YtProviderName) && GetRunOptions().GatewaysConfig->HasYt()) {
  133. return GetYtNativeDataProviderInitializer(CreateYtGateway(), CreateCboFactory(), CreateDqHelper());
  134. }
  135. return {};
  136. });
  137. SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);
  138. }
  139. IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
  140. TYtNativeServices services;
  141. services.FunctionRegistry = GetFuncRegistry().Get();
  142. services.FileStorage = GetFileStorage();
  143. services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt());
  144. auto ytGateway = CreateYtNativeGateway(services);
  145. return GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName) ? CreateYtFmrGateway(ytGateway): ytGateway;
  146. }
  147. IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() {
  148. return MakeSimpleCBOOptimizerFactory();
  149. }
  150. IDqHelper::TPtr TYtRunTool::CreateDqHelper() {
  151. return {};
  152. }
  153. int TYtRunTool::DoMain(int argc, const char *argv[]) {
  154. // Init MR/YT for proper work of embedded agent
  155. NYT::Initialize(argc, argv);
  156. NYql::NBacktrace::AddAfterFatalCallback([](int){ FlushYtDebugLogOnSignal(); });
  157. NYql::SetYtLoggerGlobalBackend(LOG_DEF_PRIORITY);
  158. if (NYT::TConfig::Get()->Prefix.empty()) {
  159. NYT::TConfig::Get()->Prefix = "//";
  160. }
  161. int res = TFacadeRunner::DoMain(argc, argv);
  162. if (0 == res) {
  163. NYql::DropYtDebugLog();
  164. }
  165. return res;
  166. }
  167. TProgram::TStatus TYtRunTool::DoRunProgram(TProgramPtr program) {
  168. auto sigHandler = [program](int) {
  169. Cerr << "Aborting..." << Endl;
  170. try {
  171. program->Abort().GetValueSync();
  172. } catch (...) {
  173. Cerr << CurrentExceptionMessage();
  174. }
  175. };
  176. SetAsyncSignalFunction(SIGINT, sigHandler);
  177. SetAsyncSignalFunction(SIGTERM, sigHandler);
  178. TProgram::TStatus status = TFacadeRunner::DoRunProgram(program);
  179. auto dummySigHandler = [](int) { };
  180. SetAsyncSignalFunction(SIGINT, dummySigHandler);
  181. SetAsyncSignalFunction(SIGTERM, dummySigHandler);
  182. return status;
  183. }
  184. } // NYql