123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- #include "plugin.h"
- #include "error_helpers.h"
- #include "progress_merger.h"
- #include <ydb/library/yql/providers/yt/lib/log/yt_logger.h>
- #include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
- #include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
- #include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
- #include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
- #include "ydb/library/yql/providers/common/proto/gateways_config.pb.h"
- #include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
- #include <ydb/library/yql/ast/yql_expr.h>
- #include <ydb/library/yql/minikql/mkql_function_registry.h>
- #include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
- #include <ydb/library/yql/core/facade/yql_facade.h>
- #include <ydb/library/yql/core/file_storage/file_storage.h>
- #include "ydb/library/yql/core/file_storage/proto/file_storage.pb.h"
- #include <ydb/library/yql/core/services/mounts/yql_mounts.h>
- #include <ydb/library/yql/core/services/yql_transform_pipeline.h>
- #include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h>
- #include <ydb/library/yql/utils/log/log.h>
- #include <ydb/library/yql/utils/backtrace/backtrace.h>
- #include <yt/cpp/mapreduce/interface/config.h>
- #include <yt/cpp/mapreduce/interface/logging/logger.h>
- #include <library/cpp/yt/threading/rw_spin_lock.h>
- #include <library/cpp/yson/node/node_io.h>
- #include <library/cpp/yson/parser.h>
- #include <library/cpp/yson/writer.h>
- #include <library/cpp/resource/resource.h>
- #include <library/cpp/digest/md5/md5.h>
- #include <util/folder/path.h>
- #include <util/stream/file.h>
- #include <util/string/builder.h>
- #include <util/system/fs.h>
- namespace NYT::NYqlPlugin {
- namespace NNative {
- using namespace NYson;
- ////////////////////////////////////////////////////////////////////////////////
- std::optional<TString> MaybeToOptional(const TMaybe<TString>& maybeStr)
- {
- if (!maybeStr) {
- return std::nullopt;
- }
- return *maybeStr;
- };
- ////////////////////////////////////////////////////////////////////////////////
- struct TQueryPlan
- {
- std::optional<TString> Plan;
- YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, PlanSpinLock);
- };
- struct TActiveQuery
- {
- TProgressMerger ProgressMerger;
- std::optional<TString> Plan;
- };
- ////////////////////////////////////////////////////////////////////////////////
- class TQueryPipelineConfigurator
- : public NYql::IPipelineConfigurator
- {
- public:
- TQueryPipelineConfigurator(NYql::TProgramPtr program, TQueryPlan& plan)
- : Program_(program)
- , Plan_(plan)
- { }
- void AfterCreate(NYql::TTransformationPipeline* /*pipeline*/) const override
- { }
- void AfterTypeAnnotation(NYql::TTransformationPipeline* /*pipeline*/) const override
- { }
- void AfterOptimize(NYql::TTransformationPipeline* pipeline) const override
- {
- auto transformer = [this](NYql::TExprNode::TPtr input, NYql::TExprNode::TPtr& output, NYql::TExprContext& /*ctx*/) {
- output = input;
- auto guard = WriterGuard(Plan_.PlanSpinLock);
- Plan_.Plan = MaybeToOptional(Program_->GetQueryPlan());
- return NYql::IGraphTransformer::TStatus::Ok;
- };
- pipeline->Add(NYql::CreateFunctorTransformer(transformer), "PlanOutput");
- }
- private:
- NYql::TProgramPtr Program_;
- TQueryPlan& Plan_;
- };
- ////////////////////////////////////////////////////////////////////////////////
- class TYqlPlugin
- : public IYqlPlugin
- {
- public:
- TYqlPlugin(TYqlPluginOptions& options)
- {
- try {
- NYql::NLog::InitLogger(std::move(options.LogBackend));
- auto& logger = NYql::NLog::YqlLogger();
- logger.SetDefaultPriority(ELogPriority::TLOG_DEBUG);
- for (int i = 0; i < NYql::NLog::EComponentHelpers::ToInt(NYql::NLog::EComponent::MaxValue); ++i) {
- logger.SetComponentLevel((NYql::NLog::EComponent) i, NYql::NLog::ELevel::DEBUG);
- }
- NYql::SetYtLoggerGlobalBackend(NYT::ILogger::ELevel::DEBUG);
- if (NYT::TConfig::Get()->Prefix.empty()) {
- NYT::TConfig::Get()->Prefix = "//";
- }
- auto yqlCoreFlags = GatewaysConfig_.GetYqlCore()
- .GetFlags();
- auto ytConfig = GatewaysConfig_.MutableYt();
- if (!ytConfig->HasExecuteUdfLocallyIfPossible()) {
- ytConfig->SetExecuteUdfLocallyIfPossible(true);
- }
- auto pattern = ytConfig->AddRemoteFilePatterns();
- pattern->SetPattern("yt://([a-zA-Z0-9\\-_]+)/([^&@?]+)$");
- pattern->SetCluster("$1");
- pattern->SetPath("$2");
- ytConfig->SetYtLogLevel(NYql::EYtLogLevel::YL_DEBUG);
- ytConfig->SetMrJobBin(options.MRJobBinary);
- ytConfig->SetMrJobBinMd5(MD5::File(options.MRJobBinary));
- ytConfig->ClearMrJobUdfsDir();
- auto setting = ytConfig->AddDefaultSettings();
- setting->SetName("NativeYtTypeCompatibility");
- setting->SetValue("all");
- for (const auto& [cluster, address]: options.Clusters) {
- auto item = ytConfig->AddClusterMapping();
- item->SetName(cluster);
- item->SetCluster(address);
- if (cluster == options.DefaultCluster) {
- item->SetDefault(true);
- }
- Clusters_.insert({item->GetName(), TString(NYql::YtProviderName)});
- }
- DefaultCluster_ = options.DefaultCluster;
- NYql::TFileStorageConfig fileStorageConfig;
- fileStorageConfig.SetMaxSizeMb(options.MaxFilesSizeMb);
- fileStorageConfig.SetMaxFiles(options.MaxFileCount);
- fileStorageConfig.SetRetryCount(options.DownloadFileRetryCount);
- FileStorage_ = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)}));
- FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(
- NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone();
- const NKikimr::NMiniKQL::TUdfModuleRemappings emptyRemappings;
- FuncRegistry_->SetBackTraceCallback(&NYql::NBacktrace::KikimrBackTrace);
- NKikimr::NMiniKQL::TUdfModulePathsMap systemModules;
- TVector<TString> udfPaths;
- NKikimr::NMiniKQL::FindUdfsInDir(options.UdfDirectory, &udfPaths);
- for (const auto& path: udfPaths) {
- // Skip YQL plugin shared library itself, it is not a UDF.
- if (path.EndsWith("libyqlplugin.so")) {
- continue;
- }
- FuncRegistry_->LoadUdfs(path, emptyRemappings, 0);
- }
- for (auto& m: FuncRegistry_->GetAllModuleNames()) {
- TMaybe<TString> path = FuncRegistry_->FindUdfPath(m);
- if (!path) {
- YQL_LOG(FATAL) << "Unable to detect UDF path for module " << m;
- exit(1);
- }
- systemModules.emplace(m, *path);
- }
- FuncRegistry_->SetSystemModulePaths(systemModules);
- auto userDataTable = GetYqlModuleResolver(ExprContext_, ModuleResolver_, {}, Clusters_, {});
- if (!userDataTable) {
- TStringStream err;
- ExprContext_.IssueManager
- .GetIssues()
- .PrintTo(err);
- YQL_LOG(FATAL) << "Failed to compile modules:\n"
- << err.Str();
- exit(1);
- }
- OperationAttributes_ = options.OperationAttributes;
- TVector<NYql::TDataProviderInitializer> dataProvidersInit;
- NYql::TYtNativeServices ytServices;
- ytServices.FunctionRegistry = FuncRegistry_.Get();
- ytServices.FileStorage = FileStorage_;
- ytServices.Config = std::make_shared<NYql::TYtGatewayConfig>(*ytConfig);
- auto ytNativeGateway = CreateYtNativeGateway(ytServices);
- dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
- ProgramFactory_ = std::make_unique<NYql::TProgramFactory>(
- false, FuncRegistry_.Get(), ExprContext_.NextUniqueId, dataProvidersInit, "embedded");
- auto credentials = MakeIntrusive<NYql::TCredentials>();
- if (options.YTTokenPath) {
- TFsPath path(options.YTTokenPath);
- auto token = TIFStream(path).ReadAll();
- credentials->AddCredential("default_yt", NYql::TCredential("yt", "", token));
- }
- ProgramFactory_->AddUserDataTable(userDataTable);
- ProgramFactory_->SetCredentials(credentials);
- ProgramFactory_->SetModules(ModuleResolver_);
- ProgramFactory_->SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get(), FileStorage_));
- ProgramFactory_->SetGatewaysConfig(&GatewaysConfig_);
- ProgramFactory_->SetFileStorage(FileStorage_);
- ProgramFactory_->SetUrlPreprocessing(MakeIntrusive<NYql::TUrlPreprocessing>(GatewaysConfig_));
- } catch (const std::exception& ex) {
- YQL_LOG(FATAL) << "Unexpected exception while initializing YQL plugin: " << ex.what();
- exit(1);
- }
- YQL_LOG(INFO) << "YQL plugin initialized";
- }
- TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files)
- {
- auto program = ProgramFactory_->Create("-memory-", queryText);
- program->AddCredentials({{"impersonation_user_yt", NYql::TCredential("yt", "", impersonationUser)}});
- program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings));
- auto userDataTable = FilesToUserTable(files);
- program->AddUserDataTable(userDataTable);
- TQueryPlan queryPlan;
- auto pipelineConfigurator = TQueryPipelineConfigurator(program, queryPlan);
- program->SetProgressWriter([&] (const NYql::TOperationProgress& progress) {
- std::optional<TString> plan;
- {
- auto guard = ReaderGuard(queryPlan.PlanSpinLock);
- plan.swap(queryPlan.Plan);
- }
- auto guard = WriterGuard(ProgressSpinLock);
- ActiveQueriesProgress_[queryId].ProgressMerger.MergeWith(progress);
- if (plan) {
- ActiveQueriesProgress_[queryId].Plan.swap(plan);
- }
- });
- NSQLTranslation::TTranslationSettings sqlSettings;
- sqlSettings.ClusterMapping = Clusters_;
- sqlSettings.ModuleMapping = Modules_;
- if (DefaultCluster_) {
- sqlSettings.DefaultCluster = *DefaultCluster_;
- }
- sqlSettings.SyntaxVersion = 1;
- sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
- if (!program->ParseSql(sqlSettings)) {
- return TQueryResult{
- .YsonError = IssuesToYtErrorYson(program->Issues()),
- };
- }
- if (!program->Compile(impersonationUser)) {
- return TQueryResult{
- .YsonError = IssuesToYtErrorYson(program->Issues()),
- };
- }
- NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error;
- status = program->RunWithConfig(impersonationUser, pipelineConfigurator);
- if (status == NYql::TProgram::TStatus::Error) {
- return TQueryResult{
- .YsonError = IssuesToYtErrorYson(program->Issues()),
- };
- }
- TStringStream result;
- if (program->HasResults()) {
- ::NYson::TYsonWriter yson(&result, EYsonFormat::Binary);
- yson.OnBeginList();
- for (const auto& result: program->Results()) {
- yson.OnListItem();
- yson.OnRaw(result);
- }
- yson.OnEndList();
- }
- TString progress;
- {
- auto guard = WriterGuard(ProgressSpinLock);
- progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString();
- ActiveQueriesProgress_.erase(queryId);
- }
- return {
- .YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()),
- .Plan = MaybeToOptional(program->GetQueryPlan()),
- .Statistics = MaybeToOptional(program->GetStatistics()),
- .Progress = progress,
- .TaskInfo = MaybeToOptional(program->GetTasksInfo()),
- };
- }
- TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept override
- {
- try {
- return GuardedRun(queryId, impersonationUser, queryText, settings, files);
- } catch (const std::exception& ex) {
- {
- auto guard = WriterGuard(ProgressSpinLock);
- ActiveQueriesProgress_.erase(queryId);
- }
- return TQueryResult{
- .YsonError = MessageToYtErrorYson(ex.what()),
- };
- }
- }
- TQueryResult GetProgress(TQueryId queryId) noexcept override
- {
- auto guard = ReaderGuard(ProgressSpinLock);
- if (ActiveQueriesProgress_.contains(queryId)) {
- TQueryResult result;
- if (ActiveQueriesProgress_[queryId].ProgressMerger.HasChangesSinceLastFlush()) {
- result.Plan = ActiveQueriesProgress_[queryId].Plan;
- result.Progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString();
- }
- return result;
- } else {
- return TQueryResult{
- .YsonError = MessageToYtErrorYson(Format("No progress for queryId: %v", queryId)),
- };
- }
- }
- private:
- NYql::TFileStoragePtr FileStorage_;
- NYql::TExprContext ExprContext_;
- ::TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry_;
- NYql::IModuleResolver::TPtr ModuleResolver_;
- NYql::TGatewaysConfig GatewaysConfig_;
- std::unique_ptr<NYql::TProgramFactory> ProgramFactory_;
- THashMap<TString, TString> Clusters_;
- std::optional<TString> DefaultCluster_;
- THashMap<TString, TString> Modules_;
- TYsonString OperationAttributes_;
- YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ProgressSpinLock);
- THashMap<TQueryId, TActiveQuery> ActiveQueriesProgress_;
- TVector<NYql::TDataProviderInitializer> DataProvidersInit_;
- TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings)
- {
- auto querySettingsMap = NodeFromYsonString(querySettings.ToString());
- auto resultAttributesMap = NodeFromYsonString(configAttributes.ToString());
- for (const auto& item: querySettingsMap.AsMap()) {
- resultAttributesMap[item.first] = item.second;
- }
- return NodeToYsonString(resultAttributesMap);
- }
- NYql::TUserDataTable FilesToUserTable(const std::vector<TQueryFile>& files)
- {
- NYql::TUserDataTable table;
- for (const auto& file : files) {
- NYql::TUserDataBlock& block = table[NYql::TUserDataKey::File(NYql::GetDefaultFilePrefix() + file.Name)];
- block.Data = file.Content;
- switch (file.Type) {
- case EQueryFileContentType::RawInlineData: {
- block.Type = NYql::EUserDataType::RAW_INLINE_DATA;
- break;
- }
- case EQueryFileContentType::Url: {
- block.Type = NYql::EUserDataType::URL;
- break;
- }
- default: {
- ythrow yexception() << "Unexpected file content type";
- }
- }
- }
- return table;
- }
- };
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NNative
- ////////////////////////////////////////////////////////////////////////////////
- std::unique_ptr<IYqlPlugin> CreateYqlPlugin(TYqlPluginOptions& options) noexcept
- {
- return std::make_unique<NNative::TYqlPlugin>(options);
- }
- ////////////////////////////////////////////////////////////////////////////////
- } // namespace NYT::NYqlPlugin
|