Hi! I wrote a little class to make multihreading easier. It's based on one of aahz's threading example scripts. What it does:
It spawns up number of CollectorThreads and one ProcessThread. The CollectorThreads listen on one queue (inputqueue), read, process the data (with colfunc), put the result onto the outputqueue. The ProcessThread listens on the outputqueue, reads, processes (with prfunc). end. (more details in the attached file) it seems to work with test functions but when I use a network-intensive function (snmp-queries) it just gets slower with maxThreads set to more than 1. Any help? Thanks. see the class attached. ps. Maybe I basically don't understand something... -- --arutz
#!/usr/local/bin/python """ Multithreaded class for the task: multiple collector - one dataprocessor Usage: collector = Collector(data, colfunc, prfunc, maxThreads) collector.run() Internals: Collector spawns up the CollectorThreads and a ProcessThread and puts the data onto the inputQueue. The CollectorThread reads the data from inputQueue and processes it through 'colfunc()'. Then puts the result onto the outputQueue. The ProcessThread only listens on the outputQueue (blocks on it) and feeds the data to `prfunc()`. Thread shutdown: collectorthreads: inputQueue.put(shutdown=True) processthread: outputQueue.put(shutdown=True) """ import threading import Queue #from operator import truth as _truth #def _xor(a,b): # return _truth(a) ^ _truth(b) class _Token: def __init__(self, data=None, shutdown=None): #if not _xor(data, shutdown): # raise "Tsk, tsk, need to set either URL or shutdown (not both)" self.data = data self.shutdown = shutdown class _CollectorThread(threading.Thread): """Worker thread blocking on inputQueue. The result goes to outputQueue after processed by self.func. """ def __init__(self, inQueue, outQueue, func): threading.Thread.__init__(self) self.inQ = inQueue self.outQ = outQueue self.func = func def run(self): while True: token = self.inQ.get() if token.shutdown is not None: break else: #collect data from the routers #print token.data result = self.func(token.data) self.outQ.put_nowait(_Token(data=result)) class _ProcessThread(threading.Thread): """'Reader-only' thread processing outputQueue.""" def __init__(self, outQueue, func): threading.Thread.__init__(self) self.outQ = outQueue self.func = func def run(self): while True: token = self.outQ.get() if token.shutdown is not None: break else: #insert into db or do anything self.func(token.data) class Collector: """Spawns up the threadpool (worker and processthreads) and puts tha data onto the inputQueue of the worker threads. Then shuts them down.""" def __init__(self, data, colfunc, prfunc, maxThreads=5): """Parameters: - data: data for collectfunc (type of sequence) - colfunc: function to process inputQueue into outputQueue - prfunc: function to process outputQueue - maxThreads: MAX_THREADS """ self.data = data self.inputQueue = Queue.Queue() self.outputQueue = Queue.Queue() self.threadPool = [] #Start the worker threads for i in range(maxThreads): collector = _CollectorThread(self.inputQueue, self.outputQueue, colfunc) collector.start() self.threadPool.append(collector) #Start the db thread self.processthread = _ProcessThread(self.outputQueue, prfunc) self.processthread.start() def run(self): """Queue the data and shutdown the threads.""" self._queueData() self._shutdown() def _queueData(self): """Put data onto the inputQueue.""" for d in self.data: self.inputQueue.put_nowait(_Token(data=d)) def _shutdown(self): for i in self.threadPool: self.inputQueue.put(_Token(shutdown=True)) for thread in self.threadPool: thread.join() self.outputQueue.put(_Token(shutdown=True)) self.processthread.join() if __name__ == '__main__': def myprint(s): print s def hashdata(a): return a + ': OK' MAX_THREADS = 5 data = ['1', '2', 'asd', 'qwe'] collect = Collector(data=data, colfunc=hashdata, prfunc=myprint, maxThreads=MAX_THREADS) collect.run()
-- http://mail.python.org/mailman/listinfo/python-list