plugin.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. #include "plugin.h"
  2. #include "error_helpers.h"
  3. #include "progress_merger.h"
  4. #include <ydb/library/yql/providers/yt/lib/log/yt_logger.h>
  5. #include <ydb/library/yql/providers/yt/lib/yt_download/yt_download.h>
  6. #include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
  7. #include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
  8. #include <ydb/library/yql/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  9. #include "ydb/library/yql/providers/common/proto/gateways_config.pb.h"
  10. #include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
  11. #include <ydb/library/yql/ast/yql_expr.h>
  12. #include <ydb/library/yql/minikql/mkql_function_registry.h>
  13. #include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
  14. #include <ydb/library/yql/core/facade/yql_facade.h>
  15. #include <ydb/library/yql/core/file_storage/file_storage.h>
  16. #include "ydb/library/yql/core/file_storage/proto/file_storage.pb.h"
  17. #include <ydb/library/yql/core/services/mounts/yql_mounts.h>
  18. #include <ydb/library/yql/core/services/yql_transform_pipeline.h>
  19. #include <ydb/library/yql/core/url_preprocessing/url_preprocessing.h>
  20. #include <ydb/library/yql/utils/log/log.h>
  21. #include <ydb/library/yql/utils/backtrace/backtrace.h>
  22. #include <yt/cpp/mapreduce/interface/config.h>
  23. #include <yt/cpp/mapreduce/interface/logging/logger.h>
  24. #include <library/cpp/yt/threading/rw_spin_lock.h>
  25. #include <library/cpp/yson/node/node_io.h>
  26. #include <library/cpp/yson/parser.h>
  27. #include <library/cpp/yson/writer.h>
  28. #include <library/cpp/resource/resource.h>
  29. #include <library/cpp/digest/md5/md5.h>
  30. #include <util/folder/path.h>
  31. #include <util/stream/file.h>
  32. #include <util/string/builder.h>
  33. #include <util/system/fs.h>
  34. namespace NYT::NYqlPlugin {
  35. namespace NNative {
  36. using namespace NYson;
  37. ////////////////////////////////////////////////////////////////////////////////
  38. std::optional<TString> MaybeToOptional(const TMaybe<TString>& maybeStr)
  39. {
  40. if (!maybeStr) {
  41. return std::nullopt;
  42. }
  43. return *maybeStr;
  44. };
  45. ////////////////////////////////////////////////////////////////////////////////
  46. struct TQueryPlan
  47. {
  48. std::optional<TString> Plan;
  49. YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, PlanSpinLock);
  50. };
  51. struct TActiveQuery
  52. {
  53. TProgressMerger ProgressMerger;
  54. std::optional<TString> Plan;
  55. };
  56. ////////////////////////////////////////////////////////////////////////////////
  57. class TQueryPipelineConfigurator
  58. : public NYql::IPipelineConfigurator
  59. {
  60. public:
  61. TQueryPipelineConfigurator(NYql::TProgramPtr program, TQueryPlan& plan)
  62. : Program_(program)
  63. , Plan_(plan)
  64. { }
  65. void AfterCreate(NYql::TTransformationPipeline* /*pipeline*/) const override
  66. { }
  67. void AfterTypeAnnotation(NYql::TTransformationPipeline* /*pipeline*/) const override
  68. { }
  69. void AfterOptimize(NYql::TTransformationPipeline* pipeline) const override
  70. {
  71. auto transformer = [this](NYql::TExprNode::TPtr input, NYql::TExprNode::TPtr& output, NYql::TExprContext& /*ctx*/) {
  72. output = input;
  73. auto guard = WriterGuard(Plan_.PlanSpinLock);
  74. Plan_.Plan = MaybeToOptional(Program_->GetQueryPlan());
  75. return NYql::IGraphTransformer::TStatus::Ok;
  76. };
  77. pipeline->Add(NYql::CreateFunctorTransformer(transformer), "PlanOutput");
  78. }
  79. private:
  80. NYql::TProgramPtr Program_;
  81. TQueryPlan& Plan_;
  82. };
  83. ////////////////////////////////////////////////////////////////////////////////
  84. class TYqlPlugin
  85. : public IYqlPlugin
  86. {
  87. public:
  88. TYqlPlugin(TYqlPluginOptions& options)
  89. {
  90. try {
  91. NYql::NLog::InitLogger(std::move(options.LogBackend));
  92. auto& logger = NYql::NLog::YqlLogger();
  93. logger.SetDefaultPriority(ELogPriority::TLOG_DEBUG);
  94. for (int i = 0; i < NYql::NLog::EComponentHelpers::ToInt(NYql::NLog::EComponent::MaxValue); ++i) {
  95. logger.SetComponentLevel((NYql::NLog::EComponent) i, NYql::NLog::ELevel::DEBUG);
  96. }
  97. NYql::SetYtLoggerGlobalBackend(NYT::ILogger::ELevel::DEBUG);
  98. if (NYT::TConfig::Get()->Prefix.empty()) {
  99. NYT::TConfig::Get()->Prefix = "//";
  100. }
  101. auto yqlCoreFlags = GatewaysConfig_.GetYqlCore()
  102. .GetFlags();
  103. auto ytConfig = GatewaysConfig_.MutableYt();
  104. if (!ytConfig->HasExecuteUdfLocallyIfPossible()) {
  105. ytConfig->SetExecuteUdfLocallyIfPossible(true);
  106. }
  107. auto pattern = ytConfig->AddRemoteFilePatterns();
  108. pattern->SetPattern("yt://([a-zA-Z0-9\\-_]+)/([^&@?]+)$");
  109. pattern->SetCluster("$1");
  110. pattern->SetPath("$2");
  111. ytConfig->SetYtLogLevel(NYql::EYtLogLevel::YL_DEBUG);
  112. ytConfig->SetMrJobBin(options.MRJobBinary);
  113. ytConfig->SetMrJobBinMd5(MD5::File(options.MRJobBinary));
  114. ytConfig->ClearMrJobUdfsDir();
  115. auto setting = ytConfig->AddDefaultSettings();
  116. setting->SetName("NativeYtTypeCompatibility");
  117. setting->SetValue("all");
  118. for (const auto& [cluster, address]: options.Clusters) {
  119. auto item = ytConfig->AddClusterMapping();
  120. item->SetName(cluster);
  121. item->SetCluster(address);
  122. if (cluster == options.DefaultCluster) {
  123. item->SetDefault(true);
  124. }
  125. Clusters_.insert({item->GetName(), TString(NYql::YtProviderName)});
  126. }
  127. DefaultCluster_ = options.DefaultCluster;
  128. NYql::TFileStorageConfig fileStorageConfig;
  129. fileStorageConfig.SetMaxSizeMb(options.MaxFilesSizeMb);
  130. fileStorageConfig.SetMaxFiles(options.MaxFileCount);
  131. fileStorageConfig.SetRetryCount(options.DownloadFileRetryCount);
  132. FileStorage_ = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)}));
  133. FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(
  134. NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone();
  135. const NKikimr::NMiniKQL::TUdfModuleRemappings emptyRemappings;
  136. FuncRegistry_->SetBackTraceCallback(&NYql::NBacktrace::KikimrBackTrace);
  137. NKikimr::NMiniKQL::TUdfModulePathsMap systemModules;
  138. TVector<TString> udfPaths;
  139. NKikimr::NMiniKQL::FindUdfsInDir(options.UdfDirectory, &udfPaths);
  140. for (const auto& path: udfPaths) {
  141. // Skip YQL plugin shared library itself, it is not a UDF.
  142. if (path.EndsWith("libyqlplugin.so")) {
  143. continue;
  144. }
  145. FuncRegistry_->LoadUdfs(path, emptyRemappings, 0);
  146. }
  147. for (auto& m: FuncRegistry_->GetAllModuleNames()) {
  148. TMaybe<TString> path = FuncRegistry_->FindUdfPath(m);
  149. if (!path) {
  150. YQL_LOG(FATAL) << "Unable to detect UDF path for module " << m;
  151. exit(1);
  152. }
  153. systemModules.emplace(m, *path);
  154. }
  155. FuncRegistry_->SetSystemModulePaths(systemModules);
  156. auto userDataTable = GetYqlModuleResolver(ExprContext_, ModuleResolver_, {}, Clusters_, {});
  157. if (!userDataTable) {
  158. TStringStream err;
  159. ExprContext_.IssueManager
  160. .GetIssues()
  161. .PrintTo(err);
  162. YQL_LOG(FATAL) << "Failed to compile modules:\n"
  163. << err.Str();
  164. exit(1);
  165. }
  166. OperationAttributes_ = options.OperationAttributes;
  167. TVector<NYql::TDataProviderInitializer> dataProvidersInit;
  168. NYql::TYtNativeServices ytServices;
  169. ytServices.FunctionRegistry = FuncRegistry_.Get();
  170. ytServices.FileStorage = FileStorage_;
  171. ytServices.Config = std::make_shared<NYql::TYtGatewayConfig>(*ytConfig);
  172. auto ytNativeGateway = CreateYtNativeGateway(ytServices);
  173. dataProvidersInit.push_back(GetYtNativeDataProviderInitializer(ytNativeGateway));
  174. ProgramFactory_ = std::make_unique<NYql::TProgramFactory>(
  175. false, FuncRegistry_.Get(), ExprContext_.NextUniqueId, dataProvidersInit, "embedded");
  176. auto credentials = MakeIntrusive<NYql::TCredentials>();
  177. if (options.YTTokenPath) {
  178. TFsPath path(options.YTTokenPath);
  179. auto token = TIFStream(path).ReadAll();
  180. credentials->AddCredential("default_yt", NYql::TCredential("yt", "", token));
  181. }
  182. ProgramFactory_->AddUserDataTable(userDataTable);
  183. ProgramFactory_->SetCredentials(credentials);
  184. ProgramFactory_->SetModules(ModuleResolver_);
  185. ProgramFactory_->SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(FuncRegistry_.Get(), FileStorage_));
  186. ProgramFactory_->SetGatewaysConfig(&GatewaysConfig_);
  187. ProgramFactory_->SetFileStorage(FileStorage_);
  188. ProgramFactory_->SetUrlPreprocessing(MakeIntrusive<NYql::TUrlPreprocessing>(GatewaysConfig_));
  189. } catch (const std::exception& ex) {
  190. YQL_LOG(FATAL) << "Unexpected exception while initializing YQL plugin: " << ex.what();
  191. exit(1);
  192. }
  193. YQL_LOG(INFO) << "YQL plugin initialized";
  194. }
  195. TQueryResult GuardedRun(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files)
  196. {
  197. auto program = ProgramFactory_->Create("-memory-", queryText);
  198. program->AddCredentials({{"impersonation_user_yt", NYql::TCredential("yt", "", impersonationUser)}});
  199. program->SetOperationAttrsYson(PatchQueryAttributes(OperationAttributes_, settings));
  200. auto userDataTable = FilesToUserTable(files);
  201. program->AddUserDataTable(userDataTable);
  202. TQueryPlan queryPlan;
  203. auto pipelineConfigurator = TQueryPipelineConfigurator(program, queryPlan);
  204. program->SetProgressWriter([&] (const NYql::TOperationProgress& progress) {
  205. std::optional<TString> plan;
  206. {
  207. auto guard = ReaderGuard(queryPlan.PlanSpinLock);
  208. plan.swap(queryPlan.Plan);
  209. }
  210. auto guard = WriterGuard(ProgressSpinLock);
  211. ActiveQueriesProgress_[queryId].ProgressMerger.MergeWith(progress);
  212. if (plan) {
  213. ActiveQueriesProgress_[queryId].Plan.swap(plan);
  214. }
  215. });
  216. NSQLTranslation::TTranslationSettings sqlSettings;
  217. sqlSettings.ClusterMapping = Clusters_;
  218. sqlSettings.ModuleMapping = Modules_;
  219. if (DefaultCluster_) {
  220. sqlSettings.DefaultCluster = *DefaultCluster_;
  221. }
  222. sqlSettings.SyntaxVersion = 1;
  223. sqlSettings.V0Behavior = NSQLTranslation::EV0Behavior::Disable;
  224. if (!program->ParseSql(sqlSettings)) {
  225. return TQueryResult{
  226. .YsonError = IssuesToYtErrorYson(program->Issues()),
  227. };
  228. }
  229. if (!program->Compile(impersonationUser)) {
  230. return TQueryResult{
  231. .YsonError = IssuesToYtErrorYson(program->Issues()),
  232. };
  233. }
  234. NYql::TProgram::TStatus status = NYql::TProgram::TStatus::Error;
  235. status = program->RunWithConfig(impersonationUser, pipelineConfigurator);
  236. if (status == NYql::TProgram::TStatus::Error) {
  237. return TQueryResult{
  238. .YsonError = IssuesToYtErrorYson(program->Issues()),
  239. };
  240. }
  241. TStringStream result;
  242. if (program->HasResults()) {
  243. ::NYson::TYsonWriter yson(&result, EYsonFormat::Binary);
  244. yson.OnBeginList();
  245. for (const auto& result: program->Results()) {
  246. yson.OnListItem();
  247. yson.OnRaw(result);
  248. }
  249. yson.OnEndList();
  250. }
  251. TString progress;
  252. {
  253. auto guard = WriterGuard(ProgressSpinLock);
  254. progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString();
  255. ActiveQueriesProgress_.erase(queryId);
  256. }
  257. return {
  258. .YsonResult = result.Empty() ? std::nullopt : std::make_optional(result.Str()),
  259. .Plan = MaybeToOptional(program->GetQueryPlan()),
  260. .Statistics = MaybeToOptional(program->GetStatistics()),
  261. .Progress = progress,
  262. .TaskInfo = MaybeToOptional(program->GetTasksInfo()),
  263. };
  264. }
  265. TQueryResult Run(TQueryId queryId, TString impersonationUser, TString queryText, TYsonString settings, std::vector<TQueryFile> files) noexcept override
  266. {
  267. try {
  268. return GuardedRun(queryId, impersonationUser, queryText, settings, files);
  269. } catch (const std::exception& ex) {
  270. {
  271. auto guard = WriterGuard(ProgressSpinLock);
  272. ActiveQueriesProgress_.erase(queryId);
  273. }
  274. return TQueryResult{
  275. .YsonError = MessageToYtErrorYson(ex.what()),
  276. };
  277. }
  278. }
  279. TQueryResult GetProgress(TQueryId queryId) noexcept override
  280. {
  281. auto guard = ReaderGuard(ProgressSpinLock);
  282. if (ActiveQueriesProgress_.contains(queryId)) {
  283. TQueryResult result;
  284. if (ActiveQueriesProgress_[queryId].ProgressMerger.HasChangesSinceLastFlush()) {
  285. result.Plan = ActiveQueriesProgress_[queryId].Plan;
  286. result.Progress = ActiveQueriesProgress_[queryId].ProgressMerger.ToYsonString();
  287. }
  288. return result;
  289. } else {
  290. return TQueryResult{
  291. .YsonError = MessageToYtErrorYson(Format("No progress for queryId: %v", queryId)),
  292. };
  293. }
  294. }
  295. private:
  296. NYql::TFileStoragePtr FileStorage_;
  297. NYql::TExprContext ExprContext_;
  298. ::TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FuncRegistry_;
  299. NYql::IModuleResolver::TPtr ModuleResolver_;
  300. NYql::TGatewaysConfig GatewaysConfig_;
  301. std::unique_ptr<NYql::TProgramFactory> ProgramFactory_;
  302. THashMap<TString, TString> Clusters_;
  303. std::optional<TString> DefaultCluster_;
  304. THashMap<TString, TString> Modules_;
  305. TYsonString OperationAttributes_;
  306. YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, ProgressSpinLock);
  307. THashMap<TQueryId, TActiveQuery> ActiveQueriesProgress_;
  308. TVector<NYql::TDataProviderInitializer> DataProvidersInit_;
  309. TString PatchQueryAttributes(TYsonString configAttributes, TYsonString querySettings)
  310. {
  311. auto querySettingsMap = NodeFromYsonString(querySettings.ToString());
  312. auto resultAttributesMap = NodeFromYsonString(configAttributes.ToString());
  313. for (const auto& item: querySettingsMap.AsMap()) {
  314. resultAttributesMap[item.first] = item.second;
  315. }
  316. return NodeToYsonString(resultAttributesMap);
  317. }
  318. NYql::TUserDataTable FilesToUserTable(const std::vector<TQueryFile>& files)
  319. {
  320. NYql::TUserDataTable table;
  321. for (const auto& file : files) {
  322. NYql::TUserDataBlock& block = table[NYql::TUserDataKey::File(NYql::GetDefaultFilePrefix() + file.Name)];
  323. block.Data = file.Content;
  324. switch (file.Type) {
  325. case EQueryFileContentType::RawInlineData: {
  326. block.Type = NYql::EUserDataType::RAW_INLINE_DATA;
  327. break;
  328. }
  329. case EQueryFileContentType::Url: {
  330. block.Type = NYql::EUserDataType::URL;
  331. break;
  332. }
  333. default: {
  334. ythrow yexception() << "Unexpected file content type";
  335. }
  336. }
  337. }
  338. return table;
  339. }
  340. };
  341. ////////////////////////////////////////////////////////////////////////////////
  342. } // namespace NNative
  343. ////////////////////////////////////////////////////////////////////////////////
  344. std::unique_ptr<IYqlPlugin> CreateYqlPlugin(TYqlPluginOptions& options) noexcept
  345. {
  346. return std::make_unique<NNative::TYqlPlugin>(options);
  347. }
  348. ////////////////////////////////////////////////////////////////////////////////
  349. } // namespace NYT::NYqlPlugin