plugin.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. #include "plugin.h"
  2. #include "error_helpers.h"
  3. #include <ydb/library/yql/providers/yt/lib/log/yt_logger.h>
  4. #include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
  5. #include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
  6. #include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
  7. #include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h>
  8. #include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  9. #include "ydb/library/yql/providers/common/proto/gateways_config.pb.h"
  10. #include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
  11. #include <ydb/library/yql/ast/yql_expr.h>
  12. #include <ydb/library/yql/minikql/mkql_function_registry.h>
  13. #include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
  14. #include <ydb/library/yql/core/facade/yql_facade.h>
  15. #include <ydb/library/yql/core/file_storage/file_storage.h>
  16. #include "ydb/library/yql/core/file_storage/proto/file_storage.pb.h"
  17. #include <ydb/library/yql/core/services/mounts/yql_mounts.h>
  18. #include <ydb/library/yql/utils/log/log.h>
  19. #include <ydb/library/yql/utils/backtrace/backtrace.h>
  20. #include <yt/cpp/mapreduce/interface/config.h>
  21. #include <yt/cpp/mapreduce/interface/logging/logger.h>
  22. #include <library/cpp/yson/node/node_io.h>
  23. #include <library/cpp/yson/parser.h>
  24. #include <library/cpp/yson/writer.h>
  25. #include <library/cpp/resource/resource.h>
  26. #include <library/cpp/digest/md5/md5.h>
  27. #include <util/folder/path.h>
  28. #include <util/stream/file.h>
  29. #include <util/string/builder.h>
  30. #include <util/system/fs.h>
  31. #include <util/system/user.h>
  32. namespace NYT::NYqlPlugin {
  33. namespace NNative {
  34. using namespace NYson;
  35. ////////////////////////////////////////////////////////////////////////////////
  36. class TYqlPlugin
  37. : public IYqlPlugin
  38. {
  39. public:
  40. TYqlPlugin(TYqlPluginOptions& options)
  41. {
  42. try {
  43. NYql::NLog::InitLogger(std::move(options.LogBackend));
  44. auto& logger = NYql::NLog::YqlLogger();
  45. logger.SetDefaultPriority(ELogPriority::TLOG_DEBUG);
  46. for (int i = 0; i < NYql::NLog::EComponentHelpers::ToInt(NYql::NLog::EComponent::MaxValue); ++i) {
  47. logger.SetComponentLevel((NYql::NLog::EComponent) i, NYql::NLog::ELevel::DEBUG);
  48. }
  49. NYql::SetYtLoggerGlobalBackend(NYT::ILogger::ELevel::DEBUG);
  50. if (NYT::TConfig::Get()->Prefix.empty()) {
  51. NYT::TConfig::Get()->Prefix = "//";
  52. }
  53. auto yqlCoreFlags = GatewaysConfig_.GetYqlCore()
  54. .GetFlags();
  55. auto ytConfig = GatewaysConfig_.MutableYt();
  56. if (!ytConfig->HasExecuteUdfLocallyIfPossible()) {
  57. ytConfig->SetExecuteUdfLocallyIfPossible(true);
  58. }
  59. ytConfig->SetYtLogLevel(NYql::EYtLogLevel::YL_DEBUG);
  60. ytConfig->SetMrJobBin(options.MRJobBinary);
  61. ytConfig->SetMrJobBinMd5(MD5::File(options.MRJobBinary));
  62. ytConfig->ClearMrJobUdfsDir();
  63. auto setting = ytConfig->AddDefaultSettings();
  64. setting->SetName("NativeYtTypeCompatibility");
  65. setting->SetValue("all");
  66. for (const auto& [cluster, address]: options.Clusters) {
  67. auto item = ytConfig->AddClusterMapping();
  68. item->SetName(cluster);
  69. item->SetCluster(address);
  70. if (cluster == options.DefaultCluster) {
  71. item->SetDefault(true);
  72. }
  73. Clusters_.insert({item->GetName(), TString(NYql::YtProviderName)});
  74. }
  75. DefaultCluster_ = options.DefaultCluster;
  76. NYql::TFileStorageConfig fileStorageConfig;
  77. fileStorageConfig.SetMaxSizeMb(1 << 14);
  78. FileStorage_ = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)}));
  79. FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(
  80. NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone();
  81. const NKikimr::NMiniKQL::TUdfModuleRemappings emptyRemappings;
  82. FuncRegistry_->SetBackTraceCallback(&NYql::NBacktrace::KikimrBackTrace);
  83. NKikimr::NMiniKQL::TUdfModulePathsMap systemModules;
  84. TVector<TString> udfPaths;
  85. NKikimr::NMiniKQL::FindUdfsInDir(options.UdfDirectory, &udfPaths);
  86. for (const auto& path: udfPaths) {
  87. // Skip YQL plugin shared library itself, it is not a UDF.
  88. if (path.EndsWith("libyqlplugin.so")) {
  89. continue;
  90. }
  91. FuncRegistry_->LoadUdfs(path, emptyRemappings, 0);
  92. }
  93. for (auto& m: FuncRegistry_->GetAllModuleNames()) {
  94. TMaybe<TString> path = FuncRegistry_->FindUdfPath(m);
  95. if (!path) {
  96. YQL_LOG(FATAL) << "Unable to detect UDF path for module " << m;
  97. exit(1);
  98. }
  99. systemModules.emplace(m, *path);
  100. }
  101. FuncRegistry_->SetSystemModulePaths(systemModules);
  102. auto userDataTable = GetYqlModuleResolver(ExprContext_, ModuleResolver_, {}, Clusters_, {});
  103. if (!userDataTable) {
  104. TStringStream err;
  105. ExprContext_.IssueManager
  106. .GetIssues()
  107. .PrintTo(err);
  108. YQL_LOG(FATAL) << "Failed to compile modules:\n"
  109. << err.Str();
  110. exit(1);
  111. }
  112. OperationAttributes_ = options.OperationAttributes;
  113. TVector<NYql::TDataProviderInitializer> dataProvidersInit;
  114. NYql::TYtNativeServices ytServices;
  115. ytServices.FunctionRegistry = FuncRegistry_.Get();
  116. ytServices.FileStorage = FileStorage_;
  117. ytServices.Config = std::make_shared<NYql::TYtGatewayConfig>(*ytConfig);
  118. auto ytNativeGateway = CreateYtNativeGateway(ytServices);
  119. dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
  120. ProgramFactory_ = std::make_unique<NYql::TProgramFactory>(
  121. false, FuncRegistry_.Get(), ExprContext_.NextUniqueId, dataProvidersInit, "embedded");
  122. YTTokenPath_ = options.YTTokenPath;
  123. ProgramFactory_->AddUserDataTable(userDataTable);
  124. ProgramFactory_->SetModules(ModuleResolver_);
  125. ProgramFactory_->SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get(), FileStorage_));
  126. ProgramFactory_->SetGatewaysConfig(&GatewaysConfig_);
  127. ProgramFactory_->SetFileStorage(FileStorage_);
  128. ProgramFactory_->SetUrlPreprocessing(MakeIntrusive<NYql::TUrlPreprocessing>(GatewaysConfig_));
  129. } catch (const std::exception& ex) {
  130. YQL_LOG(FATAL) << "Unexpected exception while initializing YQL plugin: " << ex.what();
  131. exit(1);
  132. }
  133. YQL_LOG(INFO) << "YQL plugin initialized";
  134. }
  135. TQueryResult GuardedRun(TString impersonationUser, TString queryText, TYsonString settings)
  136. {
  137. auto credentials = MakeIntrusive<NYql::TCredentials>();
  138. if (YTTokenPath_) {
  139. TFsPath path(YTTokenPath_);
  140. auto token = TIFStream(path).ReadAll();
  141. credentials->AddCredential("default_yt", NYql::TCredential("yt", "", token));
  142. }
  143. credentials->AddCredential("impersonation_user_yt", NYql::TCredential("yt", "", impersonationUser));
  144. ProgramFactory_->SetCredentials(credentials);
  145. auto program = ProgramFactory_->Create("-memory-", queryText);
  146. program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings));
  147. NSQLTranslation::TTranslationSettings sqlSettings;
  148. sqlSettings.ClusterMapping = Clusters_;
  149. sqlSettings.ModuleMapping = Modules_;
  150. if (DefaultCluster_) {
  151. sqlSettings.DefaultCluster = *DefaultCluster_;
  152. }
  153. sqlSettings.SyntaxVersion = 1;
  154. sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
  155. if (!program->ParseSql(sqlSettings)) {
  156. return TQueryResult{
  157. .YsonError = IssuesToYtErrorYson(program->Issues()),
  158. };
  159. }
  160. if (!program->Compile(GetUsername())) {
  161. return TQueryResult{
  162. .YsonError = IssuesToYtErrorYson(program->Issues()),
  163. };
  164. }
  165. NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error;
  166. status = program->Run(GetUsername(), nullptr, nullptr, nullptr);
  167. if (status == NYql::TProgram::TStatus::Error) {
  168. return TQueryResult{
  169. .YsonError = IssuesToYtErrorYson(program->Issues()),
  170. };
  171. }
  172. TStringStream result;
  173. if (program->HasResults()) {
  174. ::NYson::TYsonWriter yson(&result, EYsonFormat::Binary);
  175. yson.OnBeginList();
  176. for (const auto& result: program->Results()) {
  177. yson.OnListItem();
  178. yson.OnRaw(result);
  179. }
  180. yson.OnEndList();
  181. }
  182. auto maybeToOptional = [] (const TMaybe<TString>& maybeStr) -> std::optional<TString> {
  183. if (!maybeStr) {
  184. return std::nullopt;
  185. }
  186. return *maybeStr;
  187. };
  188. return {
  189. .YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()),
  190. .Plan = maybeToOptional(program->GetQueryPlan()),
  191. .Statistics = maybeToOptional(program->GetStatistics()),
  192. .TaskInfo = maybeToOptional(program->GetTasksInfo()),
  193. };
  194. }
  195. TQueryResult Run(TString impersonationUser, TString queryText, TYsonString settings) noexcept override
  196. {
  197. try {
  198. return GuardedRun(impersonationUser, queryText, settings);
  199. } catch (const std::exception& ex) {
  200. return TQueryResult{
  201. .YsonError = ExceptionToYtErrorYson(ex),
  202. };
  203. }
  204. }
  205. private:
  206. NYql::TFileStoragePtr FileStorage_;
  207. NYql::TExprContext ExprContext_;
  208. ::TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry_;
  209. NYql::IModuleResolver::TPtr ModuleResolver_;
  210. NYql::TGatewaysConfig GatewaysConfig_;
  211. std::unique_ptr<NYql::TProgramFactory> ProgramFactory_;
  212. TString YTTokenPath_;
  213. THashMap<TString, TString> Clusters_;
  214. std::optional<TString> DefaultCluster_;
  215. THashMap<TString, TString> Modules_;
  216. TYsonString OperationAttributes_;
  217. TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings)
  218. {
  219. auto querySettingsMap = NodeFromYsonString(querySettings.ToString());
  220. auto resultAttributesMap = NodeFromYsonString(configAttributes.ToString());
  221. for (const auto& item: querySettingsMap.AsMap()) {
  222. resultAttributesMap[item.first] = item.second;
  223. }
  224. return NodeToYsonString(resultAttributesMap);
  225. }
  226. };
  227. ////////////////////////////////////////////////////////////////////////////////
  228. } // namespace NNative
  229. ////////////////////////////////////////////////////////////////////////////////
  230. std::unique_ptr<IYqlPlugin> CreateYqlPlugin(TYqlPluginOptions& options) noexcept
  231. {
  232. return std::make_unique<NNative::TYqlPlugin>(options);
  233. }
  234. ////////////////////////////////////////////////////////////////////////////////
  235. } // namespace NYT::NYqlPlugin