Hi all,

I've been trying to find the path of least resistance to using cooperative multitasking in PyQt apps.

There are a bunch of different options out there, including Twisted (The initial inqternet.py Qt support I wrote became the basis of the current Qt reactor, and it works great if you are using Twisted) and Kamaelia/Axon (I had a stab at implementing Qt event loop support for that - available here: http://sirtaj.net/projects/axonqt.py). However most of these approaches require you to commit to frameworks that are likely to shape the implementation of the rest of your app.

The two recommended ways to doing this sort of thing in Qt/C++ are:

1) use QTimer with a timeout of 0 to call some function.

2) In a long-running loop, call processEvents to allow other events to be processed to keep the GUI interactive.

...and that's all she wrote, since C++ doesn't really allow many options besides multithreading.

With the yield keyword, however, we can get the same kind of cooperative multitasking that we had back in 1991 with Visual Basic 1.0 (yay!). This is the approach that Axon uses. Using the QTimer method above, a handful of lines of code gets us this in PyQt without having to use a larger framework:

---------------------------------------------------------------

from PyQt4.QtCore import QObject, SIGNAL

def qmicro(iterations=500):
    '''Qt fire-and-forget microprocess decorator.
    '''
    def wrap_qmicro(microfn):
        def call_qmicro(qobj, *call_args, **call_kwargs):
            try:
                call_iter = microfn(qobj, *call_args, **call_kwargs)
            except StopIteration, endex:
                return
            except:
                raise

            return QtMicroProcess(qobj, call_iter.next, iterations)
        return call_qmicro
    return wrap_qmicro


class QtMicroProcess(QObject):
    '''A single running microprocess, scheduled in the event loop using
    timer events until completed or error.
    '''
    def __init__(self, parent, next_fn, iterations):
        QObject.__init__(self, parent)
        self.next_fn = next_fn
        self.iterations = iterations
        self.timer_id = self.startTimer(0)

    def timerEvent(self, tev):
        next_fn = self.next_fn
        try:
            for itidx in xrange(self.iterations):
                next_fn()
            return
        except StopIteration, sex:
            pass
        except Exception, ex:
            print "QMICRO: Unhandled exception:", ex

        try:
            self.killTimer(self.timer_id)
        finally:
            self.deleteLater()

--------------------------------------------------------------

Now we can create "fire and forget" LWPs methods that can "yield" to the Qt event loop by simply using the @qmicro decorator:

class MyApp(QObject):
        @qmicro()
        def beer(n):
                for x in xrange(n):
                        print x, "bottles of beer on the wall"
                        yield

        def some_regular_method():
                beer(99) # returns immediately
                

(a slightly more fleshed out example is in the attached file)

Note that this is a deliberately simplistic implementation that has various limitations, eg the method has to be a method of a QObject subclass, and there is no builtin way to get feedback when the LWP exits. Still, it is a convenient bit of code you can drop into your PyQt project when you want to do some background work while allowing the rest of the app to continue relatively unaffected, while sidestepping the issues that come up when calling Qt code from multiple threads.

Hope someone finds this useful and any feedback appreciated.

-Taj.

from PyQt4.QtCore import QObject, SIGNAL

#################

def qmicro(iterations=500):
    '''Qt fire-and-forget microprocess decorator.

    The 
    '''
    def wrap_qmicro(microfn):
        def call_qmicro(qobj, *call_args, **call_kwargs):
            try:
                call_iter = microfn(qobj, *call_args, **call_kwargs)
            except StopIteration, sex:
                return
            except:
                raise

            return QtMicroProcess(qobj, call_iter.next, iterations)
        return call_qmicro
    return wrap_qmicro


class QtMicroProcess(QObject):
    '''A single running microprocess, scheduled in the event loop using
    timer events until completed or error.
    '''
    def __init__(self, parent, next_fn, iterations):
        QObject.__init__(self, parent)
        self.next_fn = next_fn
        self.iterations = iterations
        self.timer_id = self.startTimer(0)

    def timerEvent(self, tev):
        next_fn = self.next_fn
        try:
            for itidx in xrange(self.iterations):
                next_fn()
            return
        except StopIteration, sex:
            pass
        except Exception, ex:
            print "QMICRO: Unhandled exception:", ex

        try:
            self.killTimer(self.timer_id)
        finally:
            self.deleteLater()



def test_qmicro():
    from PyQt4.QtGui import (QApplication, QWidget, QVBoxLayout, QPushButton, 
                            QLabel, QProgressBar)
    import sys

    class TestWidget(QWidget):
        def __init__(self, parent = None):
            QWidget.__init__(self, parent)

            # create the UI
            self.start_btn = QPushButton("start")
            self.running = QLabel("0")

            layout = QVBoxLayout()
            layout.addWidget(self.start_btn)
            layout.addWidget(self.running)
            self.setLayout(layout)

            # call the microprocess when the user clicks the start button
            QObject.connect(self.start_btn, SIGNAL("clicked()"), 
                            lambda: self.count_from(2, 500000))

        @qmicro()
        def count_from(self, cfrm, cto):
            self.running.setText(str(int(self.running.text())+1))

            lbl = QProgressBar()
            lbl.setMinimum(cfrm)
            lbl.setMaximum(cto)

            self.layout().addWidget(lbl)
            for x in xrange(cfrm, cto): 
                if x % 100 == 0:
                    lbl.setValue(x)
                yield

            lbl.setValue(x)

            lbl.close()
            self.running.setText(str(int(self.running.text())-1))

    app = QApplication(sys.argv)
    thing = TestWidget()
    thing.show()

    app.exec_()

if __name__ == '__main__':
    test_qmicro()

_______________________________________________
BangPypers mailing list
BangPypers@python.org
http://mail.python.org/mailman/listinfo/bangpypers

Reply via email to