On 19 May, 10:24, Steven D'Aprano <ste...@remove.this.cybersource.com.au> wrote: > On Mon, 18 May 2009 02:27:06 -0700, jeremy wrote: > > Let me clarify what I think par, pmap, pfilter and preduce would mean > > and how they would be implemented. > > [...] > > Just for fun, I've implemented a parallel-map function, and done a couple > of tests. Comments, criticism and improvements welcome! > > import threading > import Queue > import random > import time > > def f(arg): # Simulate a slow function. > time.sleep(0.5) > return 3*arg-2 > > class PMapThread(threading.Thread): > def __init__(self, clients): > super(PMapThread, self).__init__() > self._clients = clients > def start(self): > super(PMapThread, self).start() > def run(self): > while True: > try: > data = self._clients.get_nowait() > except Queue.Empty: > break > target, where, func, arg = data > result = func(arg) > target[where] = result > > class VerbosePMapThread(threading.Thread): > def __init__(self, clients): > super(VerbosePMapThread, self).__init__() > print "Thread %s created at %s" % (self.getName(), time.ctime()) > def start(self): > super(VerbosePMapThread, self).start() > print "Thread %s starting at %s" % (self.getName(), time.ctime()) > def run(self): > super(VerbosePMapThread, self).run() > print "Thread %s finished at %s" % (self.getName(), time.ctime()) > > def pmap(func, seq, verbose=False, numthreads=4): > size = len(seq) > results = [None]*size > if verbose: > print "Initiating threads" > thread = VerbosePMapThread > else: > thread = PMapThread > datapool = Queue.Queue(size) > for i in xrange(size): > datapool.put( (results, i, f, seq[i]) ) > threads = [PMapThread(datapool) for i in xrange(numthreads)] > if verbose: > print "All threads created." > for t in threads: > t.start() > # Block until all threads are done. > while any([t.isAlive() for t in threads]): > if verbose: > time.sleep(0.25) > print results > return results > > And here's the timing results: > > >>> from timeit import Timer > >>> setup = "from __main__ import pmap, f; data = range(50)" > >>> min(Timer('map(f, data)', setup).repeat(repeat=5, number=3)) > 74.999755859375 > >>> min(Timer('pmap(f, data)', setup).repeat(repeat=5, number=3)) > > 20.490942001342773 > > -- > Steven
Hi Steven, I am impressed by this - it shows the potential speedup that pmap could give. Although the GIL would be a problem as things for speed up of pure Python code. Do Jython and Iron Python include the threading module? Jeremy -- http://mail.python.org/mailman/listinfo/python-list