Hi all
I've found a strange bug on win32 using PyQt old style signals.
Sometimes I get a wrong result from QObject::receivers and the signal is
"lost". Looks like a PyQt bug cause a similar C++ example works correctly both
on Linux and Win32.
Sidenotes:
* changing the signature for every new CRS instance seems to workaround the bug
* not deleting (keeping a reference) every new CRS instance seems to
workaround the bug
* new style signals seem to work OK
Tested on:
Python 2.7.3 (default, Mar 5 2013, 01:19:40) [GCC 4.7.2] on linux2
PYQT_VERSION_STR='snapshot-4.10.2-d9c6e01fe9ce'
QtCore.QT_VERSION_STR='4.8.2'
Linux 3.3.5 #1 SMP/32bit
Python 2.7.3 (default, Dec 18 2012, 17:53:47) [MSC v.1500 32 bit (Intel)]
PYQT_VERSION_STR='4.9.6'
QT_VERSION_STR='4.8.4'
Windows 7/32bit and XP/32bit
As programs output I expect matching emit/receive logs and 1 single receiver.
Attached python and c++ examples.
Python program uses an "nexplode.txt" logfile in append mode, while C++ uses
qDebug()
--
Giuseppe Corbelli
WASP Software Engineer, Copan Italia S.p.A
Phone: +390303666318 Fax: +390302659932
E-mail: giuseppe.corbe...@copanitalia.com
#!/usr/bin/python
# -*- coding: utf-8 -*-
import logging
import re
import sys
import types
import weakref
from collections import deque
from PyQt4 import QtCore
logging.basicConfig(filename="nexplode.txt")
logging.root.setLevel(logging.DEBUG)
class CSignal(object):
_signature_re = re.compile(".+?\(\s*PyQt_PyObject,\s*PyQt_PyObject(,?.*?)\)")
def __init__(self, emitter, signature):
super(CSignal, self).__init__()
if not isinstance(emitter, QtCore.QObject):
raise TypeError(u"Emitter must be an instance of QtCore.QObject not {0}".format(emitter.__class__.__name__))
self._emitter = weakref.ref(emitter, self._on_emitter_weakref_destroyed)
self._original_id_emitter = id(emitter)
self._signature = QtCore.SIGNAL(signature)
if not self._signature_re.match(self._signature):
raise ValueError(u"Signatur must include at least 2 parameter, got {0}".format(self._signature))
self._logger = logging.getLogger("Signal")
self._connected_slots = set()
def __del__(self):
self.disconnect()
def _on_emitter_weakref_destroyed(self, object_weakref):
self.disconnect()
def _on_receiver_weakref_destroyed(self, object_weakref):
self._clean_connected_slots(object_weakref)
def _get_receiver_and_method_name(self, code_object):
real_code_object = code_object
if isinstance(real_code_object, types.MethodType):
receiver = weakref.ref(real_code_object.im_self, self._on_receiver_weakref_destroyed)
method_name = real_code_object.im_func.func_name
elif isinstance(real_code_object, types.FunctionType):
(receiver, method_name) = (weakref.ref(real_code_object), None)
else:
(receiver, method_name) = (weakref.ref(real_code_object), None)
return (receiver, method_name)
def emit(self, *args):
emitter = self._emitter()
if emitter is not None:
emitter.emit(self._signature, emitter, self._signature, *args)
def connect(self, slot, connection_type=QtCore.Qt.AutoConnection):
(receiver, method_name) = self._get_receiver_and_method_name(slot)
self._connected_slots.add((receiver, method_name))
emitter = self._emitter()
assert id(emitter) == self._original_id_emitter
res = QtCore.QObject.connect(emitter, self._signature, slot, connection_type)
recvs = emitter.receivers(self._signature)
if recvs != 1:
self._logger.debug(
u'CSIGNAL {0}] called connect, emitter: {1!r} slot: {2!r} receivers: "{3}" _connected_slots: "{4}" res: {5}'.format(
self, emitter, slot, recvs, self._connected_slots, res
)
)
return res
def disconnect(self, slot=None):
emitter = self._emitter()
if emitter is None:
self._connected_slots.clear()
return
if slot is None:
for receiver, method_name in self._connected_slots:
self._disconnect_slot_reference(emitter, receiver, method_name)
self._connected_slots.clear()
else:
self._disconnect_slot(emitter, slot)
def _clean_connected_slots(self, ref):
for elem in list(self._connected_slots):
receiver_ref, method_name = elem
if receiver_ref is ref:
self._connected_slots.remove(elem)
def _disconnect_slot_reference(self, sender, receiver_ref, method_name):
receiver = receiver_ref()
if receiver is None:
return
if method_name is not None:
try:
slot = getattr(receiver, method_name, None)
except AttributeError:
pass
else:
QtCore.QObject.disconnect(sender, self._signature, slot)
else:
QtCore.QObject.disconnect(sender, self._signature, receiver)
def _disconnect_slot(self, sender, slot):
if QtCore.QObject.disconnect(sender, self._signature, slot):
(receiver, method_name) = self._get_receiver_and_method_name(slot)
try:
self._connected_slots.remove((receiver, method_name))
except KeyError:
pass
@property
def signature(self):
return self._signature
def __str__(self):
return self.signature
class CRS(QtCore.QObject):
def __init__(self, sample_id):
super(CRS, self).__init__()
self.app = QtCore.QCoreApplication.instance()
self.id = sample_id
self.sample_completed = CSignal(self, 'sample_completed(PyQt_PyObject, PyQt_PyObject)')
self.sample_completed.connect(self.app.on_sample_completed)
def __str__(self):
return str(self.id)
class CApp(QtCore.QCoreApplication):
def __init__(self, *args, **kwargs):
super(CApp, self).__init__(*args, **kwargs)
self._logger = logging.getLogger('QCoreApp')
self._sample_counter = 0
self.__timer = QtCore.QTimer()
self.__timer.setInterval(1)
self.__timer.timeout.connect(self.generate_crs_and_emit)
self.__timer.start()
self.__crs_list = deque(maxlen=300)
def generate_crs_and_emit(self):
self._sample_counter += 1
sample = CRS(self._sample_counter)
self._logger.debug(u"emit sample_completed for sample {0}".format(sample))
sample.sample_completed.emit()
self.__crs_list.append(sample)
def on_sample_completed(self, sender, signature):
self._logger.debug(u"received sample_completed for sample {0}".format(sender))
app = CApp(sys.argv)
sys.exit(app.exec_())
#include "main.h"
#include <QDebug>
CMyApplication::CMyApplication(int& argc, char** argv) :
QCoreApplication(argc, argv), m_SampleCounter(0)
{
this->m_Timer.setInterval(1);
this->connect(&this->m_Timer, SIGNAL(timeout()), this, SLOT(generate_crs_and_emit()));
this->m_Timer.start();
}
void CMyApplication::generate_crs_and_emit()
{
this->m_SampleCounter++;
CRS* sample(new CRS(this->m_SampleCounter));
this->m_CRSQueue.enqueue(sample);
while (this->m_CRSQueue.size() > 300) {
CRS* x(this->m_CRSQueue.dequeue());
delete x;
}
}
void CMyApplication::onSampleCompleted()
{
CRS* s = (CRS*)this->sender();
qDebug() << "received sample_completed for sample" << s->m_Id;
}
CRS::CRS(unsigned sample_id) : m_Id(sample_id)
{
CMyApplication* app = (CMyApplication*)QCoreApplication::instance();
this->connect(this, SIGNAL(sampleCompleted()), app, SLOT(onSampleCompleted()));
int n_receivers(this->receivers(SIGNAL(sampleCompleted())));
qDebug() << "Receivers for sample" << m_Id << ":" << n_receivers;
Q_ASSERT(n_receivers == 1);
qDebug() << "emit sample_completed for sample" << this->m_Id;
emit this->sampleCompleted();
}
int main(int argc, char* argv[])
{
CMyApplication app(argc, argv);
return app.exec();
}
#ifndef __TESTSIGNAL_MAIN_H__
#define __TESTSIGNAL_MAIN_H__
#include <QCoreApplication>
#include <QTimer>
#include <QQueue>
class CRS : public QObject
{
Q_OBJECT
public:
CRS(unsigned sample_id);
unsigned m_Id;
signals:
void sampleCompleted();
};
class CMyApplication : public QCoreApplication
{
Q_OBJECT
public:
CMyApplication(int& argc, char** argv);
unsigned m_SampleCounter;
QTimer m_Timer;
QQueue<CRS*> m_CRSQueue;
public slots:
void generate_crs_and_emit();
void onSampleCompleted();
};
#endif //__TESTSIGNAL_MAIN_H__
HEADERS = main.h
SOURCES = main.cpp
QT = core
win32: CONFIG = release console
_______________________________________________
PyQt mailing list PyQt@riverbankcomputing.com
http://www.riverbankcomputing.com/mailman/listinfo/pyqt