python_udf.cpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. #include "python_udf.h"
  2. #include "python_function_factory.h"
  3. #include <yql/essentials/public/udf/udf_version.h>
  4. #include <yql/essentials/udfs/common/python/bindings/py_utils.h>
  5. #include <util/generic/vector.h>
  6. #include <util/system/execpath.h>
  7. namespace {
  8. #if PY_MAJOR_VERSION >= 3
  9. #define PYTHON_PROGRAMM_NAME L"YQL::Python3"
  10. #else
  11. #define PYTHON_PROGRAMM_NAME "YQL::Python2"
  12. #endif
  13. int AddToPythonPath(const TVector<TStringBuf>& pathVals)
  14. {
  15. char pathVar[] = "path"; // PySys_{Get,Set}Object take a non-const char* arg
  16. TPyObjectPtr sysPath(PySys_GetObject(pathVar), TPyObjectPtr::ADD_REF);
  17. if (!sysPath) return -1;
  18. for (const auto& val: pathVals) {
  19. TPyObjectPtr pyStr = PyRepr(val.data());
  20. int rc = PyList_Append(sysPath.Get(), pyStr.Get());
  21. if (rc != 0) {
  22. return rc;
  23. }
  24. }
  25. return PySys_SetObject(pathVar, sysPath.Get());
  26. }
  27. void InitArcadiaPythonRuntime()
  28. {
  29. // Arcadia static python import hook resides in __res module
  30. // It modifies sys.meta_path upon import
  31. TPyObjectPtr mod(PyImport_ImportModule("__res"));
  32. Y_ABORT_UNLESS(mod, "Can't import arcadia python runtime");
  33. }
  34. //////////////////////////////////////////////////////////////////////////////
  35. // TPythonModule
  36. //////////////////////////////////////////////////////////////////////////////
  37. class TPythonModule: public IUdfModule
  38. {
  39. public:
  40. TPythonModule(const TString& resourceName, EPythonFlavor pythonFlavor, bool standalone = true)
  41. : ResourceName(resourceName), Standalone(standalone)
  42. {
  43. if (Standalone) {
  44. Py_SetProgramName(PYTHON_PROGRAMM_NAME);
  45. PrepareYqlModule();
  46. Py_Initialize();
  47. }
  48. InitYqlModule(pythonFlavor, standalone);
  49. const auto rc = PyRun_SimpleString(R"(
  50. # numpy on import may find installed openblas library and load it,
  51. # which in turn causes it to start CPUCOUNT threads
  52. # with approx. 40Mb memory reserved for each thread;
  53. #
  54. # See more detailed explanation here: https://st.yandex-team.ru/STATLIBS-1715#5bfc68ecbbc039001cec572a
  55. #
  56. # Thus, we reduce negative effects as much as possible
  57. import os
  58. os.environ['OPENBLAS_NUM_THREADS'] = '1'
  59. # Following part allows us later to format tracebacks via sys.excepthook
  60. # in thread-safe manner
  61. import sys
  62. import threading
  63. if sys.version_info >= (3, 0):
  64. from io import StringIO, TextIOWrapper as SysStderrType
  65. else:
  66. from cStringIO import StringIO
  67. SysStderrType = file
  68. class StderrLocal(threading.local):
  69. def __init__(self):
  70. self.is_real_mode = True
  71. self.buffer = StringIO()
  72. class StderrProxy(object):
  73. def __init__(self, stderr):
  74. self._stderr = stderr
  75. self._tls = StderrLocal()
  76. def _toggle_real_mode(self):
  77. self._tls.is_real_mode = not self._tls.is_real_mode
  78. if not self._tls.is_real_mode:
  79. self._tls.buffer.clear()
  80. def _get_value(self):
  81. assert not self._tls.is_real_mode
  82. return self._tls.buffer.getvalue()
  83. def __getattr__(self, attr):
  84. target = self._stderr
  85. if not self._tls.is_real_mode:
  86. target = self._tls.buffer
  87. return getattr(target, attr)
  88. if isinstance(sys.stderr, SysStderrType):
  89. sys.stderr = StderrProxy(sys.stderr)
  90. )");
  91. Y_ABORT_UNLESS(rc >= 0, "Can't setup module");
  92. if (pythonFlavor == EPythonFlavor::Arcadia) {
  93. InitArcadiaPythonRuntime();
  94. }
  95. #ifndef _win_
  96. if (Standalone) {
  97. TVector<TStringBuf> paths;
  98. if (pythonFlavor == EPythonFlavor::System) {
  99. paths.push_back(TStringBuf("/usr/lib/python2.7/dist-packages"));
  100. }
  101. paths.push_back(TStringBuf("."));
  102. const auto r = AddToPythonPath(paths);
  103. Y_ABORT_UNLESS(r >= 0, "Can't add dist-packages into sys.path");
  104. }
  105. #endif
  106. char executableVar[] = "executable"; // PySys_{Get,Set}Object take a non-const char* arg
  107. TPyObjectPtr pyExecutableStr = PyRepr(GetExecPath().data());
  108. Y_ABORT_UNLESS(PySys_SetObject(executableVar, pyExecutableStr.Get()) >= 0, "Can't set sys.executable");
  109. if (Standalone) {
  110. PyEval_InitThreads();
  111. MainThreadState_ = PyEval_SaveThread();
  112. }
  113. }
  114. ~TPythonModule() {
  115. if (Standalone) {
  116. PyEval_RestoreThread(MainThreadState_);
  117. Py_Finalize();
  118. }
  119. }
  120. void CleanupOnTerminate() const final {
  121. PyCleanup();
  122. }
  123. void GetAllFunctions(IFunctionsSink&) const final {}
  124. void BuildFunctionTypeInfo(
  125. const TStringRef& name,
  126. TType* userType,
  127. const TStringRef& typeConfig,
  128. ui32 flags,
  129. IFunctionTypeInfoBuilder& builder) const final
  130. {
  131. Y_UNUSED(typeConfig);
  132. if (flags & TFlags::TypesOnly) {
  133. return;
  134. }
  135. try {
  136. auto typeHelper = builder.TypeInfoHelper();
  137. if (ETypeKind::Callable != typeHelper->GetTypeKind(userType)) {
  138. return builder.SetError(TStringRef::Of("Expected callable type"));
  139. }
  140. const auto pos = builder.GetSourcePosition();
  141. builder.Implementation(new TPythonFunctionFactory(name, ResourceName, userType, std::move(typeHelper), pos));
  142. } catch (const yexception& e) {
  143. builder.SetError(TStringBuf(e.what()));
  144. }
  145. }
  146. private:
  147. TString ResourceName;
  148. bool Standalone;
  149. PyThreadState* MainThreadState_;
  150. };
  151. //////////////////////////////////////////////////////////////////////////////
  152. // TStubModule
  153. //////////////////////////////////////////////////////////////////////////////
  154. class TStubModule: public IUdfModule {
  155. void GetAllFunctions(IFunctionsSink&) const final {}
  156. void BuildFunctionTypeInfo(
  157. const TStringRef& /*name*/,
  158. TType* /*userType*/,
  159. const TStringRef& /*typeConfig*/,
  160. ui32 flags,
  161. IFunctionTypeInfoBuilder& /*builder*/) const final
  162. {
  163. Y_DEBUG_ABORT_UNLESS(flags & TFlags::TypesOnly,
  164. "in stub module this function can be called only for types loading");
  165. }
  166. void CleanupOnTerminate() const final {}
  167. };
  168. } // namespace
  169. void NKikimr::NUdf::RegisterYqlPythonUdf(
  170. IRegistrator& registrator,
  171. ui32 flags,
  172. TStringBuf moduleName,
  173. TStringBuf resourceName,
  174. EPythonFlavor pythonFlavor)
  175. {
  176. if (flags & IRegistrator::TFlags::TypesOnly) {
  177. registrator.AddModule(moduleName, new TStubModule);
  178. } else {
  179. registrator.AddModule(
  180. moduleName,
  181. NKikimr::NUdf::GetYqlPythonUdfModule(resourceName, pythonFlavor, true)
  182. );
  183. }
  184. }
  185. TUniquePtr<NKikimr::NUdf::IUdfModule> NKikimr::NUdf::GetYqlPythonUdfModule(
  186. TStringBuf resourceName, NKikimr::NUdf::EPythonFlavor pythonFlavor,
  187. bool standalone
  188. ) {
  189. return new TPythonModule(TString(resourceName), pythonFlavor, standalone);
  190. }