yqlrun.cpp 12 KB


  1. #include <yql/tools/yqlrun/lib/yqlrun_lib.h>
  2. #include <yql/tools/yqlrun/http/yql_server.h>
  3. #include <yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h>
  4. #include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  5. #include <yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h>
  6. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  7. #include <yql/essentials/minikql/mkql_function_registry.h>
  8. #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
  9. #include <yql/essentials/utils/backtrace/backtrace.h>
  10. #include <yql/essentials/utils/log/tls_backend.h>
  11. #include <yql/essentials/utils/log/log.h>
  12. #include <yql/essentials/parser/pg_wrapper/interface/parser.h>
  13. #include <yql/essentials/parser/pg_catalog/catalog.h>
  14. #include <yql/essentials/protos/pg_ext.pb.h>
  15. #include <yql/essentials/core/file_storage/file_storage.h>
  16. #include <yql/essentials/core/file_storage/proto/file_storage.pb.h>
  17. #include <yql/essentials/core/services/mounts/yql_mounts.h>
  18. #include <yql/essentials/core/facade/yql_facade.h>
  19. #include <yql/essentials/core/pg_ext/yql_pg_ext.h>
  20. #include <yql/essentials/core/yql_udf_resolver.h>
  21. #include <yql/essentials/core/yql_udf_index.h>
  22. #include <yql/essentials/core/yql_library_compiler.h>
  23. #include <yql/essentials/ast/yql_expr.h>
  24. #include <yql/essentials/sql/sql.h>
  25. #include <yql/essentials/sql/v1/sql.h>
  26. #include <yql/essentials/sql/v1/lexer/antlr4/lexer.h>
  27. #include <yql/essentials/sql/v1/lexer/antlr4_ansi/lexer.h>
  28. #include <yql/essentials/sql/v1/proto_parser/antlr4/proto_parser.h>
  29. #include <yql/essentials/sql/v1/proto_parser/antlr4_ansi/proto_parser.h>
  30. #include <library/cpp/getopt/last_getopt.h>
  31. #include <library/cpp/logger/stream.h>
  32. #include <google/protobuf/text_format.h>
  33. #include <util/generic/string.h>
  34. #include <util/generic/vector.h>
  35. #include <util/generic/hash.h>
  36. #include <util/datetime/base.h>
  37. using namespace NYql;
  38. using namespace NKikimr::NMiniKQL;
  39. using namespace NYql::NHttp;
  40. namespace NMiniKQL = NKikimr::NMiniKQL;
  41. class TStoreMappingFunctor: public NLastGetopt::IOptHandler {
  42. public:
  43. TStoreMappingFunctor(THashMap<TString, TString>* target, char delim = '@')
  44. : Target(target)
  45. , Delim(delim)
  46. {
  47. }
  48. void HandleOpt(const NLastGetopt::TOptsParser* parser) final {
  49. const TStringBuf val(parser->CurValOrDef());
  50. const auto service = TString(val.After(Delim));
  51. auto res = Target->emplace(TString(val.Before(Delim)), service);
  52. if (!res.second) {
  53. /// force replace already exist parametr
  54. res.first->second = service;
  55. }
  56. }
  57. private:
  58. THashMap<TString, TString>* Target;
  59. char Delim;
  60. };
  61. void CommonInit(const NLastGetopt::TOptsParseResult& res, const TString& udfResolverPath, bool filterSysCalls,
  62. const TVector<TString>& udfsPaths, TFileStoragePtr fileStorage,
  63. IUdfResolver::TPtr& udfResolver, IFunctionRegistry::TPtr funcRegistry, TUdfIndex::TPtr& udfIndex) {
  64. if (fileStorage && res.Has("scan-udfs")) {
  65. if (!udfResolverPath) {
  66. ythrow yexception() << "udf-resolver path must be specified when use 'scan-udfs'";
  67. }
  68. udfResolver = NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), fileStorage, udfResolverPath, {}, {}, filterSysCalls, {});
  69. Cerr << TInstant::Now().ToStringLocalUpToSeconds() << " Udf scanning started for " << udfsPaths.size() << " udfs ..." << Endl;
  70. udfIndex = new TUdfIndex();
  71. LoadRichMetadataToUdfIndex(*udfResolver, udfsPaths, false, TUdfIndex::EOverrideMode::RaiseError, *udfIndex);
  72. Cerr << TInstant::Now().ToStringLocalUpToSeconds() << " UdfIndex done." << Endl;
  73. udfResolver = NCommon::CreateUdfResolverWithIndex(udfIndex, udfResolver, fileStorage);
  74. Cerr << TInstant::Now().ToStringLocalUpToSeconds() << " Udfs scanned" << Endl;
  75. return;
  76. }
  77. udfResolver = fileStorage && udfResolverPath
  78. ? NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), fileStorage, udfResolverPath, {}, {}, false, {})
  79. : NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), fileStorage, true);
  80. }
  81. template <typename TMessage>
  82. THolder<TMessage> ParseProtoConfig(const TString& cfgFile) {
  83. auto config = MakeHolder<TMessage>();
  84. TString configData = TFileInput(cfgFile).ReadAll();;
  85. using ::google::protobuf::TextFormat;
  86. if (!TextFormat::ParseFromString(configData, config.Get())) {
  87. Cerr << "Bad format of gateways configuration";
  88. return {};
  89. }
  90. return config;
  91. }
  92. int RunUI(int argc, const char* argv[])
  93. {
  94. Cerr << "yqlrun ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl;
  95. NYql::NBacktrace::RegisterKikimrFatalActions();
  96. NYql::NBacktrace::EnableKikimrSymbolize();
  97. TVector<TString> udfsPaths;
  98. TString udfsDir;
  99. TString mountConfig;
  100. TVector<TString> filesMappingList;
  101. TString udfResolverPath;
  102. bool udfResolverFilterSyscalls = false;
  103. TString gatewaysCfgFile;
  104. TString fsCfgFile;
  105. TString pgExtConfig;
  106. THashMap<TString, TString> clusterMapping;
  107. clusterMapping["plato"] = YtProviderName;
  108. THashSet<TString> sqlFlags;
  109. NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
  110. opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&udfsPaths);
  111. opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory").StoreResult(&udfsDir);
  112. opts.AddLongOption('m', "mounts", "Mount points config file.").StoreResult(&mountConfig);
  113. opts.AddLongOption('f', "file", "name@path").AppendTo(&filesMappingList);
  114. opts.AddLongOption("udf-resolver", "Path to udf-resolver").Optional().RequiredArgument("PATH").StoreResult(&udfResolverPath);
  115. opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver")
  116. .Optional()
  117. .NoArgument()
  118. .SetFlag(&udfResolverFilterSyscalls);
  119. opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf resolver to use static function registry").NoArgument();
  120. opts.AddLongOption('C', "cluster", "set cluster to service mapping").RequiredArgument("name@service").Handler(new TStoreMappingFunctor(&clusterMapping));
  121. opts.AddLongOption("gateways-cfg", "gateways configuration file").Optional().RequiredArgument("FILE").StoreResult(&gatewaysCfgFile);
  122. opts.AddLongOption("fs-cfg", "fs configuration file").Optional().RequiredArgument("FILE").StoreResult(&fsCfgFile);
  123. opts.AddLongOption("pg-ext", "pg extensions config file").StoreResult(&pgExtConfig);
  124. opts.AddLongOption("sql-flags", "SQL translator pragma flags").SplitHandler(&sqlFlags, ',');
  125. TServerConfig config;
  126. config.SetAssetsPath("http/www");
  127. config.InitCliOptions(opts);
  128. NLastGetopt::TOptsParseResult res(&opts, argc, argv);
  129. config.ParseFromCli(res);
  130. TUserDataTable userData;
  131. for (auto& s : filesMappingList) {
  132. TStringBuf fileName, filePath;
  133. TStringBuf(s).Split('@', fileName, filePath);
  134. if (fileName.empty() || filePath.empty()) {
  135. Cerr << "Incorrect file mapping, expected form name@path, e.g. MyFile@file.txt" << Endl;
  136. return 1;
  137. }
  138. auto& file = userData[TUserDataKey::File(GetDefaultFilePrefix() + fileName)];
  139. file.Type = EUserDataType::PATH;
  140. file.Data = filePath;
  141. }
  142. NMiniKQL::FindUdfsInDir(udfsDir, &udfsPaths);
  143. NPg::SetSqlLanguageParser(NSQLTranslationPG::CreateSqlLanguageParser());
  144. NPg::LoadSystemFunctions(*NSQLTranslationPG::CreateSystemFunctionsParser());
  145. if (!pgExtConfig.empty()) {
  146. auto config = ParseProtoConfig<NProto::TPgExtensions>(pgExtConfig);
  147. Y_ABORT_UNLESS(config);
  148. TVector<NPg::TExtensionDesc> extensions;
  149. PgExtensionsFromProto(*config, extensions);
  150. NPg::RegisterExtensions(extensions, false,
  151. *NSQLTranslationPG::CreateExtensionSqlParser(),
  152. NKikimr::NMiniKQL::CreateExtensionLoader().get());
  153. }
  154. NPg::GetSqlLanguageParser()->Freeze();
  155. THolder<TGatewaysConfig> gatewaysConfig;
  156. if (!gatewaysCfgFile.empty()) {
  157. gatewaysConfig = ParseProtoConfig<TGatewaysConfig>(gatewaysCfgFile);
  158. if (!gatewaysConfig) {
  159. return -1;
  160. }
  161. if (gatewaysConfig->HasSqlCore()) {
  162. sqlFlags.insert(gatewaysConfig->GetSqlCore().GetTranslationFlags().begin(), gatewaysConfig->GetSqlCore().GetTranslationFlags().end());
  163. }
  164. }
  165. THolder<TFileStorageConfig> fsConfig;
  166. if (!fsCfgFile.empty()) {
  167. fsConfig = ParseProtoConfig<TFileStorageConfig>(fsCfgFile);
  168. if (!fsConfig) {
  169. return 1;
  170. }
  171. } else {
  172. fsConfig = MakeHolder<TFileStorageConfig>();
  173. }
  174. auto fileStorage = WithAsync(CreateFileStorage(*fsConfig));
  175. IUdfResolver::TPtr udfResolver;
  176. auto funcRegistry = CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, CreateBuiltinRegistry(), false, udfsPaths);
  177. TUdfIndex::TPtr udfIndex;
  178. CommonInit(res, udfResolverPath, udfResolverFilterSyscalls, udfsPaths, fileStorage, udfResolver, funcRegistry, udfIndex);
  179. NSQLTranslationV1::TLexers lexers;
  180. lexers.Antlr4 = NSQLTranslationV1::MakeAntlr4LexerFactory();
  181. lexers.Antlr4Ansi = NSQLTranslationV1::MakeAntlr4AnsiLexerFactory();
  182. NSQLTranslationV1::TParsers parsers;
  183. parsers.Antlr4 = NSQLTranslationV1::MakeAntlr4ParserFactory();
  184. parsers.Antlr4Ansi = NSQLTranslationV1::MakeAntlr4AnsiParserFactory();
  185. NSQLTranslation::TTranslators translators(
  186. nullptr,
  187. NSQLTranslationV1::MakeTranslator(lexers, parsers),
  188. NSQLTranslationPG::MakeTranslator()
  189. );
  190. TExprContext ctx;
  191. ctx.NextUniqueId = NPg::GetSqlLanguageParser()->GetContext().NextUniqueId;
  192. IModuleResolver::TPtr moduleResolver;
  193. if (!mountConfig.empty()) {
  194. TModulesTable modules;
  195. auto mount = ParseProtoConfig<NYqlMountConfig::TMountConfig>(mountConfig);
  196. Y_ABORT_UNLESS(mount);
  197. FillUserDataTableFromFileSystem(*mount, userData);
  198. if (!CompileLibraries(translators, userData, ctx, modules)) {
  199. Cerr << "Errors on compile libraries:" << Endl;
  200. ctx.IssueManager.GetIssues().PrintTo(Cerr);
  201. return -1;
  202. }
  203. moduleResolver = std::make_shared<TModuleResolver>(translators, std::move(modules), ctx.NextUniqueId, clusterMapping, sqlFlags);
  204. } else {
  205. if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusterMapping)) {
  206. Cerr << "Errors loading default YQL libraries:" << Endl;
  207. ctx.IssueManager.GetIssues().PrintTo(Cerr);
  208. return -1;
  209. }
  210. }
  211. TString fn = "pkg/a/b/c.sql";
  212. TString content0 = "$sqr = ($x) -> { return 2 * $x * $x; }; export $sqr;";
  213. TString content1 = "$sqr = ($x) -> { return 3 * $x * $x; }; export $sqr;";
  214. moduleResolver->RegisterPackage("a.b");
  215. if (!moduleResolver->AddFromMemory(fn, content0, ctx, 1, 0) || !moduleResolver->AddFromMemory(fn, content1, ctx, 1, 1)) {
  216. Cerr << "Unable to compile SQL library" << Endl;
  217. ctx.IssueManager.GetIssues().PrintTo(Cerr);
  218. return -1;
  219. }
  220. TExprContext::TFreezeGuard freezeGuard(ctx);
  221. NLog::YqlLoggerScope logger(new NLog::TTlsLogBackend(new TStreamLogBackend(&Cerr)));
  222. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::DEBUG);
  223. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, NLog::ELevel::DEBUG);
  224. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, NLog::ELevel::DEBUG);
  225. auto server = CreateYqlServer(config,
  226. funcRegistry.Get(), udfIndex, ctx.NextUniqueId,
  227. userData,
  228. std::move(gatewaysConfig),
  229. sqlFlags,
  230. moduleResolver, udfResolver, fileStorage);
  231. server->Start();
  232. server->Wait();
  233. return 0;
  234. }
  235. int main(int argc, const char *argv[]) {
  236. try {
  237. if (argc > 1 && TString(argv[1]) == TStringBuf("ui")) {
  238. return RunUI(argc, argv);
  239. } else {
  240. return NYql::TYqlRunTool().Main(argc, argv);
  241. }
  242. }
  243. catch (...) {
  244. Cerr << CurrentExceptionMessage() << Endl;
  245. return 1;
  246. }
  247. }