py_stream.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. #include "py_stream.h"
  2. #include "py_cast.h"
  3. #include "py_errors.h"
  4. #include "py_gil.h"
  5. #include "py_utils.h"
  6. #include <yql/essentials/public/udf/udf_value.h>
  7. #include <yql/essentials/public/udf/udf_value_builder.h>
  8. #include <yql/essentials/public/udf/udf_type_inspection.h>
  9. #include <yql/essentials/public/udf/udf_terminator.h>
  10. #include <util/string/builder.h>
  11. using namespace NKikimr;
  12. namespace NPython {
  13. // will be initialized in InitYqlModule()
  14. PyObject* PyYieldIterationException = nullptr;
  15. //////////////////////////////////////////////////////////////////////////////
  16. // TPyStream
  17. //////////////////////////////////////////////////////////////////////////////
  18. struct TPyStream {
  19. PyObject_HEAD;
  20. TPyCastContext::TPtr CastCtx;
  21. TPyCleanupListItem<NUdf::IBoxedValuePtr> Value;
  22. const NUdf::TType* ItemType;
  23. inline static TPyStream* Cast(PyObject* o) {
  24. return reinterpret_cast<TPyStream*>(o);
  25. }
  26. inline static void Dealloc(PyObject* self) {
  27. delete Cast(self);
  28. }
  29. inline static PyObject* Repr(PyObject* self) {
  30. Y_UNUSED(self);
  31. return PyRepr("<yql.TStream>").Release();
  32. }
  33. static PyObject* New(
  34. const TPyCastContext::TPtr& castCtx,
  35. const NUdf::TType* type,
  36. NUdf::IBoxedValuePtr value);
  37. static PyObject* Next(PyObject* self);
  38. };
  39. #if PY_MAJOR_VERSION >= 3
  40. #define Py_TPFLAGS_HAVE_ITER 0
  41. #endif
  42. PyTypeObject PyStreamType = {
  43. PyVarObject_HEAD_INIT(&PyType_Type, 0)
  44. INIT_MEMBER(tp_name , "yql.TStream"),
  45. INIT_MEMBER(tp_basicsize , sizeof(TPyStream)),
  46. INIT_MEMBER(tp_itemsize , 0),
  47. INIT_MEMBER(tp_dealloc , TPyStream::Dealloc),
  48. #if PY_VERSION_HEX < 0x030800b4
  49. INIT_MEMBER(tp_print , nullptr),
  50. #else
  51. INIT_MEMBER(tp_vectorcall_offset, 0),
  52. #endif
  53. INIT_MEMBER(tp_getattr , nullptr),
  54. INIT_MEMBER(tp_setattr , nullptr),
  55. #if PY_MAJOR_VERSION >= 3
  56. INIT_MEMBER(tp_as_async , nullptr),
  57. #else
  58. INIT_MEMBER(tp_compare , nullptr),
  59. #endif
  60. INIT_MEMBER(tp_repr , TPyStream::Repr),
  61. INIT_MEMBER(tp_as_number , nullptr),
  62. INIT_MEMBER(tp_as_sequence , nullptr),
  63. INIT_MEMBER(tp_as_mapping , nullptr),
  64. INIT_MEMBER(tp_hash , nullptr),
  65. INIT_MEMBER(tp_call , nullptr),
  66. INIT_MEMBER(tp_str , nullptr),
  67. INIT_MEMBER(tp_getattro , nullptr),
  68. INIT_MEMBER(tp_setattro , nullptr),
  69. INIT_MEMBER(tp_as_buffer , nullptr),
  70. INIT_MEMBER(tp_flags , Py_TPFLAGS_HAVE_ITER),
  71. INIT_MEMBER(tp_doc , "yql.TStream object"),
  72. INIT_MEMBER(tp_traverse , nullptr),
  73. INIT_MEMBER(tp_clear , nullptr),
  74. INIT_MEMBER(tp_richcompare , nullptr),
  75. INIT_MEMBER(tp_weaklistoffset , 0),
  76. INIT_MEMBER(tp_iter , PyObject_SelfIter),
  77. INIT_MEMBER(tp_iternext , TPyStream::Next),
  78. INIT_MEMBER(tp_methods , nullptr),
  79. INIT_MEMBER(tp_members , nullptr),
  80. INIT_MEMBER(tp_getset , nullptr),
  81. INIT_MEMBER(tp_base , nullptr),
  82. INIT_MEMBER(tp_dict , nullptr),
  83. INIT_MEMBER(tp_descr_get , nullptr),
  84. INIT_MEMBER(tp_descr_set , nullptr),
  85. INIT_MEMBER(tp_dictoffset , 0),
  86. INIT_MEMBER(tp_init , nullptr),
  87. INIT_MEMBER(tp_alloc , nullptr),
  88. INIT_MEMBER(tp_new , nullptr),
  89. INIT_MEMBER(tp_free , nullptr),
  90. INIT_MEMBER(tp_is_gc , nullptr),
  91. INIT_MEMBER(tp_bases , nullptr),
  92. INIT_MEMBER(tp_mro , nullptr),
  93. INIT_MEMBER(tp_cache , nullptr),
  94. INIT_MEMBER(tp_subclasses , nullptr),
  95. INIT_MEMBER(tp_weaklist , nullptr),
  96. INIT_MEMBER(tp_del , nullptr),
  97. INIT_MEMBER(tp_version_tag , 0),
  98. #if PY_MAJOR_VERSION >= 3
  99. INIT_MEMBER(tp_finalize , nullptr),
  100. #endif
  101. #if PY_VERSION_HEX >= 0x030800b1
  102. INIT_MEMBER(tp_vectorcall , nullptr),
  103. #endif
  104. #if PY_VERSION_HEX >= 0x030800b4 && PY_VERSION_HEX < 0x03090000
  105. INIT_MEMBER(tp_print , nullptr),
  106. #endif
  107. };
  108. PyObject* TPyStream::New(
  109. const TPyCastContext::TPtr& castCtx,
  110. const NUdf::TType* type,
  111. NUdf::IBoxedValuePtr value)
  112. {
  113. TPyStream* stream = new TPyStream;
  114. PyObject_INIT(stream, &PyStreamType);
  115. stream->CastCtx = castCtx;
  116. stream->Value.Set(castCtx->PyCtx, value);
  117. const NUdf::TStreamTypeInspector inspector(*castCtx->PyCtx->TypeInfoHelper, type);
  118. stream->ItemType = inspector.GetItemType();
  119. return reinterpret_cast<PyObject*>(stream);
  120. }
  121. PyObject* TPyStream::Next(PyObject* self) {
  122. PY_TRY {
  123. TPyStream* stream = Cast(self);
  124. NUdf::TUnboxedValue item;
  125. auto status = NUdf::TBoxedValueAccessor::Fetch(*stream->Value.Get(), item);
  126. switch (status) {
  127. case NUdf::EFetchStatus::Ok:
  128. return ToPyObject(stream->CastCtx, stream->ItemType, item)
  129. .Release();
  130. case NUdf::EFetchStatus::Finish:
  131. return nullptr;
  132. case NUdf::EFetchStatus::Yield:
  133. PyErr_SetNone(PyYieldIterationException);
  134. return nullptr;
  135. default:
  136. Y_ABORT("Unknown stream status");
  137. }
  138. } PY_CATCH(nullptr)
  139. }
  140. //////////////////////////////////////////////////////////////////////////////
  141. // TStreamOverPyIter
  142. //////////////////////////////////////////////////////////////////////////////
  143. class TStreamOverPyIter final: public NUdf::TBoxedValue {
  144. public:
  145. TStreamOverPyIter(
  146. TPyCastContext::TPtr castCtx,
  147. const NUdf::TType* itemType,
  148. TPyObjectPtr pyIter,
  149. TPyObjectPtr pyIterable,
  150. TPyObjectPtr pyGeneratorCallable,
  151. TPyObjectPtr pyGeneratorCallableClosure,
  152. TPyObjectPtr pyGeneratorCallableArgs)
  153. : CastCtx_(std::move(castCtx))
  154. , ItemType_(itemType)
  155. , PyIter_(std::move(pyIter))
  156. , PyIterable_(std::move(pyIterable))
  157. , PyGeneratorCallable_(std::move(pyGeneratorCallable))
  158. , PyGeneratorCallableClosure_(std::move(pyGeneratorCallableClosure))
  159. , PyGeneratorCallableArgs_(std::move(pyGeneratorCallableArgs))
  160. {
  161. }
  162. ~TStreamOverPyIter() {
  163. TPyGilLocker lock;
  164. PyIter_.Reset();
  165. PyIterable_.Reset();
  166. PyGeneratorCallableArgs_.Reset();
  167. PyGeneratorCallableClosure_.Reset();
  168. PyGeneratorCallable_.Reset();
  169. }
  170. private:
  171. NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
  172. try {
  173. TPyGilLocker lock;
  174. TPyObjectPtr next(PyIter_Next(PyIter_.Get()));
  175. if (next) {
  176. if (PyErr_GivenExceptionMatches(next.Get(), PyYieldIterationException)) {
  177. return NUdf::EFetchStatus::Yield;
  178. }
  179. result = FromPyObject(CastCtx_, ItemType_, next.Get());
  180. return NUdf::EFetchStatus::Ok;
  181. }
  182. if (PyObject* ex = PyErr_Occurred()) {
  183. if (PyErr_GivenExceptionMatches(ex, PyYieldIterationException)) {
  184. PyErr_Clear();
  185. TPyObjectPtr iterable;
  186. TPyObjectPtr iter;
  187. if (PyIterable_) {
  188. PyIter_.Reset();
  189. iterable = PyIterable_;
  190. } else if (PyGeneratorCallable_) {
  191. PyIter_.Reset();
  192. TPyObjectPtr result(PyObject_CallObject(PyGeneratorCallable_.Get(), PyGeneratorCallableArgs_.Get()));
  193. if (!result) {
  194. UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << "Failed to execute:\n" << GetLastErrorAsString()).data());
  195. }
  196. if (PyGen_Check(result.Get())) {
  197. iterable = std::move(result);
  198. } else if (PyIter_Check(result.Get())) {
  199. iter = std::move(result);
  200. } else {
  201. UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << "Expected iterator or generator, but got " << PyObjectRepr(result.Get())).data());
  202. }
  203. } else {
  204. return NUdf::EFetchStatus::Yield;
  205. }
  206. if (!iter) {
  207. iter.ResetSteal(PyObject_GetIter(iterable.Get()));
  208. if (!iter) {
  209. UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << GetLastErrorAsString()).data());
  210. }
  211. }
  212. PyIter_.ResetAddRef(iter.Get());
  213. return NUdf::EFetchStatus::Yield;
  214. }
  215. UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << GetLastErrorAsString()).data());
  216. }
  217. return NUdf::EFetchStatus::Finish;
  218. }
  219. catch (const yexception& e) {
  220. UdfTerminate((TStringBuilder() << CastCtx_->PyCtx->Pos << e.what()).data());
  221. }
  222. }
  223. private:
  224. TPyCastContext::TPtr CastCtx_;
  225. const NUdf::TType* ItemType_;
  226. TPyObjectPtr PyIter_;
  227. TPyObjectPtr PyIterable_;
  228. TPyObjectPtr PyGeneratorCallable_;
  229. TPyObjectPtr PyGeneratorCallableClosure_;
  230. TPyObjectPtr PyGeneratorCallableArgs_;
  231. };
  232. //////////////////////////////////////////////////////////////////////////////
  233. // public functions
  234. //////////////////////////////////////////////////////////////////////////////
  235. TPyObjectPtr ToPyStream(
  236. const TPyCastContext::TPtr& castCtx,
  237. const NKikimr::NUdf::TType* type,
  238. const NKikimr::NUdf::TUnboxedValuePod& value)
  239. {
  240. return TPyStream::New(castCtx, type, value.AsBoxed());
  241. }
  242. NKikimr::NUdf::TUnboxedValue FromPyStream(
  243. const TPyCastContext::TPtr& castCtx,
  244. const NKikimr::NUdf::TType* type,
  245. const TPyObjectPtr& value,
  246. const TPyObjectPtr& originalCallable,
  247. const TPyObjectPtr& originalCallableClosure,
  248. const TPyObjectPtr& originalCallableArgs
  249. )
  250. {
  251. const NUdf::TStreamTypeInspector inspector(*castCtx->PyCtx->TypeInfoHelper, type);
  252. const NUdf::TType* itemType = inspector.GetItemType();
  253. if (PyGen_Check(value.Get())) {
  254. TPyObjectPtr iter(PyObject_GetIter(value.Get()));
  255. if (!iter) {
  256. UdfTerminate((TStringBuilder() << castCtx->PyCtx->Pos << GetLastErrorAsString()).data());
  257. }
  258. return NUdf::TUnboxedValuePod(new TStreamOverPyIter(castCtx, itemType, std::move(iter), nullptr,
  259. originalCallable, originalCallableClosure, originalCallableArgs));
  260. }
  261. if (PyIter_Check(value.Get())
  262. #if PY_MAJOR_VERSION < 3
  263. // python 2 iterators must also implement "next" method
  264. && 1 == PyObject_HasAttrString(value.Get(), "next")
  265. #endif
  266. ) {
  267. TPyObjectPtr iter(value.Get(), TPyObjectPtr::ADD_REF);
  268. return NUdf::TUnboxedValuePod(new TStreamOverPyIter(castCtx, itemType, std::move(iter), nullptr,
  269. originalCallable, originalCallableClosure, originalCallableArgs));
  270. }
  271. // assume that this function will returns generator
  272. if (PyCallable_Check(value.Get())) {
  273. TPyObjectPtr generator(PyObject_CallObject(value.Get(), nullptr));
  274. if (!generator || !PyGen_Check(generator.Get())) {
  275. UdfTerminate((TStringBuilder() << castCtx->PyCtx->Pos << "Expected generator as a result of function call").data());
  276. }
  277. TPyObjectPtr iter(PyObject_GetIter(generator.Get()));
  278. if (!iter) {
  279. UdfTerminate((TStringBuilder() << castCtx->PyCtx->Pos << GetLastErrorAsString()).data());
  280. }
  281. TPyObjectPtr callableClosure;
  282. if (PyFunction_Check(value.Get())) {
  283. PyObject* closure = PyFunction_GetClosure(value.Get());
  284. if (closure) {
  285. callableClosure = TPyObjectPtr(closure, TPyObjectPtr::ADD_REF);
  286. }
  287. }
  288. return NUdf::TUnboxedValuePod(new TStreamOverPyIter(castCtx, itemType, std::move(iter), nullptr,
  289. originalCallable ? value : nullptr, originalCallable ? callableClosure : nullptr, nullptr));
  290. }
  291. // must be after checking for callable
  292. if (PySequence_Check(value.Get()) || PyObject_HasAttrString(value.Get(), "__iter__")) {
  293. TPyObjectPtr iter(PyObject_GetIter(value.Get()));
  294. if (!iter) {
  295. UdfTerminate((TStringBuilder() << castCtx->PyCtx->Pos << GetLastErrorAsString()).data());
  296. }
  297. return NUdf::TUnboxedValuePod(new TStreamOverPyIter(castCtx, itemType, std::move(iter), originalCallable ? value : nullptr, nullptr, nullptr, nullptr));
  298. }
  299. UdfTerminate((TStringBuilder() << castCtx->PyCtx->Pos << "Expected iterator, generator, generator factory, "
  300. "or iterable object, but got " << PyObjectRepr(value.Get())).data());
  301. }
  302. } // namespace NPython