123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- #include "ytrun_lib.h"
- #include <yt/yql/providers/yt/provider/yql_yt_provider_impl.h>
- #include <yt/yql/providers/yt/provider/yql_yt_provider.h>
- #include <yt/yql/providers/yt/lib/config_clusters/config_clusters.h>
- #include <yt/yql/providers/yt/lib/yt_download/yt_download.h>
- #include <yt/yql/providers/yt/lib/yt_url_lister/yt_url_lister.h>
- #include <yt/yql/providers/yt/lib/log/yt_logger.h>
- #include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
- #include <yt/yql/providers/yt/gateway/fmr/yql_yt_fmr.h>
- #include <yt/yql/providers/yt/fmr/coordinator/impl/yql_yt_coordinator_impl.h>
- #include <yt/yql/providers/yt/fmr/job_factory/impl/yql_yt_job_factory_impl.h>
- #include <yql/essentials/providers/common/provider/yql_provider_names.h>
- #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
- #include <yql/essentials/core/services/yql_transform_pipeline.h>
- #include <yql/essentials/core/cbo/simple/cbo_simple.h>
- #include <yql/essentials/utils/backtrace/backtrace.h>
- #include <yt/cpp/mapreduce/client/init.h>
- #include <yt/cpp/mapreduce/interface/config.h>
- #include <library/cpp/digest/md5/md5.h>
- #include <library/cpp/malloc/api/malloc.h>
- #include <library/cpp/sighandler/async_signals_handler.h>
- #include <util/folder/path.h>
- #include <util/stream/file.h>
- namespace {
- class TPeepHolePipelineConfigurator : public NYql::IPipelineConfigurator {
- public:
- TPeepHolePipelineConfigurator() = default;
- void AfterCreate(NYql::TTransformationPipeline* pipeline) const final {
- Y_UNUSED(pipeline);
- }
- void AfterTypeAnnotation(NYql::TTransformationPipeline* pipeline) const final {
- Y_UNUSED(pipeline);
- }
- void AfterOptimize(NYql::TTransformationPipeline* pipeline) const final {
- pipeline->Add(NYql::CreateYtWideFlowTransformer(nullptr), "WideFlow");
- pipeline->Add(NYql::CreateYtBlockInputTransformer(nullptr), "BlockInput");
- pipeline->Add(NYql::MakePeepholeOptimization(pipeline->GetTypeAnnotationContext()), "PeepHole");
- pipeline->Add(NYql::CreateYtBlockOutputTransformer(nullptr), "BlockOutput");
- }
- };
- TPeepHolePipelineConfigurator PEEPHOLE_CONFIG_INSTANCE;
- void FlushYtDebugLogOnSignal() {
- if (!NMalloc::IsAllocatorCorrupted) {
- NYql::FlushYtDebugLog();
- }
- }
- } // unnamed
- namespace NYql {
- TYtRunTool::TYtRunTool(TString name)
- : TFacadeRunner(std::move(name))
- {
- GetRunOptions().EnableResultPosition = true;
- GetRunOptions().EnableCredentials = true;
- GetRunOptions().EnableQPlayer = true;
- GetRunOptions().ResultStream = &Cout;
- GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
- opts.AddLongOption("user", "MR user")
- .Optional()
- .RequiredArgument("USER")
- .StoreResult(&GetRunOptions().User);
- opts.AddLongOption("mrjob-bin", "Path to mrjob binary")
- .Optional()
- .StoreResult(&MrJobBin_);
- opts.AddLongOption("mrjob-udfsdir", "Path to udfs for mr jobs")
- .Optional()
- .StoreResult(&MrJobUdfsDir_);
- opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
- .Handler0([&]() {
- SetOperationProgressWriter([](const TOperationProgress& progress) {
- TStringBuilder remoteId;
- if (progress.RemoteId) {
- remoteId << ", remoteId: " << progress.RemoteId;
- }
- TStringBuilder counters;
- if (progress.Counters) {
- if (progress.Counters->Running) {
- counters << ' ' << progress.Counters->Running;
- }
- if (progress.Counters->Total) {
- counters << TStringBuf(" (") << (100ul * progress.Counters->Completed / progress.Counters->Total) << TStringBuf("%)");
- }
- }
- Cerr << "Operation: [" << progress.Category << "] " << progress.Id
- << ", state: " << progress.State << remoteId << counters
- << ", current stage: " << progress.Stage.first << Endl;
- });
- });
- opts.AddLongOption("threads", "gateway threads")
- .Optional()
- .RequiredArgument("COUNT")
- .StoreResult(&NumThreads_);
- opts.AddLongOption("keep-temp", "keep temporary tables")
- .Optional()
- .NoArgument()
- .SetFlag(&KeepTemp_);
- opts.AddLongOption("use-graph-meta", "Use tables metadata from graph")
- .Optional()
- .NoArgument()
- .SetFlag(&GetRunOptions().UseMetaFromGrpah);
- });
- GetRunOptions().AddOptHandler([this](const NLastGetopt::TOptsParseResult& res) {
- Y_UNUSED(res);
- if (!GetRunOptions().GatewaysConfig) {
- GetRunOptions().GatewaysConfig = MakeHolder<TGatewaysConfig>();
- }
- auto ytConfig = GetRunOptions().GatewaysConfig->MutableYt();
- ytConfig->SetGatewayThreads(NumThreads_);
- if (MrJobBin_.empty()) {
- ytConfig->ClearMrJobBin();
- } else {
- ytConfig->SetMrJobBin(MrJobBin_);
- ytConfig->SetMrJobBinMd5(MD5::File(MrJobBin_));
- }
- if (MrJobUdfsDir_.empty()) {
- ytConfig->ClearMrJobUdfsDir();
- } else {
- ytConfig->SetMrJobUdfsDir(MrJobUdfsDir_);
- }
- auto attr = ytConfig->MutableDefaultSettings()->Add();
- attr->SetName("KeepTempTables");
- attr->SetValue(KeepTemp_ ? "yes" : "no");
- FillClusterMapping(*ytConfig, TString{YtProviderName});
- DefYtServer_ = NYql::TConfigClusters::GetDefaultYtServer(*ytConfig);
- });
- GetRunOptions().SetSupportedGateways({TString{YtProviderName}});
- GetRunOptions().GatewayTypes.emplace(YtProviderName);
- AddFsDownloadFactory([this]() -> NFS::IDownloaderPtr {
- return MakeYtDownloader(*GetRunOptions().FsConfig, DefYtServer_);
- });
- AddUrlListerFactory([]() -> IUrlListerPtr {
- return MakeYtUrlLister();
- });
- AddProviderFactory([this]() -> NYql::TDataProviderInitializer {
- if (GetRunOptions().GatewayTypes.contains(YtProviderName) && GetRunOptions().GatewaysConfig->HasYt()) {
- return GetYtNativeDataProviderInitializer(CreateYtGateway(), CreateCboFactory(), CreateDqHelper());
- }
- return {};
- });
- SetPeepholePipelineConfigurator(&PEEPHOLE_CONFIG_INSTANCE);
- }
- IYtGateway::TPtr TYtRunTool::CreateYtGateway() {
- TYtNativeServices services;
- services.FunctionRegistry = GetFuncRegistry().Get();
- services.FileStorage = GetFileStorage();
- services.Config = std::make_shared<TYtGatewayConfig>(GetRunOptions().GatewaysConfig->GetYt());
- auto ytGateway = CreateYtNativeGateway(services);
- if (!GetRunOptions().GatewayTypes.contains(FastMapReduceGatewayName)) {
- return ytGateway;
- }
- auto coordinator = MakeFmrCoordinator();
- auto func = [&] (TTask::TPtr /*task*/, std::shared_ptr<std::atomic<bool>> cancelFlag) {
- while (!cancelFlag->load()) {
- Sleep(TDuration::Seconds(3));
- return ETaskStatus::Completed;
- }
- return ETaskStatus::Aborted;
- }; // TODO - use function which actually calls Downloader/Uploader based on task params
- TFmrJobFactorySettings settings{.Function=func};
- auto jobFactory = MakeFmrJobFactory(settings);
- TFmrWorkerSettings workerSettings{.WorkerId = 1, .RandomProvider = CreateDefaultRandomProvider(),
- .TimeToSleepBetweenRequests=TDuration::Seconds(1)};
- FmrWorker_ = MakeFmrWorker(coordinator, jobFactory, workerSettings);
- FmrWorker_->Start();
- return CreateYtFmrGateway(ytGateway, coordinator);
- }
- IOptimizerFactory::TPtr TYtRunTool::CreateCboFactory() {
- return MakeSimpleCBOOptimizerFactory();
- }
- IDqHelper::TPtr TYtRunTool::CreateDqHelper() {
- return {};
- }
- int TYtRunTool::DoMain(int argc, const char *argv[]) {
- // Init MR/YT for proper work of embedded agent
- NYT::Initialize(argc, argv);
- NYql::NBacktrace::AddAfterFatalCallback([](int){ FlushYtDebugLogOnSignal(); });
- NYql::SetYtLoggerGlobalBackend(LOG_DEF_PRIORITY);
- if (NYT::TConfig::Get()->Prefix.empty()) {
- NYT::TConfig::Get()->Prefix = "//";
- }
- int res = TFacadeRunner::DoMain(argc, argv);
- if (0 == res) {
- NYql::DropYtDebugLog();
- }
- return res;
- }
- TProgram::TStatus TYtRunTool::DoRunProgram(TProgramPtr program) {
- auto sigHandler = [program](int) {
- Cerr << "Aborting..." << Endl;
- try {
- program->Abort().GetValueSync();
- } catch (...) {
- Cerr << CurrentExceptionMessage();
- }
- };
- SetAsyncSignalFunction(SIGINT, sigHandler);
- SetAsyncSignalFunction(SIGTERM, sigHandler);
- TProgram::TStatus status = TFacadeRunner::DoRunProgram(program);
- auto dummySigHandler = [](int) { };
- SetAsyncSignalFunction(SIGINT, dummySigHandler);
- SetAsyncSignalFunction(SIGTERM, dummySigHandler);
- return status;
- }
- } // NYql
|