As full disclosure, I posted this question on StackOverflow as well, but it looks like questions with [Python] [Extension-Module] tags are not frequently answered. The link to my question there is https://stackoverflow.com/questions/64559322/python-extension-module-with-callbacks-into-python
I am running into unpredictable behavior with my Python extension module that wraps around a C++ library that starts a new pthread and, after doing some work, generates callbacks back into the caller. I've greatly simplified this to a simplistic example which still demonstrates this problem. The following will sometimes generate a Fatal Python error: PyEval_SaveThread: NULL tstate, usually rather quickly. Sometimes it SIGSEGV on tupledealoc. Occasionally this deadlocks. I am at a loss why. Does anyone have any ideas? The relevant code for this is below, between the ==== sections ==== === python program === import mymod from time import sleep from random import randrange def my_cb1(s): print("Python cb %s" % (s)); for x in range(1,1000): num_cb = randrange(5) + 1 print("Starting %d" % mymod.doit(my_cb1, "myid" + str(x), num_cb)) while True: sleep(1) === Extension Module === #include <pthread.h> #define PY_SSIZE_T_CLEAN #include <Python.h> #include <stddef.h> #include <iostream> #include <map> #include <deque> #include <mutex> #include <functional> #include <thread> static std::map<std::string, PyObject *> cb_map; static std::mutex map_mtx; struct fake_cb_info { fake_cb_info() = delete; fake_cb_info(const unsigned long &num_cb, const std::string &id) : num_cb(num_cb), id(id) { } const unsigned long num_cb; const std::string id; }; static std::deque<struct fake_cb_info> deq; static std::mutex deq_mtx; static bool is_worker_thread_running = false; static std::thread worker_thread; typedef std::function<void(const std::string &id, const std::string &s)> doit_cb_t; static void internal_cb(const std::string &id, const std::string &s) { std::scoped_lock<std::mutex> lk(map_mtx); if (0 != cb_map.count(id)) { PyGILState_STATE gstate; gstate = PyGILState_Ensure(); PyObject *arglist = Py_BuildValue("(s)", s.c_str()); PyObject *result = PyObject_CallObject(cb_map.at(id), arglist); Py_DECREF(arglist); if (NULL == result) { if (NULL == PyErr_Occurred()) { std::cerr << "Unknown error occurred in C callback" << std::endl; } else { PyErr_Print(); } } else { Py_DECREF(result); } PyGILState_Release(gstate); } else { std::cerr << "Unknown callback id " << id << std::endl; } } void static worker() { size_t x = 0; while(true) { std::scoped_lock<std::mutex> lk(deq_mtx); if (deq.size() == 0) { usleep(1000); continue; } auto info = deq.front(); deq.pop_front(); for (unsigned long i=0; i<info.num_cb; i++) { internal_cb(info.id, std::to_string(x++)); } } } PyObject * _wrap_doit(void *self, PyObject *args, PyObject *kwargs) { PyObject *py_retval; PyThreadState *py_thread_state = NULL; PyObject *cb; const char *id = NULL; Py_ssize_t id_len; std::string id_std; unsigned long num_callbacks; const char *keywords[] = {"cb_func", "id", "num_cb", NULL}; if (!PyArg_ParseTupleAndKeywords(args, kwargs, (char *) "Os#k", (char **) keywords, &cb, &id, &id_len, &num_callbacks)) { abort(); } if (!PyCallable_Check(cb)) { abort(); } id_std = std::string(id, id_len); { std::scoped_lock<std::mutex> lk(map_mtx); if (0 == cb_map.count(id_std)) { Py_INCREF(cb); cb_map.insert(std::make_pair(id_std, cb)); // N.B. The corresponding Py_DECREF for the callback function PyObject // is intentionally not here. It is in another extension module method // that is not listed here (just trying to keep this example as small // and lean as possible) } else { std::cerr << "Only one callback for ID!" << std::endl; abort(); } } if (PyEval_ThreadsInitialized ()) { std::cout << "Saving thread" << std::endl; py_thread_state = PyEval_SaveThread(); } { // Stash away the info so that we will know how many callbacks to // generate and sleep a bit. This is to simulate a real external library // doing work which will, in turn, generate callbacks struct fake_cb_info info(num_callbacks, id_std); std::scoped_lock<std::mutex> lk(deq_mtx); deq.push_back(info); if (!is_worker_thread_running) { std::cout << "@@@@ Creating a new thread\n"; worker_thread = std::thread(&worker); pthread_setname_np(worker_thread.native_handle(), "worker_thread"); worker_thread.detach(); is_worker_thread_running = true; } usleep(10000); } if (py_thread_state) { std::cout << "Restoring thread" << std::endl; PyEval_RestoreThread(py_thread_state); } py_retval = Py_BuildValue((char *) "k", num_callbacks); return py_retval; } static PyMethodDef mymod_functions[] = { { (char *) "doit", (PyCFunction) _wrap_doit, METH_KEYWORDS | METH_VARARGS, "Generate requested number of multi-threaded callbacks.\n doit(callback_fn, id, num_callbacks)" }, {NULL, NULL, 0, NULL} }; static struct PyModuleDef moduledef = { PyModuleDef_HEAD_INIT, "mymod", "pthread test module", -1, mymod_functions, }; #define MOD_ERROR NULL #define MOD_INIT(name) PyObject* PyInit_##name(void) #define MOD_RETURN(val) val #if defined(__cplusplus) extern "C" #endif #if defined(__GNUC__) && __GNUC__ >= 4 __attribute__ ((visibility("default"))) #endif MOD_INIT(mymod) { PyObject *m = PyModule_Create(&moduledef); if (m == NULL) { return MOD_ERROR; } return MOD_RETURN(m); } -- https://mail.python.org/mailman/listinfo/python-list