123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573 |
- #include "mkql_function_registry.h"
- #include "mkql_utils.h"
- #include "mkql_type_builder.h"
- #include <yql/essentials/public/udf/udf_static_registry.h>
- #include <util/folder/iterator.h>
- #include <util/folder/dirut.h>
- #include <util/folder/path.h>
- #include <util/system/dynlib.h>
- #include <util/stream/str.h>
- #include <util/string/builder.h>
- #include <util/string/split.h>
- namespace {
- using namespace NKikimr;
- using namespace NMiniKQL;
- const char MODULE_NAME_DELIMITER = '.';
- const char* RegisterFuncName = "Register";
- const char* AbiVersionFuncName = "AbiVersion";
- #if defined(_win_) || defined(_darwin_)
- const char* BindSymbolsFuncName = "BindSymbols";
- #endif
- const char* SetBackTraceCallbackName = "SetBackTraceCallback";
- //////////////////////////////////////////////////////////////////////////////
- // TMutableFunctionRegistry
- //////////////////////////////////////////////////////////////////////////////
- class TMutableFunctionRegistry: public IMutableFunctionRegistry
- {
- struct TUdfModule {
- TString LibraryPath;
- std::shared_ptr<NUdf::IUdfModule> Impl;
- };
- using TUdfModulesMap = THashMap<TString, TUdfModule>;
- struct TUdfLibrary: public TThrRefBase {
- ui32 AbiVersion = 0;
- TDynamicLibrary Lib;
- TUdfLibrary()
- {
- }
- };
- using TUdfLibraryPtr = TIntrusivePtr<TUdfLibrary>;
- class TUdfModuleLoader: public NUdf::IRegistrator {
- public:
- TUdfModuleLoader(
- TUdfModulesMap& modulesMap,
- THashSet<TString>* newModules,
- const TString& libraryPath,
- const TUdfModuleRemappings& remappings,
- ui32 abiVersion,
- const TString& customUdfPrefix = {})
- : ModulesMap(modulesMap)
- , NewModules(newModules)
- , LibraryPath(libraryPath)
- , Remappings(remappings)
- , AbiVersion(NUdf::AbiVersionToStr(abiVersion))
- , CustomUdfPrefix(customUdfPrefix)
- {
- }
- void AddModule(
- const NUdf::TStringRef& name,
- NUdf::TUniquePtr<NUdf::IUdfModule> module) override
- {
- Y_DEBUG_ABORT_UNLESS(module, "Module is empty");
- if (!HasError()) {
- TUdfModule m;
- m.LibraryPath = LibraryPath;
- m.Impl.reset(module.Release());
- auto it = Remappings.find(name);
- const TString& newName = CustomUdfPrefix
- + ((it == Remappings.end())
- ? TString(name)
- : it->second);
- auto i = ModulesMap.insert({ newName, std::move(m) });
- if (!i.second) {
- TUdfModule* oldModule = ModulesMap.FindPtr(newName);
- Y_DEBUG_ABORT_UNLESS(oldModule != nullptr);
- Error = (TStringBuilder()
- << "UDF module duplication: name " << TStringBuf(name)
- << ", already loaded from " << oldModule->LibraryPath
- << ", trying to load from " << LibraryPath);
- } else if (NewModules) {
- NewModules->insert(newName);
- }
- }
- }
- const TString& GetError() const { return Error; }
- bool HasError() const { return !Error.empty(); }
- private:
- TUdfModulesMap& ModulesMap;
- THashSet<TString>* NewModules;
- const TString LibraryPath;
- const TUdfModuleRemappings& Remappings;
- const TString AbiVersion;
- TString Error;
- const TString CustomUdfPrefix;
- };
- public:
- TMutableFunctionRegistry(IBuiltinFunctionRegistry::TPtr builtins)
- : Builtins_(std::move(builtins))
- {
- }
- TMutableFunctionRegistry(const TMutableFunctionRegistry& rhs)
- : Builtins_(rhs.Builtins_)
- , LoadedLibraries_(rhs.LoadedLibraries_)
- , UdfModules_(rhs.UdfModules_)
- , SupportsSizedAllocators_(rhs.SupportsSizedAllocators_)
- {
- }
- ~TMutableFunctionRegistry() {
- }
- void AllowUdfPatch() override {
- }
- void LoadUdfs(
- const TString& libraryPath,
- const TUdfModuleRemappings& remmapings,
- ui32 flags = 0,
- const TString& customUdfPrefix = {},
- THashSet<TString>* modules = nullptr) override
- {
- TUdfLibraryPtr lib;
- auto libIt = LoadedLibraries_.find(libraryPath);
- if (libIt == LoadedLibraries_.end()) {
- lib = MakeIntrusive<TUdfLibrary>();
- #ifdef _win32_
- ui32 loadFlags = 0;
- #else
- ui32 loadFlags = RTLD_GLOBAL | ((flags & NUdf::IRegistrator::TFlags::TypesOnly) ? RTLD_LAZY : RTLD_NOW);
- #endif
- TPathSplit absPathSplit(libraryPath);
- TString absPath = libraryPath;
- if (!absPathSplit.IsAbsolute) {
- absPath = JoinPaths(TFsPath::Cwd().PathSplit(), absPathSplit);
- }
- lib->Lib.Open(absPath.data(), loadFlags);
- lib->Lib.SetUnloadable(false);
- // (1) check ABI version
- auto abiVersionFunc = reinterpret_cast<NUdf::TAbiVersionFunctionPtr>(
- lib->Lib.SymOptional(AbiVersionFuncName));
- if (!abiVersionFunc) {
- return;
- }
- ui32 version = abiVersionFunc();
- Y_ENSURE(NUdf::IsAbiCompatible(version) && version >= NUdf::MakeAbiVersion(2, 8, 0),
- "Non compatible ABI version of UDF library " << libraryPath
- << ", expected up to " << NUdf::AbiVersionToStr(NUdf::CurrentCompatibilityAbiVersion() * 100)
- << ", got " << NUdf::AbiVersionToStr(version)
- << "; try to re-compile library using "
- << "YQL_ABI_VERSION(" << UDF_ABI_VERSION_MAJOR
- << " " << UDF_ABI_VERSION_MINOR << " 0) macro in ya.make");
- lib->AbiVersion = version;
- if (version < NUdf::MakeAbiVersion(2, 8, 0)) {
- SupportsSizedAllocators_ = false;
- }
- #if defined(_win_) || defined(_darwin_)
- auto bindSymbolsFunc = reinterpret_cast<NUdf::TBindSymbolsFunctionPtr>(lib->Lib.Sym(BindSymbolsFuncName));
- bindSymbolsFunc(NUdf::GetStaticSymbols());
- #endif
- if (BackTraceCallback_) {
- auto setter = reinterpret_cast<NUdf::TSetBackTraceCallbackPtr>(lib->Lib.SymOptional(SetBackTraceCallbackName));
- if (setter) {
- setter(BackTraceCallback_);
- }
- }
- libIt = LoadedLibraries_.insert({ libraryPath, lib }).first;
- } else {
- lib = libIt->second;
- }
- // (2) ensure that Register() func is present
- auto registerFunc = reinterpret_cast<NUdf::TRegisterFunctionPtr>(
- lib->Lib.Sym(RegisterFuncName));
- // (3) do load
- THashSet<TString> newModules;
- TUdfModuleLoader loader(
- UdfModules_,
- &newModules,
- libraryPath,
- remmapings,
- lib->AbiVersion, customUdfPrefix);
- registerFunc(loader, flags);
- Y_ENSURE(!loader.HasError(), loader.GetError());
- if (modules) {
- *modules = std::move(newModules);
- }
- }
- void AddModule(
- const TStringBuf& libraryPath,
- const TStringBuf& moduleName,
- NUdf::TUniquePtr<NUdf::IUdfModule> module) override
- {
- TString libraryPathStr(libraryPath);
- auto inserted = LoadedLibraries_.insert({ libraryPathStr, nullptr });
- if (!inserted.second) {
- return;
- }
- TUdfModuleRemappings remappings;
- TUdfModuleLoader loader(
- UdfModules_, nullptr, libraryPathStr,
- remappings, NUdf::CurrentAbiVersion());
- loader.AddModule(moduleName, std::move(module));
- Y_ENSURE(!loader.HasError(), loader.GetError());
- }
- void SetSystemModulePaths(const TUdfModulePathsMap& paths) override {
- SystemModulePaths_ = paths;
- }
- const IBuiltinFunctionRegistry::TPtr& GetBuiltins() const override {
- return Builtins_;
- }
- TStatus FindFunctionTypeInfo(
- const TTypeEnvironment& env,
- NUdf::ITypeInfoHelper::TPtr typeInfoHelper,
- NUdf::ICountersProvider* countersProvider,
- const TStringBuf& name,
- TType* userType,
- const TStringBuf& typeConfig,
- ui32 flags,
- const NUdf::TSourcePosition& pos,
- const NUdf::ISecureParamsProvider* secureParamsProvider,
- TFunctionTypeInfo* funcInfo) const override
- {
- TStringBuf moduleName, funcName;
- if (name.TrySplit(MODULE_NAME_DELIMITER, moduleName, funcName)) {
- auto it = UdfModules_.find(moduleName);
- if (it != UdfModules_.end()) {
- TFunctionTypeInfoBuilder typeInfoBuilder(env, typeInfoHelper, moduleName,
- (flags & NUdf::IUdfModule::TFlags::TypesOnly) ? nullptr : countersProvider, pos, secureParamsProvider);
- const auto& module = *it->second.Impl;
- module.BuildFunctionTypeInfo(
- funcName, userType, typeConfig, flags, typeInfoBuilder);
- if (typeInfoBuilder.HasError()) {
- return TStatus::Error()
- << "Module: " << moduleName
- << ", function: " << funcName
- << ", error: " << typeInfoBuilder.GetError();
- }
- try {
- typeInfoBuilder.Build(funcInfo);
- }
- catch (yexception& e) {
- return TStatus::Error()
- << "Module: " << moduleName
- << ", function: " << funcName
- << ", error: " << e.what();
- }
- if ((flags & NUdf::IRegistrator::TFlags::TypesOnly) &&
- !funcInfo->FunctionType)
- {
- return TStatus::Error()
- << "Module: " << moduleName
- << ", function: " << funcName
- << ", function not found";
- }
- if (funcInfo->ModuleIRUniqID) {
- funcInfo->ModuleIRUniqID.prepend(moduleName);
- }
- return TStatus::Ok();
- }
- return TStatus::Error()
- << "Module " << moduleName << " is not registered";
- }
- return TStatus::Error()
- << "Function name must be in <module>.<func_name> scheme. "
- << "But get " << name;
- }
- TMaybe<TString> FindUdfPath(const TStringBuf& moduleName) const override {
- if (const TUdfModule* udf = UdfModules_.FindPtr(moduleName)) {
- return udf->LibraryPath;
- }
- if (const TString* path = SystemModulePaths_.FindPtr(moduleName)) {
- return *path;
- }
- return Nothing();
- }
- bool IsLoadedUdfModule(const TStringBuf& moduleName) const override {
- return UdfModules_.contains(moduleName);
- }
- THashSet<TString> GetAllModuleNames() const override {
- THashSet<TString> names;
- names.reserve(UdfModules_.size());
- for (const auto& module: UdfModules_) {
- names.insert(module.first);
- }
- return names;
- }
- TFunctionsMap GetModuleFunctions(const TStringBuf& moduleName) const override {
- struct TFunctionNamesSink: public NUdf::IFunctionNamesSink {
- TFunctionsMap Functions;
- class TFuncDescriptor : public NUdf::IFunctionDescriptor {
- public:
- TFuncDescriptor(TFunctionProperties& properties)
- : Properties(properties)
- {}
- private:
- void SetTypeAwareness() final {
- Properties.IsTypeAwareness = true;
- }
- TFunctionProperties& Properties;
- };
- NUdf::IFunctionDescriptor::TPtr Add(const NUdf::TStringRef& name) final {
- const auto it = Functions.emplace(name, TFunctionProperties{});
- return new TFuncDescriptor(it.first->second);
- }
- } sink;
- const auto it = UdfModules_.find(moduleName);
- if (UdfModules_.cend() == it)
- return TFunctionsMap();
- it->second.Impl->GetAllFunctions(sink);
- return sink.Functions;
- }
- bool SupportsSizedAllocators() const override {
- return SupportsSizedAllocators_;
- }
- void PrintInfoTo(IOutputStream& out) const override {
- Builtins_->PrintInfoTo(out);
- }
- void CleanupModulesOnTerminate() const override {
- for (const auto& module : UdfModules_) {
- module.second.Impl->CleanupOnTerminate();
- }
- }
- TIntrusivePtr<IMutableFunctionRegistry> Clone() const override {
- return new TMutableFunctionRegistry(*this);
- }
- void SetBackTraceCallback(NUdf::TBackTraceCallback callback) override {
- BackTraceCallback_ = callback;
- }
- private:
- const IBuiltinFunctionRegistry::TPtr Builtins_;
- THashMap<TString, TUdfLibraryPtr> LoadedLibraries_;
- TUdfModulesMap UdfModules_;
- THolder<TMemoryUsageInfo> UdfMemoryInfo_;
- TUdfModulePathsMap SystemModulePaths_;
- NUdf::TBackTraceCallback BackTraceCallback_ = nullptr;
- bool SupportsSizedAllocators_ = true;
- };
- //////////////////////////////////////////////////////////////////////////////
- // TBuiltinsWrapper
- //////////////////////////////////////////////////////////////////////////////
- class TBuiltinsWrapper: public IFunctionRegistry
- {
- public:
- TBuiltinsWrapper(IBuiltinFunctionRegistry::TPtr&& builtins)
- : Builtins_(std::move(builtins))
- {
- }
- const IBuiltinFunctionRegistry::TPtr& GetBuiltins() const override {
- return Builtins_;
- }
- void AllowUdfPatch() override {
- }
- TStatus FindFunctionTypeInfo(
- const TTypeEnvironment& env,
- NUdf::ITypeInfoHelper::TPtr typeInfoHelper,
- NUdf::ICountersProvider* countersProvider,
- const TStringBuf& name,
- TType* userType,
- const TStringBuf& typeConfig,
- ui32 flags,
- const NUdf::TSourcePosition& pos,
- const NUdf::ISecureParamsProvider* secureParamsProvider,
- TFunctionTypeInfo* funcInfo) const override
- {
- Y_UNUSED(env);
- Y_UNUSED(typeInfoHelper);
- Y_UNUSED(countersProvider);
- Y_UNUSED(name);
- Y_UNUSED(userType);
- Y_UNUSED(typeConfig);
- Y_UNUSED(flags);
- Y_UNUSED(pos);
- Y_UNUSED(secureParamsProvider);
- Y_UNUSED(funcInfo);
- return TStatus::Error(TStringBuf("Unsupported access to builtins registry"));
- }
- TMaybe<TString> FindUdfPath(
- const TStringBuf& /* moduleName */) const override
- {
- return{};
- }
- bool IsLoadedUdfModule(const TStringBuf& /* moduleName */) const override {
- return false;
- }
- THashSet<TString> GetAllModuleNames() const override {
- return {};
- }
- TFunctionsMap GetModuleFunctions(const TStringBuf&) const override {
- return TFunctionsMap();
- }
- bool SupportsSizedAllocators() const override {
- return true;
- }
- void PrintInfoTo(IOutputStream& out) const override {
- Builtins_->PrintInfoTo(out);
- }
- void CleanupModulesOnTerminate() const override {
- }
- TIntrusivePtr<IMutableFunctionRegistry> Clone() const override {
- return new TMutableFunctionRegistry(Builtins_);
- }
- private:
- const IBuiltinFunctionRegistry::TPtr Builtins_;
- };
- } // namespace
- namespace NKikimr {
- namespace NMiniKQL {
- void FindUdfsInDir(const TString& dirPath, TVector<TString>* paths)
- {
- static const TStringBuf libPrefix = TStringBuf(MKQL_UDF_LIB_PREFIX);
- static const TStringBuf libSuffix = TStringBuf(MKQL_UDF_LIB_SUFFIX);
- if (!dirPath.empty()) {
- std::vector<TString> dirs;
- StringSplitter(dirPath).Split(';').AddTo(&dirs);
- for (auto d : dirs) {
- TDirIterator dir(d, TDirIterator::TOptions(FTS_LOGICAL).SetMaxLevel(10));
- for (auto file = dir.begin(), end = dir.end(); file != end; ++file) {
- // skip entries with empty name, and all non-files
- // all valid symlinks are already dereferenced, provided by FTS_LOGICAL
- if (file->fts_pathlen == file->fts_namelen || file->fts_info != FTS_F) {
- continue;
- }
- TString path(file->fts_path);
- TString fileName = GetBaseName(path);
- // skip non shared libraries
- if (!fileName.StartsWith(libPrefix) ||
- !fileName.EndsWith(libSuffix))
- {
- continue;
- }
- // skip test udfs when scanning dir
- auto udfName = TStringBuf(fileName).Skip(libPrefix.length());
- if (udfName.StartsWith(TStringBuf("test_"))) {
- continue;
- }
- paths->push_back(std::move(path));
- }
- }
- }
- }
- bool SplitModuleAndFuncName(const TStringBuf& name, TStringBuf& module, TStringBuf& func)
- {
- return name.TrySplit(MODULE_NAME_DELIMITER, module, func);
- }
- TString FullName(const TStringBuf& module, const TStringBuf& func)
- {
- TString fullName;
- fullName.reserve(module.size() + func.size() + 1);
- fullName.append(module);
- fullName.append(MODULE_NAME_DELIMITER);
- fullName.append(func);
- return fullName;
- }
- TIntrusivePtr<IFunctionRegistry> CreateFunctionRegistry(IBuiltinFunctionRegistry::TPtr&& builtins)
- {
- return new TBuiltinsWrapper(std::move(builtins));
- }
- TIntrusivePtr<IFunctionRegistry> CreateFunctionRegistry(
- NKikimr::NUdf::TBackTraceCallback backtraceCallback,
- IBuiltinFunctionRegistry::TPtr&& builtins,
- bool allowUdfPatch,
- const TVector<TString>& udfsPaths,
- ui32 flags /* = 0 */)
- {
- auto registry = MakeHolder<TMutableFunctionRegistry>(std::move(builtins));
- if (allowUdfPatch) {
- registry->AllowUdfPatch();
- }
- registry->SetBackTraceCallback(backtraceCallback);
- // system UDFs loaded with default names
- TUdfModuleRemappings remappings;
- THashSet<TString> usedUdfPaths;
- for (const TString& udfPath: udfsPaths) {
- if (usedUdfPaths.insert(udfPath).second) {
- registry->LoadUdfs(udfPath, remappings, flags);
- }
- }
- return registry.Release();
- }
- void FillStaticModules(IMutableFunctionRegistry& registry) {
- for (const auto& wrapper : NUdf::GetStaticUdfModuleWrapperList()) {
- auto [name, ptr] = wrapper();
- registry.AddModule(TString(StaticModulePrefix) + name, name, std::move(ptr));
- }
- }
- } // namespace NMiniKQL
- } // namespace NKiki
|