ytrun_lib.cpp 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. #include "ytrun_lib.h"
  2. #include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
  3. #include <yt/yql/providers/yt/provider/yql_yt_provider.h>
  4. #include <yt/yql/providers/yt/lib/config_clusters/config_clusters.h>
  5. #include <yt/yql/providers/yt/lib/yt_download/yt_download.h>
  6. #include <yt/yql/providers/yt/lib/yt_url_lister/yt_url_lister.h>
  7. #include <yt/yql/providers/yt/lib/log/yt_logger.h>
  8. #include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
  9. #include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h>
  10. #include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
  11. #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
  12. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  13. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  14. #include <yql/essentials/core/services/yql_transform_pipeline.h>
  15. #include <yql/essentials/core/cbo/simple/cbo_simple.h>
  16. #include <yql/essentials/utils/backtrace/backtrace.h>
  17. #include <yt/cpp/mapreduce/client/init.h>
  18. #include <yt/cpp/mapreduce/interface/config.h>
  19. #include <library/cpp/digest/md5/md5.h>
  20. #include <library/cpp/malloc/api/malloc.h>
  21. #include <library/cpp/sighandler/async_signals_handler.h>
  22. #include <util/folder/path.h>
  23. #include <util/stream/file.h>
  24. namespace {
  25. class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
  26. public:
  27. TPeepHolePipelineConfigurator() = default;
  28. void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
  29. Y_UNUSED(pipeline);
  30. }
  31. void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
  32. Y_UNUSED(pipeline);
  33. }
  34. void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
  35. pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow");
  36. pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput");
  37. pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
  38. pipeline->Add(NYql::CreateYtBlockOutputTransformer(nullptr), "BlockOutput");
  39. }
  40. };
  41. TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE;
  42. void FlushYtDebugLogOnSignal() {
  43. if (!NMalloc::IsAllocatorCorrupted) {
  44. NYql::FlushYtDebugLog();
  45. }
  46. }
  47. } // unnamed
  48. namespace NYql {
  49. TYtRunTool::TYtRunTool(TString name)
  50. : TFacadeRunner(std::move(name))
  51. {
  52. GetRunOptions().EnableResultPosition = true;
  53. GetRunOptions().EnableCredentials = true;
  54. GetRunOptions().EnableQPlayer = true;
  55. GetRunOptions().ResultStream = &Cout;
  56. GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
  57. opts.AddLongOption("user", "MR user")
  58. .Optional()
  59. .RequiredArgument("USER")
  60. .StoreResult(&GetRunOptions().User);
  61. opts.AddLongOption("mrjob-bin", "Path to mrjob binary")
  62. .Optional()
  63. .StoreResult(&MrJobBin_);
  64. opts.AddLongOption("mrjob-udfsdir", "Path to udfs for mr jobs")
  65. .Optional()
  66. .StoreResult(&MrJobUdfsDir_);
  67. opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
  68. .Handler0([&]() {
  69. SetOperationProgressWriter([](const TOperationProgress& progress) {
  70. TStringBuilder remoteId;
  71. if (progress.RemoteId) {
  72. remoteId << ", remoteId: " << progress.RemoteId;
  73. }
  74. TStringBuilder counters;
  75. if (progress.Counters) {
  76. if (progress.Counters->Running) {
  77. counters << ' ' << progress.Counters->Running;
  78. }
  79. if (progress.Counters->Total) {
  80. counters << TStringBuf(" (") << (100ul * progress.Counters->Completed / progress.Counters->Total) << TStringBuf("%)");
  81. }
  82. }
  83. Cerr << "Operation: [" << progress.Category << "] " << progress.Id
  84. << ", state: " << progress.State << remoteId << counters
  85. << ", current stage: " << progress.Stage.first << Endl;
  86. });
  87. });
  88. opts.AddLongOption("threads", "gateway threads")
  89. .Optional()
  90. .RequiredArgument("COUNT")
  91. .StoreResult(&NumThreads_);
  92. opts.AddLongOption("keep-temp", "keep temporary tables")
  93. .Optional()
  94. .NoArgument()
  95. .SetFlag(&KeepTemp_);
  96. opts.AddLongOption("use-graph-meta", "Use tables metadata from graph")
  97. .Optional()
  98. .NoArgument()
  99. .SetFlag(&GetRunOptions().UseMetaFromGrpah);
  100. });
  101. GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) {
  102. Y_UNUSED(res);
  103. if (!GetRunOptions().GatewaysConfig) {
  104. GetRunOptions().GatewaysConfig = MakeHolder<TGatewaysConfig>();
  105. }
  106. auto ytConfig = GetRunOptions().GatewaysConfig->MutableYt();
  107. ytConfig->SetGatewayThreads(NumThreads_);
  108. if (MrJobBin_.empty()) {
  109. ytConfig->ClearMrJobBin();
  110. } else {
  111. ytConfig->SetMrJobBin(MrJobBin_);
  112. ytConfig->SetMrJobBinMd5(MD5::File(MrJobBin_));
  113. }
  114. if (MrJobUdfsDir_.empty()) {
  115. ytConfig->ClearMrJobUdfsDir();
  116. } else {
  117. ytConfig->SetMrJobUdfsDir(MrJobUdfsDir_);
  118. }
  119. auto attr = ytConfig->MutableDefaultSettings()->Add();
  120. attr->SetName("KeepTempTables");
  121. attr->SetValue(KeepTemp_ ? "yes" : "no");
  122. FillClusterMapping(*ytConfig, TString{YtProviderName});
  123. DefYtServer_ = NYql::TConfigClusters::GetDefaultYtServer(*ytConfig);
  124. });
  125. GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
  126. GetRunOptions().GatewayTypes.emplace(YtProviderName);
  127. AddFsDownloadFactory([this]() -> NFS::IDownloaderPtr {
  128. return MakeYtDownloader(*GetRunOptions().FsConfig, DefYtServer_);
  129. });
  130. AddUrlListerFactory([]() -> IUrlListerPtr {
  131. return MakeYtUrlLister();
  132. });
  133. AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
  134. if (GetRunOptions().GatewayTypes.contains(YtProviderName) && GetRunOptions().GatewaysConfig->HasYt()) {
  135. return GetYtNativeDataProviderInitializer(CreateYtGateway(), CreateCboFactory(), CreateDqHelper());
  136. }
  137. return {};
  138. });
  139. SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);
  140. }
  141. IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
  142. TYtNativeServices services;
  143. services.FunctionRegistry = GetFuncRegistry().Get();
  144. services.FileStorage = GetFileStorage();
  145. services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt());
  146. auto ytGateway = CreateYtNativeGateway(services);
  147. if (!GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) {
  148. return ytGateway;
  149. }
  150. auto coordinator = MakeFmrCoordinator();
  151. auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
  152. while (!cancelFlag->load()) {
  153. Sleep(TDuration::Seconds(3));
  154. return ETaskStatus::Completed;
  155. }
  156. return ETaskStatus::Aborted;
  157. }; // TODO - use function which actually calls Downloader/Uploader based on task params
  158. TFmrJobFactorySettings settings{.Function=func};
  159. auto jobFactory = MakeFmrJobFactory(settings);
  160. TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(),
  161. .TimeToSleepBetweenRequests=TDuration::Seconds(1)};
  162. FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings);
  163. FmrWorker_->Start();
  164. return CreateYtFmrGateway(ytGateway, coordinator);
  165. }
  166. IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() {
  167. return MakeSimpleCBOOptimizerFactory();
  168. }
  169. IDqHelper::TPtr TYtRunTool::CreateDqHelper() {
  170. return {};
  171. }
  172. int TYtRunTool::DoMain(int argc, const char *argv[]) {
  173. // Init MR/YT for proper work of embedded agent
  174. NYT::Initialize(argc, argv);
  175. NYql::NBacktrace::AddAfterFatalCallback([](int){ FlushYtDebugLogOnSignal(); });
  176. NYql::SetYtLoggerGlobalBackend(LOG_DEF_PRIORITY);
  177. if (NYT::TConfig::Get()->Prefix.empty()) {
  178. NYT::TConfig::Get()->Prefix = "//";
  179. }
  180. int res = TFacadeRunner::DoMain(argc, argv);
  181. if (0 == res) {
  182. NYql::DropYtDebugLog();
  183. }
  184. return res;
  185. }
  186. TProgram::TStatus TYtRunTool::DoRunProgram(TProgramPtr program) {
  187. auto sigHandler = [program](int) {
  188. Cerr << "Aborting..." << Endl;
  189. try {
  190. program->Abort().GetValueSync();
  191. } catch (...) {
  192. Cerr << CurrentExceptionMessage();
  193. }
  194. };
  195. SetAsyncSignalFunction(SIGINT, sigHandler);
  196. SetAsyncSignalFunction(SIGTERM, sigHandler);
  197. TProgram::TStatus status = TFacadeRunner::DoRunProgram(program);
  198. auto dummySigHandler = [](int) { };
  199. SetAsyncSignalFunction(SIGINT, dummySigHandler);
  200. SetAsyncSignalFunction(SIGTERM, dummySigHandler);
  201. return status;
  202. }
  203. } // NYql