Browse Source

Intermediate changes
commit_hash:5d5aa527dd6fcb0dfb1e62748135e5f04f92b1b0

robot-piglet 2 months ago
parent
commit
3f61c3b132

+ 2 - 0
yql/essentials/tools/yql_facade_run/ya.make

@@ -21,6 +21,8 @@ PEERDIR(
     yql/essentials/core/url_lister
     yql/essentials/core/url_preprocessing
     yql/essentials/core/peephole_opt
+    yql/essentials/core/qplayer/storage/interface
+    yql/essentials/core/qplayer/storage/file
     yql/essentials/core
     yql/essentials/minikql/invoke_builtins
     yql/essentials/minikql

+ 164 - 66
yql/essentials/tools/yql_facade_run/yql_facade_run.cpp

@@ -21,6 +21,7 @@
 #include <yql/essentials/core/facade/yql_facade.h>
 #include <yql/essentials/core/url_lister/url_lister_manager.h>
 #include <yql/essentials/core/url_preprocessing/url_preprocessing.h>
+#include <yql/essentials/core/qplayer/storage/file/yql_qstorage_file.h>
 #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
 #include <yql/essentials/minikql/mkql_function_registry.h>
 #include <yql/essentials/ast/yql_expr.h>
@@ -51,9 +52,11 @@
 #include <util/stream/file.h>
 #include <util/stream/null.h>
 #include <util/system/user.h>
+#include <util/system/env.h>
 #include <util/string/split.h>
 #include <util/string/join.h>
 #include <util/string/builder.h>
+#include <util/string/strip.h>
 #include <util/generic/vector.h>
 #include <util/generic/ptr.h>
 #include <util/generic/yexception.h>
@@ -155,7 +158,6 @@ public:
 namespace NYql {
 
 TFacadeRunOptions::TFacadeRunOptions() {
-    User = GetUsername();
 }
 
 TFacadeRunOptions::~TFacadeRunOptions() {
@@ -185,11 +187,23 @@ void TFacadeRunOptions::PrintInfo(const TString& msg) {
 }
 
 void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
+    User = GetUsername();
+
+    if (EnableCredentials) {
+        Token = GetEnv("YQL_TOKEN");
+        if (!Token) {
+            const TString home = GetEnv("HOME");
+            auto tokenPath = TFsPath(home) / ".yql" / "token";
+            if (tokenPath.Exists()) {
+                Token = StripStringRight(TFileInput(tokenPath).ReadAll());
+            }
+        }
+    }
 
     NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
 
     opts.AddHelpOption();
-    opts.AddLongOption('p', "program", "Program file").Required().RequiredArgument("FILE")
+    opts.AddLongOption('p', "program", "Program file (use - to read from stdin)").Required().RequiredArgument("FILE")
         .Handler1T<TString>([this](const TString& file) {
             ProgramFile = file;
             if (ProgramFile == "-") {
@@ -203,7 +217,7 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
     opts.AddLongOption('s', "sql", "Program is SQL query").NoArgument().StoreValue(&ProgramType, EProgramType::Sql);
     if (PgSupport) {
         opts.AddLongOption("pg", "Program has PG syntax").NoArgument().StoreValue(&ProgramType, EProgramType::Pg);
-        opts.AddLongOption("pg-ext", "pg extensions config file").Optional().RequiredArgument("FILE")
+        opts.AddLongOption("pg-ext", "Pg extensions config file").Optional().RequiredArgument("FILE")
             .Handler1T<TString>([this](const TString& file) {
                 PgExtConfig = ParseProtoConfig<NProto::TPgExtensions>(file);
             });
@@ -238,7 +252,7 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
         .Handler1T<TString>([this](const TString& file) {
             Params = TFileInput(file).ReadAll();
         });
-    opts.AddLongOption('G', "gateways", "Used gateways").DefaultValue(JoinSeq(",", SupportedGateways_))
+    opts.AddLongOption('G', "gateways", TStringBuilder() << "Used gateways, available: " << JoinSeq(",", SupportedGateways_)).DefaultValue(JoinSeq(",", GatewayTypes))
         .Handler1T<TString>([this](const TString& gateways) {
             ::StringSplitter(gateways).Split(',').Consume([&](const TStringBuf& val) {
                 if (!SupportedGateways_.contains(val)) {
@@ -256,8 +270,8 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
             FsConfig = MakeHolder<TFileStorageConfig>();
             LoadFsConfigFromFile(file, *FsConfig);
         });
-    opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&UdfsPaths);
-    opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory")
+    opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").RequiredArgument("PATH").AppendTo(&UdfsPaths);
+    opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory").RequiredArgument("DIR")
         .Handler1T<TString>([this](const TString& dir) {
             NKikimr::NMiniKQL::FindUdfsInDir(dir, &UdfsPaths);
         });
@@ -265,14 +279,14 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
     opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver").Optional().NoArgument().SetFlag(&UdfResolverFilterSyscalls);
     opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf-resolver to use static function registry").NoArgument().SetFlag(&ScanUdfs);
 
-    opts.AddLongOption("parse-only", "Exit after program has been parsed").NoArgument().StoreValue(&Mode, ERunMode::Parse);
-    opts.AddLongOption("compile-only", "Exit after program has been compiled").NoArgument().StoreValue(&Mode, ERunMode::Compile);
-    opts.AddLongOption("validate", "Exit after program has been validated").NoArgument().StoreValue(&Mode, ERunMode::Validate);
-    opts.AddLongOption("lineage", "Exit after data lineage has been calculated").NoArgument().StoreValue(&Mode, ERunMode::Lineage);
-    opts.AddLongOption('O',"optimize", "Optimize expression").NoArgument().StoreValue(&Mode, ERunMode::Optimize);
-    opts.AddLongOption('R',"run", "Run expression using input/output tables").NoArgument().StoreValue(&Mode, ERunMode::Run);
-    opts.AddLongOption('D', "discover", "Discover tables in the program").NoArgument().StoreValue(&Mode, ERunMode::Discover);
-    opts.AddLongOption("peephole", "Perform peephole stage of expression using input/output tables").NoArgument().StoreValue(&Mode, ERunMode::Peephole);
+    opts.AddLongOption("parse-only", "Parse program and exit").NoArgument().StoreValue(&Mode, ERunMode::Parse);
+    opts.AddLongOption("compile-only", "Compiled program and exit").NoArgument().StoreValue(&Mode, ERunMode::Compile);
+    opts.AddLongOption("validate", "Validate program and exit").NoArgument().StoreValue(&Mode, ERunMode::Validate);
+    opts.AddLongOption("lineage", "Calculate program lineage and exit").NoArgument().StoreValue(&Mode, ERunMode::Lineage);
+    opts.AddLongOption('O',"optimize", "Optimize program and exir").NoArgument().StoreValue(&Mode, ERunMode::Optimize);
+    opts.AddLongOption('D', "discover", "Discover tables in the program and exit").NoArgument().StoreValue(&Mode, ERunMode::Discover);
+    opts.AddLongOption("peephole", "Perform peephole program optimization and exit").NoArgument().StoreValue(&Mode, ERunMode::Peephole);
+    opts.AddLongOption('R',"run", "Run progrum (use by default)").NoArgument().StoreValue(&Mode, ERunMode::Run);
 
     opts.AddLongOption('L', "show-log", "Show transformation log").Optional().NoArgument().SetFlag(&ShowLog);
     opts.AddLongOption('v', "verbosity", "Log verbosity level").Optional().RequiredArgument("LEVEL").StoreResult(&Verbosity);
@@ -290,8 +304,8 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
         });
     opts.AddLongOption("expr-file", "Print AST to that file instead of stdout").Optional().RequiredArgument("FILE")
         .Handler1T<TString>([this](const TString& file) {
-            ExprStreamHolder = MakeHolder<TFixedBufferFileOutput>(file);
-            ExprStream = ExprStreamHolder.Get();
+            ExprStreamHolder_ = MakeHolder<TFixedBufferFileOutput>(file);
+            ExprStream = ExprStreamHolder_.Get();
         });
     opts.AddLongOption("print-result", "Print program execution result to stdout").NoArgument()
         .Handler0([this]() {
@@ -317,8 +331,8 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
 
     opts.AddLongOption("result-file", "Print program execution result to file").Optional().RequiredArgument("FILE")
         .Handler1T<TString>([this](const TString& file) {
-            ResultStreamHolder = MakeHolder<TFixedBufferFileOutput>(file);
-            ResultStream = ResultStreamHolder.Get();
+            ResultStreamHolder_ = MakeHolder<TFixedBufferFileOutput>(file);
+            ResultStream = ResultStreamHolder_.Get();
         });
     opts.AddLongOption('P',"trace-plan", "Print plan before execution").NoArgument()
         .Handler0([this]() {
@@ -328,13 +342,13 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
         });
     opts.AddLongOption("plan-file", "Print program plan to file").Optional().RequiredArgument("FILE")
         .Handler1T<TString>([this](const TString& file) {
-            PlanStreamHolder = MakeHolder<TFixedBufferFileOutput>(file);
-            PlanStream = PlanStreamHolder.Get();
+            PlanStreamHolder_ = MakeHolder<TFixedBufferFileOutput>(file);
+            PlanStream = PlanStreamHolder_.Get();
         });
     opts.AddLongOption("err-file", "Print validate/optimize/runtime errors to file")
         .Handler1T<TString>([this](const TString& file) {
-            ErrStreamHolder = MakeHolder<TFixedBufferFileOutput>(file);
-            ErrStream = ErrStreamHolder.Get();
+            ErrStreamHolder_ = MakeHolder<TFixedBufferFileOutput>(file);
+            ErrStream = ErrStreamHolder_.Get();
         });
     opts.AddLongOption("full-expr", "Avoid buffering of expr/plan").NoArgument().SetFlag(&FullExpr);
     opts.AddLongOption("mem-limit", "Set memory limit in megabytes")
@@ -366,8 +380,8 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
     opts.AddLongOption("stat", "Print execution statistics").Optional().OptionalArgument("FILE")
         .Handler1T<TString>([this](const TString& file) {
             if (file) {
-                StatStreamHolder = MakeHolder<TFileOutput>(file);
-                StatStream = StatStreamHolder.Get();
+                StatStreamHolder_ = MakeHolder<TFileOutput>(file);
+                StatStream = StatStreamHolder_.Get();
             } else {
                 StatStream = &Cerr;
             }
@@ -396,6 +410,42 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
                 }
             });
     }
+    if (EnableCredentials) {
+        opts.AddLongOption("token", "YQL token")
+            .Optional()
+            .RequiredArgument("VALUE")
+            .StoreResult(&Token);
+        opts.AddLongOption("custom-tokens", "Custom tokens")
+            .Optional()
+            .RequiredArgument("NAME=VALUE or NAME=@PATH")
+            .KVHandler([this](TString key, TString value) {
+                if (value.StartsWith('@')) {
+                    value = StripStringRight(TFileInput(value.substr(1)).ReadAll());
+                }
+                Credentials->AddCredential(key, TCredential("custom", "", value));
+            });
+    }
+    if (EnableQPlayer) {
+        opts.AddLongOption("qstorage-dir", "Directory for QStorage").RequiredArgument("DIR")
+            .Handler1T<TString>([this](const TString& dir) {
+                QPlayerStorage_ = MakeFileQStorage(dir);
+            });
+        opts.AddLongOption("op-id", "QStorage operation id").StoreResult(&OperationId).DefaultValue("dummy_op");
+        opts.AddLongOption("capture", "Write query metadata to QStorage").NoArgument()
+            .Handler0([this]() {
+                if (EQPlayerMode::Replay == QPlayerMode) {
+                    throw yexception() << "replay and capture options can't be used simultaneously";
+                }
+                QPlayerMode = EQPlayerMode::Capture;
+            });
+        opts.AddLongOption("replay", "Read query metadata from QStorage").NoArgument()
+            .Handler0([this]() {
+                if (EQPlayerMode::Capture == QPlayerMode) {
+                    throw yexception() << "replay and capture options can't be used simultaneously";
+                }
+                QPlayerMode = EQPlayerMode::Replay;
+            });
+    }
 
     opts.SetFreeArgsMax(0);
 
@@ -405,8 +455,17 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
 
     auto res = NLastGetopt::TOptsParseResult(&opts, argc, argv);
 
-    for (auto& handle: OptHandlers_) {
-        handle(res);
+    if (QPlayerMode != EQPlayerMode::None) {
+        if (!QPlayerStorage_) {
+            QPlayerStorage_ = MakeFileQStorage(".");
+        }
+        if (EQPlayerMode::Replay == QPlayerMode) {
+            QPlayerContext = TQContext(QPlayerStorage_->MakeReader(OperationId, {}));
+            ProgramFile = "-replay-";
+            ProgramText = "";
+        } else if (EQPlayerMode::Capture == QPlayerMode) {
+            QPlayerContext = TQContext(QPlayerStorage_->MakeWriter(OperationId, {}));
+        }
     }
 
     if (Mode >= ERunMode::Validate && GatewayTypes.empty()) {
@@ -420,6 +479,7 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
     if (GatewaysConfig && GatewaysConfig->HasSqlCore()) {
         SqlFlags.insert(GatewaysConfig->GetSqlCore().GetTranslationFlags().begin(), GatewaysConfig->GetSqlCore().GetTranslationFlags().end());
     }
+    UpdateSqlFlagsFromQContext(QPlayerContext, SqlFlags);
 
     if (!FsConfig) {
         FsConfig = MakeHolder<TFileStorageConfig>();
@@ -427,6 +487,28 @@ void TFacadeRunOptions::Parse(int argc, const char *argv[]) {
             LoadFsConfigFromResource("fs.conf", *FsConfig);
         }
     }
+
+    if (EnableCredentials && Token) {
+        for (auto name: SupportedGateways_) {
+            Credentials->AddCredential(TStringBuilder() << "default_" << name, TCredential(name, "", Token));
+        }
+    }
+
+    for (auto& handle: OptHandlers_) {
+        handle(res);
+    }
+}
+
+TFacadeRunner::TFacadeRunner(TString name)
+    : Name_(std::move(name))
+{
+}
+
+TFacadeRunner::~TFacadeRunner() {
+}
+
+TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> TFacadeRunner::GetFuncRegistry() {
+    return FuncRegistry_;
 }
 
 int TFacadeRunner::Main(int argc, const char *argv[]) {
@@ -463,7 +545,7 @@ int TFacadeRunner::DoMain(int argc, const char *argv[]) {
         if (RunOptions_.PgExtConfig) {
             TVector<NPg::TExtensionDesc> extensions;
             PgExtensionsFromProto(*RunOptions_.PgExtConfig, extensions);
-            NPg::RegisterExtensions(extensions, false,
+            NPg::RegisterExtensions(extensions, RunOptions_.QPlayerContext.CanRead(),
                 *NSQLTranslationPG::CreateExtensionSqlParser(),
                 NKikimr::NMiniKQL::CreateExtensionLoader().get());
         }
@@ -503,7 +585,9 @@ int TFacadeRunner::DoMain(int argc, const char *argv[]) {
     if (RunOptions_.Mode >= ERunMode::Validate) {
         std::vector<NFS::IDownloaderPtr> downloaders;
         for (auto& factory: FsDownloadFactories_) {
-            downloaders.push_back(factory());
+            if (auto download = factory()) {
+                downloaders.push_back(std::move(download));
+            }
         }
 
         FileStorage_ = WithAsync(CreateFileStorage(*RunOptions_.FsConfig, downloaders));
@@ -537,12 +621,16 @@ int TFacadeRunner::DoMain(int argc, const char *argv[]) {
         dataProvidersInit.push_back(GetPgDataProviderInitializer());
     }
     for (auto& factory: ProviderFactories_) {
-        dataProvidersInit.push_back(factory());
+        if (auto init = factory()) {
+            dataProvidersInit.push_back(std::move(init));
+        }
     }
 
     TVector<IUrlListerPtr> urlListers;
     for (auto& factory: UrlListerFactories_) {
-        urlListers.push_back(factory());
+        if (auto listener = factory()) {
+            urlListers.push_back(std::move(listener));
+        }
     }
 
     TProgramFactory factory(RunOptions_.UseRepeatableRandomAndTimeProviders, FuncRegistry_.Get(), ctx.NextUniqueId, dataProvidersInit, Name_);
@@ -555,18 +643,22 @@ int TFacadeRunner::DoMain(int argc, const char *argv[]) {
     factory.SetUdfIndex(udfIndex, new TUdfIndexPackageSet());
     factory.SetUdfResolver(udfResolver);
     factory.SetGatewaysConfig(RunOptions_.GatewaysConfig.Get());
-    factory.SetCredentials(Credentials_);
+    factory.SetCredentials(RunOptions_.Credentials);
     factory.EnableRangeComputeFor();
     if (!urlListers.empty()) {
         factory.SetUrlListerManager(MakeUrlListerManager(urlListers));
     }
 
-    return RunProgram(factory);
+    int result = DoRun(factory);
+    if (result == 0 && EQPlayerMode::Capture == RunOptions_.QPlayerMode) {
+        RunOptions_.QPlayerContext.GetWriter()->Commit().GetValueSync();
+    }
+    return result;
 }
 
-int TFacadeRunner::RunProgram(TProgramFactory& factory) {
+int TFacadeRunner::DoRun(TProgramFactory& factory) {
 
-    TProgramPtr program = factory.Create(RunOptions_.ProgramFile, RunOptions_.ProgramText);;
+    TProgramPtr program = factory.Create(RunOptions_.ProgramFile, RunOptions_.ProgramText, RunOptions_.OperationId, EHiddenMode::Disable, RunOptions_.QPlayerContext);;
     if (RunOptions_.Params) {
         program->SetParametersYson(RunOptions_.Params);
     }
@@ -673,40 +765,15 @@ int TFacadeRunner::RunProgram(TProgramFactory& factory) {
         return 0;
     }
 
-    auto defOptConfig = TOptPipelineConfigurator(program, RunOptions_.FullExpr ? RunOptions_.PlanStream : nullptr, RunOptions_.FullExpr ? RunOptions_.ExprStream : nullptr, RunOptions_.WithTypes);
-    IPipelineConfigurator* optConfig = OptPipelineConfigurator_ ? OptPipelineConfigurator_  : &defOptConfig;
-
-    TProgram::TStatus status = TProgram::TStatus::Ok;
-    if (ERunMode::Peephole == RunOptions_.Mode) {
-        RunOptions_.PrintInfo("Peephole...");
-        auto defConfig = TPeepHolePipelineConfigurator();
-        IPipelineConfigurator* config = PeepholePipelineConfigurator_ ? PeepholePipelineConfigurator_  : &defConfig;
-        status = program->OptimizeWithConfig(RunOptions_.User, *config);
+    TProgram::TStatus status = DoRunProgram(program);
 
-        if (RunOptions_.ExprStream && program->ExprRoot()) {
-            auto ast = ConvertToAst(*program->ExprRoot(), program->ExprCtx(), RunOptions_.WithTypes ? TExprAnnotationFlags::Types : TExprAnnotationFlags::None, true);
-            ui32 prettyFlags = TAstPrintFlags::ShortQuote;
-            if (!RunOptions_.WithTypes) {
-                prettyFlags |= TAstPrintFlags::PerLine;
-            }
-            ast.Root->PrettyPrintTo(*RunOptions_.ExprStream, prettyFlags);
+    if (ERunMode::Peephole == RunOptions_.Mode && RunOptions_.ExprStream && program->ExprRoot()) {
+        auto ast = ConvertToAst(*program->ExprRoot(), program->ExprCtx(), RunOptions_.WithTypes ? TExprAnnotationFlags::Types : TExprAnnotationFlags::None, true);
+        ui32 prettyFlags = TAstPrintFlags::ShortQuote;
+        if (!RunOptions_.WithTypes) {
+            prettyFlags |= TAstPrintFlags::PerLine;
         }
-
-    } else if (ERunMode::Run == RunOptions_.Mode) {
-        RunOptions_.PrintInfo("Run program...");
-        status = program->RunWithConfig(RunOptions_.User, *optConfig);
-    } else if (ERunMode::Optimize == RunOptions_.Mode) {
-        RunOptions_.PrintInfo("Optimize program...");
-        status = program->OptimizeWithConfig(RunOptions_.User, *optConfig);
-    } else if (ERunMode::Validate == RunOptions_.Mode) {
-        RunOptions_.PrintInfo("Validate program...");
-        status = program->Validate(RunOptions_.User, RunOptions_.ExprStream, RunOptions_.WithTypes);
-    } else if (ERunMode::Discover == RunOptions_.Mode) {
-        RunOptions_.PrintInfo("Discover program...");
-        status = program->Discover(RunOptions_.User);
-    } else if (ERunMode::Lineage == RunOptions_.Mode) {
-        RunOptions_.PrintInfo("Calculating lineage in program...");
-        status = program->LineageWithConfig(RunOptions_.User, *optConfig);
+        ast.Root->PrettyPrintTo(*RunOptions_.ExprStream, prettyFlags);
     }
 
     if (RunOptions_.WithFinalIssues) {
@@ -776,4 +843,35 @@ int TFacadeRunner::RunProgram(TProgramFactory& factory) {
     return 0;
 }
 
+TProgram::TStatus TFacadeRunner::DoRunProgram(TProgramPtr program) {
+    TProgram::TStatus status = TProgram::TStatus::Ok;
+
+    auto defOptConfig = TOptPipelineConfigurator(program, RunOptions_.FullExpr ? RunOptions_.PlanStream : nullptr, RunOptions_.FullExpr ? RunOptions_.ExprStream : nullptr, RunOptions_.WithTypes);
+    IPipelineConfigurator* optConfig = OptPipelineConfigurator_ ? OptPipelineConfigurator_  : &defOptConfig;
+
+    if (ERunMode::Peephole == RunOptions_.Mode) {
+        RunOptions_.PrintInfo("Peephole...");
+        auto defConfig = TPeepHolePipelineConfigurator();
+        IPipelineConfigurator* config = PeepholePipelineConfigurator_ ? PeepholePipelineConfigurator_  : &defConfig;
+        status = program->OptimizeWithConfig(RunOptions_.User, *config);
+    } else if (ERunMode::Run == RunOptions_.Mode) {
+        RunOptions_.PrintInfo("Run program...");
+        status = program->RunWithConfig(RunOptions_.User, *optConfig);
+    } else if (ERunMode::Optimize == RunOptions_.Mode) {
+        RunOptions_.PrintInfo("Optimize program...");
+        status = program->OptimizeWithConfig(RunOptions_.User, *optConfig);
+    } else if (ERunMode::Validate == RunOptions_.Mode) {
+        RunOptions_.PrintInfo("Validate program...");
+        status = program->Validate(RunOptions_.User, RunOptions_.ExprStream, RunOptions_.WithTypes);
+    } else if (ERunMode::Discover == RunOptions_.Mode) {
+        RunOptions_.PrintInfo("Discover program...");
+        status = program->Discover(RunOptions_.User);
+    } else if (ERunMode::Lineage == RunOptions_.Mode) {
+        RunOptions_.PrintInfo("Calculating lineage in program...");
+        status = program->LineageWithConfig(RunOptions_.User, *optConfig);
+    }
+
+    return status;
+}
+
 } // NYql

+ 29 - 20
yql/essentials/tools/yql_facade_run/yql_facade_run.h

@@ -6,6 +6,8 @@
 #include <yql/essentials/core/url_lister/interface/url_lister.h>
 #include <yql/essentials/core/yql_data_provider.h>
 #include <yql/essentials/core/yql_user_data.h>
+#include <yql/essentials/core/facade/yql_facade.h>
+#include <yql/essentials/core/qplayer/storage/interface/yql_qstorage.h>
 
 #include <library/cpp/getopt/last_getopt.h>
 #include <library/cpp/yson/public.h>
@@ -25,7 +27,6 @@ namespace NKikimr::NMiniKQL {
 namespace NYql {
     class TFileStorageConfig;
     class TGatewaysConfig;
-    class TProgramFactory;
 }
 
 namespace NYql::NProto {
@@ -55,6 +56,12 @@ enum class EProgramType {
     Pg      /* "pg" */,
 };
 
+enum class EQPlayerMode {
+    None    /* "none" */,
+    Capture /* "capture" */,
+    Replay  /* "replay" */,
+};
+
 class TFacadeRunOptions {
 public:
     TFacadeRunOptions();
@@ -66,7 +73,11 @@ public:
     TString ProgramFile;
     TString ProgramText;
     TString User;
+    TString Token;
     ui64 MemLimit = 0;
+    EQPlayerMode QPlayerMode = EQPlayerMode::None;
+    TString OperationId;
+    TQContext QPlayerContext;
 
     THashSet<TString> SqlFlags;
     ui16 SyntaxVersion = 1;
@@ -85,20 +96,16 @@ public:
     IOutputStream* TraceOptStream = nullptr;
 
     IOutputStream* ErrStream = &Cerr;
-    THolder<IOutputStream> ErrStreamHolder;
     IOutputStream* PlanStream = nullptr;
-    THolder<IOutputStream> PlanStreamHolder;
     IOutputStream* ExprStream = nullptr;
-    THolder<IOutputStream> ExprStreamHolder;
     IOutputStream* ResultStream = nullptr;
-    THolder<IOutputStream> ResultStreamHolder;
     IOutputStream* StatStream = nullptr;
-    THolder<IOutputStream> StatStreamHolder;
 
     NYql::TUserDataTable DataTable;
     TVector<TString> UdfsPaths;
     TString Params;
     NUdf::EValidateMode ValidateMode = NUdf::EValidateMode::Greedy;
+    TCredentials::TPtr Credentials = MakeIntrusive<TCredentials>();
 
     THashSet<TString> GatewayTypes;
     TString UdfResolverPath;
@@ -118,6 +125,8 @@ public:
     bool TestSqlFormat = false;
     bool ValidateResultFormat = false;
     bool EnableResultPosition = false;
+    bool EnableCredentials = false;
+    bool EnableQPlayer = false;
 
     void Parse(int argc, const char *argv[]);
 
@@ -139,14 +148,19 @@ private:
     std::vector<std::function<void(NLastGetopt::TOpts&)>> OptExtenders_;
     std::vector<std::function<void(const NLastGetopt::TOptsParseResult&)>> OptHandlers_;
     THashSet<TString> SupportedGateways_;
+    THolder<IOutputStream> ErrStreamHolder_;
+    THolder<IOutputStream> PlanStreamHolder_;
+    THolder<IOutputStream> ExprStreamHolder_;
+    THolder<IOutputStream> ResultStreamHolder_;
+    THolder<IOutputStream> StatStreamHolder_;
+    IQStoragePtr QPlayerStorage_;
 };
 
 class TFacadeRunner {
 public:
-    TFacadeRunner(TString name)
-        : Name_(std::move(name))
-    {
-    }
+    TFacadeRunner(TString name);
+    ~TFacadeRunner();
+
     int Main(int argc, const char *argv[]);
 
     void AddFsDownloadFactory(std::function<NFS::IDownloaderPtr()> factory) {
@@ -180,19 +194,15 @@ public:
     TFileStoragePtr GetFileStorage() const {
         return FileStorage_;
     }
-    TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> GetFuncRegistry() {
-        return FuncRegistry_;
-    }
-    TCredentials::TPtr GetCredentials() {
-        return Credentials_;
-    }
+    TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> GetFuncRegistry();
     TFacadeRunOptions& GetRunOptions() {
         return RunOptions_;
     }
 
-private:
-    int DoMain(int argc, const char *argv[]);
-    int RunProgram(TProgramFactory& factory);
+protected:
+    virtual int DoMain(int argc, const char *argv[]);
+    virtual int DoRun(TProgramFactory& factory);
+    virtual TProgram::TStatus DoRunProgram(TProgramPtr program);
 
 private:
     TString Name_;
@@ -203,7 +213,6 @@ private:
     THolder<TFileStorageConfig> FileStorageConfig_;
     TFileStoragePtr FileStorage_;
     TIntrusivePtr<NKikimr::NMiniKQL::IFunctionRegistry> FuncRegistry_;
-    TCredentials::TPtr Credentials_ = MakeIntrusive<TCredentials>();
     TOperationProgressWriter ProgressWriter_;
     IPipelineConfigurator* OptPipelineConfigurator_ = nullptr;
     IPipelineConfigurator* PeepholePipelineConfigurator_ = nullptr;

+ 0 - 17
yql/tools/yqlrun/lib/yqlrun_lib.cpp

@@ -58,8 +58,6 @@ TYqlRunTool::TYqlRunTool()
                 TablesMapping_[name] = path;
             }, '@');
 
-    });
-    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
         opts.AddLongOption("tables-dir", "Table dirs mapping").RequiredArgument("cluster@dir")
             .KVHandler([&](TString cluster, TString dir) {
                 if (cluster.empty() || dir.empty()) {
@@ -75,8 +73,6 @@ TYqlRunTool::TYqlRunTool()
                 }
             }, '@');
 
-    });
-    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
         opts.AddLongOption('C', "cluster", "Cluster to service mapping").RequiredArgument("name@service")
             .KVHandler([&](TString cluster, TString provider) {
                 if (cluster.empty() || provider.empty()) {
@@ -85,29 +81,16 @@ TYqlRunTool::TYqlRunTool()
                 AddClusterMapping(std::move(cluster), std::move(provider));
             }, '@');
 
-    });
-    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
         opts.AddLongOption("ndebug", "Do not show debug info in error output").NoArgument().SetFlag(&GetRunOptions().NoDebug);
-    });
-    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
         opts.AddLongOption("keep-temp", "Keep temporary tables").NoArgument().SetFlag(&KeepTemp_);
-    });
-    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
         opts.AddLongOption("show-progress", "Report operation progress").NoArgument()
             .Handler0([&]() {
                 SetOperationProgressWriter([](const TOperationProgress& progress) {
                     Cerr << "Operation: [" << progress.Category << "] " << progress.Id << ", state: " << progress.State << "\n";
                 });
             });
-    });
-    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
         opts.AddLongOption("tmp-dir", "Directory for temporary tables").RequiredArgument("DIR").StoreResult(&TmpDir_);
-    });
-
-    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
         opts.AddLongOption("test-format", "Compare formatted query's AST with the original query's AST (only syntaxVersion=1 is supported)").NoArgument().SetFlag(&GetRunOptions().TestSqlFormat);
-    });
-    GetRunOptions().AddOptExtension([this](NLastGetopt::TOpts& opts) {
         opts.AddLongOption("validate-result-format", "Check that result-format can parse Result").NoArgument().SetFlag(&GetRunOptions().ValidateResultFormat);
     });
 

+ 3 - 0
yt/yt/library/profiling/solomon/config.cpp

@@ -92,6 +92,9 @@ void TSolomonExporterConfig::Register(TRegistrar registrar)
         .Default(DefaultProducerCollectionBatchSize)
         .GreaterThan(0);
 
+    registrar.Parameter("label_sanitization_policy", &TThis::LabelSanitizationPolicy)
+        .Default(ELabelSanitizationPolicy::None);
+
     registrar.Postprocessor([] (TThis* config) {
         if (config->LingerTimeout.GetValue() % config->GridStep.GetValue() != 0) {
             THROW_ERROR_EXCEPTION("\"linger_timeout\" must be multiple of \"grid_step\"");

+ 2 - 0
yt/yt/library/profiling/solomon/config.h

@@ -74,6 +74,8 @@ struct TSolomonExporterConfig
 
     int ProducerCollectionBatchSize;
 
+    ELabelSanitizationPolicy LabelSanitizationPolicy;
+
     TShardConfigPtr MatchShard(const std::string& sensorName);
 
     ESummaryPolicy GetSummaryPolicy() const;

+ 1 - 0
yt/yt/library/profiling/solomon/exporter.cpp

@@ -88,6 +88,7 @@ TSolomonExporter::TSolomonExporter(
 
         return shard->GridStep->GetValue() / config->GridStep.GetValue();
     });
+    Registry_->SetLabelSanitizationPolicy(Config_->LabelSanitizationPolicy);
 
     if (Config_->ReportBuildInfo) {
         TProfiler profiler{registry, ""};

+ 9 - 0
yt/yt/library/profiling/solomon/public.h

@@ -1,6 +1,7 @@
 #pragma once
 
 #include <library/cpp/yt/memory/ref_counted.h>
+#include <library/cpp/yt/misc/enum.h>
 
 namespace NYT::NProfiling {
 
@@ -18,4 +19,12 @@ DECLARE_REFCOUNTED_STRUCT(IEndpointProvider)
 
 ////////////////////////////////////////////////////////////////////////////////
 
+DEFINE_ENUM(ELabelSanitizationPolicy,
+    ((None)   (0))
+    ((Weak)   (1)) // Escape only zero symbol and trim label to 200 symbols
+    ((Strong) (2)) // Escape all forbidden symbols and trim label to 200 symbols
+);
+
+////////////////////////////////////////////////////////////////////////////////
+
 } // namespace NYT::NProfiling

+ 5 - 0
yt/yt/library/profiling/solomon/registry.cpp

@@ -281,6 +281,11 @@ void TSolomonRegistry::SetProducerCollectionBatchSize(int batchSize)
     Producers_.SetCollectionBatchSize(batchSize);
 }
 
+void TSolomonRegistry::SetLabelSanitizationPolicy(ELabelSanitizationPolicy LabelSanitizationPolicy)
+{
+    Tags_.SetLabelSanitizationPolicy(LabelSanitizationPolicy);
+}
+
 int TSolomonRegistry::GetWindowSize() const
 {
     if (!WindowSize_) {

+ 1 - 0
yt/yt/library/profiling/solomon/registry.h

@@ -129,6 +129,7 @@ public:
     void SetGridFactor(std::function<int(const std::string&)> gridFactor);
     void SetWindowSize(int windowSize);
     void SetProducerCollectionBatchSize(int batchSize);
+    void SetLabelSanitizationPolicy(ELabelSanitizationPolicy LabelSanitizationPolicy);
     void ProcessRegistrations();
     void Collect(IInvokerPtr offloadInvoker = GetSyncInvoker());
     void ReadSensors(

Some files were not shown because too many files changed in this diff