_xxinterpchannelsmodule.c 61 KB


  1. /* interpreters module */
  2. /* low-level access to interpreter primitives */
  3. #ifndef Py_BUILD_CORE_BUILTIN
  4. # define Py_BUILD_CORE_MODULE 1
  5. #endif
  6. #include "Python.h"
  7. #include "interpreteridobject.h"
  8. #include "pycore_pystate.h" // _PyCrossInterpreterData_ReleaseAndRawFree()
  9. /*
  10. This module has the following process-global state:
  11. _globals (static struct globals):
  12. module_count (int)
  13. channels (struct _channels):
  14. numopen (int64_t)
  15. next_id; (int64_t)
  16. mutex (PyThread_type_lock)
  17. head (linked list of struct _channelref *):
  18. id (int64_t)
  19. objcount (Py_ssize_t)
  20. next (struct _channelref *):
  21. ...
  22. chan (struct _channel *):
  23. open (int)
  24. mutex (PyThread_type_lock)
  25. closing (struct _channel_closing *):
  26. ref (struct _channelref *):
  27. ...
  28. ends (struct _channelends *):
  29. numsendopen (int64_t)
  30. numrecvopen (int64_t)
  31. send (struct _channelend *):
  32. interp (int64_t)
  33. open (int)
  34. next (struct _channelend *)
  35. recv (struct _channelend *):
  36. ...
  37. queue (struct _channelqueue *):
  38. count (int64_t)
  39. first (struct _channelitem *):
  40. next (struct _channelitem *):
  41. ...
  42. data (_PyCrossInterpreterData *):
  43. data (void *)
  44. obj (PyObject *)
  45. interp (int64_t)
  46. new_object (xid_newobjectfunc)
  47. free (xid_freefunc)
  48. last (struct _channelitem *):
  49. ...
  50. The above state includes the following allocations by the module:
  51. * 1 top-level mutex (to protect the rest of the state)
  52. * for each channel:
  53. * 1 struct _channelref
  54. * 1 struct _channel
  55. * 0-1 struct _channel_closing
  56. * 1 struct _channelends
  57. * 2 struct _channelend
  58. * 1 struct _channelqueue
  59. * for each item in each channel:
  60. * 1 struct _channelitem
  61. * 1 _PyCrossInterpreterData
  62. The only objects in that global state are the references held by each
  63. channel's queue, which are safely managed via the _PyCrossInterpreterData_*()
  64. API.. The module does not create any objects that are shared globally.
  65. */
  66. #define MODULE_NAME "_xxinterpchannels"
  67. #define GLOBAL_MALLOC(TYPE) \
  68. PyMem_RawMalloc(sizeof(TYPE))
  69. #define GLOBAL_FREE(VAR) \
  70. PyMem_RawFree(VAR)
  71. static PyInterpreterState *
  72. _get_current_interp(void)
  73. {
  74. // PyInterpreterState_Get() aborts if lookup fails, so don't need
  75. // to check the result for NULL.
  76. return PyInterpreterState_Get();
  77. }
  78. static PyObject *
  79. _get_current_module(void)
  80. {
  81. PyObject *name = PyUnicode_FromString(MODULE_NAME);
  82. if (name == NULL) {
  83. return NULL;
  84. }
  85. PyObject *mod = PyImport_GetModule(name);
  86. Py_DECREF(name);
  87. if (mod == NULL) {
  88. return NULL;
  89. }
  90. assert(mod != Py_None);
  91. return mod;
  92. }
  93. static PyObject *
  94. get_module_from_owned_type(PyTypeObject *cls)
  95. {
  96. assert(cls != NULL);
  97. return _get_current_module();
  98. // XXX Use the more efficient API now that we use heap types:
  99. //return PyType_GetModule(cls);
  100. }
  101. static struct PyModuleDef moduledef;
  102. static PyObject *
  103. get_module_from_type(PyTypeObject *cls)
  104. {
  105. assert(cls != NULL);
  106. return _get_current_module();
  107. // XXX Use the more efficient API now that we use heap types:
  108. //return PyType_GetModuleByDef(cls, &moduledef);
  109. }
  110. static PyObject *
  111. add_new_exception(PyObject *mod, const char *name, PyObject *base)
  112. {
  113. assert(!PyObject_HasAttrString(mod, name));
  114. PyObject *exctype = PyErr_NewException(name, base, NULL);
  115. if (exctype == NULL) {
  116. return NULL;
  117. }
  118. int res = PyModule_AddType(mod, (PyTypeObject *)exctype);
  119. if (res < 0) {
  120. Py_DECREF(exctype);
  121. return NULL;
  122. }
  123. return exctype;
  124. }
  125. #define ADD_NEW_EXCEPTION(MOD, NAME, BASE) \
  126. add_new_exception(MOD, MODULE_NAME "." Py_STRINGIFY(NAME), BASE)
  127. static PyTypeObject *
  128. add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared)
  129. {
  130. PyTypeObject *cls = (PyTypeObject *)PyType_FromMetaclass(
  131. NULL, mod, spec, NULL);
  132. if (cls == NULL) {
  133. return NULL;
  134. }
  135. if (PyModule_AddType(mod, cls) < 0) {
  136. Py_DECREF(cls);
  137. return NULL;
  138. }
  139. if (shared != NULL) {
  140. if (_PyCrossInterpreterData_RegisterClass(cls, shared)) {
  141. Py_DECREF(cls);
  142. return NULL;
  143. }
  144. }
  145. return cls;
  146. }
  147. #define XID_IGNORE_EXC 1
  148. #define XID_FREE 2
  149. static int
  150. _release_xid_data(_PyCrossInterpreterData *data, int flags)
  151. {
  152. int ignoreexc = flags & XID_IGNORE_EXC;
  153. PyObject *exc;
  154. if (ignoreexc) {
  155. exc = PyErr_GetRaisedException();
  156. }
  157. int res;
  158. if (flags & XID_FREE) {
  159. res = _PyCrossInterpreterData_ReleaseAndRawFree(data);
  160. }
  161. else {
  162. res = _PyCrossInterpreterData_Release(data);
  163. }
  164. if (res < 0) {
  165. /* The owning interpreter is already destroyed. */
  166. if (ignoreexc) {
  167. // XXX Emit a warning?
  168. PyErr_Clear();
  169. }
  170. }
  171. if (flags & XID_FREE) {
  172. /* Either way, we free the data. */
  173. }
  174. if (ignoreexc) {
  175. PyErr_SetRaisedException(exc);
  176. }
  177. return res;
  178. }
  179. /* module state *************************************************************/
  180. typedef struct {
  181. /* heap types */
  182. PyTypeObject *ChannelIDType;
  183. /* exceptions */
  184. PyObject *ChannelError;
  185. PyObject *ChannelNotFoundError;
  186. PyObject *ChannelClosedError;
  187. PyObject *ChannelEmptyError;
  188. PyObject *ChannelNotEmptyError;
  189. } module_state;
  190. static inline module_state *
  191. get_module_state(PyObject *mod)
  192. {
  193. assert(mod != NULL);
  194. module_state *state = PyModule_GetState(mod);
  195. assert(state != NULL);
  196. return state;
  197. }
  198. static int
  199. traverse_module_state(module_state *state, visitproc visit, void *arg)
  200. {
  201. /* heap types */
  202. Py_VISIT(state->ChannelIDType);
  203. /* exceptions */
  204. Py_VISIT(state->ChannelError);
  205. Py_VISIT(state->ChannelNotFoundError);
  206. Py_VISIT(state->ChannelClosedError);
  207. Py_VISIT(state->ChannelEmptyError);
  208. Py_VISIT(state->ChannelNotEmptyError);
  209. return 0;
  210. }
  211. static int
  212. clear_module_state(module_state *state)
  213. {
  214. /* heap types */
  215. if (state->ChannelIDType != NULL) {
  216. (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType);
  217. }
  218. Py_CLEAR(state->ChannelIDType);
  219. /* exceptions */
  220. Py_CLEAR(state->ChannelError);
  221. Py_CLEAR(state->ChannelNotFoundError);
  222. Py_CLEAR(state->ChannelClosedError);
  223. Py_CLEAR(state->ChannelEmptyError);
  224. Py_CLEAR(state->ChannelNotEmptyError);
  225. return 0;
  226. }
  227. /* channel-specific code ****************************************************/
  228. #define CHANNEL_SEND 1
  229. #define CHANNEL_BOTH 0
  230. #define CHANNEL_RECV -1
  231. /* channel errors */
  232. #define ERR_CHANNEL_NOT_FOUND -2
  233. #define ERR_CHANNEL_CLOSED -3
  234. #define ERR_CHANNEL_INTERP_CLOSED -4
  235. #define ERR_CHANNEL_EMPTY -5
  236. #define ERR_CHANNEL_NOT_EMPTY -6
  237. #define ERR_CHANNEL_MUTEX_INIT -7
  238. #define ERR_CHANNELS_MUTEX_INIT -8
  239. #define ERR_NO_NEXT_CHANNEL_ID -9
  240. static int
  241. exceptions_init(PyObject *mod)
  242. {
  243. module_state *state = get_module_state(mod);
  244. if (state == NULL) {
  245. return -1;
  246. }
  247. #define ADD(NAME, BASE) \
  248. do { \
  249. assert(state->NAME == NULL); \
  250. state->NAME = ADD_NEW_EXCEPTION(mod, NAME, BASE); \
  251. if (state->NAME == NULL) { \
  252. return -1; \
  253. } \
  254. } while (0)
  255. // A channel-related operation failed.
  256. ADD(ChannelError, PyExc_RuntimeError);
  257. // An operation tried to use a channel that doesn't exist.
  258. ADD(ChannelNotFoundError, state->ChannelError);
  259. // An operation tried to use a closed channel.
  260. ADD(ChannelClosedError, state->ChannelError);
  261. // An operation tried to pop from an empty channel.
  262. ADD(ChannelEmptyError, state->ChannelError);
  263. // An operation tried to close a non-empty channel.
  264. ADD(ChannelNotEmptyError, state->ChannelError);
  265. #undef ADD
  266. return 0;
  267. }
  268. static int
  269. handle_channel_error(int err, PyObject *mod, int64_t cid)
  270. {
  271. if (err == 0) {
  272. assert(!PyErr_Occurred());
  273. return 0;
  274. }
  275. assert(err < 0);
  276. module_state *state = get_module_state(mod);
  277. assert(state != NULL);
  278. if (err == ERR_CHANNEL_NOT_FOUND) {
  279. PyErr_Format(state->ChannelNotFoundError,
  280. "channel %" PRId64 " not found", cid);
  281. }
  282. else if (err == ERR_CHANNEL_CLOSED) {
  283. PyErr_Format(state->ChannelClosedError,
  284. "channel %" PRId64 " is closed", cid);
  285. }
  286. else if (err == ERR_CHANNEL_INTERP_CLOSED) {
  287. PyErr_Format(state->ChannelClosedError,
  288. "channel %" PRId64 " is already closed", cid);
  289. }
  290. else if (err == ERR_CHANNEL_EMPTY) {
  291. PyErr_Format(state->ChannelEmptyError,
  292. "channel %" PRId64 " is empty", cid);
  293. }
  294. else if (err == ERR_CHANNEL_NOT_EMPTY) {
  295. PyErr_Format(state->ChannelNotEmptyError,
  296. "channel %" PRId64 " may not be closed "
  297. "if not empty (try force=True)",
  298. cid);
  299. }
  300. else if (err == ERR_CHANNEL_MUTEX_INIT) {
  301. PyErr_SetString(state->ChannelError,
  302. "can't initialize mutex for new channel");
  303. }
  304. else if (err == ERR_CHANNELS_MUTEX_INIT) {
  305. PyErr_SetString(state->ChannelError,
  306. "can't initialize mutex for channel management");
  307. }
  308. else if (err == ERR_NO_NEXT_CHANNEL_ID) {
  309. PyErr_SetString(state->ChannelError,
  310. "failed to get a channel ID");
  311. }
  312. else {
  313. assert(PyErr_Occurred());
  314. }
  315. return 1;
  316. }
  317. /* the channel queue */
  318. struct _channelitem;
  319. typedef struct _channelitem {
  320. _PyCrossInterpreterData *data;
  321. struct _channelitem *next;
  322. } _channelitem;
  323. static _channelitem *
  324. _channelitem_new(void)
  325. {
  326. _channelitem *item = GLOBAL_MALLOC(_channelitem);
  327. if (item == NULL) {
  328. PyErr_NoMemory();
  329. return NULL;
  330. }
  331. item->data = NULL;
  332. item->next = NULL;
  333. return item;
  334. }
  335. static void
  336. _channelitem_clear(_channelitem *item)
  337. {
  338. if (item->data != NULL) {
  339. // It was allocated in _channel_send().
  340. (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
  341. item->data = NULL;
  342. }
  343. item->next = NULL;
  344. }
  345. static void
  346. _channelitem_free(_channelitem *item)
  347. {
  348. _channelitem_clear(item);
  349. GLOBAL_FREE(item);
  350. }
  351. static void
  352. _channelitem_free_all(_channelitem *item)
  353. {
  354. while (item != NULL) {
  355. _channelitem *last = item;
  356. item = item->next;
  357. _channelitem_free(last);
  358. }
  359. }
  360. static _PyCrossInterpreterData *
  361. _channelitem_popped(_channelitem *item)
  362. {
  363. _PyCrossInterpreterData *data = item->data;
  364. item->data = NULL;
  365. _channelitem_free(item);
  366. return data;
  367. }
  368. typedef struct _channelqueue {
  369. int64_t count;
  370. _channelitem *first;
  371. _channelitem *last;
  372. } _channelqueue;
  373. static _channelqueue *
  374. _channelqueue_new(void)
  375. {
  376. _channelqueue *queue = GLOBAL_MALLOC(_channelqueue);
  377. if (queue == NULL) {
  378. PyErr_NoMemory();
  379. return NULL;
  380. }
  381. queue->count = 0;
  382. queue->first = NULL;
  383. queue->last = NULL;
  384. return queue;
  385. }
  386. static void
  387. _channelqueue_clear(_channelqueue *queue)
  388. {
  389. _channelitem_free_all(queue->first);
  390. queue->count = 0;
  391. queue->first = NULL;
  392. queue->last = NULL;
  393. }
  394. static void
  395. _channelqueue_free(_channelqueue *queue)
  396. {
  397. _channelqueue_clear(queue);
  398. GLOBAL_FREE(queue);
  399. }
  400. static int
  401. _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
  402. {
  403. _channelitem *item = _channelitem_new();
  404. if (item == NULL) {
  405. return -1;
  406. }
  407. item->data = data;
  408. queue->count += 1;
  409. if (queue->first == NULL) {
  410. queue->first = item;
  411. }
  412. else {
  413. queue->last->next = item;
  414. }
  415. queue->last = item;
  416. return 0;
  417. }
  418. static _PyCrossInterpreterData *
  419. _channelqueue_get(_channelqueue *queue)
  420. {
  421. _channelitem *item = queue->first;
  422. if (item == NULL) {
  423. return NULL;
  424. }
  425. queue->first = item->next;
  426. if (queue->last == item) {
  427. queue->last = NULL;
  428. }
  429. queue->count -= 1;
  430. return _channelitem_popped(item);
  431. }
  432. static void
  433. _channelqueue_drop_interpreter(_channelqueue *queue, int64_t interp)
  434. {
  435. _channelitem *prev = NULL;
  436. _channelitem *next = queue->first;
  437. while (next != NULL) {
  438. _channelitem *item = next;
  439. next = item->next;
  440. if (item->data->interp == interp) {
  441. if (prev == NULL) {
  442. queue->first = item->next;
  443. }
  444. else {
  445. prev->next = item->next;
  446. }
  447. _channelitem_free(item);
  448. queue->count -= 1;
  449. }
  450. else {
  451. prev = item;
  452. }
  453. }
  454. }
  455. /* channel-interpreter associations */
  456. struct _channelend;
  457. typedef struct _channelend {
  458. struct _channelend *next;
  459. int64_t interp;
  460. int open;
  461. } _channelend;
  462. static _channelend *
  463. _channelend_new(int64_t interp)
  464. {
  465. _channelend *end = GLOBAL_MALLOC(_channelend);
  466. if (end == NULL) {
  467. PyErr_NoMemory();
  468. return NULL;
  469. }
  470. end->next = NULL;
  471. end->interp = interp;
  472. end->open = 1;
  473. return end;
  474. }
  475. static void
  476. _channelend_free(_channelend *end)
  477. {
  478. GLOBAL_FREE(end);
  479. }
  480. static void
  481. _channelend_free_all(_channelend *end)
  482. {
  483. while (end != NULL) {
  484. _channelend *last = end;
  485. end = end->next;
  486. _channelend_free(last);
  487. }
  488. }
  489. static _channelend *
  490. _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
  491. {
  492. _channelend *prev = NULL;
  493. _channelend *end = first;
  494. while (end != NULL) {
  495. if (end->interp == interp) {
  496. break;
  497. }
  498. prev = end;
  499. end = end->next;
  500. }
  501. if (pprev != NULL) {
  502. *pprev = prev;
  503. }
  504. return end;
  505. }
  506. typedef struct _channelassociations {
  507. // Note that the list entries are never removed for interpreter
  508. // for which the channel is closed. This should not be a problem in
  509. // practice. Also, a channel isn't automatically closed when an
  510. // interpreter is destroyed.
  511. int64_t numsendopen;
  512. int64_t numrecvopen;
  513. _channelend *send;
  514. _channelend *recv;
  515. } _channelends;
  516. static _channelends *
  517. _channelends_new(void)
  518. {
  519. _channelends *ends = GLOBAL_MALLOC(_channelends);
  520. if (ends== NULL) {
  521. return NULL;
  522. }
  523. ends->numsendopen = 0;
  524. ends->numrecvopen = 0;
  525. ends->send = NULL;
  526. ends->recv = NULL;
  527. return ends;
  528. }
  529. static void
  530. _channelends_clear(_channelends *ends)
  531. {
  532. _channelend_free_all(ends->send);
  533. ends->send = NULL;
  534. ends->numsendopen = 0;
  535. _channelend_free_all(ends->recv);
  536. ends->recv = NULL;
  537. ends->numrecvopen = 0;
  538. }
  539. static void
  540. _channelends_free(_channelends *ends)
  541. {
  542. _channelends_clear(ends);
  543. GLOBAL_FREE(ends);
  544. }
  545. static _channelend *
  546. _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
  547. int send)
  548. {
  549. _channelend *end = _channelend_new(interp);
  550. if (end == NULL) {
  551. return NULL;
  552. }
  553. if (prev == NULL) {
  554. if (send) {
  555. ends->send = end;
  556. }
  557. else {
  558. ends->recv = end;
  559. }
  560. }
  561. else {
  562. prev->next = end;
  563. }
  564. if (send) {
  565. ends->numsendopen += 1;
  566. }
  567. else {
  568. ends->numrecvopen += 1;
  569. }
  570. return end;
  571. }
  572. static int
  573. _channelends_associate(_channelends *ends, int64_t interp, int send)
  574. {
  575. _channelend *prev;
  576. _channelend *end = _channelend_find(send ? ends->send : ends->recv,
  577. interp, &prev);
  578. if (end != NULL) {
  579. if (!end->open) {
  580. return ERR_CHANNEL_CLOSED;
  581. }
  582. // already associated
  583. return 0;
  584. }
  585. if (_channelends_add(ends, prev, interp, send) == NULL) {
  586. return -1;
  587. }
  588. return 0;
  589. }
  590. static int
  591. _channelends_is_open(_channelends *ends)
  592. {
  593. if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
  594. return 1;
  595. }
  596. if (ends->send == NULL && ends->recv == NULL) {
  597. return 1;
  598. }
  599. return 0;
  600. }
  601. static void
  602. _channelends_close_end(_channelends *ends, _channelend *end, int send)
  603. {
  604. end->open = 0;
  605. if (send) {
  606. ends->numsendopen -= 1;
  607. }
  608. else {
  609. ends->numrecvopen -= 1;
  610. }
  611. }
  612. static int
  613. _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
  614. {
  615. _channelend *prev;
  616. _channelend *end;
  617. if (which >= 0) { // send/both
  618. end = _channelend_find(ends->send, interp, &prev);
  619. if (end == NULL) {
  620. // never associated so add it
  621. end = _channelends_add(ends, prev, interp, 1);
  622. if (end == NULL) {
  623. return -1;
  624. }
  625. }
  626. _channelends_close_end(ends, end, 1);
  627. }
  628. if (which <= 0) { // recv/both
  629. end = _channelend_find(ends->recv, interp, &prev);
  630. if (end == NULL) {
  631. // never associated so add it
  632. end = _channelends_add(ends, prev, interp, 0);
  633. if (end == NULL) {
  634. return -1;
  635. }
  636. }
  637. _channelends_close_end(ends, end, 0);
  638. }
  639. return 0;
  640. }
  641. static void
  642. _channelends_drop_interpreter(_channelends *ends, int64_t interp)
  643. {
  644. _channelend *end;
  645. end = _channelend_find(ends->send, interp, NULL);
  646. if (end != NULL) {
  647. _channelends_close_end(ends, end, 1);
  648. }
  649. end = _channelend_find(ends->recv, interp, NULL);
  650. if (end != NULL) {
  651. _channelends_close_end(ends, end, 0);
  652. }
  653. }
  654. static void
  655. _channelends_close_all(_channelends *ends, int which, int force)
  656. {
  657. // XXX Handle the ends.
  658. // XXX Handle force is True.
  659. // Ensure all the "send"-associated interpreters are closed.
  660. _channelend *end;
  661. for (end = ends->send; end != NULL; end = end->next) {
  662. _channelends_close_end(ends, end, 1);
  663. }
  664. // Ensure all the "recv"-associated interpreters are closed.
  665. for (end = ends->recv; end != NULL; end = end->next) {
  666. _channelends_close_end(ends, end, 0);
  667. }
  668. }
  669. /* channels */
  670. struct _channel;
  671. struct _channel_closing;
  672. static void _channel_clear_closing(struct _channel *);
  673. static void _channel_finish_closing(struct _channel *);
  674. typedef struct _channel {
  675. PyThread_type_lock mutex;
  676. _channelqueue *queue;
  677. _channelends *ends;
  678. int open;
  679. struct _channel_closing *closing;
  680. } _PyChannelState;
  681. static _PyChannelState *
  682. _channel_new(PyThread_type_lock mutex)
  683. {
  684. _PyChannelState *chan = GLOBAL_MALLOC(_PyChannelState);
  685. if (chan == NULL) {
  686. return NULL;
  687. }
  688. chan->mutex = mutex;
  689. chan->queue = _channelqueue_new();
  690. if (chan->queue == NULL) {
  691. GLOBAL_FREE(chan);
  692. return NULL;
  693. }
  694. chan->ends = _channelends_new();
  695. if (chan->ends == NULL) {
  696. _channelqueue_free(chan->queue);
  697. GLOBAL_FREE(chan);
  698. return NULL;
  699. }
  700. chan->open = 1;
  701. chan->closing = NULL;
  702. return chan;
  703. }
  704. static void
  705. _channel_free(_PyChannelState *chan)
  706. {
  707. _channel_clear_closing(chan);
  708. PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
  709. _channelqueue_free(chan->queue);
  710. _channelends_free(chan->ends);
  711. PyThread_release_lock(chan->mutex);
  712. PyThread_free_lock(chan->mutex);
  713. GLOBAL_FREE(chan);
  714. }
  715. static int
  716. _channel_add(_PyChannelState *chan, int64_t interp,
  717. _PyCrossInterpreterData *data)
  718. {
  719. int res = -1;
  720. PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
  721. if (!chan->open) {
  722. res = ERR_CHANNEL_CLOSED;
  723. goto done;
  724. }
  725. if (_channelends_associate(chan->ends, interp, 1) != 0) {
  726. res = ERR_CHANNEL_INTERP_CLOSED;
  727. goto done;
  728. }
  729. if (_channelqueue_put(chan->queue, data) != 0) {
  730. goto done;
  731. }
  732. res = 0;
  733. done:
  734. PyThread_release_lock(chan->mutex);
  735. return res;
  736. }
  737. static int
  738. _channel_next(_PyChannelState *chan, int64_t interp,
  739. _PyCrossInterpreterData **res)
  740. {
  741. int err = 0;
  742. PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
  743. if (!chan->open) {
  744. err = ERR_CHANNEL_CLOSED;
  745. goto done;
  746. }
  747. if (_channelends_associate(chan->ends, interp, 0) != 0) {
  748. err = ERR_CHANNEL_INTERP_CLOSED;
  749. goto done;
  750. }
  751. _PyCrossInterpreterData *data = _channelqueue_get(chan->queue);
  752. if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
  753. chan->open = 0;
  754. }
  755. *res = data;
  756. done:
  757. PyThread_release_lock(chan->mutex);
  758. if (chan->queue->count == 0) {
  759. _channel_finish_closing(chan);
  760. }
  761. return err;
  762. }
  763. static int
  764. _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
  765. {
  766. PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
  767. int res = -1;
  768. if (!chan->open) {
  769. res = ERR_CHANNEL_CLOSED;
  770. goto done;
  771. }
  772. if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
  773. goto done;
  774. }
  775. chan->open = _channelends_is_open(chan->ends);
  776. res = 0;
  777. done:
  778. PyThread_release_lock(chan->mutex);
  779. return res;
  780. }
  781. static void
  782. _channel_drop_interpreter(_PyChannelState *chan, int64_t interp)
  783. {
  784. PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
  785. _channelqueue_drop_interpreter(chan->queue, interp);
  786. _channelends_drop_interpreter(chan->ends, interp);
  787. chan->open = _channelends_is_open(chan->ends);
  788. PyThread_release_lock(chan->mutex);
  789. }
  790. static int
  791. _channel_close_all(_PyChannelState *chan, int end, int force)
  792. {
  793. int res = -1;
  794. PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
  795. if (!chan->open) {
  796. res = ERR_CHANNEL_CLOSED;
  797. goto done;
  798. }
  799. if (!force && chan->queue->count > 0) {
  800. res = ERR_CHANNEL_NOT_EMPTY;
  801. goto done;
  802. }
  803. chan->open = 0;
  804. // We *could* also just leave these in place, since we've marked
  805. // the channel as closed already.
  806. _channelends_close_all(chan->ends, end, force);
  807. res = 0;
  808. done:
  809. PyThread_release_lock(chan->mutex);
  810. return res;
  811. }
  812. /* the set of channels */
  813. struct _channelref;
  814. typedef struct _channelref {
  815. int64_t id;
  816. _PyChannelState *chan;
  817. struct _channelref *next;
  818. Py_ssize_t objcount;
  819. } _channelref;
  820. static _channelref *
  821. _channelref_new(int64_t id, _PyChannelState *chan)
  822. {
  823. _channelref *ref = GLOBAL_MALLOC(_channelref);
  824. if (ref == NULL) {
  825. return NULL;
  826. }
  827. ref->id = id;
  828. ref->chan = chan;
  829. ref->next = NULL;
  830. ref->objcount = 0;
  831. return ref;
  832. }
  833. //static void
  834. //_channelref_clear(_channelref *ref)
  835. //{
  836. // ref->id = -1;
  837. // ref->chan = NULL;
  838. // ref->next = NULL;
  839. // ref->objcount = 0;
  840. //}
  841. static void
  842. _channelref_free(_channelref *ref)
  843. {
  844. if (ref->chan != NULL) {
  845. _channel_clear_closing(ref->chan);
  846. }
  847. //_channelref_clear(ref);
  848. GLOBAL_FREE(ref);
  849. }
  850. static _channelref *
  851. _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
  852. {
  853. _channelref *prev = NULL;
  854. _channelref *ref = first;
  855. while (ref != NULL) {
  856. if (ref->id == id) {
  857. break;
  858. }
  859. prev = ref;
  860. ref = ref->next;
  861. }
  862. if (pprev != NULL) {
  863. *pprev = prev;
  864. }
  865. return ref;
  866. }
  867. typedef struct _channels {
  868. PyThread_type_lock mutex;
  869. _channelref *head;
  870. int64_t numopen;
  871. int64_t next_id;
  872. } _channels;
  873. static void
  874. _channels_init(_channels *channels, PyThread_type_lock mutex)
  875. {
  876. channels->mutex = mutex;
  877. channels->head = NULL;
  878. channels->numopen = 0;
  879. channels->next_id = 0;
  880. }
  881. static void
  882. _channels_fini(_channels *channels)
  883. {
  884. assert(channels->numopen == 0);
  885. assert(channels->head == NULL);
  886. if (channels->mutex != NULL) {
  887. PyThread_free_lock(channels->mutex);
  888. channels->mutex = NULL;
  889. }
  890. }
  891. static int64_t
  892. _channels_next_id(_channels *channels) // needs lock
  893. {
  894. int64_t id = channels->next_id;
  895. if (id < 0) {
  896. /* overflow */
  897. return -1;
  898. }
  899. channels->next_id += 1;
  900. return id;
  901. }
  902. static int
  903. _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex,
  904. _PyChannelState **res)
  905. {
  906. int err = -1;
  907. _PyChannelState *chan = NULL;
  908. PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
  909. if (pmutex != NULL) {
  910. *pmutex = NULL;
  911. }
  912. _channelref *ref = _channelref_find(channels->head, id, NULL);
  913. if (ref == NULL) {
  914. err = ERR_CHANNEL_NOT_FOUND;
  915. goto done;
  916. }
  917. if (ref->chan == NULL || !ref->chan->open) {
  918. err = ERR_CHANNEL_CLOSED;
  919. goto done;
  920. }
  921. if (pmutex != NULL) {
  922. // The mutex will be closed by the caller.
  923. *pmutex = channels->mutex;
  924. }
  925. chan = ref->chan;
  926. err = 0;
  927. done:
  928. if (pmutex == NULL || *pmutex == NULL) {
  929. PyThread_release_lock(channels->mutex);
  930. }
  931. *res = chan;
  932. return err;
  933. }
  934. static int64_t
  935. _channels_add(_channels *channels, _PyChannelState *chan)
  936. {
  937. int64_t cid = -1;
  938. PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
  939. // Create a new ref.
  940. int64_t id = _channels_next_id(channels);
  941. if (id < 0) {
  942. cid = ERR_NO_NEXT_CHANNEL_ID;
  943. goto done;
  944. }
  945. _channelref *ref = _channelref_new(id, chan);
  946. if (ref == NULL) {
  947. goto done;
  948. }
  949. // Add it to the list.
  950. // We assume that the channel is a new one (not already in the list).
  951. ref->next = channels->head;
  952. channels->head = ref;
  953. channels->numopen += 1;
  954. cid = id;
  955. done:
  956. PyThread_release_lock(channels->mutex);
  957. return cid;
  958. }
  959. /* forward */
  960. static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
  961. static int
  962. _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
  963. int end, int force)
  964. {
  965. int res = -1;
  966. PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
  967. if (pchan != NULL) {
  968. *pchan = NULL;
  969. }
  970. _channelref *ref = _channelref_find(channels->head, cid, NULL);
  971. if (ref == NULL) {
  972. res = ERR_CHANNEL_NOT_FOUND;
  973. goto done;
  974. }
  975. if (ref->chan == NULL) {
  976. res = ERR_CHANNEL_CLOSED;
  977. goto done;
  978. }
  979. else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
  980. res = ERR_CHANNEL_CLOSED;
  981. goto done;
  982. }
  983. else {
  984. int err = _channel_close_all(ref->chan, end, force);
  985. if (err != 0) {
  986. if (end == CHANNEL_SEND && err == ERR_CHANNEL_NOT_EMPTY) {
  987. if (ref->chan->closing != NULL) {
  988. res = ERR_CHANNEL_CLOSED;
  989. goto done;
  990. }
  991. // Mark the channel as closing and return. The channel
  992. // will be cleaned up in _channel_next().
  993. PyErr_Clear();
  994. int err = _channel_set_closing(ref, channels->mutex);
  995. if (err != 0) {
  996. res = err;
  997. goto done;
  998. }
  999. if (pchan != NULL) {
  1000. *pchan = ref->chan;
  1001. }
  1002. res = 0;
  1003. }
  1004. else {
  1005. res = err;
  1006. }
  1007. goto done;
  1008. }
  1009. if (pchan != NULL) {
  1010. *pchan = ref->chan;
  1011. }
  1012. else {
  1013. _channel_free(ref->chan);
  1014. }
  1015. ref->chan = NULL;
  1016. }
  1017. res = 0;
  1018. done:
  1019. PyThread_release_lock(channels->mutex);
  1020. return res;
  1021. }
  1022. static void
  1023. _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
  1024. _PyChannelState **pchan)
  1025. {
  1026. if (ref == channels->head) {
  1027. channels->head = ref->next;
  1028. }
  1029. else {
  1030. prev->next = ref->next;
  1031. }
  1032. channels->numopen -= 1;
  1033. if (pchan != NULL) {
  1034. *pchan = ref->chan;
  1035. }
  1036. _channelref_free(ref);
  1037. }
  1038. static int
  1039. _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
  1040. {
  1041. int res = -1;
  1042. PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
  1043. if (pchan != NULL) {
  1044. *pchan = NULL;
  1045. }
  1046. _channelref *prev = NULL;
  1047. _channelref *ref = _channelref_find(channels->head, id, &prev);
  1048. if (ref == NULL) {
  1049. res = ERR_CHANNEL_NOT_FOUND;
  1050. goto done;
  1051. }
  1052. _channels_remove_ref(channels, ref, prev, pchan);
  1053. res = 0;
  1054. done:
  1055. PyThread_release_lock(channels->mutex);
  1056. return res;
  1057. }
  1058. static int
  1059. _channels_add_id_object(_channels *channels, int64_t id)
  1060. {
  1061. int res = -1;
  1062. PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
  1063. _channelref *ref = _channelref_find(channels->head, id, NULL);
  1064. if (ref == NULL) {
  1065. res = ERR_CHANNEL_NOT_FOUND;
  1066. goto done;
  1067. }
  1068. ref->objcount += 1;
  1069. res = 0;
  1070. done:
  1071. PyThread_release_lock(channels->mutex);
  1072. return res;
  1073. }
  1074. static void
  1075. _channels_drop_id_object(_channels *channels, int64_t id)
  1076. {
  1077. PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
  1078. _channelref *prev = NULL;
  1079. _channelref *ref = _channelref_find(channels->head, id, &prev);
  1080. if (ref == NULL) {
  1081. // Already destroyed.
  1082. goto done;
  1083. }
  1084. ref->objcount -= 1;
  1085. // Destroy if no longer used.
  1086. if (ref->objcount == 0) {
  1087. _PyChannelState *chan = NULL;
  1088. _channels_remove_ref(channels, ref, prev, &chan);
  1089. if (chan != NULL) {
  1090. _channel_free(chan);
  1091. }
  1092. }
  1093. done:
  1094. PyThread_release_lock(channels->mutex);
  1095. }
  1096. static int64_t *
  1097. _channels_list_all(_channels *channels, int64_t *count)
  1098. {
  1099. int64_t *cids = NULL;
  1100. PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
  1101. int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
  1102. if (ids == NULL) {
  1103. goto done;
  1104. }
  1105. _channelref *ref = channels->head;
  1106. for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
  1107. ids[i] = ref->id;
  1108. }
  1109. *count = channels->numopen;
  1110. cids = ids;
  1111. done:
  1112. PyThread_release_lock(channels->mutex);
  1113. return cids;
  1114. }
  1115. static void
  1116. _channels_drop_interpreter(_channels *channels, int64_t interp)
  1117. {
  1118. PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
  1119. _channelref *ref = channels->head;
  1120. for (; ref != NULL; ref = ref->next) {
  1121. if (ref->chan != NULL) {
  1122. _channel_drop_interpreter(ref->chan, interp);
  1123. }
  1124. }
  1125. PyThread_release_lock(channels->mutex);
  1126. }
  1127. /* support for closing non-empty channels */
  1128. struct _channel_closing {
  1129. struct _channelref *ref;
  1130. };
  1131. static int
  1132. _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
  1133. struct _channel *chan = ref->chan;
  1134. if (chan == NULL) {
  1135. // already closed
  1136. return 0;
  1137. }
  1138. int res = -1;
  1139. PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
  1140. if (chan->closing != NULL) {
  1141. res = ERR_CHANNEL_CLOSED;
  1142. goto done;
  1143. }
  1144. chan->closing = GLOBAL_MALLOC(struct _channel_closing);
  1145. if (chan->closing == NULL) {
  1146. goto done;
  1147. }
  1148. chan->closing->ref = ref;
  1149. res = 0;
  1150. done:
  1151. PyThread_release_lock(chan->mutex);
  1152. return res;
  1153. }
  1154. static void
  1155. _channel_clear_closing(struct _channel *chan) {
  1156. PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
  1157. if (chan->closing != NULL) {
  1158. GLOBAL_FREE(chan->closing);
  1159. chan->closing = NULL;
  1160. }
  1161. PyThread_release_lock(chan->mutex);
  1162. }
  1163. static void
  1164. _channel_finish_closing(struct _channel *chan) {
  1165. struct _channel_closing *closing = chan->closing;
  1166. if (closing == NULL) {
  1167. return;
  1168. }
  1169. _channelref *ref = closing->ref;
  1170. _channel_clear_closing(chan);
  1171. // Do the things that would have been done in _channels_close().
  1172. ref->chan = NULL;
  1173. _channel_free(chan);
  1174. }
  1175. /* "high"-level channel-related functions */
  1176. static int64_t
  1177. _channel_create(_channels *channels)
  1178. {
  1179. PyThread_type_lock mutex = PyThread_allocate_lock();
  1180. if (mutex == NULL) {
  1181. return ERR_CHANNEL_MUTEX_INIT;
  1182. }
  1183. _PyChannelState *chan = _channel_new(mutex);
  1184. if (chan == NULL) {
  1185. PyThread_free_lock(mutex);
  1186. return -1;
  1187. }
  1188. int64_t id = _channels_add(channels, chan);
  1189. if (id < 0) {
  1190. _channel_free(chan);
  1191. }
  1192. return id;
  1193. }
  1194. static int
  1195. _channel_destroy(_channels *channels, int64_t id)
  1196. {
  1197. _PyChannelState *chan = NULL;
  1198. int err = _channels_remove(channels, id, &chan);
  1199. if (err != 0) {
  1200. return err;
  1201. }
  1202. if (chan != NULL) {
  1203. _channel_free(chan);
  1204. }
  1205. return 0;
  1206. }
  1207. static int
  1208. _channel_send(_channels *channels, int64_t id, PyObject *obj)
  1209. {
  1210. PyInterpreterState *interp = _get_current_interp();
  1211. if (interp == NULL) {
  1212. return -1;
  1213. }
  1214. // Look up the channel.
  1215. PyThread_type_lock mutex = NULL;
  1216. _PyChannelState *chan = NULL;
  1217. int err = _channels_lookup(channels, id, &mutex, &chan);
  1218. if (err != 0) {
  1219. return err;
  1220. }
  1221. assert(chan != NULL);
  1222. // Past this point we are responsible for releasing the mutex.
  1223. if (chan->closing != NULL) {
  1224. PyThread_release_lock(mutex);
  1225. return ERR_CHANNEL_CLOSED;
  1226. }
  1227. // Convert the object to cross-interpreter data.
  1228. _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
  1229. if (data == NULL) {
  1230. PyThread_release_lock(mutex);
  1231. return -1;
  1232. }
  1233. if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
  1234. PyThread_release_lock(mutex);
  1235. GLOBAL_FREE(data);
  1236. return -1;
  1237. }
  1238. // Add the data to the channel.
  1239. int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
  1240. PyThread_release_lock(mutex);
  1241. if (res != 0) {
  1242. // We may chain an exception here:
  1243. (void)_release_xid_data(data, 0);
  1244. GLOBAL_FREE(data);
  1245. return res;
  1246. }
  1247. return 0;
  1248. }
  1249. static int
  1250. _channel_recv(_channels *channels, int64_t id, PyObject **res)
  1251. {
  1252. int err;
  1253. *res = NULL;
  1254. PyInterpreterState *interp = _get_current_interp();
  1255. if (interp == NULL) {
  1256. // XXX Is this always an error?
  1257. if (PyErr_Occurred()) {
  1258. return -1;
  1259. }
  1260. return 0;
  1261. }
  1262. // Look up the channel.
  1263. PyThread_type_lock mutex = NULL;
  1264. _PyChannelState *chan = NULL;
  1265. err = _channels_lookup(channels, id, &mutex, &chan);
  1266. if (err != 0) {
  1267. return err;
  1268. }
  1269. assert(chan != NULL);
  1270. // Past this point we are responsible for releasing the mutex.
  1271. // Pop off the next item from the channel.
  1272. _PyCrossInterpreterData *data = NULL;
  1273. err = _channel_next(chan, PyInterpreterState_GetID(interp), &data);
  1274. PyThread_release_lock(mutex);
  1275. if (err != 0) {
  1276. return err;
  1277. }
  1278. else if (data == NULL) {
  1279. assert(!PyErr_Occurred());
  1280. return 0;
  1281. }
  1282. // Convert the data back to an object.
  1283. PyObject *obj = _PyCrossInterpreterData_NewObject(data);
  1284. if (obj == NULL) {
  1285. assert(PyErr_Occurred());
  1286. // It was allocated in _channel_send(), so we free it.
  1287. (void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE);
  1288. return -1;
  1289. }
  1290. // It was allocated in _channel_send(), so we free it.
  1291. int release_res = _release_xid_data(data, XID_FREE);
  1292. if (release_res < 0) {
  1293. // The source interpreter has been destroyed already.
  1294. assert(PyErr_Occurred());
  1295. Py_DECREF(obj);
  1296. return -1;
  1297. }
  1298. *res = obj;
  1299. return 0;
  1300. }
  1301. static int
  1302. _channel_drop(_channels *channels, int64_t id, int send, int recv)
  1303. {
  1304. PyInterpreterState *interp = _get_current_interp();
  1305. if (interp == NULL) {
  1306. return -1;
  1307. }
  1308. // Look up the channel.
  1309. PyThread_type_lock mutex = NULL;
  1310. _PyChannelState *chan = NULL;
  1311. int err = _channels_lookup(channels, id, &mutex, &chan);
  1312. if (err != 0) {
  1313. return err;
  1314. }
  1315. // Past this point we are responsible for releasing the mutex.
  1316. // Close one or both of the two ends.
  1317. int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
  1318. PyThread_release_lock(mutex);
  1319. return res;
  1320. }
  1321. static int
  1322. _channel_close(_channels *channels, int64_t id, int end, int force)
  1323. {
  1324. return _channels_close(channels, id, NULL, end, force);
  1325. }
  1326. static int
  1327. _channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
  1328. int send)
  1329. {
  1330. _PyChannelState *chan = NULL;
  1331. int err = _channels_lookup(channels, cid, NULL, &chan);
  1332. if (err != 0) {
  1333. return err;
  1334. }
  1335. else if (send && chan->closing != NULL) {
  1336. return ERR_CHANNEL_CLOSED;
  1337. }
  1338. _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
  1339. interp, NULL);
  1340. return (end != NULL && end->open);
  1341. }
  1342. /* ChannelID class */
  1343. typedef struct channelid {
  1344. PyObject_HEAD
  1345. int64_t id;
  1346. int end;
  1347. int resolve;
  1348. _channels *channels;
  1349. } channelid;
  1350. struct channel_id_converter_data {
  1351. PyObject *module;
  1352. int64_t cid;
  1353. };
  1354. static int
  1355. channel_id_converter(PyObject *arg, void *ptr)
  1356. {
  1357. int64_t cid;
  1358. struct channel_id_converter_data *data = ptr;
  1359. module_state *state = get_module_state(data->module);
  1360. assert(state != NULL);
  1361. if (PyObject_TypeCheck(arg, state->ChannelIDType)) {
  1362. cid = ((channelid *)arg)->id;
  1363. }
  1364. else if (PyIndex_Check(arg)) {
  1365. cid = PyLong_AsLongLong(arg);
  1366. if (cid == -1 && PyErr_Occurred()) {
  1367. return 0;
  1368. }
  1369. if (cid < 0) {
  1370. PyErr_Format(PyExc_ValueError,
  1371. "channel ID must be a non-negative int, got %R", arg);
  1372. return 0;
  1373. }
  1374. }
  1375. else {
  1376. PyErr_Format(PyExc_TypeError,
  1377. "channel ID must be an int, got %.100s",
  1378. Py_TYPE(arg)->tp_name);
  1379. return 0;
  1380. }
  1381. data->cid = cid;
  1382. return 1;
  1383. }
  1384. static int
  1385. newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
  1386. int force, int resolve, channelid **res)
  1387. {
  1388. *res = NULL;
  1389. channelid *self = PyObject_New(channelid, cls);
  1390. if (self == NULL) {
  1391. return -1;
  1392. }
  1393. self->id = cid;
  1394. self->end = end;
  1395. self->resolve = resolve;
  1396. self->channels = channels;
  1397. int err = _channels_add_id_object(channels, cid);
  1398. if (err != 0) {
  1399. if (force && err == ERR_CHANNEL_NOT_FOUND) {
  1400. assert(!PyErr_Occurred());
  1401. }
  1402. else {
  1403. Py_DECREF((PyObject *)self);
  1404. return err;
  1405. }
  1406. }
  1407. *res = self;
  1408. return 0;
  1409. }
  1410. static _channels * _global_channels(void);
  1411. static PyObject *
  1412. _channelid_new(PyObject *mod, PyTypeObject *cls,
  1413. PyObject *args, PyObject *kwds)
  1414. {
  1415. static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
  1416. int64_t cid;
  1417. struct channel_id_converter_data cid_data = {
  1418. .module = mod,
  1419. };
  1420. int send = -1;
  1421. int recv = -1;
  1422. int force = 0;
  1423. int resolve = 0;
  1424. if (!PyArg_ParseTupleAndKeywords(args, kwds,
  1425. "O&|$pppp:ChannelID.__new__", kwlist,
  1426. channel_id_converter, &cid_data,
  1427. &send, &recv, &force, &resolve)) {
  1428. return NULL;
  1429. }
  1430. cid = cid_data.cid;
  1431. // Handle "send" and "recv".
  1432. if (send == 0 && recv == 0) {
  1433. PyErr_SetString(PyExc_ValueError,
  1434. "'send' and 'recv' cannot both be False");
  1435. return NULL;
  1436. }
  1437. int end = 0;
  1438. if (send == 1) {
  1439. if (recv == 0 || recv == -1) {
  1440. end = CHANNEL_SEND;
  1441. }
  1442. }
  1443. else if (recv == 1) {
  1444. end = CHANNEL_RECV;
  1445. }
  1446. PyObject *id = NULL;
  1447. int err = newchannelid(cls, cid, end, _global_channels(),
  1448. force, resolve,
  1449. (channelid **)&id);
  1450. if (handle_channel_error(err, mod, cid)) {
  1451. assert(id == NULL);
  1452. return NULL;
  1453. }
  1454. assert(id != NULL);
  1455. return id;
  1456. }
  1457. static void
  1458. channelid_dealloc(PyObject *self)
  1459. {
  1460. int64_t cid = ((channelid *)self)->id;
  1461. _channels *channels = ((channelid *)self)->channels;
  1462. PyTypeObject *tp = Py_TYPE(self);
  1463. tp->tp_free(self);
  1464. /* "Instances of heap-allocated types hold a reference to their type."
  1465. * See: https://docs.python.org/3.11/howto/isolating-extensions.html#garbage-collection-protocol
  1466. * See: https://docs.python.org/3.11/c-api/typeobj.html#c.PyTypeObject.tp_traverse
  1467. */
  1468. // XXX Why don't we implement Py_TPFLAGS_HAVE_GC, e.g. Py_tp_traverse,
  1469. // like we do for _abc._abc_data?
  1470. Py_DECREF(tp);
  1471. _channels_drop_id_object(channels, cid);
  1472. }
  1473. static PyObject *
  1474. channelid_repr(PyObject *self)
  1475. {
  1476. PyTypeObject *type = Py_TYPE(self);
  1477. const char *name = _PyType_Name(type);
  1478. channelid *cid = (channelid *)self;
  1479. const char *fmt;
  1480. if (cid->end == CHANNEL_SEND) {
  1481. fmt = "%s(%" PRId64 ", send=True)";
  1482. }
  1483. else if (cid->end == CHANNEL_RECV) {
  1484. fmt = "%s(%" PRId64 ", recv=True)";
  1485. }
  1486. else {
  1487. fmt = "%s(%" PRId64 ")";
  1488. }
  1489. return PyUnicode_FromFormat(fmt, name, cid->id);
  1490. }
  1491. static PyObject *
  1492. channelid_str(PyObject *self)
  1493. {
  1494. channelid *cid = (channelid *)self;
  1495. return PyUnicode_FromFormat("%" PRId64 "", cid->id);
  1496. }
  1497. static PyObject *
  1498. channelid_int(PyObject *self)
  1499. {
  1500. channelid *cid = (channelid *)self;
  1501. return PyLong_FromLongLong(cid->id);
  1502. }
  1503. static Py_hash_t
  1504. channelid_hash(PyObject *self)
  1505. {
  1506. channelid *cid = (channelid *)self;
  1507. PyObject *id = PyLong_FromLongLong(cid->id);
  1508. if (id == NULL) {
  1509. return -1;
  1510. }
  1511. Py_hash_t hash = PyObject_Hash(id);
  1512. Py_DECREF(id);
  1513. return hash;
  1514. }
  1515. static PyObject *
  1516. channelid_richcompare(PyObject *self, PyObject *other, int op)
  1517. {
  1518. PyObject *res = NULL;
  1519. if (op != Py_EQ && op != Py_NE) {
  1520. Py_RETURN_NOTIMPLEMENTED;
  1521. }
  1522. PyObject *mod = get_module_from_type(Py_TYPE(self));
  1523. if (mod == NULL) {
  1524. return NULL;
  1525. }
  1526. module_state *state = get_module_state(mod);
  1527. if (state == NULL) {
  1528. goto done;
  1529. }
  1530. if (!PyObject_TypeCheck(self, state->ChannelIDType)) {
  1531. res = Py_NewRef(Py_NotImplemented);
  1532. goto done;
  1533. }
  1534. channelid *cid = (channelid *)self;
  1535. int equal;
  1536. if (PyObject_TypeCheck(other, state->ChannelIDType)) {
  1537. channelid *othercid = (channelid *)other;
  1538. equal = (cid->end == othercid->end) && (cid->id == othercid->id);
  1539. }
  1540. else if (PyLong_Check(other)) {
  1541. /* Fast path */
  1542. int overflow;
  1543. long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
  1544. if (othercid == -1 && PyErr_Occurred()) {
  1545. goto done;
  1546. }
  1547. equal = !overflow && (othercid >= 0) && (cid->id == othercid);
  1548. }
  1549. else if (PyNumber_Check(other)) {
  1550. PyObject *pyid = PyLong_FromLongLong(cid->id);
  1551. if (pyid == NULL) {
  1552. goto done;
  1553. }
  1554. res = PyObject_RichCompare(pyid, other, op);
  1555. Py_DECREF(pyid);
  1556. goto done;
  1557. }
  1558. else {
  1559. res = Py_NewRef(Py_NotImplemented);
  1560. goto done;
  1561. }
  1562. if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
  1563. res = Py_NewRef(Py_True);
  1564. }
  1565. else {
  1566. res = Py_NewRef(Py_False);
  1567. }
  1568. done:
  1569. Py_DECREF(mod);
  1570. return res;
  1571. }
  1572. static PyObject *
  1573. _channel_from_cid(PyObject *cid, int end)
  1574. {
  1575. PyObject *highlevel = PyImport_ImportModule("interpreters");
  1576. if (highlevel == NULL) {
  1577. PyErr_Clear();
  1578. highlevel = PyImport_ImportModule("test.support.interpreters");
  1579. if (highlevel == NULL) {
  1580. return NULL;
  1581. }
  1582. }
  1583. const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
  1584. "SendChannel";
  1585. PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
  1586. Py_DECREF(highlevel);
  1587. if (cls == NULL) {
  1588. return NULL;
  1589. }
  1590. PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
  1591. Py_DECREF(cls);
  1592. if (chan == NULL) {
  1593. return NULL;
  1594. }
  1595. return chan;
  1596. }
  1597. struct _channelid_xid {
  1598. int64_t id;
  1599. int end;
  1600. int resolve;
  1601. };
  1602. static PyObject *
  1603. _channelid_from_xid(_PyCrossInterpreterData *data)
  1604. {
  1605. struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
  1606. // It might not be imported yet, so we can't use _get_current_module().
  1607. PyObject *mod = PyImport_ImportModule(MODULE_NAME);
  1608. if (mod == NULL) {
  1609. return NULL;
  1610. }
  1611. assert(mod != Py_None);
  1612. module_state *state = get_module_state(mod);
  1613. if (state == NULL) {
  1614. return NULL;
  1615. }
  1616. // Note that we do not preserve the "resolve" flag.
  1617. PyObject *cid = NULL;
  1618. int err = newchannelid(state->ChannelIDType, xid->id, xid->end,
  1619. _global_channels(), 0, 0,
  1620. (channelid **)&cid);
  1621. if (err != 0) {
  1622. assert(cid == NULL);
  1623. (void)handle_channel_error(err, mod, xid->id);
  1624. goto done;
  1625. }
  1626. assert(cid != NULL);
  1627. if (xid->end == 0) {
  1628. goto done;
  1629. }
  1630. if (!xid->resolve) {
  1631. goto done;
  1632. }
  1633. /* Try returning a high-level channel end but fall back to the ID. */
  1634. PyObject *chan = _channel_from_cid(cid, xid->end);
  1635. if (chan == NULL) {
  1636. PyErr_Clear();
  1637. goto done;
  1638. }
  1639. Py_DECREF(cid);
  1640. cid = chan;
  1641. done:
  1642. Py_DECREF(mod);
  1643. return cid;
  1644. }
  1645. static int
  1646. _channelid_shared(PyThreadState *tstate, PyObject *obj,
  1647. _PyCrossInterpreterData *data)
  1648. {
  1649. if (_PyCrossInterpreterData_InitWithSize(
  1650. data, tstate->interp, sizeof(struct _channelid_xid), obj,
  1651. _channelid_from_xid
  1652. ) < 0)
  1653. {
  1654. return -1;
  1655. }
  1656. struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
  1657. xid->id = ((channelid *)obj)->id;
  1658. xid->end = ((channelid *)obj)->end;
  1659. xid->resolve = ((channelid *)obj)->resolve;
  1660. return 0;
  1661. }
  1662. static PyObject *
  1663. channelid_end(PyObject *self, void *end)
  1664. {
  1665. int force = 1;
  1666. channelid *cid = (channelid *)self;
  1667. if (end != NULL) {
  1668. PyObject *id = NULL;
  1669. int err = newchannelid(Py_TYPE(self), cid->id, *(int *)end,
  1670. cid->channels, force, cid->resolve,
  1671. (channelid **)&id);
  1672. if (err != 0) {
  1673. assert(id == NULL);
  1674. PyObject *mod = get_module_from_type(Py_TYPE(self));
  1675. if (mod == NULL) {
  1676. return NULL;
  1677. }
  1678. (void)handle_channel_error(err, mod, cid->id);
  1679. Py_DECREF(mod);
  1680. return NULL;
  1681. }
  1682. assert(id != NULL);
  1683. return id;
  1684. }
  1685. if (cid->end == CHANNEL_SEND) {
  1686. return PyUnicode_InternFromString("send");
  1687. }
  1688. if (cid->end == CHANNEL_RECV) {
  1689. return PyUnicode_InternFromString("recv");
  1690. }
  1691. return PyUnicode_InternFromString("both");
  1692. }
  1693. static int _channelid_end_send = CHANNEL_SEND;
  1694. static int _channelid_end_recv = CHANNEL_RECV;
  1695. static PyGetSetDef channelid_getsets[] = {
  1696. {"end", (getter)channelid_end, NULL,
  1697. PyDoc_STR("'send', 'recv', or 'both'")},
  1698. {"send", (getter)channelid_end, NULL,
  1699. PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
  1700. {"recv", (getter)channelid_end, NULL,
  1701. PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
  1702. {NULL}
  1703. };
  1704. PyDoc_STRVAR(channelid_doc,
  1705. "A channel ID identifies a channel and may be used as an int.");
  1706. static PyType_Slot ChannelIDType_slots[] = {
  1707. {Py_tp_dealloc, (destructor)channelid_dealloc},
  1708. {Py_tp_doc, (void *)channelid_doc},
  1709. {Py_tp_repr, (reprfunc)channelid_repr},
  1710. {Py_tp_str, (reprfunc)channelid_str},
  1711. {Py_tp_hash, channelid_hash},
  1712. {Py_tp_richcompare, channelid_richcompare},
  1713. {Py_tp_getset, channelid_getsets},
  1714. // number slots
  1715. {Py_nb_int, (unaryfunc)channelid_int},
  1716. {Py_nb_index, (unaryfunc)channelid_int},
  1717. {0, NULL},
  1718. };
  1719. static PyType_Spec ChannelIDType_spec = {
  1720. .name = MODULE_NAME ".ChannelID",
  1721. .basicsize = sizeof(channelid),
  1722. .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
  1723. Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE),
  1724. .slots = ChannelIDType_slots,
  1725. };
  1726. /* module level code ********************************************************/
  1727. /* globals is the process-global state for the module. It holds all
  1728. the data that we need to share between interpreters, so it cannot
  1729. hold PyObject values. */
  1730. static struct globals {
  1731. int module_count;
  1732. _channels channels;
  1733. } _globals = {0};
  1734. static int
  1735. _globals_init(void)
  1736. {
  1737. // XXX This isn't thread-safe.
  1738. _globals.module_count++;
  1739. if (_globals.module_count > 1) {
  1740. // Already initialized.
  1741. return 0;
  1742. }
  1743. assert(_globals.channels.mutex == NULL);
  1744. PyThread_type_lock mutex = PyThread_allocate_lock();
  1745. if (mutex == NULL) {
  1746. return ERR_CHANNELS_MUTEX_INIT;
  1747. }
  1748. _channels_init(&_globals.channels, mutex);
  1749. return 0;
  1750. }
  1751. static void
  1752. _globals_fini(void)
  1753. {
  1754. // XXX This isn't thread-safe.
  1755. _globals.module_count--;
  1756. if (_globals.module_count > 0) {
  1757. return;
  1758. }
  1759. _channels_fini(&_globals.channels);
  1760. }
  1761. static _channels *
  1762. _global_channels(void) {
  1763. return &_globals.channels;
  1764. }
  1765. static void
  1766. clear_interpreter(void *data)
  1767. {
  1768. if (_globals.module_count == 0) {
  1769. return;
  1770. }
  1771. PyInterpreterState *interp = (PyInterpreterState *)data;
  1772. assert(interp == _get_current_interp());
  1773. int64_t id = PyInterpreterState_GetID(interp);
  1774. _channels_drop_interpreter(&_globals.channels, id);
  1775. }
  1776. static PyObject *
  1777. channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
  1778. {
  1779. int64_t cid = _channel_create(&_globals.channels);
  1780. if (cid < 0) {
  1781. (void)handle_channel_error(-1, self, cid);
  1782. return NULL;
  1783. }
  1784. module_state *state = get_module_state(self);
  1785. if (state == NULL) {
  1786. return NULL;
  1787. }
  1788. PyObject *id = NULL;
  1789. int err = newchannelid(state->ChannelIDType, cid, 0,
  1790. &_globals.channels, 0, 0,
  1791. (channelid **)&id);
  1792. if (handle_channel_error(err, self, cid)) {
  1793. assert(id == NULL);
  1794. err = _channel_destroy(&_globals.channels, cid);
  1795. if (handle_channel_error(err, self, cid)) {
  1796. // XXX issue a warning?
  1797. }
  1798. return NULL;
  1799. }
  1800. assert(id != NULL);
  1801. assert(((channelid *)id)->channels != NULL);
  1802. return id;
  1803. }
  1804. PyDoc_STRVAR(channel_create_doc,
  1805. "channel_create() -> cid\n\
  1806. \n\
  1807. Create a new cross-interpreter channel and return a unique generated ID.");
  1808. static PyObject *
  1809. channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
  1810. {
  1811. static char *kwlist[] = {"cid", NULL};
  1812. int64_t cid;
  1813. struct channel_id_converter_data cid_data = {
  1814. .module = self,
  1815. };
  1816. if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
  1817. channel_id_converter, &cid_data)) {
  1818. return NULL;
  1819. }
  1820. cid = cid_data.cid;
  1821. int err = _channel_destroy(&_globals.channels, cid);
  1822. if (handle_channel_error(err, self, cid)) {
  1823. return NULL;
  1824. }
  1825. Py_RETURN_NONE;
  1826. }
  1827. PyDoc_STRVAR(channel_destroy_doc,
  1828. "channel_destroy(cid)\n\
  1829. \n\
  1830. Close and finalize the channel. Afterward attempts to use the channel\n\
  1831. will behave as though it never existed.");
  1832. static PyObject *
  1833. channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
  1834. {
  1835. int64_t count = 0;
  1836. int64_t *cids = _channels_list_all(&_globals.channels, &count);
  1837. if (cids == NULL) {
  1838. if (count == 0) {
  1839. return PyList_New(0);
  1840. }
  1841. return NULL;
  1842. }
  1843. PyObject *ids = PyList_New((Py_ssize_t)count);
  1844. if (ids == NULL) {
  1845. goto finally;
  1846. }
  1847. module_state *state = get_module_state(self);
  1848. if (state == NULL) {
  1849. Py_DECREF(ids);
  1850. ids = NULL;
  1851. goto finally;
  1852. }
  1853. int64_t *cur = cids;
  1854. for (int64_t i=0; i < count; cur++, i++) {
  1855. PyObject *id = NULL;
  1856. int err = newchannelid(state->ChannelIDType, *cur, 0,
  1857. &_globals.channels, 0, 0,
  1858. (channelid **)&id);
  1859. if (handle_channel_error(err, self, *cur)) {
  1860. assert(id == NULL);
  1861. Py_SETREF(ids, NULL);
  1862. break;
  1863. }
  1864. assert(id != NULL);
  1865. PyList_SET_ITEM(ids, (Py_ssize_t)i, id);
  1866. }
  1867. finally:
  1868. PyMem_Free(cids);
  1869. return ids;
  1870. }
  1871. PyDoc_STRVAR(channel_list_all_doc,
  1872. "channel_list_all() -> [cid]\n\
  1873. \n\
  1874. Return the list of all IDs for active channels.");
  1875. static PyObject *
  1876. channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
  1877. {
  1878. static char *kwlist[] = {"cid", "send", NULL};
  1879. int64_t cid; /* Channel ID */
  1880. struct channel_id_converter_data cid_data = {
  1881. .module = self,
  1882. };
  1883. int send = 0; /* Send or receive end? */
  1884. int64_t id;
  1885. PyObject *ids, *id_obj;
  1886. PyInterpreterState *interp;
  1887. if (!PyArg_ParseTupleAndKeywords(
  1888. args, kwds, "O&$p:channel_list_interpreters",
  1889. kwlist, channel_id_converter, &cid_data, &send)) {
  1890. return NULL;
  1891. }
  1892. cid = cid_data.cid;
  1893. ids = PyList_New(0);
  1894. if (ids == NULL) {
  1895. goto except;
  1896. }
  1897. interp = PyInterpreterState_Head();
  1898. while (interp != NULL) {
  1899. id = PyInterpreterState_GetID(interp);
  1900. assert(id >= 0);
  1901. int res = _channel_is_associated(&_globals.channels, cid, id, send);
  1902. if (res < 0) {
  1903. (void)handle_channel_error(res, self, cid);
  1904. goto except;
  1905. }
  1906. if (res) {
  1907. id_obj = _PyInterpreterState_GetIDObject(interp);
  1908. if (id_obj == NULL) {
  1909. goto except;
  1910. }
  1911. res = PyList_Insert(ids, 0, id_obj);
  1912. Py_DECREF(id_obj);
  1913. if (res < 0) {
  1914. goto except;
  1915. }
  1916. }
  1917. interp = PyInterpreterState_Next(interp);
  1918. }
  1919. goto finally;
  1920. except:
  1921. Py_CLEAR(ids);
  1922. finally:
  1923. return ids;
  1924. }
  1925. PyDoc_STRVAR(channel_list_interpreters_doc,
  1926. "channel_list_interpreters(cid, *, send) -> [id]\n\
  1927. \n\
  1928. Return the list of all interpreter IDs associated with an end of the channel.\n\
  1929. \n\
  1930. The 'send' argument should be a boolean indicating whether to use the send or\n\
  1931. receive end.");
  1932. static PyObject *
  1933. channel_send(PyObject *self, PyObject *args, PyObject *kwds)
  1934. {
  1935. static char *kwlist[] = {"cid", "obj", NULL};
  1936. int64_t cid;
  1937. struct channel_id_converter_data cid_data = {
  1938. .module = self,
  1939. };
  1940. PyObject *obj;
  1941. if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
  1942. channel_id_converter, &cid_data, &obj)) {
  1943. return NULL;
  1944. }
  1945. cid = cid_data.cid;
  1946. int err = _channel_send(&_globals.channels, cid, obj);
  1947. if (handle_channel_error(err, self, cid)) {
  1948. return NULL;
  1949. }
  1950. Py_RETURN_NONE;
  1951. }
  1952. PyDoc_STRVAR(channel_send_doc,
  1953. "channel_send(cid, obj)\n\
  1954. \n\
  1955. Add the object's data to the channel's queue.");
  1956. static PyObject *
  1957. channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
  1958. {
  1959. static char *kwlist[] = {"cid", "default", NULL};
  1960. int64_t cid;
  1961. struct channel_id_converter_data cid_data = {
  1962. .module = self,
  1963. };
  1964. PyObject *dflt = NULL;
  1965. if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
  1966. channel_id_converter, &cid_data, &dflt)) {
  1967. return NULL;
  1968. }
  1969. cid = cid_data.cid;
  1970. PyObject *obj = NULL;
  1971. int err = _channel_recv(&_globals.channels, cid, &obj);
  1972. if (handle_channel_error(err, self, cid)) {
  1973. return NULL;
  1974. }
  1975. Py_XINCREF(dflt);
  1976. if (obj == NULL) {
  1977. // Use the default.
  1978. if (dflt == NULL) {
  1979. (void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid);
  1980. return NULL;
  1981. }
  1982. obj = Py_NewRef(dflt);
  1983. }
  1984. Py_XDECREF(dflt);
  1985. return obj;
  1986. }
  1987. PyDoc_STRVAR(channel_recv_doc,
  1988. "channel_recv(cid, [default]) -> obj\n\
  1989. \n\
  1990. Return a new object from the data at the front of the channel's queue.\n\
  1991. \n\
  1992. If there is nothing to receive then raise ChannelEmptyError, unless\n\
  1993. a default value is provided. In that case return it.");
  1994. static PyObject *
  1995. channel_close(PyObject *self, PyObject *args, PyObject *kwds)
  1996. {
  1997. static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
  1998. int64_t cid;
  1999. struct channel_id_converter_data cid_data = {
  2000. .module = self,
  2001. };
  2002. int send = 0;
  2003. int recv = 0;
  2004. int force = 0;
  2005. if (!PyArg_ParseTupleAndKeywords(args, kwds,
  2006. "O&|$ppp:channel_close", kwlist,
  2007. channel_id_converter, &cid_data,
  2008. &send, &recv, &force)) {
  2009. return NULL;
  2010. }
  2011. cid = cid_data.cid;
  2012. int err = _channel_close(&_globals.channels, cid, send-recv, force);
  2013. if (handle_channel_error(err, self, cid)) {
  2014. return NULL;
  2015. }
  2016. Py_RETURN_NONE;
  2017. }
  2018. PyDoc_STRVAR(channel_close_doc,
  2019. "channel_close(cid, *, send=None, recv=None, force=False)\n\
  2020. \n\
  2021. Close the channel for all interpreters.\n\
  2022. \n\
  2023. If the channel is empty then the keyword args are ignored and both\n\
  2024. ends are immediately closed. Otherwise, if 'force' is True then\n\
  2025. all queued items are released and both ends are immediately\n\
  2026. closed.\n\
  2027. \n\
  2028. If the channel is not empty *and* 'force' is False then following\n\
  2029. happens:\n\
  2030. \n\
  2031. * recv is True (regardless of send):\n\
  2032. - raise ChannelNotEmptyError\n\
  2033. * recv is None and send is None:\n\
  2034. - raise ChannelNotEmptyError\n\
  2035. * send is True and recv is not True:\n\
  2036. - fully close the 'send' end\n\
  2037. - close the 'recv' end to interpreters not already receiving\n\
  2038. - fully close it once empty\n\
  2039. \n\
  2040. Closing an already closed channel results in a ChannelClosedError.\n\
  2041. \n\
  2042. Once the channel's ID has no more ref counts in any interpreter\n\
  2043. the channel will be destroyed.");
  2044. static PyObject *
  2045. channel_release(PyObject *self, PyObject *args, PyObject *kwds)
  2046. {
  2047. // Note that only the current interpreter is affected.
  2048. static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
  2049. int64_t cid;
  2050. struct channel_id_converter_data cid_data = {
  2051. .module = self,
  2052. };
  2053. int send = 0;
  2054. int recv = 0;
  2055. int force = 0;
  2056. if (!PyArg_ParseTupleAndKeywords(args, kwds,
  2057. "O&|$ppp:channel_release", kwlist,
  2058. channel_id_converter, &cid_data,
  2059. &send, &recv, &force)) {
  2060. return NULL;
  2061. }
  2062. cid = cid_data.cid;
  2063. if (send == 0 && recv == 0) {
  2064. send = 1;
  2065. recv = 1;
  2066. }
  2067. // XXX Handle force is True.
  2068. // XXX Fix implicit release.
  2069. int err = _channel_drop(&_globals.channels, cid, send, recv);
  2070. if (handle_channel_error(err, self, cid)) {
  2071. return NULL;
  2072. }
  2073. Py_RETURN_NONE;
  2074. }
  2075. PyDoc_STRVAR(channel_release_doc,
  2076. "channel_release(cid, *, send=None, recv=None, force=True)\n\
  2077. \n\
  2078. Close the channel for the current interpreter. 'send' and 'recv'\n\
  2079. (bool) may be used to indicate the ends to close. By default both\n\
  2080. ends are closed. Closing an already closed end is a noop.");
  2081. static PyObject *
  2082. channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
  2083. {
  2084. module_state *state = get_module_state(self);
  2085. if (state == NULL) {
  2086. return NULL;
  2087. }
  2088. PyTypeObject *cls = state->ChannelIDType;
  2089. PyObject *mod = get_module_from_owned_type(cls);
  2090. if (mod == NULL) {
  2091. return NULL;
  2092. }
  2093. PyObject *cid = _channelid_new(mod, cls, args, kwds);
  2094. Py_DECREF(mod);
  2095. return cid;
  2096. }
  2097. static PyMethodDef module_functions[] = {
  2098. {"create", channel_create,
  2099. METH_NOARGS, channel_create_doc},
  2100. {"destroy", _PyCFunction_CAST(channel_destroy),
  2101. METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
  2102. {"list_all", channel_list_all,
  2103. METH_NOARGS, channel_list_all_doc},
  2104. {"list_interpreters", _PyCFunction_CAST(channel_list_interpreters),
  2105. METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
  2106. {"send", _PyCFunction_CAST(channel_send),
  2107. METH_VARARGS | METH_KEYWORDS, channel_send_doc},
  2108. {"recv", _PyCFunction_CAST(channel_recv),
  2109. METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
  2110. {"close", _PyCFunction_CAST(channel_close),
  2111. METH_VARARGS | METH_KEYWORDS, channel_close_doc},
  2112. {"release", _PyCFunction_CAST(channel_release),
  2113. METH_VARARGS | METH_KEYWORDS, channel_release_doc},
  2114. {"_channel_id", _PyCFunction_CAST(channel__channel_id),
  2115. METH_VARARGS | METH_KEYWORDS, NULL},
  2116. {NULL, NULL} /* sentinel */
  2117. };
  2118. /* initialization function */
  2119. PyDoc_STRVAR(module_doc,
  2120. "This module provides primitive operations to manage Python interpreters.\n\
  2121. The 'interpreters' module provides a more convenient interface.");
  2122. static int
  2123. module_exec(PyObject *mod)
  2124. {
  2125. if (_globals_init() != 0) {
  2126. return -1;
  2127. }
  2128. /* Add exception types */
  2129. if (exceptions_init(mod) != 0) {
  2130. goto error;
  2131. }
  2132. /* Add other types */
  2133. module_state *state = get_module_state(mod);
  2134. if (state == NULL) {
  2135. goto error;
  2136. }
  2137. // ChannelID
  2138. state->ChannelIDType = add_new_type(
  2139. mod, &ChannelIDType_spec, _channelid_shared);
  2140. if (state->ChannelIDType == NULL) {
  2141. goto error;
  2142. }
  2143. // Make sure chnnels drop objects owned by this interpreter
  2144. PyInterpreterState *interp = _get_current_interp();
  2145. _Py_AtExit(interp, clear_interpreter, (void *)interp);
  2146. return 0;
  2147. error:
  2148. _globals_fini();
  2149. return -1;
  2150. }
  2151. static struct PyModuleDef_Slot module_slots[] = {
  2152. {Py_mod_exec, module_exec},
  2153. {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
  2154. {0, NULL},
  2155. };
  2156. static int
  2157. module_traverse(PyObject *mod, visitproc visit, void *arg)
  2158. {
  2159. module_state *state = get_module_state(mod);
  2160. assert(state != NULL);
  2161. traverse_module_state(state, visit, arg);
  2162. return 0;
  2163. }
  2164. static int
  2165. module_clear(PyObject *mod)
  2166. {
  2167. module_state *state = get_module_state(mod);
  2168. assert(state != NULL);
  2169. clear_module_state(state);
  2170. return 0;
  2171. }
  2172. static void
  2173. module_free(void *mod)
  2174. {
  2175. module_state *state = get_module_state(mod);
  2176. assert(state != NULL);
  2177. clear_module_state(state);
  2178. _globals_fini();
  2179. }
  2180. static struct PyModuleDef moduledef = {
  2181. .m_base = PyModuleDef_HEAD_INIT,
  2182. .m_name = MODULE_NAME,
  2183. .m_doc = module_doc,
  2184. .m_size = sizeof(module_state),
  2185. .m_methods = module_functions,
  2186. .m_slots = module_slots,
  2187. .m_traverse = module_traverse,
  2188. .m_clear = module_clear,
  2189. .m_free = (freefunc)module_free,
  2190. };
  2191. PyMODINIT_FUNC
  2192. PyInit__xxinterpchannels(void)
  2193. {
  2194. return PyModuleDef_Init(&moduledef);
  2195. }