On Sep 26, 4:47 am, wink <[EMAIL PROTECTED]> wrote: To make it easier to comment on the code, I'm including "mproc.py" file below. Fredrik, was commenting about using Queue and in fact I do. Queue is quite nice and is also thread safe, which is a requirement for this implementation. But its performance is poor if the number of items on a Queue becomes large because it is implemented using a list.
One of the things I was thinking of was doing another implementation using of Queue which was based on deque. """Message Processor Module. This modules allows programmers to create compnents which communicate asynchronously via messages. In addition the receiving component is will only handle one message at a time. This allows the programmer to create multi-threaded program with fewer shared memory thus fewer mutex's and semaphore's. The basic communication is via an instance of the Msg class which is sent to mproc's using the send method. An Mproc is an active component which has a thread and executes asynchronously from all other Mproc's by using one MprocDriver for each Mproc. It is also possible for several mproc's to share one MprocDriver by using BaseMproc. Each mproc must override the _handler method. When a message arrives for a mproc is it placed in a Queue and the driver calls the _handler method passing the Msg as a parameter. The driver uses the Queue to serialize the message processing, thus the _handler method will be invoked with one message at a time. Thus the _handler method does not generally need to use mutex's or semaphore's. But because each message's processing must be completed before the next message will be started it is important that the message be processed as quickly as possible. Add more documentation.""" import copy import threading import Queue import traceback class Msg: """A message""" def __init__(self): """Initializer""" self.dstMpId = None self.dstCnId = None self.srcMpId = None self.srcCnId = None self.mid = None self.cmd = None self.tag = None self.status = None self.data = None def dup(self): """Duplicate the message""" msgCopy = copy.deepcopy(self) return msgCopy def send(self): """Send a message. Returns True if the message was started on its way""" try: MprocDriver._mpList[self.dstMpId]._msgQueue.put(self) return True except: return False class BaseMproc: """Message Processor. A message processor requires a handler method and another driver which passes messages to it. This mproc driver has one overriding requirement, it must only pass one message at a time to the handler method. This eliminates any need for the programmer to worry about multi-threading issues while processing messages. This does put a burden on the handler routine, it must process messages quickly and in a non-blocking fashion so that the mproc may remain lively. The name of an BaseMproc must be unique, an exception is thrown if the name is already used.""" def __init__(self, name, mprocDriver): """Initializer""" self.name = name addMproc(self, mprocDriver) def close(self): """Close the mproc""" #print "BaseMproc.close: ", self.name try: rmvMproc(self) except: #print "BaseMproc.close: excption" traceback.print_exc() self._unreg() def _handler(self): """Override this routine.""" raise Exception("BaseMproc._handler needs to be overridden") def _reg(self, mprocDriver, id): """Register the mproc driver for this mproc""" self._mprocDriver = mprocDriver self._msgQueue = mprocDriver._msgQueue self.id = id def _unreg(self): """Unregister the mproc driver for this mproc""" self._mprocDriver = None self._msgQueue = None self.id = None class Mproc(BaseMproc): """Active Message Processor. An active message processor isa BaseMproc but it always creates a MprocDriver instance as its driver""" def __init__(self, name): """Initializer""" BaseMproc.__init__(self, name, MprocDriver("ActiveMprocDriver_" + name)) def close(self): """Close the active mproc""" try: this_mprocDriver = self._mprocDriver BaseMproc.close(self) this_mprocDriver.close() except: print "Mproc.close: excption" traceback.print_exc() self._unreg() class MprocDriver(threading.Thread, BaseMproc): """Message processor driver.""" _mpList = [] _mpDict = {} def __init__(self, name): """Initializer""" self._thisMpdDict = {} self._running = True self._msgQueue = Queue.Queue() threading.Thread.__init__(self) BaseMproc.__init__(self, name, self) self.start() def _regMproc(self, mproc, id): self._thisMpdDict[mproc.name] = mproc mproc._reg(self, id) def _unregMproc(self, mproc): #print "%s._unregMproc(%s):" % (self.name, mproc.name) del self._thisMpdDict[mproc.name] mproc._unreg() def _handler(self, msg): if (msg.mid == -1) and (msg.cmd == 0): self._running = False msg.completeQ.put(0) def close(self): """Close the mproc driver""" # Remove all mprocs related to this MprocDriver????? mprocs = self._thisMpdDict.values() for mproc in mprocs: if (mproc.name != self.name): rmvMproc(mproc) completeQ = Queue.Queue() msg = Msg() msg.dstMpId = self.id msg.mid = -1 msg.cmd = 0 msg.completeQ = completeQ msg.send() completeQ.get() # Remove ourself BaseMproc.close(self) del self._thisMpdDict def run(self): while (self._running): try: msg = self._msgQueue.get() mproc = MprocDriver._mpList[msg.dstMpId] mproc._handler(msg) except: if msg == None: print "run: no message" elif mproc == None: print "run: no mproc" elif not callable(mproc._handler): print "run: mproc._handler is not callable" else: print "run: mproc._handler caused an exception" traceback.print_exc() class PsMgr: """Publish Subscribe Manager. Allow mprocs to subscribe to any message having a specified mid/cmd. Maybe this should be in a separte module and instead of using class variables? """ midDict = {} @classmethod def publish(self, msg): """Send the message to all subscribers""" pass @classmethod def subscribe(self, mproc, mid, cmd): """Subscribe the mproc to messages with mid/cmd""" self.publish = PsMgr._publish self.subscribe = PsMgr._subscribe self.unsubscribe = PsMgr._unsubscribe self.subscribe(mproc, mid, cmd) @classmethod def unsubscribe(self, mproc, mid, cmd): """Unsubscirve the mproc""" pass @classmethod def _publish(self, msg): """The actual publish routine where there is one or more subscribers""" try: #print "_publish: msg.mid=%d msg.cmd=%d" % (msg.mid, msg.cmd) subscribers = self.midDict[msg.mid][msg.cmd] except KeyError: #print "_publish: error no subscribers for msg.mid=%d msg.cmd=%d" % (msg.mid, msg.cmd) pass else: for mproc in subscribers: msgNew = msg.dup() #print "_public mid=%d cmd=%d to %s" % (msgNew.mid, msgNew.cmd, mproc.name) #, mproc.id) msgNew.dstMpId = mproc.id msgNew.send() @classmethod def _subscribe(self, mproc, mid, cmd): """The actual subscribe routine""" #print "_subscribe: add mproc %s for mid=%d cmd=%d" % (mproc.name, mid, cmd) cmdDict = self.midDict.get(mid, {}) subscribers = cmdDict.get(cmd, []) subscribers.append(mproc) cmdDict[cmd] = subscribers self.midDict[mid] = cmdDict @classmethod def _unsubscribe(self, mproc, mid, cmd): """The actual unsubscribe routine when there is one or more subscribers""" #print "_unsubscribe: remove mproc %s for mid=%d cmd=%d" % (mproc.name, mid, cmd) cmdDict = self.midDict.get(mid, {}) subscribers = cmdDict.get(cmd, []) delList = [] count = 0 for mp in subscribers: if mp == mproc: delList.append(count) count += 1 l = len(delList) for x in xrange(l-1, -1, -1): del subscribers[delList[x]] if len(self.midDict) == 0: self.publish = PsMgr.publish self.subscribe = PsMgr.subscribe self.unsubscribe = PsMgr.unsubscribe def lookupMproc(name, onNotFound=None): """Lookup an message processor""" try: #print "lookupMproc: %s dict=%s" % (name, MprocDriver._mpDict) mproc = MprocDriver._mpDict[name] except: # Add not found exception? #print "lookupMproc: NOT FOUND", name mproc = onNotFound return mproc def addMproc(mproc, mprocDriver): """Add a new message processor to the database""" if (lookupMproc(mproc.name) != None): raise NameError("%s BaseMproc already exists" % mproc.name) #print "addMproc:", mproc.name MprocDriver._mpList.append(mproc) MprocDriver._mpDict[mproc.name] = mproc #print "addMproc: dict=", MprocDriver._mpDict id = MprocDriver._mpList.index(mproc) mprocDriver._regMproc(mproc, id) def rmvMproc(mproc): """Remove message processor from the database""" #print "rmvMproc:", mproc.name MprocDriver._mpList[mproc.id] = None del MprocDriver._mpDict[mproc.name]; #print "rmvMproc: dict=", MprocDriver._mpDict mproc._mprocDriver._unregMproc(mproc) -- http://mail.python.org/mailman/listinfo/python-list