ytrun_lib.cpp 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. #include "ytrun_lib.h"
  2. #include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
  3. #include <yt/yql/providers/yt/provider/yql_yt_provider.h>
  4. #include <yt/yql/providers/yt/lib/config_clusters/config_clusters.h>
  5. #include <yt/yql/providers/yt/lib/yt_download/yt_download.h>
  6. #include <yt/yql/providers/yt/lib/yt_url_lister/yt_url_lister.h>
  7. #include <yt/yql/providers/yt/lib/log/yt_logger.h>
  8. #include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
  9. #include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h>
  10. #include <yt/yql/providers/yt/fmr/coordinator/client/yql_yt_coordinator_client.h>
  11. #include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
  12. #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
  13. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  14. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  15. #include <yql/essentials/core/services/yql_transform_pipeline.h>
  16. #include <yql/essentials/core/cbo/simple/cbo_simple.h>
  17. #include <yql/essentials/utils/backtrace/backtrace.h>
  18. #include <yt/cpp/mapreduce/client/init.h>
  19. #include <yt/cpp/mapreduce/interface/config.h>
  20. #include <library/cpp/digest/md5/md5.h>
  21. #include <library/cpp/malloc/api/malloc.h>
  22. #include <library/cpp/sighandler/async_signals_handler.h>
  23. #include <util/folder/path.h>
  24. #include <util/stream/file.h>
  25. namespace {
  26. class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
  27. public:
  28. TPeepHolePipelineConfigurator() = default;
  29. void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
  30. Y_UNUSED(pipeline);
  31. }
  32. void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
  33. Y_UNUSED(pipeline);
  34. }
  35. void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
  36. pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow");
  37. pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput");
  38. pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
  39. pipeline->Add(NYql::CreateYtBlockOutputTransformer(nullptr), "BlockOutput");
  40. }
  41. };
  42. TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE;
  43. void FlushYtDebugLogOnSignal() {
  44. if (!NMalloc::IsAllocatorCorrupted) {
  45. NYql::FlushYtDebugLog();
  46. }
  47. }
  48. } // unnamed
  49. namespace NYql {
  50. TYtRunTool::TYtRunTool(TString name)
  51. : TFacadeRunner(std::move(name))
  52. {
  53. GetRunOptions().EnableResultPosition = true;
  54. GetRunOptions().EnableCredentials = true;
  55. GetRunOptions().EnableQPlayer = true;
  56. GetRunOptions().ResultStream = &Cout;
  57. GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
  58. opts.AddLongOption("user", "MR user")
  59. .Optional()
  60. .RequiredArgument("USER")
  61. .StoreResult(&GetRunOptions().User);
  62. opts.AddLongOption("mrjob-bin", "Path to mrjob binary")
  63. .Optional()
  64. .StoreResult(&MrJobBin_);
  65. opts.AddLongOption("mrjob-udfsdir", "Path to udfs for mr jobs")
  66. .Optional()
  67. .StoreResult(&MrJobUdfsDir_);
  68. opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
  69. .Handler0([&]() {
  70. SetOperationProgressWriter([](const TOperationProgress& progress) {
  71. TStringBuilder remoteId;
  72. if (progress.RemoteId) {
  73. remoteId << ", remoteId: " << progress.RemoteId;
  74. }
  75. TStringBuilder counters;
  76. if (progress.Counters) {
  77. if (progress.Counters->Running) {
  78. counters << ' ' << progress.Counters->Running;
  79. }
  80. if (progress.Counters->Total) {
  81. counters << TStringBuf(" (") << (100ul * progress.Counters->Completed / progress.Counters->Total) << TStringBuf("%)");
  82. }
  83. }
  84. Cerr << "Operation: [" << progress.Category << "] " << progress.Id
  85. << ", state: " << progress.State << remoteId << counters
  86. << ", current stage: " << progress.Stage.first << Endl;
  87. });
  88. });
  89. opts.AddLongOption("threads", "gateway threads")
  90. .Optional()
  91. .RequiredArgument("COUNT")
  92. .StoreResult(&NumThreads_);
  93. opts.AddLongOption("keep-temp", "keep temporary tables")
  94. .Optional()
  95. .NoArgument()
  96. .SetFlag(&KeepTemp_);
  97. opts.AddLongOption("use-graph-meta", "Use tables metadata from graph")
  98. .Optional()
  99. .NoArgument()
  100. .SetFlag(&GetRunOptions().UseMetaFromGrpah);
  101. opts.AddLongOption("fmr-coordinator-server-url", "Fast map reduce coordinator server url")
  102. .Optional()
  103. .StoreResult(&FmrCoordinatorServerUrl_);
  104. opts.AddLongOption("disable-local-fmr-worker", "Disable local fast map reduce worker")
  105. .Optional()
  106. .NoArgument()
  107. .SetFlag(&DisableLocalFmrWorker_);
  108. });
  109. GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) {
  110. Y_UNUSED(res);
  111. if (!GetRunOptions().GatewaysConfig) {
  112. GetRunOptions().GatewaysConfig = MakeHolder<TGatewaysConfig>();
  113. }
  114. auto ytConfig = GetRunOptions().GatewaysConfig->MutableYt();
  115. ytConfig->SetGatewayThreads(NumThreads_);
  116. if (MrJobBin_.empty()) {
  117. ytConfig->ClearMrJobBin();
  118. } else {
  119. ytConfig->SetMrJobBin(MrJobBin_);
  120. ytConfig->SetMrJobBinMd5(MD5::File(MrJobBin_));
  121. }
  122. if (MrJobUdfsDir_.empty()) {
  123. ytConfig->ClearMrJobUdfsDir();
  124. } else {
  125. ytConfig->SetMrJobUdfsDir(MrJobUdfsDir_);
  126. }
  127. auto attr = ytConfig->MutableDefaultSettings()->Add();
  128. attr->SetName("KeepTempTables");
  129. attr->SetValue(KeepTemp_ ? "yes" : "no");
  130. FillClusterMapping(*ytConfig, TString{YtProviderName});
  131. DefYtServer_ = NYql::TConfigClusters::GetDefaultYtServer(*ytConfig);
  132. });
  133. GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
  134. GetRunOptions().GatewayTypes.emplace(YtProviderName);
  135. AddFsDownloadFactory([this]() -> NFS::IDownloaderPtr {
  136. return MakeYtDownloader(*GetRunOptions().FsConfig, DefYtServer_);
  137. });
  138. AddUrlListerFactory([]() -> IUrlListerPtr {
  139. return MakeYtUrlLister();
  140. });
  141. AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
  142. if (GetRunOptions().GatewayTypes.contains(YtProviderName) && GetRunOptions().GatewaysConfig->HasYt()) {
  143. return GetYtNativeDataProviderInitializer(CreateYtGateway(), CreateCboFactory(), CreateDqHelper());
  144. }
  145. return {};
  146. });
  147. SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);
  148. }
  149. IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
  150. TYtNativeServices services;
  151. services.FunctionRegistry = GetFuncRegistry().Get();
  152. services.FileStorage = GetFileStorage();
  153. services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt());
  154. auto ytGateway = CreateYtNativeGateway(services);
  155. if (!GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) {
  156. return ytGateway;
  157. }
  158. auto coordinator = NFmr::MakeFmrCoordinator();
  159. if (!FmrCoordinatorServerUrl_.empty()) {
  160. NFmr::TFmrCoordinatorClientSettings coordinatorClientSettings;
  161. THttpURL parsedUrl;
  162. if (parsedUrl.Parse(FmrCoordinatorServerUrl_) != THttpURL::ParsedOK) {
  163. ythrow yexception() << "Invalid fast map reduce coordinator server url passed in parameters";
  164. }
  165. coordinatorClientSettings.Port = parsedUrl.GetPort();
  166. coordinatorClientSettings.Host = parsedUrl.GetHost();
  167. coordinator = NFmr::MakeFmrCoordinatorClient(coordinatorClientSettings);
  168. }
  169. if (!DisableLocalFmrWorker_) {
  170. auto func = [&] (NFmr::TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
  171. while (!cancelFlag->load()) {
  172. Sleep(TDuration::Seconds(3));
  173. return NFmr::ETaskStatus::Completed;
  174. }
  175. return NFmr::ETaskStatus::Failed;
  176. }; // TODO - use function which actually calls Downloader/Uploader based on task params
  177. NFmr::TFmrJobFactorySettings settings{.Function=func};
  178. auto jobFactory = MakeFmrJobFactory(settings);
  179. NFmr::TFmrWorkerSettings workerSettings{.WorkerId = 0, .RandomProvider = CreateDefaultRandomProvider(),
  180. .TimeToSleepBetweenRequests=TDuration::Seconds(1)};
  181. FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings);
  182. FmrWorker_->Start();
  183. }
  184. return NFmr::CreateYtFmrGateway(ytGateway, coordinator);
  185. }
  186. IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() {
  187. return MakeSimpleCBOOptimizerFactory();
  188. }
  189. IDqHelper::TPtr TYtRunTool::CreateDqHelper() {
  190. return {};
  191. }
  192. int TYtRunTool::DoMain(int argc, const char *argv[]) {
  193. // Init MR/YT for proper work of embedded agent
  194. NYT::Initialize(argc, argv);
  195. NYql::NBacktrace::AddAfterFatalCallback([](int){ FlushYtDebugLogOnSignal(); });
  196. NYql::SetYtLoggerGlobalBackend(LOG_DEF_PRIORITY);
  197. if (NYT::TConfig::Get()->Prefix.empty()) {
  198. NYT::TConfig::Get()->Prefix = "//";
  199. }
  200. int res = TFacadeRunner::DoMain(argc, argv);
  201. if (0 == res) {
  202. NYql::DropYtDebugLog();
  203. }
  204. return res;
  205. }
  206. TProgram::TStatus TYtRunTool::DoRunProgram(TProgramPtr program) {
  207. auto sigHandler = [program](int) {
  208. Cerr << "Aborting..." << Endl;
  209. try {
  210. program->Abort().GetValueSync();
  211. } catch (...) {
  212. Cerr << CurrentExceptionMessage();
  213. }
  214. };
  215. SetAsyncSignalFunction(SIGINT, sigHandler);
  216. SetAsyncSignalFunction(SIGTERM, sigHandler);
  217. TProgram::TStatus status = TFacadeRunner::DoRunProgram(program);
  218. auto dummySigHandler = [](int) { };
  219. SetAsyncSignalFunction(SIGINT, dummySigHandler);
  220. SetAsyncSignalFunction(SIGTERM, dummySigHandler);
  221. return status;
  222. }
  223. } // NYql