yql_simple_udf_resolver.cpp 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. #include "yql_simple_udf_resolver.h"
  2. #include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
  3. #include <yql/essentials/core/yql_holding_file_storage.h>
  4. #include <yql/essentials/minikql/mkql_node.h>
  5. #include <yql/essentials/minikql/mkql_type_builder.h>
  6. #include <yql/essentials/minikql/mkql_program_builder.h>
  7. #include <yql/essentials/minikql/mkql_utils.h>
  8. #include <yql/essentials/minikql/computation/mkql_computation_node.h>
  9. #include <library/cpp/digest/md5/md5.h>
  10. #include <util/generic/vector.h>
  11. #include <util/generic/hash_set.h>
  12. #include <util/generic/hash.h>
  13. #include <util/generic/string.h>
  14. #include <util/system/guard.h>
  15. #include <util/system/spinlock.h>
  16. namespace NYql {
  17. namespace NCommon {
  18. using namespace NKikimr;
  19. using namespace NKikimr::NMiniKQL;
  20. class TSimpleUdfResolver : public IUdfResolver {
  21. public:
  22. TSimpleUdfResolver(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TFileStoragePtr& fileStorage, bool useFakeMD5)
  23. : FunctionRegistry_(functionRegistry)
  24. , FileStorage_(fileStorage)
  25. , TypeInfoHelper_(new TTypeInfoHelper)
  26. , UseFakeMD5_(useFakeMD5)
  27. {}
  28. TString GetMD5(const TString& path) const {
  29. if (UseFakeMD5_) {
  30. return MD5::Calc(path);
  31. } else {
  32. return {};
  33. }
  34. }
  35. TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const override {
  36. with_lock(Lock_) {
  37. auto path = FunctionRegistry_->FindUdfPath(moduleName);
  38. return path ? MakeMaybe<TFilePathWithMd5>(*path, GetMD5(*path)) : Nothing();
  39. }
  40. }
  41. bool LoadMetadata(const TVector<TImport*>& imports,
  42. const TVector<TFunction*>& functions, TExprContext& ctx) const override {
  43. with_lock(Lock_) {
  44. bool hasErrors = false;
  45. THashSet<TString> requiredModules;
  46. for (auto udfPtr : functions) {
  47. auto& udf = *udfPtr;
  48. TStringBuf moduleName, funcName;
  49. if (!SplitUdfName(udf.Name, moduleName, funcName) || moduleName.empty() || funcName.empty()) {
  50. ctx.AddError(TIssue(udf.Pos, TStringBuilder() <<
  51. "Incorrect format of function name: " << udf.Name));
  52. hasErrors = true;
  53. } else {
  54. requiredModules.insert(TString(moduleName));
  55. }
  56. }
  57. THoldingFileStorage holdingFileStorage(FileStorage_);
  58. auto newRegistry = FunctionRegistry_->Clone();
  59. THashMap<std::pair<TString, TString>, THashSet<TString>> cachedModules;
  60. for (auto import: imports) {
  61. if (import->Modules) {
  62. bool needLibrary = false;
  63. for (auto& m : *import->Modules) {
  64. if (requiredModules.contains(m)) {
  65. needLibrary = true;
  66. break;
  67. }
  68. }
  69. if (!needLibrary) {
  70. continue;
  71. }
  72. } else {
  73. import->Modules.ConstructInPlace();
  74. }
  75. const TString& customUdfPrefix = import->Block->CustomUdfPrefix;
  76. try {
  77. THashSet<TString> modules;
  78. if (FileStorage_) {
  79. auto link = holdingFileStorage.FreezeFile(*import->Block);
  80. auto path = link->GetPath().GetPath();
  81. auto [it, inserted] = cachedModules.emplace(std::make_pair(path, customUdfPrefix), THashSet<TString>());
  82. if (inserted) {
  83. newRegistry->LoadUdfs(
  84. path,
  85. {},
  86. NUdf::IRegistrator::TFlags::TypesOnly,
  87. customUdfPrefix,
  88. &modules);
  89. it->second = modules;
  90. } else {
  91. modules = it->second;
  92. }
  93. } else {
  94. if (import->Block->Type != EUserDataType::PATH) {
  95. ctx.AddError(TIssue(import->Pos, TStringBuilder() <<
  96. "Only path file type is supported, cannot load file with alias: " << import->FileAlias));
  97. hasErrors = true;
  98. continue;
  99. }
  100. auto [it, inserted] = cachedModules.emplace(std::make_pair(import->Block->Data, customUdfPrefix), THashSet<TString>());
  101. if (inserted) {
  102. newRegistry->LoadUdfs(
  103. import->Block->Data,
  104. {},
  105. NUdf::IRegistrator::TFlags::TypesOnly,
  106. customUdfPrefix,
  107. &modules);
  108. it->second = modules;
  109. } else {
  110. modules = it->second;
  111. }
  112. }
  113. import->Modules->assign(modules.begin(), modules.end());
  114. }
  115. catch (yexception& e) {
  116. ctx.AddError(TIssue(import->Pos, TStringBuilder()
  117. << "Internal error of loading udf module: " << import->FileAlias
  118. << ", reason: " << e.what()));
  119. hasErrors = true;
  120. }
  121. }
  122. hasErrors = !LoadFunctionsMetadata(functions, *newRegistry, TypeInfoHelper_, ctx) || hasErrors;
  123. return !hasErrors;
  124. }
  125. }
  126. TResolveResult LoadRichMetadata(const TVector<TImport>& imports) const override {
  127. Y_UNUSED(imports);
  128. ythrow yexception() << "LoadRichMetadata is not supported in SimpleUdfResolver";
  129. }
  130. bool ContainsModule(const TStringBuf& moduleName) const override {
  131. return FunctionRegistry_->IsLoadedUdfModule(moduleName);
  132. }
  133. private:
  134. mutable TAdaptiveLock Lock_;
  135. const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry_;
  136. TFileStoragePtr FileStorage_;
  137. NUdf::ITypeInfoHelper::TPtr TypeInfoHelper_;
  138. const bool UseFakeMD5_;
  139. };
  140. IUdfResolver::TPtr CreateSimpleUdfResolver(
  141. const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  142. const TFileStoragePtr& fileStorage,
  143. bool useFakeMD5
  144. ) {
  145. return new TSimpleUdfResolver(functionRegistry, fileStorage, useFakeMD5);
  146. }
  147. bool LoadFunctionsMetadata(const TVector<IUdfResolver::TFunction*>& functions,
  148. const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
  149. NUdf::ITypeInfoHelper::TPtr typeInfoHelper,
  150. TExprContext& ctx) {
  151. bool hasErrors = false;
  152. TScopedAlloc alloc(__LOCATION__);
  153. TTypeEnvironment env(alloc);
  154. for (auto udfPtr : functions) {
  155. auto& udf = *udfPtr;
  156. try {
  157. TType* mkqlUserType = nullptr;
  158. if (udf.UserType) {
  159. // scan for error types
  160. TErrorTypeVisitor errorVisitor(ctx);
  161. udf.UserType->Accept(errorVisitor);
  162. if (errorVisitor.HasErrors()) {
  163. hasErrors = true;
  164. continue;
  165. }
  166. TStringStream err;
  167. mkqlUserType = BuildType(*udf.UserType, {env}, err);//
  168. if (!mkqlUserType) {
  169. auto issue = TIssue(udf.Pos, TStringBuilder() << "Invalid user type for function: "
  170. << udf.Name << ", error: " << err.Str());
  171. issue.SetCode(UNEXPECTED_ERROR, ESeverity::TSeverityIds_ESeverityId_S_FATAL);
  172. ctx.AddError(issue);
  173. hasErrors = true;
  174. continue;
  175. }
  176. }
  177. auto secureParamsProvider = MakeSimpleSecureParamsProvider(udf.SecureParams);
  178. TFunctionTypeInfo funcInfo;
  179. auto status = functionRegistry.FindFunctionTypeInfo(env, typeInfoHelper, nullptr,
  180. udf.Name, mkqlUserType, udf.TypeConfig, NUdf::IUdfModule::TFlags::TypesOnly, {}, secureParamsProvider.get(), &funcInfo);
  181. if (!status.IsOk()) {
  182. ctx.AddError(TIssue(udf.Pos, TStringBuilder() << "Failed to find UDF function: " << udf.Name
  183. << ", reason: " << status.GetError()));
  184. hasErrors = true;
  185. continue;
  186. }
  187. udf.NormalizedName = udf.Name;
  188. udf.CallableType = ConvertMiniKQLType(udf.Pos, funcInfo.FunctionType, ctx);
  189. YQL_ENSURE(udf.CallableType);
  190. if (funcInfo.RunConfigType) {
  191. udf.RunConfigType = ConvertMiniKQLType(udf.Pos, const_cast<TType*>(funcInfo.RunConfigType), ctx);
  192. YQL_ENSURE(udf.RunConfigType);
  193. }
  194. if (funcInfo.UserType) {
  195. udf.NormalizedUserType = ConvertMiniKQLType(udf.Pos, const_cast<TType*>(funcInfo.UserType), ctx);
  196. YQL_ENSURE(udf.NormalizedUserType);
  197. }
  198. udf.SupportsBlocks = funcInfo.SupportsBlocks;
  199. udf.IsStrict = funcInfo.IsStrict;
  200. } catch (const std::exception& e) {
  201. auto issue = TIssue(udf.Pos, TStringBuilder()
  202. << "Internal error was found when udf metadata is loading for function: " << udf.Name
  203. << ", reason: " << e.what());
  204. issue.SetCode(UNEXPECTED_ERROR, ESeverity::TSeverityIds_ESeverityId_S_FATAL);
  205. ctx.AddError(issue);
  206. hasErrors = true;
  207. }
  208. }
  209. return !hasErrors;
  210. }
  211. } // namespace NCommon
  212. } // namespace NYql