#include "mkql_function_registry.h" #include "mkql_utils.h" #include "mkql_type_builder.h" #include #include #include #include #include #include #include #include namespace { using namespace NKikimr; using namespace NMiniKQL; const char MODULE_NAME_DELIMITER = '.'; const char* RegisterFuncName = "Register"; const char* AbiVersionFuncName = "AbiVersion"; #if defined(_win_) || defined(_darwin_) const char* BindSymbolsFuncName = "BindSymbols"; #endif const char* SetBackTraceCallbackName = "SetBackTraceCallback"; ////////////////////////////////////////////////////////////////////////////// // TMutableFunctionRegistry ////////////////////////////////////////////////////////////////////////////// class TMutableFunctionRegistry: public IMutableFunctionRegistry { struct TUdfModule { TString LibraryPath; std::shared_ptr Impl; }; using TUdfModulesMap = THashMap; struct TUdfLibrary: public TThrRefBase { ui32 AbiVersion = 0; TDynamicLibrary Lib; TUdfLibrary() { } }; using TUdfLibraryPtr = TIntrusivePtr; class TUdfModuleLoader: public NUdf::IRegistrator { public: TUdfModuleLoader( TUdfModulesMap& modulesMap, THashSet* newModules, const TString& libraryPath, const TUdfModuleRemappings& remappings, ui32 abiVersion, const TString& customUdfPrefix = {}) : ModulesMap(modulesMap) , NewModules(newModules) , LibraryPath(libraryPath) , Remappings(remappings) , AbiVersion(NUdf::AbiVersionToStr(abiVersion)) , CustomUdfPrefix(customUdfPrefix) { } void AddModule( const NUdf::TStringRef& name, NUdf::TUniquePtr module) override { Y_DEBUG_ABORT_UNLESS(module, "Module is empty"); if (!HasError()) { TUdfModule m; m.LibraryPath = LibraryPath; m.Impl.reset(module.Release()); auto it = Remappings.find(name); const TString& newName = CustomUdfPrefix + ((it == Remappings.end()) ? TString(name) : it->second); auto i = ModulesMap.insert({ newName, std::move(m) }); if (!i.second) { TUdfModule* oldModule = ModulesMap.FindPtr(newName); Y_DEBUG_ABORT_UNLESS(oldModule != nullptr); Error = (TStringBuilder() << "UDF module duplication: name " << TStringBuf(name) << ", already loaded from " << oldModule->LibraryPath << ", trying to load from " << LibraryPath); } else if (NewModules) { NewModules->insert(newName); } } } const TString& GetError() const { return Error; } bool HasError() const { return !Error.empty(); } private: TUdfModulesMap& ModulesMap; THashSet* NewModules; const TString LibraryPath; const TUdfModuleRemappings& Remappings; const TString AbiVersion; TString Error; const TString CustomUdfPrefix; }; public: TMutableFunctionRegistry(IBuiltinFunctionRegistry::TPtr builtins) : Builtins_(std::move(builtins)) { } TMutableFunctionRegistry(const TMutableFunctionRegistry& rhs) : Builtins_(rhs.Builtins_) , LoadedLibraries_(rhs.LoadedLibraries_) , UdfModules_(rhs.UdfModules_) , SupportsSizedAllocators_(rhs.SupportsSizedAllocators_) { } ~TMutableFunctionRegistry() { } void AllowUdfPatch() override { } void LoadUdfs( const TString& libraryPath, const TUdfModuleRemappings& remmapings, ui32 flags = 0, const TString& customUdfPrefix = {}, THashSet* modules = nullptr) override { TUdfLibraryPtr lib; auto libIt = LoadedLibraries_.find(libraryPath); if (libIt == LoadedLibraries_.end()) { lib = MakeIntrusive(); #ifdef _win32_ ui32 loadFlags = 0; #else ui32 loadFlags = RTLD_GLOBAL | ((flags & NUdf::IRegistrator::TFlags::TypesOnly) ? RTLD_LAZY : RTLD_NOW); #endif TPathSplit absPathSplit(libraryPath); TString absPath = libraryPath; if (!absPathSplit.IsAbsolute) { absPath = JoinPaths(TFsPath::Cwd().PathSplit(), absPathSplit); } lib->Lib.Open(absPath.data(), loadFlags); lib->Lib.SetUnloadable(false); // (1) check ABI version auto abiVersionFunc = reinterpret_cast( lib->Lib.SymOptional(AbiVersionFuncName)); if (!abiVersionFunc) { return; } ui32 version = abiVersionFunc(); Y_ENSURE(NUdf::IsAbiCompatible(version) && version >= NUdf::MakeAbiVersion(2, 8, 0), "Non compatible ABI version of UDF library " << libraryPath << ", expected up to " << NUdf::AbiVersionToStr(NUdf::CurrentCompatibilityAbiVersion() * 100) << ", got " << NUdf::AbiVersionToStr(version) << "; try to re-compile library using " << "YQL_ABI_VERSION(" << UDF_ABI_VERSION_MAJOR << " " << UDF_ABI_VERSION_MINOR << " 0) macro in ya.make"); lib->AbiVersion = version; if (version < NUdf::MakeAbiVersion(2, 8, 0)) { SupportsSizedAllocators_ = false; } #if defined(_win_) || defined(_darwin_) auto bindSymbolsFunc = reinterpret_cast(lib->Lib.Sym(BindSymbolsFuncName)); bindSymbolsFunc(NUdf::GetStaticSymbols()); #endif if (BackTraceCallback_) { auto setter = reinterpret_cast(lib->Lib.SymOptional(SetBackTraceCallbackName)); if (setter) { setter(BackTraceCallback_); } } libIt = LoadedLibraries_.insert({ libraryPath, lib }).first; } else { lib = libIt->second; } // (2) ensure that Register() func is present auto registerFunc = reinterpret_cast( lib->Lib.Sym(RegisterFuncName)); // (3) do load THashSet newModules; TUdfModuleLoader loader( UdfModules_, &newModules, libraryPath, remmapings, lib->AbiVersion, customUdfPrefix); registerFunc(loader, flags); Y_ENSURE(!loader.HasError(), loader.GetError()); if (modules) { *modules = std::move(newModules); } } void AddModule( const TStringBuf& libraryPath, const TStringBuf& moduleName, NUdf::TUniquePtr module) override { TString libraryPathStr(libraryPath); auto inserted = LoadedLibraries_.insert({ libraryPathStr, nullptr }); if (!inserted.second) { return; } TUdfModuleRemappings remappings; TUdfModuleLoader loader( UdfModules_, nullptr, libraryPathStr, remappings, NUdf::CurrentAbiVersion()); loader.AddModule(moduleName, std::move(module)); Y_ENSURE(!loader.HasError(), loader.GetError()); } void SetSystemModulePaths(const TUdfModulePathsMap& paths) override { SystemModulePaths_ = paths; } const IBuiltinFunctionRegistry::TPtr& GetBuiltins() const override { return Builtins_; } TStatus FindFunctionTypeInfo( const TTypeEnvironment& env, NUdf::ITypeInfoHelper::TPtr typeInfoHelper, NUdf::ICountersProvider* countersProvider, const TStringBuf& name, TType* userType, const TStringBuf& typeConfig, ui32 flags, const NUdf::TSourcePosition& pos, const NUdf::ISecureParamsProvider* secureParamsProvider, TFunctionTypeInfo* funcInfo) const override { TStringBuf moduleName, funcName; if (name.TrySplit(MODULE_NAME_DELIMITER, moduleName, funcName)) { auto it = UdfModules_.find(moduleName); if (it != UdfModules_.end()) { TFunctionTypeInfoBuilder typeInfoBuilder(env, typeInfoHelper, moduleName, (flags & NUdf::IUdfModule::TFlags::TypesOnly) ? nullptr : countersProvider, pos, secureParamsProvider); const auto& module = *it->second.Impl; module.BuildFunctionTypeInfo( funcName, userType, typeConfig, flags, typeInfoBuilder); if (typeInfoBuilder.HasError()) { return TStatus::Error() << "Module: " << moduleName << ", function: " << funcName << ", error: " << typeInfoBuilder.GetError(); } try { typeInfoBuilder.Build(funcInfo); } catch (yexception& e) { return TStatus::Error() << "Module: " << moduleName << ", function: " << funcName << ", error: " << e.what(); } if ((flags & NUdf::IRegistrator::TFlags::TypesOnly) && !funcInfo->FunctionType) { return TStatus::Error() << "Module: " << moduleName << ", function: " << funcName << ", function not found"; } if (funcInfo->ModuleIRUniqID) { funcInfo->ModuleIRUniqID.prepend(moduleName); } return TStatus::Ok(); } return TStatus::Error() << "Module " << moduleName << " is not registered"; } return TStatus::Error() << "Function name must be in . scheme. " << "But get " << name; } TMaybe FindUdfPath(const TStringBuf& moduleName) const override { if (const TUdfModule* udf = UdfModules_.FindPtr(moduleName)) { return udf->LibraryPath; } if (const TString* path = SystemModulePaths_.FindPtr(moduleName)) { return *path; } return Nothing(); } bool IsLoadedUdfModule(const TStringBuf& moduleName) const override { return UdfModules_.contains(moduleName); } THashSet GetAllModuleNames() const override { THashSet names; names.reserve(UdfModules_.size()); for (const auto& module: UdfModules_) { names.insert(module.first); } return names; } TFunctionsMap GetModuleFunctions(const TStringBuf& moduleName) const override { struct TFunctionNamesSink: public NUdf::IFunctionNamesSink { TFunctionsMap Functions; class TFuncDescriptor : public NUdf::IFunctionDescriptor { public: TFuncDescriptor(TFunctionProperties& properties) : Properties(properties) {} private: void SetTypeAwareness() final { Properties.IsTypeAwareness = true; } TFunctionProperties& Properties; }; NUdf::IFunctionDescriptor::TPtr Add(const NUdf::TStringRef& name) final { const auto it = Functions.emplace(name, TFunctionProperties{}); return new TFuncDescriptor(it.first->second); } } sink; const auto it = UdfModules_.find(moduleName); if (UdfModules_.cend() == it) return TFunctionsMap(); it->second.Impl->GetAllFunctions(sink); return sink.Functions; } bool SupportsSizedAllocators() const override { return SupportsSizedAllocators_; } void PrintInfoTo(IOutputStream& out) const override { Builtins_->PrintInfoTo(out); } void CleanupModulesOnTerminate() const override { for (const auto& module : UdfModules_) { module.second.Impl->CleanupOnTerminate(); } } TIntrusivePtr Clone() const override { return new TMutableFunctionRegistry(*this); } void SetBackTraceCallback(NUdf::TBackTraceCallback callback) override { BackTraceCallback_ = callback; } private: const IBuiltinFunctionRegistry::TPtr Builtins_; THashMap LoadedLibraries_; TUdfModulesMap UdfModules_; THolder UdfMemoryInfo_; TUdfModulePathsMap SystemModulePaths_; NUdf::TBackTraceCallback BackTraceCallback_ = nullptr; bool SupportsSizedAllocators_ = true; }; ////////////////////////////////////////////////////////////////////////////// // TBuiltinsWrapper ////////////////////////////////////////////////////////////////////////////// class TBuiltinsWrapper: public IFunctionRegistry { public: TBuiltinsWrapper(IBuiltinFunctionRegistry::TPtr&& builtins) : Builtins_(std::move(builtins)) { } const IBuiltinFunctionRegistry::TPtr& GetBuiltins() const override { return Builtins_; } void AllowUdfPatch() override { } TStatus FindFunctionTypeInfo( const TTypeEnvironment& env, NUdf::ITypeInfoHelper::TPtr typeInfoHelper, NUdf::ICountersProvider* countersProvider, const TStringBuf& name, TType* userType, const TStringBuf& typeConfig, ui32 flags, const NUdf::TSourcePosition& pos, const NUdf::ISecureParamsProvider* secureParamsProvider, TFunctionTypeInfo* funcInfo) const override { Y_UNUSED(env); Y_UNUSED(typeInfoHelper); Y_UNUSED(countersProvider); Y_UNUSED(name); Y_UNUSED(userType); Y_UNUSED(typeConfig); Y_UNUSED(flags); Y_UNUSED(pos); Y_UNUSED(secureParamsProvider); Y_UNUSED(funcInfo); return TStatus::Error(TStringBuf("Unsupported access to builtins registry")); } TMaybe FindUdfPath( const TStringBuf& /* moduleName */) const override { return{}; } bool IsLoadedUdfModule(const TStringBuf& /* moduleName */) const override { return false; } THashSet GetAllModuleNames() const override { return {}; } TFunctionsMap GetModuleFunctions(const TStringBuf&) const override { return TFunctionsMap(); } bool SupportsSizedAllocators() const override { return true; } void PrintInfoTo(IOutputStream& out) const override { Builtins_->PrintInfoTo(out); } void CleanupModulesOnTerminate() const override { } TIntrusivePtr Clone() const override { return new TMutableFunctionRegistry(Builtins_); } private: const IBuiltinFunctionRegistry::TPtr Builtins_; }; } // namespace namespace NKikimr { namespace NMiniKQL { void FindUdfsInDir(const TString& dirPath, TVector* paths) { static const TStringBuf libPrefix = TStringBuf(MKQL_UDF_LIB_PREFIX); static const TStringBuf libSuffix = TStringBuf(MKQL_UDF_LIB_SUFFIX); if (!dirPath.empty()) { std::vector dirs; StringSplitter(dirPath).Split(';').AddTo(&dirs); for (auto d : dirs) { TDirIterator dir(d, TDirIterator::TOptions(FTS_LOGICAL).SetMaxLevel(10)); for (auto file = dir.begin(), end = dir.end(); file != end; ++file) { // skip entries with empty name, and all non-files // all valid symlinks are already dereferenced, provided by FTS_LOGICAL if (file->fts_pathlen == file->fts_namelen || file->fts_info != FTS_F) { continue; } TString path(file->fts_path); TString fileName = GetBaseName(path); // skip non shared libraries if (!fileName.StartsWith(libPrefix) || !fileName.EndsWith(libSuffix)) { continue; } // skip test udfs when scanning dir auto udfName = TStringBuf(fileName).Skip(libPrefix.length()); if (udfName.StartsWith(TStringBuf("test_"))) { continue; } paths->push_back(std::move(path)); } } } } bool SplitModuleAndFuncName(const TStringBuf& name, TStringBuf& module, TStringBuf& func) { return name.TrySplit(MODULE_NAME_DELIMITER, module, func); } TString FullName(const TStringBuf& module, const TStringBuf& func) { TString fullName; fullName.reserve(module.size() + func.size() + 1); fullName.append(module); fullName.append(MODULE_NAME_DELIMITER); fullName.append(func); return fullName; } TIntrusivePtr CreateFunctionRegistry(IBuiltinFunctionRegistry::TPtr&& builtins) { return new TBuiltinsWrapper(std::move(builtins)); } TIntrusivePtr CreateFunctionRegistry( NKikimr::NUdf::TBackTraceCallback backtraceCallback, IBuiltinFunctionRegistry::TPtr&& builtins, bool allowUdfPatch, const TVector& udfsPaths, ui32 flags /* = 0 */) { auto registry = MakeHolder(std::move(builtins)); if (allowUdfPatch) { registry->AllowUdfPatch(); } registry->SetBackTraceCallback(backtraceCallback); // system UDFs loaded with default names TUdfModuleRemappings remappings; THashSet usedUdfPaths; for (const TString& udfPath: udfsPaths) { if (usedUdfPaths.insert(udfPath).second) { registry->LoadUdfs(udfPath, remappings, flags); } } return registry.Release(); } void FillStaticModules(IMutableFunctionRegistry& registry) { for (const auto& wrapper : NUdf::GetStaticUdfModuleWrapperList()) { auto [name, ptr] = wrapper(); registry.AddModule(TString(StaticModulePrefix) + name, name, std::move(ptr)); } } } // namespace NMiniKQL } // namespace NKiki