mkql_function_registry.cpp 19 KB


  1. #include "mkql_function_registry.h"
  2. #include "mkql_utils.h"
  3. #include "mkql_type_builder.h"
  4. #include <yql/essentials/public/udf/udf_static_registry.h>
  5. #include <util/folder/iterator.h>
  6. #include <util/folder/dirut.h>
  7. #include <util/folder/path.h>
  8. #include <util/system/dynlib.h>
  9. #include <util/stream/str.h>
  10. #include <util/string/builder.h>
  11. #include <util/string/split.h>
  12. namespace {
  13. using namespace NKikimr;
  14. using namespace NMiniKQL;
  15. const char MODULE_NAME_DELIMITER = '.';
  16. const char* RegisterFuncName = "Register";
  17. const char* AbiVersionFuncName = "AbiVersion";
  18. #if defined(_win_) || defined(_darwin_)
  19. const char* BindSymbolsFuncName = "BindSymbols";
  20. #endif
  21. const char* SetBackTraceCallbackName = "SetBackTraceCallback";
  22. //////////////////////////////////////////////////////////////////////////////
  23. // TMutableFunctionRegistry
  24. //////////////////////////////////////////////////////////////////////////////
  25. class TMutableFunctionRegistry: public IMutableFunctionRegistry
  26. {
  27. struct TUdfModule {
  28. TString LibraryPath;
  29. std::shared_ptr<NUdf::IUdfModule> Impl;
  30. };
  31. using TUdfModulesMap = THashMap<TString, TUdfModule>;
  32. struct TUdfLibrary: public TThrRefBase {
  33. ui32 AbiVersion = 0;
  34. TDynamicLibrary Lib;
  35. TUdfLibrary()
  36. {
  37. }
  38. };
  39. using TUdfLibraryPtr = TIntrusivePtr<TUdfLibrary>;
  40. class TUdfModuleLoader: public NUdf::IRegistrator {
  41. public:
  42. TUdfModuleLoader(
  43. TUdfModulesMap& modulesMap,
  44. THashSet<TString>* newModules,
  45. const TString& libraryPath,
  46. const TUdfModuleRemappings& remappings,
  47. ui32 abiVersion,
  48. const TString& customUdfPrefix = {})
  49. : ModulesMap(modulesMap)
  50. , NewModules(newModules)
  51. , LibraryPath(libraryPath)
  52. , Remappings(remappings)
  53. , AbiVersion(NUdf::AbiVersionToStr(abiVersion))
  54. , CustomUdfPrefix(customUdfPrefix)
  55. {
  56. }
  57. void AddModule(
  58. const NUdf::TStringRef& name,
  59. NUdf::TUniquePtr<NUdf::IUdfModule> module) override
  60. {
  61. Y_DEBUG_ABORT_UNLESS(module, "Module is empty");
  62. if (!HasError()) {
  63. TUdfModule m;
  64. m.LibraryPath = LibraryPath;
  65. m.Impl.reset(module.Release());
  66. auto it = Remappings.find(name);
  67. const TString& newName = CustomUdfPrefix
  68. + ((it == Remappings.end())
  69. ? TString(name)
  70. : it->second);
  71. auto i = ModulesMap.insert({ newName, std::move(m) });
  72. if (!i.second) {
  73. TUdfModule* oldModule = ModulesMap.FindPtr(newName);
  74. Y_DEBUG_ABORT_UNLESS(oldModule != nullptr);
  75. Error = (TStringBuilder()
  76. << "UDF module duplication: name " << TStringBuf(name)
  77. << ", already loaded from " << oldModule->LibraryPath
  78. << ", trying to load from " << LibraryPath);
  79. } else if (NewModules) {
  80. NewModules->insert(newName);
  81. }
  82. }
  83. }
  84. const TString& GetError() const { return Error; }
  85. bool HasError() const { return !Error.empty(); }
  86. private:
  87. TUdfModulesMap& ModulesMap;
  88. THashSet<TString>* NewModules;
  89. const TString LibraryPath;
  90. const TUdfModuleRemappings& Remappings;
  91. const TString AbiVersion;
  92. TString Error;
  93. const TString CustomUdfPrefix;
  94. };
  95. public:
  96. TMutableFunctionRegistry(IBuiltinFunctionRegistry::TPtr builtins)
  97. : Builtins_(std::move(builtins))
  98. {
  99. }
  100. TMutableFunctionRegistry(const TMutableFunctionRegistry& rhs)
  101. : Builtins_(rhs.Builtins_)
  102. , LoadedLibraries_(rhs.LoadedLibraries_)
  103. , UdfModules_(rhs.UdfModules_)
  104. , SupportsSizedAllocators_(rhs.SupportsSizedAllocators_)
  105. {
  106. }
  107. ~TMutableFunctionRegistry() {
  108. }
  109. void AllowUdfPatch() override {
  110. }
  111. void LoadUdfs(
  112. const TString& libraryPath,
  113. const TUdfModuleRemappings& remmapings,
  114. ui32 flags = 0,
  115. const TString& customUdfPrefix = {},
  116. THashSet<TString>* modules = nullptr) override
  117. {
  118. TUdfLibraryPtr lib;
  119. auto libIt = LoadedLibraries_.find(libraryPath);
  120. if (libIt == LoadedLibraries_.end()) {
  121. lib = MakeIntrusive<TUdfLibrary>();
  122. #ifdef _win32_
  123. ui32 loadFlags = 0;
  124. #else
  125. ui32 loadFlags = RTLD_GLOBAL | ((flags & NUdf::IRegistrator::TFlags::TypesOnly) ? RTLD_LAZY : RTLD_NOW);
  126. #endif
  127. TPathSplit absPathSplit(libraryPath);
  128. TString absPath = libraryPath;
  129. if (!absPathSplit.IsAbsolute) {
  130. absPath = JoinPaths(TFsPath::Cwd().PathSplit(), absPathSplit);
  131. }
  132. lib->Lib.Open(absPath.data(), loadFlags);
  133. lib->Lib.SetUnloadable(false);
  134. // (1) check ABI version
  135. auto abiVersionFunc = reinterpret_cast<NUdf::TAbiVersionFunctionPtr>(
  136. lib->Lib.SymOptional(AbiVersionFuncName));
  137. if (!abiVersionFunc) {
  138. return;
  139. }
  140. ui32 version = abiVersionFunc();
  141. Y_ENSURE(NUdf::IsAbiCompatible(version) && version >= NUdf::MakeAbiVersion(2, 8, 0),
  142. "Non compatible ABI version of UDF library " << libraryPath
  143. << ", expected up to " << NUdf::AbiVersionToStr(NUdf::CurrentCompatibilityAbiVersion() * 100)
  144. << ", got " << NUdf::AbiVersionToStr(version)
  145. << "; try to re-compile library using "
  146. << "YQL_ABI_VERSION(" << UDF_ABI_VERSION_MAJOR
  147. << " " << UDF_ABI_VERSION_MINOR << " 0) macro in ya.make");
  148. lib->AbiVersion = version;
  149. if (version < NUdf::MakeAbiVersion(2, 8, 0)) {
  150. SupportsSizedAllocators_ = false;
  151. }
  152. #if defined(_win_) || defined(_darwin_)
  153. auto bindSymbolsFunc = reinterpret_cast<NUdf::TBindSymbolsFunctionPtr>(lib->Lib.Sym(BindSymbolsFuncName));
  154. bindSymbolsFunc(NUdf::GetStaticSymbols());
  155. #endif
  156. if (BackTraceCallback_) {
  157. auto setter = reinterpret_cast<NUdf::TSetBackTraceCallbackPtr>(lib->Lib.SymOptional(SetBackTraceCallbackName));
  158. if (setter) {
  159. setter(BackTraceCallback_);
  160. }
  161. }
  162. libIt = LoadedLibraries_.insert({ libraryPath, lib }).first;
  163. } else {
  164. lib = libIt->second;
  165. }
  166. // (2) ensure that Register() func is present
  167. auto registerFunc = reinterpret_cast<NUdf::TRegisterFunctionPtr>(
  168. lib->Lib.Sym(RegisterFuncName));
  169. // (3) do load
  170. THashSet<TString> newModules;
  171. TUdfModuleLoader loader(
  172. UdfModules_,
  173. &newModules,
  174. libraryPath,
  175. remmapings,
  176. lib->AbiVersion, customUdfPrefix);
  177. registerFunc(loader, flags);
  178. Y_ENSURE(!loader.HasError(), loader.GetError());
  179. if (modules) {
  180. *modules = std::move(newModules);
  181. }
  182. }
  183. void AddModule(
  184. const TStringBuf& libraryPath,
  185. const TStringBuf& moduleName,
  186. NUdf::TUniquePtr<NUdf::IUdfModule> module) override
  187. {
  188. TString libraryPathStr(libraryPath);
  189. auto inserted = LoadedLibraries_.insert({ libraryPathStr, nullptr });
  190. if (!inserted.second) {
  191. return;
  192. }
  193. TUdfModuleRemappings remappings;
  194. TUdfModuleLoader loader(
  195. UdfModules_, nullptr, libraryPathStr,
  196. remappings, NUdf::CurrentAbiVersion());
  197. loader.AddModule(moduleName, std::move(module));
  198. Y_ENSURE(!loader.HasError(), loader.GetError());
  199. }
  200. void SetSystemModulePaths(const TUdfModulePathsMap& paths) override {
  201. SystemModulePaths_ = paths;
  202. }
  203. const IBuiltinFunctionRegistry::TPtr& GetBuiltins() const override {
  204. return Builtins_;
  205. }
  206. TStatus FindFunctionTypeInfo(
  207. const TTypeEnvironment& env,
  208. NUdf::ITypeInfoHelper::TPtr typeInfoHelper,
  209. NUdf::ICountersProvider* countersProvider,
  210. const TStringBuf& name,
  211. TType* userType,
  212. const TStringBuf& typeConfig,
  213. ui32 flags,
  214. const NUdf::TSourcePosition& pos,
  215. const NUdf::ISecureParamsProvider* secureParamsProvider,
  216. TFunctionTypeInfo* funcInfo) const override
  217. {
  218. TStringBuf moduleName, funcName;
  219. if (name.TrySplit(MODULE_NAME_DELIMITER, moduleName, funcName)) {
  220. auto it = UdfModules_.find(moduleName);
  221. if (it != UdfModules_.end()) {
  222. TFunctionTypeInfoBuilder typeInfoBuilder(env, typeInfoHelper, moduleName,
  223. (flags & NUdf::IUdfModule::TFlags::TypesOnly) ? nullptr : countersProvider, pos, secureParamsProvider);
  224. const auto& module = *it->second.Impl;
  225. module.BuildFunctionTypeInfo(
  226. funcName, userType, typeConfig, flags, typeInfoBuilder);
  227. if (typeInfoBuilder.HasError()) {
  228. return TStatus::Error()
  229. << "Module: " << moduleName
  230. << ", function: " << funcName
  231. << ", error: " << typeInfoBuilder.GetError();
  232. }
  233. try {
  234. typeInfoBuilder.Build(funcInfo);
  235. }
  236. catch (yexception& e) {
  237. return TStatus::Error()
  238. << "Module: " << moduleName
  239. << ", function: " << funcName
  240. << ", error: " << e.what();
  241. }
  242. if ((flags & NUdf::IRegistrator::TFlags::TypesOnly) &&
  243. !funcInfo->FunctionType)
  244. {
  245. return TStatus::Error()
  246. << "Module: " << moduleName
  247. << ", function: " << funcName
  248. << ", function not found";
  249. }
  250. if (funcInfo->ModuleIRUniqID) {
  251. funcInfo->ModuleIRUniqID.prepend(moduleName);
  252. }
  253. return TStatus::Ok();
  254. }
  255. return TStatus::Error()
  256. << "Module " << moduleName << " is not registered";
  257. }
  258. return TStatus::Error()
  259. << "Function name must be in <module>.<func_name> scheme. "
  260. << "But get " << name;
  261. }
  262. TMaybe<TString> FindUdfPath(const TStringBuf& moduleName) const override {
  263. if (const TUdfModule* udf = UdfModules_.FindPtr(moduleName)) {
  264. return udf->LibraryPath;
  265. }
  266. if (const TString* path = SystemModulePaths_.FindPtr(moduleName)) {
  267. return *path;
  268. }
  269. return Nothing();
  270. }
  271. bool IsLoadedUdfModule(const TStringBuf& moduleName) const override {
  272. return UdfModules_.contains(moduleName);
  273. }
  274. THashSet<TString> GetAllModuleNames() const override {
  275. THashSet<TString> names;
  276. names.reserve(UdfModules_.size());
  277. for (const auto& module: UdfModules_) {
  278. names.insert(module.first);
  279. }
  280. return names;
  281. }
  282. TFunctionsMap GetModuleFunctions(const TStringBuf& moduleName) const override {
  283. struct TFunctionNamesSink: public NUdf::IFunctionNamesSink {
  284. TFunctionsMap Functions;
  285. class TFuncDescriptor : public NUdf::IFunctionDescriptor {
  286. public:
  287. TFuncDescriptor(TFunctionProperties& properties)
  288. : Properties(properties)
  289. {}
  290. private:
  291. void SetTypeAwareness() final {
  292. Properties.IsTypeAwareness = true;
  293. }
  294. TFunctionProperties& Properties;
  295. };
  296. NUdf::IFunctionDescriptor::TPtr Add(const NUdf::TStringRef& name) final {
  297. const auto it = Functions.emplace(name, TFunctionProperties{});
  298. return new TFuncDescriptor(it.first->second);
  299. }
  300. } sink;
  301. const auto it = UdfModules_.find(moduleName);
  302. if (UdfModules_.cend() == it)
  303. return TFunctionsMap();
  304. it->second.Impl->GetAllFunctions(sink);
  305. return sink.Functions;
  306. }
  307. bool SupportsSizedAllocators() const override {
  308. return SupportsSizedAllocators_;
  309. }
  310. void PrintInfoTo(IOutputStream& out) const override {
  311. Builtins_->PrintInfoTo(out);
  312. }
  313. void CleanupModulesOnTerminate() const override {
  314. for (const auto& module : UdfModules_) {
  315. module.second.Impl->CleanupOnTerminate();
  316. }
  317. }
  318. TIntrusivePtr<IMutableFunctionRegistry> Clone() const override {
  319. return new TMutableFunctionRegistry(*this);
  320. }
  321. void SetBackTraceCallback(NUdf::TBackTraceCallback callback) override {
  322. BackTraceCallback_ = callback;
  323. }
  324. private:
  325. const IBuiltinFunctionRegistry::TPtr Builtins_;
  326. THashMap<TString, TUdfLibraryPtr> LoadedLibraries_;
  327. TUdfModulesMap UdfModules_;
  328. THolder<TMemoryUsageInfo> UdfMemoryInfo_;
  329. TUdfModulePathsMap SystemModulePaths_;
  330. NUdf::TBackTraceCallback BackTraceCallback_ = nullptr;
  331. bool SupportsSizedAllocators_ = true;
  332. };
  333. //////////////////////////////////////////////////////////////////////////////
  334. // TBuiltinsWrapper
  335. //////////////////////////////////////////////////////////////////////////////
  336. class TBuiltinsWrapper: public IFunctionRegistry
  337. {
  338. public:
  339. TBuiltinsWrapper(IBuiltinFunctionRegistry::TPtr&& builtins)
  340. : Builtins_(std::move(builtins))
  341. {
  342. }
  343. const IBuiltinFunctionRegistry::TPtr& GetBuiltins() const override {
  344. return Builtins_;
  345. }
  346. void AllowUdfPatch() override {
  347. }
  348. TStatus FindFunctionTypeInfo(
  349. const TTypeEnvironment& env,
  350. NUdf::ITypeInfoHelper::TPtr typeInfoHelper,
  351. NUdf::ICountersProvider* countersProvider,
  352. const TStringBuf& name,
  353. TType* userType,
  354. const TStringBuf& typeConfig,
  355. ui32 flags,
  356. const NUdf::TSourcePosition& pos,
  357. const NUdf::ISecureParamsProvider* secureParamsProvider,
  358. TFunctionTypeInfo* funcInfo) const override
  359. {
  360. Y_UNUSED(env);
  361. Y_UNUSED(typeInfoHelper);
  362. Y_UNUSED(countersProvider);
  363. Y_UNUSED(name);
  364. Y_UNUSED(userType);
  365. Y_UNUSED(typeConfig);
  366. Y_UNUSED(flags);
  367. Y_UNUSED(pos);
  368. Y_UNUSED(secureParamsProvider);
  369. Y_UNUSED(funcInfo);
  370. return TStatus::Error(TStringBuf("Unsupported access to builtins registry"));
  371. }
  372. TMaybe<TString> FindUdfPath(
  373. const TStringBuf& /* moduleName */) const override
  374. {
  375. return{};
  376. }
  377. bool IsLoadedUdfModule(const TStringBuf& /* moduleName */) const override {
  378. return false;
  379. }
  380. THashSet<TString> GetAllModuleNames() const override {
  381. return {};
  382. }
  383. TFunctionsMap GetModuleFunctions(const TStringBuf&) const override {
  384. return TFunctionsMap();
  385. }
  386. bool SupportsSizedAllocators() const override {
  387. return true;
  388. }
  389. void PrintInfoTo(IOutputStream& out) const override {
  390. Builtins_->PrintInfoTo(out);
  391. }
  392. void CleanupModulesOnTerminate() const override {
  393. }
  394. TIntrusivePtr<IMutableFunctionRegistry> Clone() const override {
  395. return new TMutableFunctionRegistry(Builtins_);
  396. }
  397. private:
  398. const IBuiltinFunctionRegistry::TPtr Builtins_;
  399. };
  400. } // namespace
  401. namespace NKikimr {
  402. namespace NMiniKQL {
  403. void FindUdfsInDir(const TString& dirPath, TVector<TString>* paths)
  404. {
  405. static const TStringBuf libPrefix = TStringBuf(MKQL_UDF_LIB_PREFIX);
  406. static const TStringBuf libSuffix = TStringBuf(MKQL_UDF_LIB_SUFFIX);
  407. if (!dirPath.empty()) {
  408. std::vector<TString> dirs;
  409. StringSplitter(dirPath).Split(';').AddTo(&dirs);
  410. for (auto d : dirs) {
  411. TDirIterator dir(d, TDirIterator::TOptions(FTS_LOGICAL).SetMaxLevel(10));
  412. for (auto file = dir.begin(), end = dir.end(); file != end; ++file) {
  413. // skip entries with empty name, and all non-files
  414. // all valid symlinks are already dereferenced, provided by FTS_LOGICAL
  415. if (file->fts_pathlen == file->fts_namelen || file->fts_info != FTS_F) {
  416. continue;
  417. }
  418. TString path(file->fts_path);
  419. TString fileName = GetBaseName(path);
  420. // skip non shared libraries
  421. if (!fileName.StartsWith(libPrefix) ||
  422. !fileName.EndsWith(libSuffix))
  423. {
  424. continue;
  425. }
  426. // skip test udfs when scanning dir
  427. auto udfName = TStringBuf(fileName).Skip(libPrefix.length());
  428. if (udfName.StartsWith(TStringBuf("test_"))) {
  429. continue;
  430. }
  431. paths->push_back(std::move(path));
  432. }
  433. }
  434. }
  435. }
  436. bool SplitModuleAndFuncName(const TStringBuf& name, TStringBuf& module, TStringBuf& func)
  437. {
  438. return name.TrySplit(MODULE_NAME_DELIMITER, module, func);
  439. }
  440. TString FullName(const TStringBuf& module, const TStringBuf& func)
  441. {
  442. TString fullName;
  443. fullName.reserve(module.size() + func.size() + 1);
  444. fullName.append(module);
  445. fullName.append(MODULE_NAME_DELIMITER);
  446. fullName.append(func);
  447. return fullName;
  448. }
  449. TIntrusivePtr<IFunctionRegistry> CreateFunctionRegistry(IBuiltinFunctionRegistry::TPtr&& builtins)
  450. {
  451. return new TBuiltinsWrapper(std::move(builtins));
  452. }
  453. TIntrusivePtr<IFunctionRegistry> CreateFunctionRegistry(
  454. NKikimr::NUdf::TBackTraceCallback backtraceCallback,
  455. IBuiltinFunctionRegistry::TPtr&& builtins,
  456. bool allowUdfPatch,
  457. const TVector<TString>& udfsPaths,
  458. ui32 flags /* = 0 */)
  459. {
  460. auto registry = MakeHolder<TMutableFunctionRegistry>(std::move(builtins));
  461. if (allowUdfPatch) {
  462. registry->AllowUdfPatch();
  463. }
  464. registry->SetBackTraceCallback(backtraceCallback);
  465. // system UDFs loaded with default names
  466. TUdfModuleRemappings remappings;
  467. THashSet<TString> usedUdfPaths;
  468. for (const TString& udfPath: udfsPaths) {
  469. if (usedUdfPaths.insert(udfPath).second) {
  470. registry->LoadUdfs(udfPath, remappings, flags);
  471. }
  472. }
  473. return registry.Release();
  474. }
  475. void FillStaticModules(IMutableFunctionRegistry& registry) {
  476. for (const auto& wrapper : NUdf::GetStaticUdfModuleWrapperList()) {
  477. auto [name, ptr] = wrapper();
  478. registry.AddModule(TString(StaticModulePrefix) + name, name, std::move(ptr));
  479. }
  480. }
  481. } // namespace NMiniKQL
  482. } // namespace NKiki