On 7 Aug, 16:02, MRAB <pyt...@mrabarnett.plus.com> wrote: > ma3mju wrote: > > On 3 Aug, 09:36, ma3mju <matt.u...@googlemail.com> wrote: > >> On 2 Aug, 21:49, Piet van Oostrum <p...@cs.uu.nl> wrote: > > >>>>>>>> MRAB <pyt...@mrabarnett.plus.com> (M) wrote: > >>>> M> I wonder whether one of the workers is raising an exception, perhaps > >>>> due > >>>> M> to lack of memory, when there are large number of jobs to process. > >>> But that wouldn't prevent the join. And you would probably get an > >>> exception traceback printed. > >>> I wonder if something fishy is happening in the multiprocessing > >>> infrastructure. Or maybe the Fortran code goes wrong because it has no > >>> protection against buffer overruns and similar problems, I think. > >>> -- > >>> Piet van Oostrum <p...@cs.uu.nl> > >>> URL:http://pietvanoostrum.com[PGP8DAE142BE17999C4] > >>> Private email: p...@vanoostrum.org > >> I don't think it's a memory problem, the reason for the hard and easy > >> queue is because for larger examples it uses far more RAM. If I run > >> all of workers with harder problems I do begin to run out of RAM and > >> end up spending all my time switching in and out of swap so I limit > >> the number of harder problems I run at the same time. I've watched it > >> run to the end (a very boring couple of hours) and it stays out of my > >> swap space and everything appears to be staying in RAM. Just hangs > >> after all "poison" has been printed for each process. > > >> The other thing is that I get the message "here" telling me I broke > >> out of the loop after seeing the poison pill in the process and I get > >> all the things queued listed as output surely if I were to run out of > >> memory I wouldn't expect all of the jobs to be listed as output. > > >> I have a serial script that works fine so I know individually for each > >> example the fortran code works. > > >> Thanks > > >> Matt > > > Any ideas for a solution? > > A workaround is to do them in small batches. > > You could put each job in a queue with a flag to say whether it's hard > or easy, then: > > while have more jobs: > move up to BATCH_SIZE jobs into worker queues > create and start workers > wait for workers to finish > discard workers
Yeah, I was hoping for something with a bit more finesse. In the end I used pool instead with a callback function and that has solved the problem. I did today find this snippet; Joining processes that use queues Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread() method of the queue to avoid this behaviour.) This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be automatically be joined. I don't know (not a computer scientist) but could it have been the pipe getting full? In case anyway else is effected by this I've attached the new code to see the changes I made to fix it. Thanks for all your help Matt ============================================================================================================================ parallel.py ============================================================================================================================ import GaussianProcessRegression as GP import numpy as np import networkx as nx import pickle from multiprocessing import Pool global result def cb(r): global result print r result[r[0]] = r[1] ############################################################################################ # Things You Can Change ############################################################################################ #savefiles savefile = "powerlaw" graphfile = "powerlawgraph" #sample sizes num_graphs = 5 num_sets_of_data = 10 #other things... intervals = np.ceil(np.logspace(-2,1,50)*500) noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt(0.0001)] num_hard_workers = 5 hard_work_threshold = 4000 ############################################################################################ #generate graphs graphs = [] for i in range(0,num_graphs): graphs.append(nx.powerlaw_cluster_graph(500,0.1,0.05)) #save them for later reference filehandler = open(graphfile,'w') pickle.dump(graphs,filehandler,-1) filehandler.close() #queues easy_work = [] hard_work = [] #construct the items in the hard queue l=0 for j in range(0,len(intervals)): for i in range(0,len(noise)): for k in range(0,num_graphs): if int(intervals[j]) <=hard_work_threshold: easy_work.append({'datapt': l,'graph': graphs [k],'noise': noise[i],'number_of_sets_of_data': num_sets_of_data,'number_of_data_points':int(intervals[j])}) else: hard_work.append({'datapt': l,'graph': graphs [k],'noise': noise[i],'number_of_sets_of_data': num_sets_of_data,'number_of_data_points':int(intervals[j])}) l+=1 result = np.zeros(l) #create pool with all cores possible worker_pool = Pool() for i in xrange(0,len(easy_work)): worker_pool.apply_async(GP.RandomWalkGeneralizationErrorParallel, (easy_work[i],),callback=cb) worker_pool.close() worker_pool.join() #create hard work queue worker_pool = Pool(processes = num_hard_workers) for i in xrange(0,len(hard_work)): worker_pool.apply_async(GP.RandomWalkGeneralizationErrorParallel, (hard_work[i],),callback=cb) worker_pool.close() worker_pool.join() finaldata = result.reshape((len(intervals),len(noise),num_graphs)) np.save(savefile,finaldata) ================================================================================================================================================ GaussianProcessRegression.py ================================================================================================================================================ import CovarianceFunction as CF import networkx as nx import numpy as np import scipy.linalg as sp #fortran code from lapack-blas (hopefully when scipy updated this wont be needed) import dtrsv #Currently we assume Gaussian noise TODO change to general noise #Assume 0 mean TODO change to general mean Gaussian Process class GaussianProcessRegression: def __init__(self,covariance_function,sigma): #a covariance function object defined in CovarianceFunction class #note this uses the parent class but any children can be used self.C = covariance_function #a list of pts that are known and their values self.pts = [] self.vals = [] #the inverse of K as defined in #...@book{coolen05:theoryofneural, #ISBN = {0-19-853024-2}, #publisher = {Oxford University Press, USA}, #author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.}, #title = {Theory of neural information processing systems}, #year = {2005}, #} self.K = np.array([]) #gaussian noise variable self.sigma = float(sigma) self.cholL = np.array([]) def add_data_points(self,points,vals): #add all points to list self.pts += points self.vals += vals arraysize = len(self.pts) #construct K K = np.zeros((arraysize,arraysize)) #for speed pts = self.pts between_points = self.C.between_points if len(self.K): K[:-1,:-1] = self.K for i in xrange(0,arraysize): for j in xrange(arraysize-len(points),arraysize): K[i,j] = between_points(pts[i],pts[j]) K[j,i] = K[i,j] K[arraysize-len(points):arraysize,arraysize-len (points):arraysize] = K[arraysize-len(points):arraysize,arraysize-len (points):arraysize] + self.sigma**2 * np.eye(len(points)) self.K = K #calculate the prediction of a point based on data previously given def point_prediction(self,points): mean = [] variance =[] arraysize = len(self.pts) #cholesky #if self.cholL.shape[0] < arraysize: L=np.linalg.cholesky(self.K) # self.cholL = L #else: # L = self.cholL alpha = sp.cho_solve((L,1),self.vals) #create L in banded form k=np.zeros((arraysize,len(points))) ################################################################## #for speed get ref to functions im going to use and save them between_points = self.C.between_points pts = self.pts dot = np.dot ################################################################## for j in xrange(0,len(points)): #create k for i in xrange(0,arraysize): k[i,j] = between_points(pts[i],points[j]) #calculate mean and variance #call the command for forward substitution ###############fortran call####################################### v = dtrsv.dtrsv('L','N',arraysize,L,k) ################################################################## #result mean=dot(alpha,k) for i in xrange(0,len(points)): variance.append(between_points(points[i],points[i]) - dot(v [:,i],v[:,i])) #return it in dictionary form return {'mean':mean,'variance':variance} # calculate the error for data given, where function is a vector # of the function evaluated at a sufficiently large number of points # that the GPregression has been trying to learn def error(self,function): total = 0 #sum up variances result = self.point_prediction(function[::2]) total = np.sum(result['variance']) total = (1/float(len(function)/2))*total return total #clear what has been learnt so far def clear(self): self.pts = [] self.vals = [] self.K = np.array([]) #calculate the average error for a function defined in function when give #number_of_examples examples def average_error_over_samples(self,function, sample_size, number_of_examples): avg = 0 numberofpoints = len(function)/2 for i in range(0,sample_size): self.clear() #generate points of the function permpts = np.random.randint (0,numberofpoints,number_of_examples) #create the vectors pts = [] vals = [] for j in range(0,number_of_examples): pts.append(function[permpts[j]*2]) vals.append(function[permpts[j]*2+1]) #learn these points self.add_data_points(pts,vals) #print("points added") avg = avg + self.error(function) avg = avg/sample_size return avg -- http://mail.python.org/mailman/listinfo/python-list