Hi all, I'm having trouble with multiprocessing I'm using it to speed up some simulations, I find for large queues when the process reaches the poison pill it does not exit whereas for smaller queues it works without any problems. Has anyone else had this trouble? Can anyone tell me a way around it? The code is in two files below.
Thanks Matt parallel.py =================================================== import GaussianProcessRegression as GP import numpy as np import networkx as nx import pickle import multiprocessing ############################################################################################ # Things You Can Change ############################################################################################ #savefiles savefile = "wattsdata2" graphfile = "wattsgraphs2" #sample sizes num_graphs = 5 num_sets_of_data = 10 #other things... intervals = np.ceil(np.logspace(-2,1,50)*500) noise = [np.sqrt(0.1),np.sqrt(0.01),np.sqrt(0.001),np.sqrt(0.0001)] ############################################################################################ #generate graphs graphs = [] for i in range(0,num_graphs): graphs.append(nx.watts_strogatz_graph(500,5,0.01)) #save them for later reference filehandler = open(graphfile,'w') pickle.dump(graphs,filehandler,-1) filehandler.close() #queues easy_work_queue = multiprocessing.Queue() hard_work_queue = multiprocessing.Queue() result_queue = multiprocessing.Queue() #construct the items in the hard queue l=0 for j in range(0,len(intervals)): for i in range(0,len(noise)): for k in range(0,num_graphs): if int(intervals[j]) <=4000: easy_work_queue.put({'datapt': l,'graph': graphs [k],'noise': noise[i],'number_of_sets_of_data': num_sets_of_data,'number_of_data_points':int(intervals[j])}) else: hard_work_queue.put({'datapt': l,'graph': graphs [k],'noise': noise[i],'number_of_sets_of_data': num_sets_of_data,'number_of_data_points':int(intervals[j])}) l+=1 #get number of cores and set the number on concurrent processes num_hard_workers = 2 num_workers = multiprocessing.cpu_count()*1.5 easy_workers = [] hard_workers = [] #add poison pill for each worker and create the worker for i in range(0,num_workers-num_hard_workers): easy_work_queue.put(None) easy_workers.append(multiprocessing.Process (target=GP.RandomWalkGeneralizationErrorParallel,args= (easy_work_queue,result_queue,))) for i in range(0,num_hard_workers): hard_work_queue.put(None) hard_workers.append(multiprocessing.Process (target=GP.RandomWalkGeneralizationErrorParallel,args= (hard_work_queue,result_queue,))) #run all workers for worker in hard_workers: worker.start() for worker in easy_workers: worker.start() #wait for easy workers to finish for worker in easy_workers: worker.join() print('worker joined') #set off some of the easy workers on the hard work (maybe double number of hard) for i in range(0,num_hard_workers): hard_work_queue.put(None) hard_workers.append(multiprocessing.Process (target=GP.RandomWalkGeneralizationErrorParallel,args= (hard_work_queue,result_queue,))) #wait for all hard workers to finish for worker in hard_workers: worker.join() #construct data from the mess in the result queue tempdata = np.zeros(l) while not result_queue.empty(): data = result_queue.get() tempdata[data[0]] = data[1] finaldata = tempdata.reshape((len(intervals),len(noise),num_graphs)) np.save(savefile,finaldata) ======================================================= GaussianProcessRegression.py ======================================================= import CovarianceFunction as CF import networkx as nx import numpy as np import scipy.linalg as sp #fortran code from lapack-blas (hopefully when scipy updated this wont be needed) import dtrsv #to use more than one core import multiprocessing #Currently we assume Gaussian noise TODO change to general noise #Assume 0 mean TODO change to general mean Gaussian Process class GaussianProcessRegression: def __init__(self,covariance_function,sigma): #a covariance function object defined in CovarianceFunction class #note this uses the parent class but any children can be used self.C = covariance_function #a list of pts that are known and their values self.pts = [] self.vals = [] #the inverse of K as defined in #...@book{coolen05:theoryofneural, #ISBN = {0-19-853024-2}, #publisher = {Oxford University Press, USA}, #author = {Coolen, A. C. C. and K{\"{u}}hn, R. and Sollich, P.}, #title = {Theory of neural information processing systems}, #year = {2005}, #} self.K = np.array([]) #gaussian noise variable self.sigma = float(sigma) self.cholL = np.array([]) def add_data_points(self,points,vals): #add all points to list self.pts += points self.vals += vals arraysize = len(self.pts) #construct K K = np.zeros((arraysize,arraysize)) #for speed pts = self.pts between_points = self.C.between_points if len(self.K): K[:-1,:-1] = self.K for i in xrange(0,arraysize): for j in xrange(arraysize-len(points),arraysize): K[i,j] = between_points(pts[i],pts[j]) K[j,i] = K[i,j] K[arraysize-len(points):arraysize,arraysize-len (points):arraysize] = K[arraysize-len(points):arraysize,arraysize-len (points):arraysize] + self.sigma**2 * np.eye(len(points)) self.K = K #calculate the prediction of a point based on data previously given def point_prediction(self,points): mean = [] variance =[] arraysize = len(self.pts) #cholesky #if self.cholL.shape[0] < arraysize: L=np.linalg.cholesky(self.K) # self.cholL = L #else: # L = self.cholL alpha = sp.cho_solve((L,1),self.vals) #create L in banded form k=np.zeros((arraysize,len(points))) ################################################################## #for speed get ref to functions im going to use and save them between_points = self.C.between_points pts = self.pts dot = np.dot ################################################################## for j in xrange(0,len(points)): #create k for i in xrange(0,arraysize): k[i,j] = between_points(pts[i],points[j]) #calculate mean and variance #call the command for forward substitution ###############fortran call####################################### v = dtrsv.dtrsv('L','N',arraysize,L,k) ################################################################## #result mean=dot(alpha,k) for i in xrange(0,len(points)): variance.append(between_points(points[i],points[i]) - dot(v [:,i],v[:,i])) #return it in dictionary form return {'mean':mean,'variance':variance} # calculate the error for data given, where function is a vector # of the function evaluated at a sufficiently large number of points # that the GPregression has been trying to learn def error(self,function): total = 0 #sum up variances result = self.point_prediction(function[::2]) total = np.sum(result['variance']) total = (1/float(len(function)/2))*total return total #clear what has been learnt so far def clear(self): self.pts = [] self.vals = [] self.K = np.array([]) #calculate the average error for a function defined in function when give #number_of_examples examples def average_error_over_samples(self,function, sample_size, number_of_examples): avg = 0 numberofpoints = len(function)/2 for i in range(0,sample_size): self.clear() #generate points of the function permpts = np.random.randint (0,numberofpoints,number_of_examples) #create the vectors pts = [] vals = [] for j in range(0,number_of_examples): pts.append(function[permpts[j]*2]) vals.append(function[permpts[j]*2+1]) #learn these points self.add_data_points(pts,vals) #print("points added") avg = avg + self.error(function) avg = avg/sample_size return avg #calculate the average error over functions over data of size number_of_data_points for MOST cases this is #also the generalization error a summary of which and approximations to can be found in: #...@inproceedings{sollich99learningcurves, #booktitle = {Neural Computation}, #author = {Sollich, P.}, #title = {Learning curves for Gaussian process regression: Approximations and bounds}, #pages = {200-2}, #year = {1999}, #} def emprical_average_error_over_functions (self,number_of_functions,number_of_sets_of_data,number_of_data_points,function_detail =0,progress=0): avg = 0 step = float(100)/number_of_functions for i in range(0,number_of_functions): if progress: print step*float(i),"%" if function_detail: fx = self.C.generate_function (self.sigma,function_detail) else: fx = self.C.generate_function(self.sigma) avg = self.average_error_over_samples (fx,number_of_sets_of_data,number_of_data_points)+avg avg = avg / number_of_functions return avg def average_error_over_functions (self,number_of_sets_of_data,number_of_data_points,function_detail=0): if function_detail: fx = self.C.generate_function (self.sigma,function_detail) else: fx = self.C.generate_function(self.sigma) avg = self.average_error_over_samples (fx,number_of_sets_of_data,number_of_data_points) return(avg) def function_prediction(self,pts): temp = self.point_prediction(pts) return {'func':temp['mean'],'varpos':temp ['variance'],'varneg':-temp['variance']} ######################################################################################################################################################### #Functions not contained in a class ######################################################################################################################################################### #function to calculate the generalization error for a RandomWalk kernel averaging over graphs graphs def RandomWalkGeneralizationError (noise,graphs,number_of_sets_of_data,number_of_data_points,a=2,p=10): graph_specific = np.zeros(len(graphs)) avg = 0 for i in range(0,len(graphs)): rw = CF.RandomWalk(a,p,graphs[i]) GP = GaussianProcessRegression(rw,noise) graph_specific[i] = GP.average_error_over_functions (number_of_sets_of_data,number_of_data_points) avg = np.sum(graph_specific)/len(graphs) return avg, graph_specific #as above but using queues to create parallel architechture def RandomWalkGeneralizationErrorParallel (work_queue,result_queue,a=2,p=10): while True: input = work_queue.get() if input is None: print "poison" break print 'this should not appear' print input['datapt'], ' ', input['number_of_data_points'] rw=CF.RandomWalk(a,p,input['graph']) GP = GaussianProcessRegression(rw,input['noise']) err = GP.average_error_over_functions(input ['number_of_sets_of_data'],input['number_of_data_points']) result_queue.put([input['datapt'],err]) print 'here' return -- http://mail.python.org/mailman/listinfo/python-list