#include "ytrun_lib.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace { class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator { public: TPeepHolePipelineConfigurator() = default; void AfterCreate(NYql::TTransformationPipeline* pipeline) const final { Y_UNUSED(pipeline); } void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final { Y_UNUSED(pipeline); } void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final { pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow"); pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput"); pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole"); pipeline->Add(NYql::CreateYtBlockOutputTransformer(nullptr), "BlockOutput"); } }; TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE; void FlushYtDebugLogOnSignal() { if (!NMalloc::IsAllocatorCorrupted) { NYql::FlushYtDebugLog(); } } } // unnamed namespace NYql { TYtRunTool::TYtRunTool(TString name) : TFacadeRunner(std::move(name)) { GetRunOptions().EnableResultPosition = true; GetRunOptions().EnableCredentials = true; GetRunOptions().EnableQPlayer = true; GetRunOptions().ResultStream = &Cout; GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) { opts.AddLongOption("user", "MR user") .Optional() .RequiredArgument("USER") .StoreResult(&GetRunOptions().User); opts.AddLongOption("mrjob-bin", "Path to mrjob binary") .Optional() .StoreResult(&MrJobBin_); opts.AddLongOption("mrjob-udfsdir", "Path to udfs for mr jobs") .Optional() .StoreResult(&MrJobUdfsDir_); opts.AddLongOption("show-progress", "Report operation progress").NoArgument() .Handler0([&]() { SetOperationProgressWriter([](const TOperationProgress& progress) { TStringBuilder remoteId; if (progress.RemoteId) { remoteId << ", remoteId: " << progress.RemoteId; } TStringBuilder counters; if (progress.Counters) { if (progress.Counters->Running) { counters << ' ' << progress.Counters->Running; } if (progress.Counters->Total) { counters << TStringBuf(" (") << (100ul * progress.Counters->Completed / progress.Counters->Total) << TStringBuf("%)"); } } Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << remoteId << counters << ", current stage: " << progress.Stage.first << Endl; }); }); opts.AddLongOption("threads", "gateway threads") .Optional() .RequiredArgument("COUNT") .StoreResult(&NumThreads_); opts.AddLongOption("keep-temp", "keep temporary tables") .Optional() .NoArgument() .SetFlag(&KeepTemp_); opts.AddLongOption("use-graph-meta", "Use tables metadata from graph") .Optional() .NoArgument() .SetFlag(&GetRunOptions().UseMetaFromGrpah); }); GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) { Y_UNUSED(res); if (!GetRunOptions().GatewaysConfig) { GetRunOptions().GatewaysConfig = MakeHolder(); } auto ytConfig = GetRunOptions().GatewaysConfig->MutableYt(); ytConfig->SetGatewayThreads(NumThreads_); if (MrJobBin_.empty()) { ytConfig->ClearMrJobBin(); } else { ytConfig->SetMrJobBin(MrJobBin_); ytConfig->SetMrJobBinMd5(MD5::File(MrJobBin_)); } if (MrJobUdfsDir_.empty()) { ytConfig->ClearMrJobUdfsDir(); } else { ytConfig->SetMrJobUdfsDir(MrJobUdfsDir_); } auto attr = ytConfig->MutableDefaultSettings()->Add(); attr->SetName("KeepTempTables"); attr->SetValue(KeepTemp_ ? "yes" : "no"); FillClusterMapping(*ytConfig, TString{YtProviderName}); DefYtServer_ = NYql::TConfigClusters::GetDefaultYtServer(*ytConfig); }); GetRunOptions().SetSupportedGateways({TString{YtProviderName}}); GetRunOptions().GatewayTypes.emplace(YtProviderName); AddFsDownloadFactory([this]() -> NFS::IDownloaderPtr { return MakeYtDownloader(*GetRunOptions().FsConfig, DefYtServer_); }); AddUrlListerFactory([]() -> IUrlListerPtr { return MakeYtUrlLister(); }); AddProviderFactory([this]() -> NYql::TDataProviderInitializer { if (GetRunOptions().GatewayTypes.contains(YtProviderName) && GetRunOptions().GatewaysConfig->HasYt()) { return GetYtNativeDataProviderInitializer(CreateYtGateway(), CreateCboFactory(), CreateDqHelper()); } return {}; }); SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE); } IYtGateway::TPtr TYtRunTool::CreateYtGateway() { TYtNativeServices services; services.FunctionRegistry = GetFuncRegistry().Get(); services.FileStorage = GetFileStorage(); services.Config = std::make_shared(GetRunOptions().GatewaysConfig->GetYt()); auto ytGateway = CreateYtNativeGateway(services); if (!GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) { return ytGateway; } auto coordinator = MakeFmrCoordinator(); auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr> cancelFlag) { while (!cancelFlag->load()) { Sleep(TDuration::Seconds(3)); return ETaskStatus::Completed; } return ETaskStatus::Aborted; }; // TODO - use function which actually calls Downloader/Uploader based on task params TFmrJobFactorySettings settings{.Function=func}; auto jobFactory = MakeFmrJobFactory(settings); TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(), .TimeToSleepBetweenRequests=TDuration::Seconds(1)}; FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings); FmrWorker_->Start(); return CreateYtFmrGateway(ytGateway, coordinator); } IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() { return MakeSimpleCBOOptimizerFactory(); } IDqHelper::TPtr TYtRunTool::CreateDqHelper() { return {}; } int TYtRunTool::DoMain(int argc, const char *argv[]) { // Init MR/YT for proper work of embedded agent NYT::Initialize(argc, argv); NYql::NBacktrace::AddAfterFatalCallback([](int){ FlushYtDebugLogOnSignal(); }); NYql::SetYtLoggerGlobalBackend(LOG_DEF_PRIORITY); if (NYT::TConfig::Get()->Prefix.empty()) { NYT::TConfig::Get()->Prefix = "//"; } int res = TFacadeRunner::DoMain(argc, argv); if (0 == res) { NYql::DropYtDebugLog(); } return res; } TProgram::TStatus TYtRunTool::DoRunProgram(TProgramPtr program) { auto sigHandler = [program](int) { Cerr << "Aborting..." << Endl; try { program->Abort().GetValueSync(); } catch (...) { Cerr << CurrentExceptionMessage(); } }; SetAsyncSignalFunction(SIGINT, sigHandler); SetAsyncSignalFunction(SIGTERM, sigHandler); TProgram::TStatus status = TFacadeRunner::DoRunProgram(program); auto dummySigHandler = [](int) { }; SetAsyncSignalFunction(SIGINT, dummySigHandler); SetAsyncSignalFunction(SIGTERM, dummySigHandler); return status; } } // NYql