#include "plugin.h" #include "error_helpers.h" #include #include #include #include #include #include #include "ydb/library/yql/providers/common/proto/gateways_config.pb.h" #include #include #include #include #include #include #include "ydb/library/yql/core/file_storage/proto/file_storage.pb.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace NYT::NYqlPlugin { namespace NNative { using namespace NYson; //////////////////////////////////////////////////////////////////////////////// class TYqlPlugin : public IYqlPlugin { public: TYqlPlugin(TYqlPluginOptions& options) { try { NYql::NLog::InitLogger(std::move(options.LogBackend)); auto& logger = NYql::NLog::YqlLogger(); logger.SetDefaultPriority(ELogPriority::TLOG_DEBUG); for (int i = 0; i < NYql::NLog::EComponentHelpers::ToInt(NYql::NLog::EComponent::MaxValue); ++i) { logger.SetComponentLevel((NYql::NLog::EComponent) i, NYql::NLog::ELevel::DEBUG); } NYql::SetYtLoggerGlobalBackend(NYT::ILogger::ELevel::DEBUG); if (NYT::TConfig::Get()->Prefix.empty()) { NYT::TConfig::Get()->Prefix = "//"; } auto yqlCoreFlags = GatewaysConfig_.GetYqlCore() .GetFlags(); auto ytConfig = GatewaysConfig_.MutableYt(); if (!ytConfig->HasExecuteUdfLocallyIfPossible()) { ytConfig->SetExecuteUdfLocallyIfPossible(true); } ytConfig->SetYtLogLevel(NYql::EYtLogLevel::YL_DEBUG); ytConfig->SetMrJobBin(options.MRJobBinary); ytConfig->SetMrJobBinMd5(MD5::File(options.MRJobBinary)); ytConfig->ClearMrJobUdfsDir(); auto setting = ytConfig->AddDefaultSettings(); setting->SetName("NativeYtTypeCompatibility"); setting->SetValue("all"); for (const auto& [cluster, address]: options.Clusters) { auto item = ytConfig->AddClusterMapping(); item->SetName(cluster); item->SetCluster(address); if (cluster == options.DefaultCluster) { item->SetDefault(true); } Clusters_.insert({item->GetName(), TString(NYql::YtProviderName)}); } DefaultCluster_ = options.DefaultCluster; NYql::TFileStorageConfig fileStorageConfig; fileStorageConfig.SetMaxSizeMb(1 << 14); FileStorage_ = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)})); FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry( NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone(); const NKikimr::NMiniKQL::TUdfModuleRemappings emptyRemappings; FuncRegistry_->SetBackTraceCallback(&NYql::NBacktrace::KikimrBackTrace); NKikimr::NMiniKQL::TUdfModulePathsMap systemModules; TVector udfPaths; NKikimr::NMiniKQL::FindUdfsInDir(options.UdfDirectory, &udfPaths); for (const auto& path: udfPaths) { // Skip YQL plugin shared library itself, it is not a UDF. if (path.EndsWith("libyqlplugin.so")) { continue; } FuncRegistry_->LoadUdfs(path, emptyRemappings, 0); } for (auto& m: FuncRegistry_->GetAllModuleNames()) { TMaybe path = FuncRegistry_->FindUdfPath(m); if (!path) { YQL_LOG(FATAL) << "Unable to detect UDF path for module " << m; exit(1); } systemModules.emplace(m, *path); } FuncRegistry_->SetSystemModulePaths(systemModules); auto userDataTable = GetYqlModuleResolver(ExprContext_, ModuleResolver_, {}, Clusters_, {}); if (!userDataTable) { TStringStream err; ExprContext_.IssueManager .GetIssues() .PrintTo(err); YQL_LOG(FATAL) << "Failed to compile modules:\n" << err.Str(); exit(1); } OperationAttributes_ = options.OperationAttributes; TVector dataProvidersInit; NYql::TYtNativeServices ytServices; ytServices.FunctionRegistry = FuncRegistry_.Get(); ytServices.FileStorage = FileStorage_; ytServices.Config = std::make_shared(*ytConfig); auto ytNativeGateway = CreateYtNativeGateway(ytServices); dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway)); ProgramFactory_ = std::make_unique( false, FuncRegistry_.Get(), ExprContext_.NextUniqueId, dataProvidersInit, "embedded"); YTTokenPath_ = options.YTTokenPath; ProgramFactory_->AddUserDataTable(userDataTable); ProgramFactory_->SetModules(ModuleResolver_); ProgramFactory_->SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get(), FileStorage_)); ProgramFactory_->SetGatewaysConfig(&GatewaysConfig_); ProgramFactory_->SetFileStorage(FileStorage_); ProgramFactory_->SetUrlPreprocessing(MakeIntrusive(GatewaysConfig_)); } catch (const std::exception& ex) { YQL_LOG(FATAL) << "Unexpected exception while initializing YQL plugin: " << ex.what(); exit(1); } YQL_LOG(INFO) << "YQL plugin initialized"; } TQueryResult GuardedRun(TString impersonationUser, TString queryText, TYsonString settings) { auto credentials = MakeIntrusive(); if (YTTokenPath_) { TFsPath path(YTTokenPath_); auto token = TIFStream(path).ReadAll(); credentials->AddCredential("default_yt", NYql::TCredential("yt", "", token)); } credentials->AddCredential("impersonation_user_yt", NYql::TCredential("yt", "", impersonationUser)); ProgramFactory_->SetCredentials(credentials); auto program = ProgramFactory_->Create("-memory-", queryText); program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings)); NSQLTranslation::TTranslationSettings sqlSettings; sqlSettings.ClusterMapping = Clusters_; sqlSettings.ModuleMapping = Modules_; if (DefaultCluster_) { sqlSettings.DefaultCluster = *DefaultCluster_; } sqlSettings.SyntaxVersion = 1; sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable; if (!program->ParseSql(sqlSettings)) { return TQueryResult{ .YsonError = IssuesToYtErrorYson(program->Issues()), }; } if (!program->Compile(GetUsername())) { return TQueryResult{ .YsonError = IssuesToYtErrorYson(program->Issues()), }; } NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error; status = program->Run(GetUsername(), nullptr, nullptr, nullptr); if (status == NYql::TProgram::TStatus::Error) { return TQueryResult{ .YsonError = IssuesToYtErrorYson(program->Issues()), }; } TStringStream result; if (program->HasResults()) { ::NYson::TYsonWriter yson(&result, EYsonFormat::Binary); yson.OnBeginList(); for (const auto& result: program->Results()) { yson.OnListItem(); yson.OnRaw(result); } yson.OnEndList(); } auto maybeToOptional = [] (const TMaybe& maybeStr) -> std::optional { if (!maybeStr) { return std::nullopt; } return *maybeStr; }; return { .YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()), .Plan = maybeToOptional(program->GetQueryPlan()), .Statistics = maybeToOptional(program->GetStatistics()), .TaskInfo = maybeToOptional(program->GetTasksInfo()), }; } TQueryResult Run(TString impersonationUser, TString queryText, TYsonString settings) noexcept override { try { return GuardedRun(impersonationUser, queryText, settings); } catch (const std::exception& ex) { return TQueryResult{ .YsonError = ExceptionToYtErrorYson(ex), }; } } private: NYql::TFileStoragePtr FileStorage_; NYql::TExprContext ExprContext_; ::TIntrusivePtr FuncRegistry_; NYql::IModuleResolver::TPtr ModuleResolver_; NYql::TGatewaysConfig GatewaysConfig_; std::unique_ptr ProgramFactory_; TString YTTokenPath_; THashMap Clusters_; std::optional DefaultCluster_; THashMap Modules_; TYsonString OperationAttributes_; TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings) { auto querySettingsMap = NodeFromYsonString(querySettings.ToString()); auto resultAttributesMap = NodeFromYsonString(configAttributes.ToString()); for (const auto& item: querySettingsMap.AsMap()) { resultAttributesMap[item.first] = item.second; } return NodeToYsonString(resultAttributesMap); } }; //////////////////////////////////////////////////////////////////////////////// } // namespace NNative //////////////////////////////////////////////////////////////////////////////// std::unique_ptr CreateYqlPlugin(TYqlPluginOptions& options) noexcept { return std::make_unique(options); } //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NYqlPlugin