1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479 |
- /* interpreters module */
- /* low-level access to interpreter primitives */
- #ifndef Py_BUILD_CORE_BUILTIN
- # define Py_BUILD_CORE_MODULE 1
- #endif
- #include "Python.h"
- #include "interpreteridobject.h"
- #include "pycore_pystate.h" // _PyCrossInterpreterData_ReleaseAndRawFree()
- /*
- This module has the following process-global state:
- _globals (static struct globals):
- module_count (int)
- channels (struct _channels):
- numopen (int64_t)
- next_id; (int64_t)
- mutex (PyThread_type_lock)
- head (linked list of struct _channelref *):
- id (int64_t)
- objcount (Py_ssize_t)
- next (struct _channelref *):
- ...
- chan (struct _channel *):
- open (int)
- mutex (PyThread_type_lock)
- closing (struct _channel_closing *):
- ref (struct _channelref *):
- ...
- ends (struct _channelends *):
- numsendopen (int64_t)
- numrecvopen (int64_t)
- send (struct _channelend *):
- interp (int64_t)
- open (int)
- next (struct _channelend *)
- recv (struct _channelend *):
- ...
- queue (struct _channelqueue *):
- count (int64_t)
- first (struct _channelitem *):
- next (struct _channelitem *):
- ...
- data (_PyCrossInterpreterData *):
- data (void *)
- obj (PyObject *)
- interp (int64_t)
- new_object (xid_newobjectfunc)
- free (xid_freefunc)
- last (struct _channelitem *):
- ...
- The above state includes the following allocations by the module:
- * 1 top-level mutex (to protect the rest of the state)
- * for each channel:
- * 1 struct _channelref
- * 1 struct _channel
- * 0-1 struct _channel_closing
- * 1 struct _channelends
- * 2 struct _channelend
- * 1 struct _channelqueue
- * for each item in each channel:
- * 1 struct _channelitem
- * 1 _PyCrossInterpreterData
- The only objects in that global state are the references held by each
- channel's queue, which are safely managed via the _PyCrossInterpreterData_*()
- API.. The module does not create any objects that are shared globally.
- */
- #define MODULE_NAME "_xxinterpchannels"
- #define GLOBAL_MALLOC(TYPE) \
- PyMem_RawMalloc(sizeof(TYPE))
- #define GLOBAL_FREE(VAR) \
- PyMem_RawFree(VAR)
- static PyInterpreterState *
- _get_current_interp(void)
- {
- // PyInterpreterState_Get() aborts if lookup fails, so don't need
- // to check the result for NULL.
- return PyInterpreterState_Get();
- }
- static PyObject *
- _get_current_module(void)
- {
- PyObject *name = PyUnicode_FromString(MODULE_NAME);
- if (name == NULL) {
- return NULL;
- }
- PyObject *mod = PyImport_GetModule(name);
- Py_DECREF(name);
- if (mod == NULL) {
- return NULL;
- }
- assert(mod != Py_None);
- return mod;
- }
- static PyObject *
- get_module_from_owned_type(PyTypeObject *cls)
- {
- assert(cls != NULL);
- return _get_current_module();
- // XXX Use the more efficient API now that we use heap types:
- //return PyType_GetModule(cls);
- }
- static struct PyModuleDef moduledef;
- static PyObject *
- get_module_from_type(PyTypeObject *cls)
- {
- assert(cls != NULL);
- return _get_current_module();
- // XXX Use the more efficient API now that we use heap types:
- //return PyType_GetModuleByDef(cls, &moduledef);
- }
- static PyObject *
- add_new_exception(PyObject *mod, const char *name, PyObject *base)
- {
- assert(!PyObject_HasAttrString(mod, name));
- PyObject *exctype = PyErr_NewException(name, base, NULL);
- if (exctype == NULL) {
- return NULL;
- }
- int res = PyModule_AddType(mod, (PyTypeObject *)exctype);
- if (res < 0) {
- Py_DECREF(exctype);
- return NULL;
- }
- return exctype;
- }
- #define ADD_NEW_EXCEPTION(MOD, NAME, BASE) \
- add_new_exception(MOD, MODULE_NAME "." Py_STRINGIFY(NAME), BASE)
- static PyTypeObject *
- add_new_type(PyObject *mod, PyType_Spec *spec, crossinterpdatafunc shared)
- {
- PyTypeObject *cls = (PyTypeObject *)PyType_FromMetaclass(
- NULL, mod, spec, NULL);
- if (cls == NULL) {
- return NULL;
- }
- if (PyModule_AddType(mod, cls) < 0) {
- Py_DECREF(cls);
- return NULL;
- }
- if (shared != NULL) {
- if (_PyCrossInterpreterData_RegisterClass(cls, shared)) {
- Py_DECREF(cls);
- return NULL;
- }
- }
- return cls;
- }
- #define XID_IGNORE_EXC 1
- #define XID_FREE 2
- static int
- _release_xid_data(_PyCrossInterpreterData *data, int flags)
- {
- int ignoreexc = flags & XID_IGNORE_EXC;
- PyObject *exc;
- if (ignoreexc) {
- exc = PyErr_GetRaisedException();
- }
- int res;
- if (flags & XID_FREE) {
- res = _PyCrossInterpreterData_ReleaseAndRawFree(data);
- }
- else {
- res = _PyCrossInterpreterData_Release(data);
- }
- if (res < 0) {
- /* The owning interpreter is already destroyed. */
- if (ignoreexc) {
- // XXX Emit a warning?
- PyErr_Clear();
- }
- }
- if (flags & XID_FREE) {
- /* Either way, we free the data. */
- }
- if (ignoreexc) {
- PyErr_SetRaisedException(exc);
- }
- return res;
- }
- /* module state *************************************************************/
- typedef struct {
- /* heap types */
- PyTypeObject *ChannelIDType;
- /* exceptions */
- PyObject *ChannelError;
- PyObject *ChannelNotFoundError;
- PyObject *ChannelClosedError;
- PyObject *ChannelEmptyError;
- PyObject *ChannelNotEmptyError;
- } module_state;
- static inline module_state *
- get_module_state(PyObject *mod)
- {
- assert(mod != NULL);
- module_state *state = PyModule_GetState(mod);
- assert(state != NULL);
- return state;
- }
- static int
- traverse_module_state(module_state *state, visitproc visit, void *arg)
- {
- /* heap types */
- Py_VISIT(state->ChannelIDType);
- /* exceptions */
- Py_VISIT(state->ChannelError);
- Py_VISIT(state->ChannelNotFoundError);
- Py_VISIT(state->ChannelClosedError);
- Py_VISIT(state->ChannelEmptyError);
- Py_VISIT(state->ChannelNotEmptyError);
- return 0;
- }
- static int
- clear_module_state(module_state *state)
- {
- /* heap types */
- if (state->ChannelIDType != NULL) {
- (void)_PyCrossInterpreterData_UnregisterClass(state->ChannelIDType);
- }
- Py_CLEAR(state->ChannelIDType);
- /* exceptions */
- Py_CLEAR(state->ChannelError);
- Py_CLEAR(state->ChannelNotFoundError);
- Py_CLEAR(state->ChannelClosedError);
- Py_CLEAR(state->ChannelEmptyError);
- Py_CLEAR(state->ChannelNotEmptyError);
- return 0;
- }
- /* channel-specific code ****************************************************/
- #define CHANNEL_SEND 1
- #define CHANNEL_BOTH 0
- #define CHANNEL_RECV -1
- /* channel errors */
- #define ERR_CHANNEL_NOT_FOUND -2
- #define ERR_CHANNEL_CLOSED -3
- #define ERR_CHANNEL_INTERP_CLOSED -4
- #define ERR_CHANNEL_EMPTY -5
- #define ERR_CHANNEL_NOT_EMPTY -6
- #define ERR_CHANNEL_MUTEX_INIT -7
- #define ERR_CHANNELS_MUTEX_INIT -8
- #define ERR_NO_NEXT_CHANNEL_ID -9
- static int
- exceptions_init(PyObject *mod)
- {
- module_state *state = get_module_state(mod);
- if (state == NULL) {
- return -1;
- }
- #define ADD(NAME, BASE) \
- do { \
- assert(state->NAME == NULL); \
- state->NAME = ADD_NEW_EXCEPTION(mod, NAME, BASE); \
- if (state->NAME == NULL) { \
- return -1; \
- } \
- } while (0)
- // A channel-related operation failed.
- ADD(ChannelError, PyExc_RuntimeError);
- // An operation tried to use a channel that doesn't exist.
- ADD(ChannelNotFoundError, state->ChannelError);
- // An operation tried to use a closed channel.
- ADD(ChannelClosedError, state->ChannelError);
- // An operation tried to pop from an empty channel.
- ADD(ChannelEmptyError, state->ChannelError);
- // An operation tried to close a non-empty channel.
- ADD(ChannelNotEmptyError, state->ChannelError);
- #undef ADD
- return 0;
- }
- static int
- handle_channel_error(int err, PyObject *mod, int64_t cid)
- {
- if (err == 0) {
- assert(!PyErr_Occurred());
- return 0;
- }
- assert(err < 0);
- module_state *state = get_module_state(mod);
- assert(state != NULL);
- if (err == ERR_CHANNEL_NOT_FOUND) {
- PyErr_Format(state->ChannelNotFoundError,
- "channel %" PRId64 " not found", cid);
- }
- else if (err == ERR_CHANNEL_CLOSED) {
- PyErr_Format(state->ChannelClosedError,
- "channel %" PRId64 " is closed", cid);
- }
- else if (err == ERR_CHANNEL_INTERP_CLOSED) {
- PyErr_Format(state->ChannelClosedError,
- "channel %" PRId64 " is already closed", cid);
- }
- else if (err == ERR_CHANNEL_EMPTY) {
- PyErr_Format(state->ChannelEmptyError,
- "channel %" PRId64 " is empty", cid);
- }
- else if (err == ERR_CHANNEL_NOT_EMPTY) {
- PyErr_Format(state->ChannelNotEmptyError,
- "channel %" PRId64 " may not be closed "
- "if not empty (try force=True)",
- cid);
- }
- else if (err == ERR_CHANNEL_MUTEX_INIT) {
- PyErr_SetString(state->ChannelError,
- "can't initialize mutex for new channel");
- }
- else if (err == ERR_CHANNELS_MUTEX_INIT) {
- PyErr_SetString(state->ChannelError,
- "can't initialize mutex for channel management");
- }
- else if (err == ERR_NO_NEXT_CHANNEL_ID) {
- PyErr_SetString(state->ChannelError,
- "failed to get a channel ID");
- }
- else {
- assert(PyErr_Occurred());
- }
- return 1;
- }
- /* the channel queue */
- struct _channelitem;
- typedef struct _channelitem {
- _PyCrossInterpreterData *data;
- struct _channelitem *next;
- } _channelitem;
- static _channelitem *
- _channelitem_new(void)
- {
- _channelitem *item = GLOBAL_MALLOC(_channelitem);
- if (item == NULL) {
- PyErr_NoMemory();
- return NULL;
- }
- item->data = NULL;
- item->next = NULL;
- return item;
- }
- static void
- _channelitem_clear(_channelitem *item)
- {
- if (item->data != NULL) {
- // It was allocated in _channel_send().
- (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
- item->data = NULL;
- }
- item->next = NULL;
- }
- static void
- _channelitem_free(_channelitem *item)
- {
- _channelitem_clear(item);
- GLOBAL_FREE(item);
- }
- static void
- _channelitem_free_all(_channelitem *item)
- {
- while (item != NULL) {
- _channelitem *last = item;
- item = item->next;
- _channelitem_free(last);
- }
- }
- static _PyCrossInterpreterData *
- _channelitem_popped(_channelitem *item)
- {
- _PyCrossInterpreterData *data = item->data;
- item->data = NULL;
- _channelitem_free(item);
- return data;
- }
- typedef struct _channelqueue {
- int64_t count;
- _channelitem *first;
- _channelitem *last;
- } _channelqueue;
- static _channelqueue *
- _channelqueue_new(void)
- {
- _channelqueue *queue = GLOBAL_MALLOC(_channelqueue);
- if (queue == NULL) {
- PyErr_NoMemory();
- return NULL;
- }
- queue->count = 0;
- queue->first = NULL;
- queue->last = NULL;
- return queue;
- }
- static void
- _channelqueue_clear(_channelqueue *queue)
- {
- _channelitem_free_all(queue->first);
- queue->count = 0;
- queue->first = NULL;
- queue->last = NULL;
- }
- static void
- _channelqueue_free(_channelqueue *queue)
- {
- _channelqueue_clear(queue);
- GLOBAL_FREE(queue);
- }
- static int
- _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
- {
- _channelitem *item = _channelitem_new();
- if (item == NULL) {
- return -1;
- }
- item->data = data;
- queue->count += 1;
- if (queue->first == NULL) {
- queue->first = item;
- }
- else {
- queue->last->next = item;
- }
- queue->last = item;
- return 0;
- }
- static _PyCrossInterpreterData *
- _channelqueue_get(_channelqueue *queue)
- {
- _channelitem *item = queue->first;
- if (item == NULL) {
- return NULL;
- }
- queue->first = item->next;
- if (queue->last == item) {
- queue->last = NULL;
- }
- queue->count -= 1;
- return _channelitem_popped(item);
- }
- static void
- _channelqueue_drop_interpreter(_channelqueue *queue, int64_t interp)
- {
- _channelitem *prev = NULL;
- _channelitem *next = queue->first;
- while (next != NULL) {
- _channelitem *item = next;
- next = item->next;
- if (item->data->interp == interp) {
- if (prev == NULL) {
- queue->first = item->next;
- }
- else {
- prev->next = item->next;
- }
- _channelitem_free(item);
- queue->count -= 1;
- }
- else {
- prev = item;
- }
- }
- }
- /* channel-interpreter associations */
- struct _channelend;
- typedef struct _channelend {
- struct _channelend *next;
- int64_t interp;
- int open;
- } _channelend;
- static _channelend *
- _channelend_new(int64_t interp)
- {
- _channelend *end = GLOBAL_MALLOC(_channelend);
- if (end == NULL) {
- PyErr_NoMemory();
- return NULL;
- }
- end->next = NULL;
- end->interp = interp;
- end->open = 1;
- return end;
- }
- static void
- _channelend_free(_channelend *end)
- {
- GLOBAL_FREE(end);
- }
- static void
- _channelend_free_all(_channelend *end)
- {
- while (end != NULL) {
- _channelend *last = end;
- end = end->next;
- _channelend_free(last);
- }
- }
- static _channelend *
- _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
- {
- _channelend *prev = NULL;
- _channelend *end = first;
- while (end != NULL) {
- if (end->interp == interp) {
- break;
- }
- prev = end;
- end = end->next;
- }
- if (pprev != NULL) {
- *pprev = prev;
- }
- return end;
- }
- typedef struct _channelassociations {
- // Note that the list entries are never removed for interpreter
- // for which the channel is closed. This should not be a problem in
- // practice. Also, a channel isn't automatically closed when an
- // interpreter is destroyed.
- int64_t numsendopen;
- int64_t numrecvopen;
- _channelend *send;
- _channelend *recv;
- } _channelends;
- static _channelends *
- _channelends_new(void)
- {
- _channelends *ends = GLOBAL_MALLOC(_channelends);
- if (ends== NULL) {
- return NULL;
- }
- ends->numsendopen = 0;
- ends->numrecvopen = 0;
- ends->send = NULL;
- ends->recv = NULL;
- return ends;
- }
- static void
- _channelends_clear(_channelends *ends)
- {
- _channelend_free_all(ends->send);
- ends->send = NULL;
- ends->numsendopen = 0;
- _channelend_free_all(ends->recv);
- ends->recv = NULL;
- ends->numrecvopen = 0;
- }
- static void
- _channelends_free(_channelends *ends)
- {
- _channelends_clear(ends);
- GLOBAL_FREE(ends);
- }
- static _channelend *
- _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
- int send)
- {
- _channelend *end = _channelend_new(interp);
- if (end == NULL) {
- return NULL;
- }
- if (prev == NULL) {
- if (send) {
- ends->send = end;
- }
- else {
- ends->recv = end;
- }
- }
- else {
- prev->next = end;
- }
- if (send) {
- ends->numsendopen += 1;
- }
- else {
- ends->numrecvopen += 1;
- }
- return end;
- }
- static int
- _channelends_associate(_channelends *ends, int64_t interp, int send)
- {
- _channelend *prev;
- _channelend *end = _channelend_find(send ? ends->send : ends->recv,
- interp, &prev);
- if (end != NULL) {
- if (!end->open) {
- return ERR_CHANNEL_CLOSED;
- }
- // already associated
- return 0;
- }
- if (_channelends_add(ends, prev, interp, send) == NULL) {
- return -1;
- }
- return 0;
- }
- static int
- _channelends_is_open(_channelends *ends)
- {
- if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
- return 1;
- }
- if (ends->send == NULL && ends->recv == NULL) {
- return 1;
- }
- return 0;
- }
- static void
- _channelends_close_end(_channelends *ends, _channelend *end, int send)
- {
- end->open = 0;
- if (send) {
- ends->numsendopen -= 1;
- }
- else {
- ends->numrecvopen -= 1;
- }
- }
- static int
- _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
- {
- _channelend *prev;
- _channelend *end;
- if (which >= 0) { // send/both
- end = _channelend_find(ends->send, interp, &prev);
- if (end == NULL) {
- // never associated so add it
- end = _channelends_add(ends, prev, interp, 1);
- if (end == NULL) {
- return -1;
- }
- }
- _channelends_close_end(ends, end, 1);
- }
- if (which <= 0) { // recv/both
- end = _channelend_find(ends->recv, interp, &prev);
- if (end == NULL) {
- // never associated so add it
- end = _channelends_add(ends, prev, interp, 0);
- if (end == NULL) {
- return -1;
- }
- }
- _channelends_close_end(ends, end, 0);
- }
- return 0;
- }
- static void
- _channelends_drop_interpreter(_channelends *ends, int64_t interp)
- {
- _channelend *end;
- end = _channelend_find(ends->send, interp, NULL);
- if (end != NULL) {
- _channelends_close_end(ends, end, 1);
- }
- end = _channelend_find(ends->recv, interp, NULL);
- if (end != NULL) {
- _channelends_close_end(ends, end, 0);
- }
- }
- static void
- _channelends_close_all(_channelends *ends, int which, int force)
- {
- // XXX Handle the ends.
- // XXX Handle force is True.
- // Ensure all the "send"-associated interpreters are closed.
- _channelend *end;
- for (end = ends->send; end != NULL; end = end->next) {
- _channelends_close_end(ends, end, 1);
- }
- // Ensure all the "recv"-associated interpreters are closed.
- for (end = ends->recv; end != NULL; end = end->next) {
- _channelends_close_end(ends, end, 0);
- }
- }
- /* channels */
- struct _channel;
- struct _channel_closing;
- static void _channel_clear_closing(struct _channel *);
- static void _channel_finish_closing(struct _channel *);
- typedef struct _channel {
- PyThread_type_lock mutex;
- _channelqueue *queue;
- _channelends *ends;
- int open;
- struct _channel_closing *closing;
- } _PyChannelState;
- static _PyChannelState *
- _channel_new(PyThread_type_lock mutex)
- {
- _PyChannelState *chan = GLOBAL_MALLOC(_PyChannelState);
- if (chan == NULL) {
- return NULL;
- }
- chan->mutex = mutex;
- chan->queue = _channelqueue_new();
- if (chan->queue == NULL) {
- GLOBAL_FREE(chan);
- return NULL;
- }
- chan->ends = _channelends_new();
- if (chan->ends == NULL) {
- _channelqueue_free(chan->queue);
- GLOBAL_FREE(chan);
- return NULL;
- }
- chan->open = 1;
- chan->closing = NULL;
- return chan;
- }
- static void
- _channel_free(_PyChannelState *chan)
- {
- _channel_clear_closing(chan);
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- _channelqueue_free(chan->queue);
- _channelends_free(chan->ends);
- PyThread_release_lock(chan->mutex);
- PyThread_free_lock(chan->mutex);
- GLOBAL_FREE(chan);
- }
- static int
- _channel_add(_PyChannelState *chan, int64_t interp,
- _PyCrossInterpreterData *data)
- {
- int res = -1;
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- if (!chan->open) {
- res = ERR_CHANNEL_CLOSED;
- goto done;
- }
- if (_channelends_associate(chan->ends, interp, 1) != 0) {
- res = ERR_CHANNEL_INTERP_CLOSED;
- goto done;
- }
- if (_channelqueue_put(chan->queue, data) != 0) {
- goto done;
- }
- res = 0;
- done:
- PyThread_release_lock(chan->mutex);
- return res;
- }
- static int
- _channel_next(_PyChannelState *chan, int64_t interp,
- _PyCrossInterpreterData **res)
- {
- int err = 0;
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- if (!chan->open) {
- err = ERR_CHANNEL_CLOSED;
- goto done;
- }
- if (_channelends_associate(chan->ends, interp, 0) != 0) {
- err = ERR_CHANNEL_INTERP_CLOSED;
- goto done;
- }
- _PyCrossInterpreterData *data = _channelqueue_get(chan->queue);
- if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
- chan->open = 0;
- }
- *res = data;
- done:
- PyThread_release_lock(chan->mutex);
- if (chan->queue->count == 0) {
- _channel_finish_closing(chan);
- }
- return err;
- }
- static int
- _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
- {
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- int res = -1;
- if (!chan->open) {
- res = ERR_CHANNEL_CLOSED;
- goto done;
- }
- if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
- goto done;
- }
- chan->open = _channelends_is_open(chan->ends);
- res = 0;
- done:
- PyThread_release_lock(chan->mutex);
- return res;
- }
- static void
- _channel_drop_interpreter(_PyChannelState *chan, int64_t interp)
- {
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- _channelqueue_drop_interpreter(chan->queue, interp);
- _channelends_drop_interpreter(chan->ends, interp);
- chan->open = _channelends_is_open(chan->ends);
- PyThread_release_lock(chan->mutex);
- }
- static int
- _channel_close_all(_PyChannelState *chan, int end, int force)
- {
- int res = -1;
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- if (!chan->open) {
- res = ERR_CHANNEL_CLOSED;
- goto done;
- }
- if (!force && chan->queue->count > 0) {
- res = ERR_CHANNEL_NOT_EMPTY;
- goto done;
- }
- chan->open = 0;
- // We *could* also just leave these in place, since we've marked
- // the channel as closed already.
- _channelends_close_all(chan->ends, end, force);
- res = 0;
- done:
- PyThread_release_lock(chan->mutex);
- return res;
- }
- /* the set of channels */
- struct _channelref;
- typedef struct _channelref {
- int64_t id;
- _PyChannelState *chan;
- struct _channelref *next;
- Py_ssize_t objcount;
- } _channelref;
- static _channelref *
- _channelref_new(int64_t id, _PyChannelState *chan)
- {
- _channelref *ref = GLOBAL_MALLOC(_channelref);
- if (ref == NULL) {
- return NULL;
- }
- ref->id = id;
- ref->chan = chan;
- ref->next = NULL;
- ref->objcount = 0;
- return ref;
- }
- //static void
- //_channelref_clear(_channelref *ref)
- //{
- // ref->id = -1;
- // ref->chan = NULL;
- // ref->next = NULL;
- // ref->objcount = 0;
- //}
- static void
- _channelref_free(_channelref *ref)
- {
- if (ref->chan != NULL) {
- _channel_clear_closing(ref->chan);
- }
- //_channelref_clear(ref);
- GLOBAL_FREE(ref);
- }
- static _channelref *
- _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
- {
- _channelref *prev = NULL;
- _channelref *ref = first;
- while (ref != NULL) {
- if (ref->id == id) {
- break;
- }
- prev = ref;
- ref = ref->next;
- }
- if (pprev != NULL) {
- *pprev = prev;
- }
- return ref;
- }
- typedef struct _channels {
- PyThread_type_lock mutex;
- _channelref *head;
- int64_t numopen;
- int64_t next_id;
- } _channels;
- static void
- _channels_init(_channels *channels, PyThread_type_lock mutex)
- {
- channels->mutex = mutex;
- channels->head = NULL;
- channels->numopen = 0;
- channels->next_id = 0;
- }
- static void
- _channels_fini(_channels *channels)
- {
- assert(channels->numopen == 0);
- assert(channels->head == NULL);
- if (channels->mutex != NULL) {
- PyThread_free_lock(channels->mutex);
- channels->mutex = NULL;
- }
- }
- static int64_t
- _channels_next_id(_channels *channels) // needs lock
- {
- int64_t id = channels->next_id;
- if (id < 0) {
- /* overflow */
- return -1;
- }
- channels->next_id += 1;
- return id;
- }
- static int
- _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex,
- _PyChannelState **res)
- {
- int err = -1;
- _PyChannelState *chan = NULL;
- PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- if (pmutex != NULL) {
- *pmutex = NULL;
- }
- _channelref *ref = _channelref_find(channels->head, id, NULL);
- if (ref == NULL) {
- err = ERR_CHANNEL_NOT_FOUND;
- goto done;
- }
- if (ref->chan == NULL || !ref->chan->open) {
- err = ERR_CHANNEL_CLOSED;
- goto done;
- }
- if (pmutex != NULL) {
- // The mutex will be closed by the caller.
- *pmutex = channels->mutex;
- }
- chan = ref->chan;
- err = 0;
- done:
- if (pmutex == NULL || *pmutex == NULL) {
- PyThread_release_lock(channels->mutex);
- }
- *res = chan;
- return err;
- }
- static int64_t
- _channels_add(_channels *channels, _PyChannelState *chan)
- {
- int64_t cid = -1;
- PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- // Create a new ref.
- int64_t id = _channels_next_id(channels);
- if (id < 0) {
- cid = ERR_NO_NEXT_CHANNEL_ID;
- goto done;
- }
- _channelref *ref = _channelref_new(id, chan);
- if (ref == NULL) {
- goto done;
- }
- // Add it to the list.
- // We assume that the channel is a new one (not already in the list).
- ref->next = channels->head;
- channels->head = ref;
- channels->numopen += 1;
- cid = id;
- done:
- PyThread_release_lock(channels->mutex);
- return cid;
- }
- /* forward */
- static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
- static int
- _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
- int end, int force)
- {
- int res = -1;
- PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- if (pchan != NULL) {
- *pchan = NULL;
- }
- _channelref *ref = _channelref_find(channels->head, cid, NULL);
- if (ref == NULL) {
- res = ERR_CHANNEL_NOT_FOUND;
- goto done;
- }
- if (ref->chan == NULL) {
- res = ERR_CHANNEL_CLOSED;
- goto done;
- }
- else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
- res = ERR_CHANNEL_CLOSED;
- goto done;
- }
- else {
- int err = _channel_close_all(ref->chan, end, force);
- if (err != 0) {
- if (end == CHANNEL_SEND && err == ERR_CHANNEL_NOT_EMPTY) {
- if (ref->chan->closing != NULL) {
- res = ERR_CHANNEL_CLOSED;
- goto done;
- }
- // Mark the channel as closing and return. The channel
- // will be cleaned up in _channel_next().
- PyErr_Clear();
- int err = _channel_set_closing(ref, channels->mutex);
- if (err != 0) {
- res = err;
- goto done;
- }
- if (pchan != NULL) {
- *pchan = ref->chan;
- }
- res = 0;
- }
- else {
- res = err;
- }
- goto done;
- }
- if (pchan != NULL) {
- *pchan = ref->chan;
- }
- else {
- _channel_free(ref->chan);
- }
- ref->chan = NULL;
- }
- res = 0;
- done:
- PyThread_release_lock(channels->mutex);
- return res;
- }
- static void
- _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
- _PyChannelState **pchan)
- {
- if (ref == channels->head) {
- channels->head = ref->next;
- }
- else {
- prev->next = ref->next;
- }
- channels->numopen -= 1;
- if (pchan != NULL) {
- *pchan = ref->chan;
- }
- _channelref_free(ref);
- }
- static int
- _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
- {
- int res = -1;
- PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- if (pchan != NULL) {
- *pchan = NULL;
- }
- _channelref *prev = NULL;
- _channelref *ref = _channelref_find(channels->head, id, &prev);
- if (ref == NULL) {
- res = ERR_CHANNEL_NOT_FOUND;
- goto done;
- }
- _channels_remove_ref(channels, ref, prev, pchan);
- res = 0;
- done:
- PyThread_release_lock(channels->mutex);
- return res;
- }
- static int
- _channels_add_id_object(_channels *channels, int64_t id)
- {
- int res = -1;
- PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- _channelref *ref = _channelref_find(channels->head, id, NULL);
- if (ref == NULL) {
- res = ERR_CHANNEL_NOT_FOUND;
- goto done;
- }
- ref->objcount += 1;
- res = 0;
- done:
- PyThread_release_lock(channels->mutex);
- return res;
- }
- static void
- _channels_drop_id_object(_channels *channels, int64_t id)
- {
- PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- _channelref *prev = NULL;
- _channelref *ref = _channelref_find(channels->head, id, &prev);
- if (ref == NULL) {
- // Already destroyed.
- goto done;
- }
- ref->objcount -= 1;
- // Destroy if no longer used.
- if (ref->objcount == 0) {
- _PyChannelState *chan = NULL;
- _channels_remove_ref(channels, ref, prev, &chan);
- if (chan != NULL) {
- _channel_free(chan);
- }
- }
- done:
- PyThread_release_lock(channels->mutex);
- }
- static int64_t *
- _channels_list_all(_channels *channels, int64_t *count)
- {
- int64_t *cids = NULL;
- PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
- if (ids == NULL) {
- goto done;
- }
- _channelref *ref = channels->head;
- for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
- ids[i] = ref->id;
- }
- *count = channels->numopen;
- cids = ids;
- done:
- PyThread_release_lock(channels->mutex);
- return cids;
- }
- static void
- _channels_drop_interpreter(_channels *channels, int64_t interp)
- {
- PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- _channelref *ref = channels->head;
- for (; ref != NULL; ref = ref->next) {
- if (ref->chan != NULL) {
- _channel_drop_interpreter(ref->chan, interp);
- }
- }
- PyThread_release_lock(channels->mutex);
- }
- /* support for closing non-empty channels */
- struct _channel_closing {
- struct _channelref *ref;
- };
- static int
- _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
- struct _channel *chan = ref->chan;
- if (chan == NULL) {
- // already closed
- return 0;
- }
- int res = -1;
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- if (chan->closing != NULL) {
- res = ERR_CHANNEL_CLOSED;
- goto done;
- }
- chan->closing = GLOBAL_MALLOC(struct _channel_closing);
- if (chan->closing == NULL) {
- goto done;
- }
- chan->closing->ref = ref;
- res = 0;
- done:
- PyThread_release_lock(chan->mutex);
- return res;
- }
- static void
- _channel_clear_closing(struct _channel *chan) {
- PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
- if (chan->closing != NULL) {
- GLOBAL_FREE(chan->closing);
- chan->closing = NULL;
- }
- PyThread_release_lock(chan->mutex);
- }
- static void
- _channel_finish_closing(struct _channel *chan) {
- struct _channel_closing *closing = chan->closing;
- if (closing == NULL) {
- return;
- }
- _channelref *ref = closing->ref;
- _channel_clear_closing(chan);
- // Do the things that would have been done in _channels_close().
- ref->chan = NULL;
- _channel_free(chan);
- }
- /* "high"-level channel-related functions */
- static int64_t
- _channel_create(_channels *channels)
- {
- PyThread_type_lock mutex = PyThread_allocate_lock();
- if (mutex == NULL) {
- return ERR_CHANNEL_MUTEX_INIT;
- }
- _PyChannelState *chan = _channel_new(mutex);
- if (chan == NULL) {
- PyThread_free_lock(mutex);
- return -1;
- }
- int64_t id = _channels_add(channels, chan);
- if (id < 0) {
- _channel_free(chan);
- }
- return id;
- }
- static int
- _channel_destroy(_channels *channels, int64_t id)
- {
- _PyChannelState *chan = NULL;
- int err = _channels_remove(channels, id, &chan);
- if (err != 0) {
- return err;
- }
- if (chan != NULL) {
- _channel_free(chan);
- }
- return 0;
- }
- static int
- _channel_send(_channels *channels, int64_t id, PyObject *obj)
- {
- PyInterpreterState *interp = _get_current_interp();
- if (interp == NULL) {
- return -1;
- }
- // Look up the channel.
- PyThread_type_lock mutex = NULL;
- _PyChannelState *chan = NULL;
- int err = _channels_lookup(channels, id, &mutex, &chan);
- if (err != 0) {
- return err;
- }
- assert(chan != NULL);
- // Past this point we are responsible for releasing the mutex.
- if (chan->closing != NULL) {
- PyThread_release_lock(mutex);
- return ERR_CHANNEL_CLOSED;
- }
- // Convert the object to cross-interpreter data.
- _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
- if (data == NULL) {
- PyThread_release_lock(mutex);
- return -1;
- }
- if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
- PyThread_release_lock(mutex);
- GLOBAL_FREE(data);
- return -1;
- }
- // Add the data to the channel.
- int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
- PyThread_release_lock(mutex);
- if (res != 0) {
- // We may chain an exception here:
- (void)_release_xid_data(data, 0);
- GLOBAL_FREE(data);
- return res;
- }
- return 0;
- }
- static int
- _channel_recv(_channels *channels, int64_t id, PyObject **res)
- {
- int err;
- *res = NULL;
- PyInterpreterState *interp = _get_current_interp();
- if (interp == NULL) {
- // XXX Is this always an error?
- if (PyErr_Occurred()) {
- return -1;
- }
- return 0;
- }
- // Look up the channel.
- PyThread_type_lock mutex = NULL;
- _PyChannelState *chan = NULL;
- err = _channels_lookup(channels, id, &mutex, &chan);
- if (err != 0) {
- return err;
- }
- assert(chan != NULL);
- // Past this point we are responsible for releasing the mutex.
- // Pop off the next item from the channel.
- _PyCrossInterpreterData *data = NULL;
- err = _channel_next(chan, PyInterpreterState_GetID(interp), &data);
- PyThread_release_lock(mutex);
- if (err != 0) {
- return err;
- }
- else if (data == NULL) {
- assert(!PyErr_Occurred());
- return 0;
- }
- // Convert the data back to an object.
- PyObject *obj = _PyCrossInterpreterData_NewObject(data);
- if (obj == NULL) {
- assert(PyErr_Occurred());
- // It was allocated in _channel_send(), so we free it.
- (void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE);
- return -1;
- }
- // It was allocated in _channel_send(), so we free it.
- int release_res = _release_xid_data(data, XID_FREE);
- if (release_res < 0) {
- // The source interpreter has been destroyed already.
- assert(PyErr_Occurred());
- Py_DECREF(obj);
- return -1;
- }
- *res = obj;
- return 0;
- }
- static int
- _channel_drop(_channels *channels, int64_t id, int send, int recv)
- {
- PyInterpreterState *interp = _get_current_interp();
- if (interp == NULL) {
- return -1;
- }
- // Look up the channel.
- PyThread_type_lock mutex = NULL;
- _PyChannelState *chan = NULL;
- int err = _channels_lookup(channels, id, &mutex, &chan);
- if (err != 0) {
- return err;
- }
- // Past this point we are responsible for releasing the mutex.
- // Close one or both of the two ends.
- int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
- PyThread_release_lock(mutex);
- return res;
- }
- static int
- _channel_close(_channels *channels, int64_t id, int end, int force)
- {
- return _channels_close(channels, id, NULL, end, force);
- }
- static int
- _channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
- int send)
- {
- _PyChannelState *chan = NULL;
- int err = _channels_lookup(channels, cid, NULL, &chan);
- if (err != 0) {
- return err;
- }
- else if (send && chan->closing != NULL) {
- return ERR_CHANNEL_CLOSED;
- }
- _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
- interp, NULL);
- return (end != NULL && end->open);
- }
- /* ChannelID class */
- typedef struct channelid {
- PyObject_HEAD
- int64_t id;
- int end;
- int resolve;
- _channels *channels;
- } channelid;
- struct channel_id_converter_data {
- PyObject *module;
- int64_t cid;
- };
- static int
- channel_id_converter(PyObject *arg, void *ptr)
- {
- int64_t cid;
- struct channel_id_converter_data *data = ptr;
- module_state *state = get_module_state(data->module);
- assert(state != NULL);
- if (PyObject_TypeCheck(arg, state->ChannelIDType)) {
- cid = ((channelid *)arg)->id;
- }
- else if (PyIndex_Check(arg)) {
- cid = PyLong_AsLongLong(arg);
- if (cid == -1 && PyErr_Occurred()) {
- return 0;
- }
- if (cid < 0) {
- PyErr_Format(PyExc_ValueError,
- "channel ID must be a non-negative int, got %R", arg);
- return 0;
- }
- }
- else {
- PyErr_Format(PyExc_TypeError,
- "channel ID must be an int, got %.100s",
- Py_TYPE(arg)->tp_name);
- return 0;
- }
- data->cid = cid;
- return 1;
- }
- static int
- newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
- int force, int resolve, channelid **res)
- {
- *res = NULL;
- channelid *self = PyObject_New(channelid, cls);
- if (self == NULL) {
- return -1;
- }
- self->id = cid;
- self->end = end;
- self->resolve = resolve;
- self->channels = channels;
- int err = _channels_add_id_object(channels, cid);
- if (err != 0) {
- if (force && err == ERR_CHANNEL_NOT_FOUND) {
- assert(!PyErr_Occurred());
- }
- else {
- Py_DECREF((PyObject *)self);
- return err;
- }
- }
- *res = self;
- return 0;
- }
- static _channels * _global_channels(void);
- static PyObject *
- _channelid_new(PyObject *mod, PyTypeObject *cls,
- PyObject *args, PyObject *kwds)
- {
- static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
- int64_t cid;
- struct channel_id_converter_data cid_data = {
- .module = mod,
- };
- int send = -1;
- int recv = -1;
- int force = 0;
- int resolve = 0;
- if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O&|$pppp:ChannelID.__new__", kwlist,
- channel_id_converter, &cid_data,
- &send, &recv, &force, &resolve)) {
- return NULL;
- }
- cid = cid_data.cid;
- // Handle "send" and "recv".
- if (send == 0 && recv == 0) {
- PyErr_SetString(PyExc_ValueError,
- "'send' and 'recv' cannot both be False");
- return NULL;
- }
- int end = 0;
- if (send == 1) {
- if (recv == 0 || recv == -1) {
- end = CHANNEL_SEND;
- }
- }
- else if (recv == 1) {
- end = CHANNEL_RECV;
- }
- PyObject *id = NULL;
- int err = newchannelid(cls, cid, end, _global_channels(),
- force, resolve,
- (channelid **)&id);
- if (handle_channel_error(err, mod, cid)) {
- assert(id == NULL);
- return NULL;
- }
- assert(id != NULL);
- return id;
- }
- static void
- channelid_dealloc(PyObject *self)
- {
- int64_t cid = ((channelid *)self)->id;
- _channels *channels = ((channelid *)self)->channels;
- PyTypeObject *tp = Py_TYPE(self);
- tp->tp_free(self);
- /* "Instances of heap-allocated types hold a reference to their type."
- * See: https://docs.python.org/3.11/howto/isolating-extensions.html#garbage-collection-protocol
- * See: https://docs.python.org/3.11/c-api/typeobj.html#c.PyTypeObject.tp_traverse
- */
- // XXX Why don't we implement Py_TPFLAGS_HAVE_GC, e.g. Py_tp_traverse,
- // like we do for _abc._abc_data?
- Py_DECREF(tp);
- _channels_drop_id_object(channels, cid);
- }
- static PyObject *
- channelid_repr(PyObject *self)
- {
- PyTypeObject *type = Py_TYPE(self);
- const char *name = _PyType_Name(type);
- channelid *cid = (channelid *)self;
- const char *fmt;
- if (cid->end == CHANNEL_SEND) {
- fmt = "%s(%" PRId64 ", send=True)";
- }
- else if (cid->end == CHANNEL_RECV) {
- fmt = "%s(%" PRId64 ", recv=True)";
- }
- else {
- fmt = "%s(%" PRId64 ")";
- }
- return PyUnicode_FromFormat(fmt, name, cid->id);
- }
- static PyObject *
- channelid_str(PyObject *self)
- {
- channelid *cid = (channelid *)self;
- return PyUnicode_FromFormat("%" PRId64 "", cid->id);
- }
- static PyObject *
- channelid_int(PyObject *self)
- {
- channelid *cid = (channelid *)self;
- return PyLong_FromLongLong(cid->id);
- }
- static Py_hash_t
- channelid_hash(PyObject *self)
- {
- channelid *cid = (channelid *)self;
- PyObject *id = PyLong_FromLongLong(cid->id);
- if (id == NULL) {
- return -1;
- }
- Py_hash_t hash = PyObject_Hash(id);
- Py_DECREF(id);
- return hash;
- }
- static PyObject *
- channelid_richcompare(PyObject *self, PyObject *other, int op)
- {
- PyObject *res = NULL;
- if (op != Py_EQ && op != Py_NE) {
- Py_RETURN_NOTIMPLEMENTED;
- }
- PyObject *mod = get_module_from_type(Py_TYPE(self));
- if (mod == NULL) {
- return NULL;
- }
- module_state *state = get_module_state(mod);
- if (state == NULL) {
- goto done;
- }
- if (!PyObject_TypeCheck(self, state->ChannelIDType)) {
- res = Py_NewRef(Py_NotImplemented);
- goto done;
- }
- channelid *cid = (channelid *)self;
- int equal;
- if (PyObject_TypeCheck(other, state->ChannelIDType)) {
- channelid *othercid = (channelid *)other;
- equal = (cid->end == othercid->end) && (cid->id == othercid->id);
- }
- else if (PyLong_Check(other)) {
- /* Fast path */
- int overflow;
- long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
- if (othercid == -1 && PyErr_Occurred()) {
- goto done;
- }
- equal = !overflow && (othercid >= 0) && (cid->id == othercid);
- }
- else if (PyNumber_Check(other)) {
- PyObject *pyid = PyLong_FromLongLong(cid->id);
- if (pyid == NULL) {
- goto done;
- }
- res = PyObject_RichCompare(pyid, other, op);
- Py_DECREF(pyid);
- goto done;
- }
- else {
- res = Py_NewRef(Py_NotImplemented);
- goto done;
- }
- if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
- res = Py_NewRef(Py_True);
- }
- else {
- res = Py_NewRef(Py_False);
- }
- done:
- Py_DECREF(mod);
- return res;
- }
- static PyObject *
- _channel_from_cid(PyObject *cid, int end)
- {
- PyObject *highlevel = PyImport_ImportModule("interpreters");
- if (highlevel == NULL) {
- PyErr_Clear();
- highlevel = PyImport_ImportModule("test.support.interpreters");
- if (highlevel == NULL) {
- return NULL;
- }
- }
- const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
- "SendChannel";
- PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
- Py_DECREF(highlevel);
- if (cls == NULL) {
- return NULL;
- }
- PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
- Py_DECREF(cls);
- if (chan == NULL) {
- return NULL;
- }
- return chan;
- }
- struct _channelid_xid {
- int64_t id;
- int end;
- int resolve;
- };
- static PyObject *
- _channelid_from_xid(_PyCrossInterpreterData *data)
- {
- struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
- // It might not be imported yet, so we can't use _get_current_module().
- PyObject *mod = PyImport_ImportModule(MODULE_NAME);
- if (mod == NULL) {
- return NULL;
- }
- assert(mod != Py_None);
- module_state *state = get_module_state(mod);
- if (state == NULL) {
- return NULL;
- }
- // Note that we do not preserve the "resolve" flag.
- PyObject *cid = NULL;
- int err = newchannelid(state->ChannelIDType, xid->id, xid->end,
- _global_channels(), 0, 0,
- (channelid **)&cid);
- if (err != 0) {
- assert(cid == NULL);
- (void)handle_channel_error(err, mod, xid->id);
- goto done;
- }
- assert(cid != NULL);
- if (xid->end == 0) {
- goto done;
- }
- if (!xid->resolve) {
- goto done;
- }
- /* Try returning a high-level channel end but fall back to the ID. */
- PyObject *chan = _channel_from_cid(cid, xid->end);
- if (chan == NULL) {
- PyErr_Clear();
- goto done;
- }
- Py_DECREF(cid);
- cid = chan;
- done:
- Py_DECREF(mod);
- return cid;
- }
- static int
- _channelid_shared(PyThreadState *tstate, PyObject *obj,
- _PyCrossInterpreterData *data)
- {
- if (_PyCrossInterpreterData_InitWithSize(
- data, tstate->interp, sizeof(struct _channelid_xid), obj,
- _channelid_from_xid
- ) < 0)
- {
- return -1;
- }
- struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
- xid->id = ((channelid *)obj)->id;
- xid->end = ((channelid *)obj)->end;
- xid->resolve = ((channelid *)obj)->resolve;
- return 0;
- }
- static PyObject *
- channelid_end(PyObject *self, void *end)
- {
- int force = 1;
- channelid *cid = (channelid *)self;
- if (end != NULL) {
- PyObject *id = NULL;
- int err = newchannelid(Py_TYPE(self), cid->id, *(int *)end,
- cid->channels, force, cid->resolve,
- (channelid **)&id);
- if (err != 0) {
- assert(id == NULL);
- PyObject *mod = get_module_from_type(Py_TYPE(self));
- if (mod == NULL) {
- return NULL;
- }
- (void)handle_channel_error(err, mod, cid->id);
- Py_DECREF(mod);
- return NULL;
- }
- assert(id != NULL);
- return id;
- }
- if (cid->end == CHANNEL_SEND) {
- return PyUnicode_InternFromString("send");
- }
- if (cid->end == CHANNEL_RECV) {
- return PyUnicode_InternFromString("recv");
- }
- return PyUnicode_InternFromString("both");
- }
- static int _channelid_end_send = CHANNEL_SEND;
- static int _channelid_end_recv = CHANNEL_RECV;
- static PyGetSetDef channelid_getsets[] = {
- {"end", (getter)channelid_end, NULL,
- PyDoc_STR("'send', 'recv', or 'both'")},
- {"send", (getter)channelid_end, NULL,
- PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
- {"recv", (getter)channelid_end, NULL,
- PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
- {NULL}
- };
- PyDoc_STRVAR(channelid_doc,
- "A channel ID identifies a channel and may be used as an int.");
- static PyType_Slot ChannelIDType_slots[] = {
- {Py_tp_dealloc, (destructor)channelid_dealloc},
- {Py_tp_doc, (void *)channelid_doc},
- {Py_tp_repr, (reprfunc)channelid_repr},
- {Py_tp_str, (reprfunc)channelid_str},
- {Py_tp_hash, channelid_hash},
- {Py_tp_richcompare, channelid_richcompare},
- {Py_tp_getset, channelid_getsets},
- // number slots
- {Py_nb_int, (unaryfunc)channelid_int},
- {Py_nb_index, (unaryfunc)channelid_int},
- {0, NULL},
- };
- static PyType_Spec ChannelIDType_spec = {
- .name = MODULE_NAME ".ChannelID",
- .basicsize = sizeof(channelid),
- .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
- Py_TPFLAGS_DISALLOW_INSTANTIATION | Py_TPFLAGS_IMMUTABLETYPE),
- .slots = ChannelIDType_slots,
- };
- /* module level code ********************************************************/
- /* globals is the process-global state for the module. It holds all
- the data that we need to share between interpreters, so it cannot
- hold PyObject values. */
- static struct globals {
- int module_count;
- _channels channels;
- } _globals = {0};
- static int
- _globals_init(void)
- {
- // XXX This isn't thread-safe.
- _globals.module_count++;
- if (_globals.module_count > 1) {
- // Already initialized.
- return 0;
- }
- assert(_globals.channels.mutex == NULL);
- PyThread_type_lock mutex = PyThread_allocate_lock();
- if (mutex == NULL) {
- return ERR_CHANNELS_MUTEX_INIT;
- }
- _channels_init(&_globals.channels, mutex);
- return 0;
- }
- static void
- _globals_fini(void)
- {
- // XXX This isn't thread-safe.
- _globals.module_count--;
- if (_globals.module_count > 0) {
- return;
- }
- _channels_fini(&_globals.channels);
- }
- static _channels *
- _global_channels(void) {
- return &_globals.channels;
- }
- static void
- clear_interpreter(void *data)
- {
- if (_globals.module_count == 0) {
- return;
- }
- PyInterpreterState *interp = (PyInterpreterState *)data;
- assert(interp == _get_current_interp());
- int64_t id = PyInterpreterState_GetID(interp);
- _channels_drop_interpreter(&_globals.channels, id);
- }
- static PyObject *
- channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
- {
- int64_t cid = _channel_create(&_globals.channels);
- if (cid < 0) {
- (void)handle_channel_error(-1, self, cid);
- return NULL;
- }
- module_state *state = get_module_state(self);
- if (state == NULL) {
- return NULL;
- }
- PyObject *id = NULL;
- int err = newchannelid(state->ChannelIDType, cid, 0,
- &_globals.channels, 0, 0,
- (channelid **)&id);
- if (handle_channel_error(err, self, cid)) {
- assert(id == NULL);
- err = _channel_destroy(&_globals.channels, cid);
- if (handle_channel_error(err, self, cid)) {
- // XXX issue a warning?
- }
- return NULL;
- }
- assert(id != NULL);
- assert(((channelid *)id)->channels != NULL);
- return id;
- }
- PyDoc_STRVAR(channel_create_doc,
- "channel_create() -> cid\n\
- \n\
- Create a new cross-interpreter channel and return a unique generated ID.");
- static PyObject *
- channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
- {
- static char *kwlist[] = {"cid", NULL};
- int64_t cid;
- struct channel_id_converter_data cid_data = {
- .module = self,
- };
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
- channel_id_converter, &cid_data)) {
- return NULL;
- }
- cid = cid_data.cid;
- int err = _channel_destroy(&_globals.channels, cid);
- if (handle_channel_error(err, self, cid)) {
- return NULL;
- }
- Py_RETURN_NONE;
- }
- PyDoc_STRVAR(channel_destroy_doc,
- "channel_destroy(cid)\n\
- \n\
- Close and finalize the channel. Afterward attempts to use the channel\n\
- will behave as though it never existed.");
- static PyObject *
- channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
- {
- int64_t count = 0;
- int64_t *cids = _channels_list_all(&_globals.channels, &count);
- if (cids == NULL) {
- if (count == 0) {
- return PyList_New(0);
- }
- return NULL;
- }
- PyObject *ids = PyList_New((Py_ssize_t)count);
- if (ids == NULL) {
- goto finally;
- }
- module_state *state = get_module_state(self);
- if (state == NULL) {
- Py_DECREF(ids);
- ids = NULL;
- goto finally;
- }
- int64_t *cur = cids;
- for (int64_t i=0; i < count; cur++, i++) {
- PyObject *id = NULL;
- int err = newchannelid(state->ChannelIDType, *cur, 0,
- &_globals.channels, 0, 0,
- (channelid **)&id);
- if (handle_channel_error(err, self, *cur)) {
- assert(id == NULL);
- Py_SETREF(ids, NULL);
- break;
- }
- assert(id != NULL);
- PyList_SET_ITEM(ids, (Py_ssize_t)i, id);
- }
- finally:
- PyMem_Free(cids);
- return ids;
- }
- PyDoc_STRVAR(channel_list_all_doc,
- "channel_list_all() -> [cid]\n\
- \n\
- Return the list of all IDs for active channels.");
- static PyObject *
- channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
- {
- static char *kwlist[] = {"cid", "send", NULL};
- int64_t cid; /* Channel ID */
- struct channel_id_converter_data cid_data = {
- .module = self,
- };
- int send = 0; /* Send or receive end? */
- int64_t id;
- PyObject *ids, *id_obj;
- PyInterpreterState *interp;
- if (!PyArg_ParseTupleAndKeywords(
- args, kwds, "O&$p:channel_list_interpreters",
- kwlist, channel_id_converter, &cid_data, &send)) {
- return NULL;
- }
- cid = cid_data.cid;
- ids = PyList_New(0);
- if (ids == NULL) {
- goto except;
- }
- interp = PyInterpreterState_Head();
- while (interp != NULL) {
- id = PyInterpreterState_GetID(interp);
- assert(id >= 0);
- int res = _channel_is_associated(&_globals.channels, cid, id, send);
- if (res < 0) {
- (void)handle_channel_error(res, self, cid);
- goto except;
- }
- if (res) {
- id_obj = _PyInterpreterState_GetIDObject(interp);
- if (id_obj == NULL) {
- goto except;
- }
- res = PyList_Insert(ids, 0, id_obj);
- Py_DECREF(id_obj);
- if (res < 0) {
- goto except;
- }
- }
- interp = PyInterpreterState_Next(interp);
- }
- goto finally;
- except:
- Py_CLEAR(ids);
- finally:
- return ids;
- }
- PyDoc_STRVAR(channel_list_interpreters_doc,
- "channel_list_interpreters(cid, *, send) -> [id]\n\
- \n\
- Return the list of all interpreter IDs associated with an end of the channel.\n\
- \n\
- The 'send' argument should be a boolean indicating whether to use the send or\n\
- receive end.");
- static PyObject *
- channel_send(PyObject *self, PyObject *args, PyObject *kwds)
- {
- static char *kwlist[] = {"cid", "obj", NULL};
- int64_t cid;
- struct channel_id_converter_data cid_data = {
- .module = self,
- };
- PyObject *obj;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
- channel_id_converter, &cid_data, &obj)) {
- return NULL;
- }
- cid = cid_data.cid;
- int err = _channel_send(&_globals.channels, cid, obj);
- if (handle_channel_error(err, self, cid)) {
- return NULL;
- }
- Py_RETURN_NONE;
- }
- PyDoc_STRVAR(channel_send_doc,
- "channel_send(cid, obj)\n\
- \n\
- Add the object's data to the channel's queue.");
- static PyObject *
- channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
- {
- static char *kwlist[] = {"cid", "default", NULL};
- int64_t cid;
- struct channel_id_converter_data cid_data = {
- .module = self,
- };
- PyObject *dflt = NULL;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
- channel_id_converter, &cid_data, &dflt)) {
- return NULL;
- }
- cid = cid_data.cid;
- PyObject *obj = NULL;
- int err = _channel_recv(&_globals.channels, cid, &obj);
- if (handle_channel_error(err, self, cid)) {
- return NULL;
- }
- Py_XINCREF(dflt);
- if (obj == NULL) {
- // Use the default.
- if (dflt == NULL) {
- (void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid);
- return NULL;
- }
- obj = Py_NewRef(dflt);
- }
- Py_XDECREF(dflt);
- return obj;
- }
- PyDoc_STRVAR(channel_recv_doc,
- "channel_recv(cid, [default]) -> obj\n\
- \n\
- Return a new object from the data at the front of the channel's queue.\n\
- \n\
- If there is nothing to receive then raise ChannelEmptyError, unless\n\
- a default value is provided. In that case return it.");
- static PyObject *
- channel_close(PyObject *self, PyObject *args, PyObject *kwds)
- {
- static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
- int64_t cid;
- struct channel_id_converter_data cid_data = {
- .module = self,
- };
- int send = 0;
- int recv = 0;
- int force = 0;
- if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O&|$ppp:channel_close", kwlist,
- channel_id_converter, &cid_data,
- &send, &recv, &force)) {
- return NULL;
- }
- cid = cid_data.cid;
- int err = _channel_close(&_globals.channels, cid, send-recv, force);
- if (handle_channel_error(err, self, cid)) {
- return NULL;
- }
- Py_RETURN_NONE;
- }
- PyDoc_STRVAR(channel_close_doc,
- "channel_close(cid, *, send=None, recv=None, force=False)\n\
- \n\
- Close the channel for all interpreters.\n\
- \n\
- If the channel is empty then the keyword args are ignored and both\n\
- ends are immediately closed. Otherwise, if 'force' is True then\n\
- all queued items are released and both ends are immediately\n\
- closed.\n\
- \n\
- If the channel is not empty *and* 'force' is False then following\n\
- happens:\n\
- \n\
- * recv is True (regardless of send):\n\
- - raise ChannelNotEmptyError\n\
- * recv is None and send is None:\n\
- - raise ChannelNotEmptyError\n\
- * send is True and recv is not True:\n\
- - fully close the 'send' end\n\
- - close the 'recv' end to interpreters not already receiving\n\
- - fully close it once empty\n\
- \n\
- Closing an already closed channel results in a ChannelClosedError.\n\
- \n\
- Once the channel's ID has no more ref counts in any interpreter\n\
- the channel will be destroyed.");
- static PyObject *
- channel_release(PyObject *self, PyObject *args, PyObject *kwds)
- {
- // Note that only the current interpreter is affected.
- static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
- int64_t cid;
- struct channel_id_converter_data cid_data = {
- .module = self,
- };
- int send = 0;
- int recv = 0;
- int force = 0;
- if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O&|$ppp:channel_release", kwlist,
- channel_id_converter, &cid_data,
- &send, &recv, &force)) {
- return NULL;
- }
- cid = cid_data.cid;
- if (send == 0 && recv == 0) {
- send = 1;
- recv = 1;
- }
- // XXX Handle force is True.
- // XXX Fix implicit release.
- int err = _channel_drop(&_globals.channels, cid, send, recv);
- if (handle_channel_error(err, self, cid)) {
- return NULL;
- }
- Py_RETURN_NONE;
- }
- PyDoc_STRVAR(channel_release_doc,
- "channel_release(cid, *, send=None, recv=None, force=True)\n\
- \n\
- Close the channel for the current interpreter. 'send' and 'recv'\n\
- (bool) may be used to indicate the ends to close. By default both\n\
- ends are closed. Closing an already closed end is a noop.");
- static PyObject *
- channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
- {
- module_state *state = get_module_state(self);
- if (state == NULL) {
- return NULL;
- }
- PyTypeObject *cls = state->ChannelIDType;
- PyObject *mod = get_module_from_owned_type(cls);
- if (mod == NULL) {
- return NULL;
- }
- PyObject *cid = _channelid_new(mod, cls, args, kwds);
- Py_DECREF(mod);
- return cid;
- }
- static PyMethodDef module_functions[] = {
- {"create", channel_create,
- METH_NOARGS, channel_create_doc},
- {"destroy", _PyCFunction_CAST(channel_destroy),
- METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
- {"list_all", channel_list_all,
- METH_NOARGS, channel_list_all_doc},
- {"list_interpreters", _PyCFunction_CAST(channel_list_interpreters),
- METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
- {"send", _PyCFunction_CAST(channel_send),
- METH_VARARGS | METH_KEYWORDS, channel_send_doc},
- {"recv", _PyCFunction_CAST(channel_recv),
- METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
- {"close", _PyCFunction_CAST(channel_close),
- METH_VARARGS | METH_KEYWORDS, channel_close_doc},
- {"release", _PyCFunction_CAST(channel_release),
- METH_VARARGS | METH_KEYWORDS, channel_release_doc},
- {"_channel_id", _PyCFunction_CAST(channel__channel_id),
- METH_VARARGS | METH_KEYWORDS, NULL},
- {NULL, NULL} /* sentinel */
- };
- /* initialization function */
- PyDoc_STRVAR(module_doc,
- "This module provides primitive operations to manage Python interpreters.\n\
- The 'interpreters' module provides a more convenient interface.");
- static int
- module_exec(PyObject *mod)
- {
- if (_globals_init() != 0) {
- return -1;
- }
- /* Add exception types */
- if (exceptions_init(mod) != 0) {
- goto error;
- }
- /* Add other types */
- module_state *state = get_module_state(mod);
- if (state == NULL) {
- goto error;
- }
- // ChannelID
- state->ChannelIDType = add_new_type(
- mod, &ChannelIDType_spec, _channelid_shared);
- if (state->ChannelIDType == NULL) {
- goto error;
- }
- // Make sure chnnels drop objects owned by this interpreter
- PyInterpreterState *interp = _get_current_interp();
- _Py_AtExit(interp, clear_interpreter, (void *)interp);
- return 0;
- error:
- _globals_fini();
- return -1;
- }
- static struct PyModuleDef_Slot module_slots[] = {
- {Py_mod_exec, module_exec},
- {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
- {0, NULL},
- };
- static int
- module_traverse(PyObject *mod, visitproc visit, void *arg)
- {
- module_state *state = get_module_state(mod);
- assert(state != NULL);
- traverse_module_state(state, visit, arg);
- return 0;
- }
- static int
- module_clear(PyObject *mod)
- {
- module_state *state = get_module_state(mod);
- assert(state != NULL);
- clear_module_state(state);
- return 0;
- }
- static void
- module_free(void *mod)
- {
- module_state *state = get_module_state(mod);
- assert(state != NULL);
- clear_module_state(state);
- _globals_fini();
- }
- static struct PyModuleDef moduledef = {
- .m_base = PyModuleDef_HEAD_INIT,
- .m_name = MODULE_NAME,
- .m_doc = module_doc,
- .m_size = sizeof(module_state),
- .m_methods = module_functions,
- .m_slots = module_slots,
- .m_traverse = module_traverse,
- .m_clear = module_clear,
- .m_free = (freefunc)module_free,
- };
- PyMODINIT_FUNC
- PyInit__xxinterpchannels(void)
- {
- return PyModuleDef_Init(&moduledef);
- }
|