yql_outproc_udf_resolver.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. #include "yql_outproc_udf_resolver.h"
  2. #include "yql_simple_udf_resolver.h"
  3. #include "yql_files_box.h"
  4. #include <yql/essentials/providers/common/proto/udf_resolver.pb.h>
  5. #include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
  6. #include <yql/essentials/core/yql_holding_file_storage.h>
  7. #include <yql/essentials/core/yql_type_annotation.h>
  8. #include <yql/essentials/utils/log/log.h>
  9. #include <yql/essentials/utils/retry.h>
  10. #include <yql/essentials/minikql/mkql_node.h>
  11. #include <yql/essentials/minikql/mkql_type_builder.h>
  12. #include <yql/essentials/minikql/mkql_program_builder.h>
  13. #include <yql/essentials/minikql/mkql_utils.h>
  14. #include <library/cpp/protobuf/util/pb_io.h>
  15. #include <util/generic/scope.h>
  16. #include <util/stream/str.h>
  17. #include <util/string/strip.h>
  18. #include <util/system/shellcommand.h>
  19. #include <util/string/split.h>
  20. #include <regex>
  21. namespace NYql {
  22. namespace NCommon {
  23. using namespace NKikimr;
  24. using namespace NKikimr::NMiniKQL;
  25. namespace {
  26. template <typename F>
  27. void RunResolver(
  28. const TString& resolverPath,
  29. const TList<TString>& args,
  30. IInputStream* input,
  31. const F& outputHandler,
  32. const TString& ldLibraryPath = {}) {
  33. TShellCommandOptions shellOptions;
  34. shellOptions
  35. .SetUseShell(false)
  36. .SetDetachSession(false)
  37. .SetInputStream(input); // input can be nullptr
  38. if (ldLibraryPath) {
  39. YQL_LOG(DEBUG) << "Using LD_LIBRARY_PATH = " << ldLibraryPath << " for Udf resolver";
  40. shellOptions.Environment["LD_LIBRARY_PATH"] = ldLibraryPath;
  41. }
  42. TShellCommand shell(resolverPath, args, shellOptions);
  43. switch (shell.Run().GetStatus()) {
  44. case TShellCommand::SHELL_INTERNAL_ERROR:
  45. ythrow yexception() << "Udf resolver internal error: "
  46. << shell.GetInternalError();
  47. case TShellCommand::SHELL_ERROR:
  48. ythrow yexception() << "Udf resolver shell error: "
  49. << StripString(shell.GetError());
  50. case TShellCommand::SHELL_FINISHED:
  51. break;
  52. default:
  53. ythrow yexception() << "Unexpected udf resolver state: "
  54. << int(shell.GetStatus());
  55. }
  56. if (shell.GetError()) {
  57. YQL_LOG(INFO) << "UdfResolver stderr: " << shell.GetError();
  58. }
  59. outputHandler(shell.GetOutput());
  60. }
  61. template <typename F>
  62. void RunResolver(
  63. const TString& resolverPath,
  64. const TList<TString>& args,
  65. const TResolve& request,
  66. const F& outputHandler,
  67. const TString& ldLibraryPath = {}) {
  68. TStringStream input;
  69. YQL_ENSURE(request.SerializeToArcadiaStream(&input), "Cannot serialize TResolve proto message");
  70. RunResolver(resolverPath, args, &input, outputHandler, ldLibraryPath);
  71. }
  72. TString ExtractSharedObjectNameFromErrorMessage(const char* message) {
  73. if (!message) {
  74. return "";
  75. }
  76. // example:
  77. // util/system/dynlib.cpp:56: libcuda.so.1: cannot open shared object file: No such file or directory
  78. static std::regex re(".*: (.+): cannot open shared object file: No such file or directory");
  79. std::cmatch match;
  80. if (!std::regex_match(message, match, re)) {
  81. return "";
  82. }
  83. return TString(match[1].str());
  84. }
  85. }
  86. class TOutProcUdfResolver : public IUdfResolver {
  87. public:
  88. TOutProcUdfResolver(const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  89. const TFileStoragePtr& fileStorage, const TString& resolverPath,
  90. const TString& user, const TString& group, bool filterSyscalls,
  91. const TString& udfDependencyStubPath, const TMap<TString, TString>& path2md5)
  92. : FunctionRegistry_(functionRegistry)
  93. , TypeInfoHelper_(new TTypeInfoHelper)
  94. , FileStorage_(fileStorage)
  95. , ResolverPath_(resolverPath)
  96. , UdfDependencyStubPath_(udfDependencyStubPath)
  97. , Path2Md5_(path2md5)
  98. {
  99. if (user) {
  100. UserGroupArgs_ = { "-U", user, "-G", group };
  101. }
  102. if (filterSyscalls) {
  103. UserGroupArgs_.push_back("-F");
  104. }
  105. }
  106. TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const override {
  107. auto path = FunctionRegistry_->FindUdfPath(moduleName);
  108. if (!path) {
  109. return Nothing();
  110. }
  111. const TString md5 = Path2Md5_.Value(*path, "");
  112. return MakeMaybe<TFilePathWithMd5>(*path, md5);
  113. }
  114. bool ContainsModule(const TStringBuf& moduleName) const override {
  115. return FunctionRegistry_->IsLoadedUdfModule(moduleName);
  116. }
  117. bool LoadMetadata(const TVector<TImport*>& imports, const TVector<TFunction*>& functions, TExprContext& ctx) const override {
  118. THashSet<TString> requiredLoadedModules;
  119. THashSet<TString> requiredExternalModules;
  120. TVector<TFunction*> loadedFunctions;
  121. TVector<TFunction*> externalFunctions;
  122. bool hasErrors = false;
  123. for (auto udf : functions) {
  124. TStringBuf moduleName, funcName;
  125. if (!SplitUdfName(udf->Name, moduleName, funcName) || moduleName.empty() || funcName.empty()) {
  126. ctx.AddError(TIssue(udf->Pos, TStringBuilder() <<
  127. "Incorrect format of function name: " << udf->Name));
  128. hasErrors = true;
  129. } else {
  130. if (FunctionRegistry_->IsLoadedUdfModule(moduleName)) {
  131. requiredLoadedModules.insert(TString(moduleName));
  132. loadedFunctions.push_back(udf);
  133. } else {
  134. requiredExternalModules.insert(TString(moduleName));
  135. externalFunctions.push_back(udf);
  136. }
  137. }
  138. }
  139. TResolve request;
  140. TVector<TImport*> usedImports;
  141. THoldingFileStorage holdingFileStorage(FileStorage_);
  142. THolder<TFilesBox> filesBox = CreateFilesBoxOverFileStorageTemp();
  143. THashMap<TString, TImport*> path2LoadedImport;
  144. for (auto import : imports) {
  145. if (import->Modules) {
  146. bool needLibrary = false;
  147. for (auto& m : *import->Modules) {
  148. if (requiredLoadedModules.contains(m)) {
  149. if (import->Block->Type == EUserDataType::PATH) {
  150. path2LoadedImport[import->Block->Data] = import;
  151. }
  152. }
  153. if (requiredExternalModules.contains(m)) {
  154. needLibrary = true;
  155. break;
  156. }
  157. }
  158. if (!needLibrary) {
  159. continue;
  160. }
  161. } else {
  162. import->Modules.ConstructInPlace();
  163. }
  164. try {
  165. LoadImport(holdingFileStorage, *filesBox, *import, request);
  166. usedImports.push_back(import);
  167. } catch (const std::exception& e) {
  168. ctx.AddError(ExceptionToIssue(e));
  169. hasErrors = true;
  170. }
  171. }
  172. for (auto& module : requiredExternalModules) {
  173. if (auto path = FunctionRegistry_->FindUdfPath(module)) {
  174. auto importRequest = request.AddImports();
  175. const TString hiddenPath = filesBox->MakeLinkFrom(*path);
  176. importRequest->SetFileAlias(hiddenPath);
  177. importRequest->SetPath(hiddenPath);
  178. importRequest->SetSystem(true);
  179. }
  180. }
  181. for (auto udf : externalFunctions) {
  182. auto udfRequest = request.AddUdfs();
  183. udfRequest->SetName(udf->Name);
  184. udfRequest->SetTypeConfig(udf->TypeConfig);
  185. if (udf->UserType) {
  186. udfRequest->SetUserType(WriteTypeToYson(udf->UserType));
  187. }
  188. }
  189. TResolveResult response;
  190. try {
  191. response = RunResolverAndParseResult(request, { }, *filesBox);
  192. filesBox->Destroy();
  193. } catch (const std::exception& e) {
  194. ctx.AddError(ExceptionToIssue(e));
  195. return false;
  196. }
  197. // extract regardless of hasErrors value
  198. hasErrors = !ExtractMetadata(response, usedImports, externalFunctions, ctx) || hasErrors;
  199. hasErrors = !LoadFunctionsMetadata(loadedFunctions, *FunctionRegistry_, TypeInfoHelper_, ctx) || hasErrors;
  200. if (!hasErrors) {
  201. for (auto& m : FunctionRegistry_->GetAllModuleNames()) {
  202. auto path = *FunctionRegistry_->FindUdfPath(m);
  203. if (auto import = path2LoadedImport.FindPtr(path)) {
  204. (*import)->Modules->push_back(m);
  205. }
  206. }
  207. }
  208. return !hasErrors;
  209. }
  210. TResolveResult LoadRichMetadata(const TVector<TImport>& imports) const override {
  211. TResolve request;
  212. THoldingFileStorage holdingFileStorage(FileStorage_);
  213. THolder<TFilesBox> filesBox = CreateFilesBoxOverFileStorageTemp();
  214. Y_DEFER {
  215. filesBox->Destroy();
  216. };
  217. for (auto import : imports) {
  218. LoadImport(holdingFileStorage, *filesBox, import, request);
  219. }
  220. return RunResolverAndParseResult(request, { "--discover-proto" }, *filesBox);
  221. }
  222. private:
  223. THolder<TFilesBox> CreateFilesBoxOverFileStorageTemp() const {
  224. return CreateFilesBox(FileStorage_->GetTemp());
  225. }
  226. void LoadImport(THoldingFileStorage& holdingFileStorage, TFilesBox& filesBox, const TImport& import, TResolve& request) const {
  227. const TString path = (import.Block->Type == EUserDataType::PATH) ? import.Block->Data : holdingFileStorage.FreezeFile(*import.Block)->GetPath().GetPath();
  228. const TString hiddenPath = filesBox.MakeLinkFrom(path);
  229. auto importRequest = request.AddImports();
  230. importRequest->SetFileAlias(import.FileAlias);
  231. importRequest->SetPath(hiddenPath);
  232. importRequest->SetCustomUdfPrefix(import.Block->CustomUdfPrefix);
  233. }
  234. TResolveResult RunResolverAndParseResult(const TResolve& request, const TVector<TString>& additionalArgs, TFilesBox& filesBox) const {
  235. auto args = UserGroupArgs_;
  236. args.insert(args.end(), additionalArgs.begin(), additionalArgs.end());
  237. TString ldLibraryPath;
  238. TSet<TString> stubbedLibraries;
  239. return WithRetry<yexception>(10, [&]() {
  240. TResolveResult response;
  241. RunResolver(ResolverPath_, args, request, [&](const TString& output) {
  242. YQL_ENSURE(response.ParseFromString(output), "Cannot deserialize TResolveResult proto message");
  243. }, ldLibraryPath);
  244. return response;
  245. }, [&](const yexception& e, int, int) {
  246. TStringStream stream;
  247. SerializeToTextFormat(request, stream);
  248. YQL_LOG(DEBUG) << "Exception from UdfResolver: " << e.what() << " for request " << stream.Str();
  249. if (!UdfDependencyStubPath_) {
  250. YQL_LOG(DEBUG) << "UdfDependencyStubPath is not specified, unable to recover error " << e.what();
  251. throw e;
  252. }
  253. TString sharedLibrary = ExtractSharedObjectNameFromErrorMessage(e.what());
  254. if (!sharedLibrary) {
  255. throw e;
  256. }
  257. YQL_LOG(DEBUG) << "UdfResolver needs shared library " << sharedLibrary;
  258. if (!stubbedLibraries.emplace(sharedLibrary).second) {
  259. // already tried, giving up
  260. YQL_LOG(ERROR) << "Unable to load shared library " << sharedLibrary << " even after using dependency stub";
  261. throw e;
  262. }
  263. YQL_LOG(DEBUG) << "Using dependency stub for shared library " << sharedLibrary;
  264. PutSharedLibraryStub(sharedLibrary, filesBox);
  265. ldLibraryPath = filesBox.GetDir();
  266. });
  267. }
  268. void PutSharedLibraryStub(const TString& sharedLibrary, TFilesBox& filesBox) const {
  269. YQL_ENSURE(UdfDependencyStubPath_);
  270. filesBox.MakeLinkFrom(UdfDependencyStubPath_, sharedLibrary);
  271. }
  272. static bool ExtractMetadata(const TResolveResult& response, const TVector<TImport*>& usedImports, const TVector<TFunction*>& functions, TExprContext& ctx) {
  273. bool hasErrors = false;
  274. YQL_ENSURE(response.UdfsSize() == functions.size(), "Number of returned udf signatures doesn't match original one");
  275. YQL_ENSURE(response.ImportsSize() >= usedImports.size(), "Number of returned udf modules is too low");
  276. for (size_t i = 0; i < usedImports.size(); ++i) {
  277. const TImportResult& importRes = response.GetImports(i);
  278. TImport* import = usedImports[i];
  279. if (importRes.HasError()) {
  280. ctx.AddError(TIssue(import ? import->Pos : TPosition(), importRes.GetError()));
  281. hasErrors = true;
  282. } else {
  283. import->Modules.ConstructInPlace();
  284. for (auto& module : importRes.GetModules()) {
  285. import->Modules->push_back(module);
  286. }
  287. }
  288. }
  289. for (size_t i = 0; i < response.UdfsSize(); ++i) {
  290. TFunction* udf = functions[i];
  291. const TFunctionResult& udfRes = response.GetUdfs(i);
  292. if (udfRes.HasError()) {
  293. ctx.AddError(TIssue(udf->Pos, udfRes.GetError()));
  294. hasErrors = true;
  295. } else {
  296. udf->NormalizedName = udf->Name;
  297. udf->CallableType = ParseTypeFromYson(TStringBuf{udfRes.GetCallableType()}, ctx, udf->Pos);
  298. if (!udf->CallableType) {
  299. hasErrors = true;
  300. continue;
  301. }
  302. if (udfRes.HasNormalizedUserType()) {
  303. udf->NormalizedUserType = ParseTypeFromYson(TStringBuf{udfRes.GetNormalizedUserType()}, ctx, udf->Pos);
  304. if (!udf->NormalizedUserType) {
  305. hasErrors = true;
  306. continue;
  307. }
  308. }
  309. if (udfRes.HasRunConfigType()) {
  310. udf->RunConfigType = ParseTypeFromYson(TStringBuf{udfRes.GetRunConfigType()}, ctx, udf->Pos);
  311. if (!udf->RunConfigType) {
  312. hasErrors = true;
  313. continue;
  314. }
  315. }
  316. udf->SupportsBlocks = udfRes.GetSupportsBlocks();
  317. udf->IsStrict = udfRes.GetIsStrict();
  318. }
  319. }
  320. return !hasErrors;
  321. }
  322. private:
  323. const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry_;
  324. NUdf::ITypeInfoHelper::TPtr TypeInfoHelper_;
  325. TFileStoragePtr FileStorage_;
  326. const TString ResolverPath_;
  327. const TString UdfDependencyStubPath_;
  328. TList<TString> UserGroupArgs_;
  329. const TMap<TString, TString> Path2Md5_;
  330. };
  331. void LoadSystemModulePaths(
  332. const TString& resolverPath,
  333. const TString& dir,
  334. TUdfModulePathsMap* paths)
  335. {
  336. const TList<TString> args = { TString("--list"), dir };
  337. RunResolver(resolverPath, args, nullptr, [&](const TString& output) {
  338. // output format is:
  339. // {{module_name}}\t{{module_path}}\n
  340. for (const auto& it : StringSplitter(output).Split('\n')) {
  341. TStringBuf moduleName, modulePath;
  342. const TStringBuf& line = it.Token();
  343. if (!line.empty()) {
  344. line.Split('\t', moduleName, modulePath);
  345. paths->emplace(moduleName, modulePath);
  346. }
  347. }
  348. });
  349. }
  350. IUdfResolver::TPtr CreateOutProcUdfResolver(
  351. const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  352. const TFileStoragePtr& fileStorage,
  353. const TString& resolverPath,
  354. const TString& user,
  355. const TString& group,
  356. bool filterSyscalls,
  357. const TString& udfDependencyStubPath,
  358. const TMap<TString, TString>& path2md5) {
  359. return new TOutProcUdfResolver(functionRegistry, fileStorage, resolverPath, user, group, filterSyscalls, udfDependencyStubPath, path2md5);
  360. }
  361. } // namespace NCommon
  362. } // namespace NYql