yqlrun.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. #include <yql/tools/yqlrun/lib/yqlrun_lib.h>
  2. #include <yql/tools/yqlrun/http/yql_server.h>
  3. #include <yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h>
  4. #include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  5. #include <yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h>
  6. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  7. #include <yql/essentials/minikql/mkql_function_registry.h>
  8. #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
  9. #include <yql/essentials/utils/backtrace/backtrace.h>
  10. #include <yql/essentials/utils/log/tls_backend.h>
  11. #include <yql/essentials/utils/log/log.h>
  12. #include <yql/essentials/parser/pg_wrapper/interface/parser.h>
  13. #include <yql/essentials/parser/pg_catalog/catalog.h>
  14. #include <yql/essentials/protos/pg_ext.pb.h>
  15. #include <yql/essentials/core/file_storage/file_storage.h>
  16. #include <yql/essentials/core/file_storage/proto/file_storage.pb.h>
  17. #include <yql/essentials/core/services/mounts/yql_mounts.h>
  18. #include <yql/essentials/core/facade/yql_facade.h>
  19. #include <yql/essentials/core/pg_ext/yql_pg_ext.h>
  20. #include <yql/essentials/core/yql_udf_resolver.h>
  21. #include <yql/essentials/core/yql_udf_index.h>
  22. #include <yql/essentials/core/yql_library_compiler.h>
  23. #include <yql/essentials/ast/yql_expr.h>
  24. #include <yql/essentials/sql/sql.h>
  25. #include <yql/essentials/sql/v1/sql.h>
  26. #include <library/cpp/getopt/last_getopt.h>
  27. #include <library/cpp/logger/stream.h>
  28. #include <google/protobuf/text_format.h>
  29. #include <util/generic/string.h>
  30. #include <util/generic/vector.h>
  31. #include <util/generic/hash.h>
  32. #include <util/datetime/base.h>
  33. using namespace NYql;
  34. using namespace NKikimr::NMiniKQL;
  35. using namespace NYql::NHttp;
  36. namespace NMiniKQL = NKikimr::NMiniKQL;
  37. class TStoreMappingFunctor: public NLastGetopt::IOptHandler {
  38. public:
  39. TStoreMappingFunctor(THashMap<TString, TString>* target, char delim = '@')
  40. : Target(target)
  41. , Delim(delim)
  42. {
  43. }
  44. void HandleOpt(const NLastGetopt::TOptsParser* parser) final {
  45. const TStringBuf val(parser->CurValOrDef());
  46. const auto service = TString(val.After(Delim));
  47. auto res = Target->emplace(TString(val.Before(Delim)), service);
  48. if (!res.second) {
  49. /// force replace already exist parametr
  50. res.first->second = service;
  51. }
  52. }
  53. private:
  54. THashMap<TString, TString>* Target;
  55. char Delim;
  56. };
  57. void CommonInit(const NLastGetopt::TOptsParseResult& res, const TString& udfResolverPath, bool filterSysCalls,
  58. const TVector<TString>& udfsPaths, TFileStoragePtr fileStorage,
  59. IUdfResolver::TPtr& udfResolver, IFunctionRegistry::TPtr funcRegistry, TUdfIndex::TPtr& udfIndex) {
  60. if (fileStorage && res.Has("scan-udfs")) {
  61. if (!udfResolverPath) {
  62. ythrow yexception() << "udf-resolver path must be specified when use 'scan-udfs'";
  63. }
  64. udfResolver = NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), fileStorage, udfResolverPath, {}, {}, filterSysCalls, {});
  65. Cerr << TInstant::Now().ToStringLocalUpToSeconds() << " Udf scanning started for " << udfsPaths.size() << " udfs ..." << Endl;
  66. udfIndex = new TUdfIndex();
  67. LoadRichMetadataToUdfIndex(*udfResolver, udfsPaths, false, TUdfIndex::EOverrideMode::RaiseError, *udfIndex);
  68. Cerr << TInstant::Now().ToStringLocalUpToSeconds() << " UdfIndex done." << Endl;
  69. udfResolver = NCommon::CreateUdfResolverWithIndex(udfIndex, udfResolver, fileStorage);
  70. Cerr << TInstant::Now().ToStringLocalUpToSeconds() << " Udfs scanned" << Endl;
  71. return;
  72. }
  73. udfResolver = fileStorage && udfResolverPath
  74. ? NCommon::CreateOutProcUdfResolver(funcRegistry.Get(), fileStorage, udfResolverPath, {}, {}, false, {})
  75. : NCommon::CreateSimpleUdfResolver(funcRegistry.Get(), fileStorage, true);
  76. }
  77. template <typename TMessage>
  78. THolder<TMessage> ParseProtoConfig(const TString& cfgFile) {
  79. auto config = MakeHolder<TMessage>();
  80. TString configData = TFileInput(cfgFile).ReadAll();;
  81. using ::google::protobuf::TextFormat;
  82. if (!TextFormat::ParseFromString(configData, config.Get())) {
  83. Cerr << "Bad format of gateways configuration";
  84. return {};
  85. }
  86. return config;
  87. }
  88. int RunUI(int argc, const char* argv[])
  89. {
  90. Cerr << "yqlrun ABI version: " << NKikimr::NUdf::CurrentAbiVersionStr() << Endl;
  91. NYql::NBacktrace::RegisterKikimrFatalActions();
  92. NYql::NBacktrace::EnableKikimrSymbolize();
  93. TVector<TString> udfsPaths;
  94. TString udfsDir;
  95. TString mountConfig;
  96. TVector<TString> filesMappingList;
  97. TString udfResolverPath;
  98. bool udfResolverFilterSyscalls = false;
  99. TString gatewaysCfgFile;
  100. TString fsCfgFile;
  101. TString pgExtConfig;
  102. THashMap<TString, TString> clusterMapping;
  103. clusterMapping["plato"] = YtProviderName;
  104. THashSet<TString> sqlFlags;
  105. NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
  106. opts.AddLongOption('u', "udf", "Load shared library with UDF by given path").AppendTo(&udfsPaths);
  107. opts.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory").StoreResult(&udfsDir);
  108. opts.AddLongOption('m', "mounts", "Mount points config file.").StoreResult(&mountConfig);
  109. opts.AddLongOption('f', "file", "name@path").AppendTo(&filesMappingList);
  110. opts.AddLongOption("udf-resolver", "Path to udf-resolver").Optional().RequiredArgument("PATH").StoreResult(&udfResolverPath);
  111. opts.AddLongOption("udf-resolver-filter-syscalls", "Filter syscalls in udf resolver")
  112. .Optional()
  113. .NoArgument()
  114. .SetFlag(&udfResolverFilterSyscalls);
  115. opts.AddLongOption("scan-udfs", "Scan specified udfs with external udf resolver to use static function registry").NoArgument();
  116. opts.AddLongOption('C', "cluster", "set cluster to service mapping").RequiredArgument("name@service").Handler(new TStoreMappingFunctor(&clusterMapping));
  117. opts.AddLongOption("gateways-cfg", "gateways configuration file").Optional().RequiredArgument("FILE").StoreResult(&gatewaysCfgFile);
  118. opts.AddLongOption("fs-cfg", "fs configuration file").Optional().RequiredArgument("FILE").StoreResult(&fsCfgFile);
  119. opts.AddLongOption("pg-ext", "pg extensions config file").StoreResult(&pgExtConfig);
  120. opts.AddLongOption("sql-flags", "SQL translator pragma flags").SplitHandler(&sqlFlags, ',');
  121. TServerConfig config;
  122. config.SetAssetsPath("http/www");
  123. config.InitCliOptions(opts);
  124. NLastGetopt::TOptsParseResult res(&opts, argc, argv);
  125. config.ParseFromCli(res);
  126. TUserDataTable userData;
  127. for (auto& s : filesMappingList) {
  128. TStringBuf fileName, filePath;
  129. TStringBuf(s).Split('@', fileName, filePath);
  130. if (fileName.empty() || filePath.empty()) {
  131. Cerr << "Incorrect file mapping, expected form name@path, e.g. MyFile@file.txt" << Endl;
  132. return 1;
  133. }
  134. auto& file = userData[TUserDataKey::File(GetDefaultFilePrefix() + fileName)];
  135. file.Type = EUserDataType::PATH;
  136. file.Data = filePath;
  137. }
  138. NMiniKQL::FindUdfsInDir(udfsDir, &udfsPaths);
  139. NPg::SetSqlLanguageParser(NSQLTranslationPG::CreateSqlLanguageParser());
  140. NPg::LoadSystemFunctions(*NSQLTranslationPG::CreateSystemFunctionsParser());
  141. if (!pgExtConfig.empty()) {
  142. auto config = ParseProtoConfig<NProto::TPgExtensions>(pgExtConfig);
  143. Y_ABORT_UNLESS(config);
  144. TVector<NPg::TExtensionDesc> extensions;
  145. PgExtensionsFromProto(*config, extensions);
  146. NPg::RegisterExtensions(extensions, false,
  147. *NSQLTranslationPG::CreateExtensionSqlParser(),
  148. NKikimr::NMiniKQL::CreateExtensionLoader().get());
  149. }
  150. NPg::GetSqlLanguageParser()->Freeze();
  151. THolder<TGatewaysConfig> gatewaysConfig;
  152. if (!gatewaysCfgFile.empty()) {
  153. gatewaysConfig = ParseProtoConfig<TGatewaysConfig>(gatewaysCfgFile);
  154. if (!gatewaysConfig) {
  155. return -1;
  156. }
  157. if (gatewaysConfig->HasSqlCore()) {
  158. sqlFlags.insert(gatewaysConfig->GetSqlCore().GetTranslationFlags().begin(), gatewaysConfig->GetSqlCore().GetTranslationFlags().end());
  159. }
  160. }
  161. THolder<TFileStorageConfig> fsConfig;
  162. if (!fsCfgFile.empty()) {
  163. fsConfig = ParseProtoConfig<TFileStorageConfig>(fsCfgFile);
  164. if (!fsConfig) {
  165. return 1;
  166. }
  167. } else {
  168. fsConfig = MakeHolder<TFileStorageConfig>();
  169. }
  170. auto fileStorage = WithAsync(CreateFileStorage(*fsConfig));
  171. IUdfResolver::TPtr udfResolver;
  172. auto funcRegistry = CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, CreateBuiltinRegistry(), false, udfsPaths);
  173. TUdfIndex::TPtr udfIndex;
  174. CommonInit(res, udfResolverPath, udfResolverFilterSyscalls, udfsPaths, fileStorage, udfResolver, funcRegistry, udfIndex);
  175. NSQLTranslation::TTranslators translators(
  176. nullptr,
  177. NSQLTranslationV1::MakeTranslator(),
  178. NSQLTranslationPG::MakeTranslator()
  179. );
  180. TExprContext ctx;
  181. ctx.NextUniqueId = NPg::GetSqlLanguageParser()->GetContext().NextUniqueId;
  182. IModuleResolver::TPtr moduleResolver;
  183. if (!mountConfig.empty()) {
  184. TModulesTable modules;
  185. auto mount = ParseProtoConfig<NYqlMountConfig::TMountConfig>(mountConfig);
  186. Y_ABORT_UNLESS(mount);
  187. FillUserDataTableFromFileSystem(*mount, userData);
  188. if (!CompileLibraries(translators, userData, ctx, modules)) {
  189. Cerr << "Errors on compile libraries:" << Endl;
  190. ctx.IssueManager.GetIssues().PrintTo(Cerr);
  191. return -1;
  192. }
  193. moduleResolver = std::make_shared<TModuleResolver>(translators, std::move(modules), ctx.NextUniqueId, clusterMapping, sqlFlags);
  194. } else {
  195. if (!GetYqlDefaultModuleResolver(ctx, moduleResolver, clusterMapping)) {
  196. Cerr << "Errors loading default YQL libraries:" << Endl;
  197. ctx.IssueManager.GetIssues().PrintTo(Cerr);
  198. return -1;
  199. }
  200. }
  201. TString fn = "pkg/a/b/c.sql";
  202. TString content0 = "$sqr = ($x) -> { return 2 * $x * $x; }; export $sqr;";
  203. TString content1 = "$sqr = ($x) -> { return 3 * $x * $x; }; export $sqr;";
  204. moduleResolver->RegisterPackage("a.b");
  205. if (!moduleResolver->AddFromMemory(fn, content0, ctx, 1, 0) || !moduleResolver->AddFromMemory(fn, content1, ctx, 1, 1)) {
  206. Cerr << "Unable to compile SQL library" << Endl;
  207. ctx.IssueManager.GetIssues().PrintTo(Cerr);
  208. return -1;
  209. }
  210. TExprContext::TFreezeGuard freezeGuard(ctx);
  211. NLog::YqlLoggerScope logger(new NLog::TTlsLogBackend(new TStreamLogBackend(&Cerr)));
  212. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::Core, NLog::ELevel::DEBUG);
  213. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CoreEval, NLog::ELevel::DEBUG);
  214. NLog::YqlLogger().SetComponentLevel(NLog::EComponent::CorePeepHole, NLog::ELevel::DEBUG);
  215. auto server = CreateYqlServer(config,
  216. funcRegistry.Get(), udfIndex, ctx.NextUniqueId,
  217. userData,
  218. std::move(gatewaysConfig),
  219. sqlFlags,
  220. moduleResolver, udfResolver, fileStorage);
  221. server->Start();
  222. server->Wait();
  223. return 0;
  224. }
  225. int main(int argc, const char *argv[]) {
  226. try {
  227. if (argc > 1 && TString(argv[1]) == TStringBuf("ui")) {
  228. return RunUI(argc, argv);
  229. } else {
  230. return NYql::TYqlRunTool().Main(argc, argv);
  231. }
  232. }
  233. catch (...) {
  234. Cerr << CurrentExceptionMessage() << Endl;
  235. return 1;
  236. }
  237. }