On May 19, 10:24 am, Steven D'Aprano <ste...@remove.this.cybersource.com.au> wrote: > On Mon, 18 May 2009 02:27:06 -0700, jeremy wrote: > > Let me clarify what I think par, pmap, pfilter and preduce would mean > > and how they would be implemented. > > [...] > > Just for fun, I've implemented a parallel-map function, and done a couple > of tests. Comments, criticism and improvements welcome! > > import threading > import Queue > import random > import time > > def f(arg): # Simulate a slow function. > time.sleep(0.5) > return 3*arg-2 > > class PMapThread(threading.Thread): > def __init__(self, clients): > super(PMapThread, self).__init__() > self._clients = clients > def start(self): > super(PMapThread, self).start() > def run(self): > while True: > try: > data = self._clients.get_nowait() > except Queue.Empty: > break > target, where, func, arg = data > result = func(arg) > target[where] = result > > class VerbosePMapThread(threading.Thread): > def __init__(self, clients): > super(VerbosePMapThread, self).__init__() > print "Thread %s created at %s" % (self.getName(), time.ctime()) > def start(self): > super(VerbosePMapThread, self).start() > print "Thread %s starting at %s" % (self.getName(), time.ctime()) > def run(self): > super(VerbosePMapThread, self).run() > print "Thread %s finished at %s" % (self.getName(), time.ctime()) > > def pmap(func, seq, verbose=False, numthreads=4): > size = len(seq) > results = [None]*size > if verbose: > print "Initiating threads" > thread = VerbosePMapThread > else: > thread = PMapThread > datapool = Queue.Queue(size) > for i in xrange(size): > datapool.put( (results, i, f, seq[i]) ) > threads = [PMapThread(datapool) for i in xrange(numthreads)] > if verbose: > print "All threads created." > for t in threads: > t.start() > # Block until all threads are done. > while any([t.isAlive() for t in threads]): > if verbose: > time.sleep(0.25) > print results > return results > > And here's the timing results: > > >>> from timeit import Timer > >>> setup = "from __main__ import pmap, f; data = range(50)" > >>> min(Timer('map(f, data)', setup).repeat(repeat=5, number=3)) > 74.999755859375 > >>> min(Timer('pmap(f, data)', setup).repeat(repeat=5, number=3)) > > 20.490942001342773 > > -- > Steven
I was going to write something like this, but you've beat me to it :) Slightly different though; rather than have pmap collate everything together then return it, have it yield results as and when it gets them and stop iteration when it's done, and rename it to par to keep the OP happy and you should get something like what he initially requests (I think): total = 0 for score in par(f, data): total += score Iain -- http://mail.python.org/mailman/listinfo/python-list