On May 10, 1:31 pm, Dennis Lee Bieber <[EMAIL PROTECTED]> wrote: > On Fri, 9 May 2008 08:40:38 -0700 (PDT),skunkwerk<[EMAIL PROTECTED]> > declaimed the following in comp.lang.python: > > Coming in late... > > > On May 9, 12:12 am, John Nagle <[EMAIL PROTECTED]> wrote: > > >skunkwerkwrote: > > > > i've declared a bunch of workerthreads(100) and a queue into which > > > > new requests are inserted, like so: > > <snip> > > > > > > > queue = Queue.Queue(0) > > > > WORKERS=100 > > > > for i in range(WORKERS): > > > > thread = SDBThread(queue) > > > > thread.setDaemon(True) > > > > thread.start() > > > > > the thread: > > > > > class SimpleDBThread ( threading.Thread ): > > > > def __init__ ( self, queue ): > > > > self.__queue = queue > > Note: double-leading __ means "name mangling" -- typically only > needed when doing multiple layers of inheritance where different parents > have similar named items that need to be kept independent; a single _ is > the convention for "don't touch me unless you know what you are doing" > > > > > threading.Thread.__init__ ( self ) > > > > def run ( self ): > > > > while 1: > > > > item = self.__queue.get() > > > > if item!=None: > > > > model = domain.get_item(item[0]) > > > > logger.debug('sdbthread item:'+item[0]) > > > > title = model['title'] > > > > scraped = model['scraped'] > > > > logger.debug("sdbthread title:"+title) > > > > > any suggestions? > > > > thanks > > <snip> > > > thanks John, Gabriel, > > here's the 'put' side of the requests: > > > def prepSDBSearch(results): > > modelList = [0] > > counter=1 > > for result in results: > > data = [result.item, counter, modelList] > > queue.put(data) > > counter+=1 > > while modelList[0] < len(results): > > print 'waiting...'#wait for them to come home > > modelList.pop(0)#now remove '0' > > return modelList > > My suggestion, if you really want diagnostic help -- follow the > common recommendation of posting the minimal /runable (if erroneous)/ > code... If "domain.get_item()" is some sort of RDBM access, you might > fake it using a pre-loaded dictionary -- anything that allows it to > return something when given the key value. > > > responses to your follow ups: > > 1) 'item' in thethreadsis a list that corresponds to the 'data' > > list in the above function. it's not global, and the initial values > > seem ok, but i'm not sure if every time i pass in data to the queue it > > passes in the same memory address or declares a new 'data' list (which > > I guess is what I want) > > Rather confusing usage... In your "put" you have a list whose first > element is "result.item", but then in the work thread, you refer to the > entire list as "item" > > > 3) the first item in the modelList is a counter that keeps track of > > the number ofthreadsfor this call that have completed - is there any > > better way of doing this? > > Where? None of your posted code shows either "counter" or modelList > being used by thethreads. > > And yes, if you havethreadstrying to update a shared mutable, you > have a race condition. > > You also have a problem if you are using "counter" to define where > in modelList a thread is supposed to store its results -- as you can not > access an element that doesn't already exist... > > a = [0] > a[3] = 1 #failure, need to create elements 1, 2, 3 first > > Now, if position is irrelevant, and a thread just appends its > results to modelList, then you don't need some counter, all you need is > to check the length of modelList against the count expected. > > Overall -- even though you are passing things via the queue, the > contents being pass via the queue are being treated as if they were > global entities (you could make modelList a global, remove it from the > queue entries, and have the same net access)... > > IOWs, you have too much coupling between thethreadsand the feed > routine... > > As for me... I'd be using a second queue for return values... > > WORKERTHREADS = 100 > feed = Queue.Queue() > result = Queue.Queue() > > def worker(): > while True: > (ID, item) = feed.get() #I leave the queues > globals > > #since they perform locking > > #internally > model = domain.get_item(item) > results.put( (ID, model["title"], model["scraped"]) ) > > for i in range(WORKERTHREADS): > aThread = threading.Thread(target=worker) > #overkill to subclass as there is now no specialized init > #and if I really wanted to make the queues non-global > #I'd pass them as arguments: > # threading.Thread(target=worker, args=(feed, results)) > #where worker is now > # def worker(feed, results): > aThread.setDaemon(True) > aThread.start() > > ... > > def prepSearch(searches): > modelList = [] > counter = 0 > for searchItem in searches: > feed.put( (counter, searchItem) ) > counter += 1 > modelList.append(None) #extend list one element per search > while counter: > (ID, title, scraped) = results.get() > modelList[ID] = (title, scraped) > counter -= 1 > return modelList > > The only place counter and modelList are modified are within the > prepSearch. I'm passing counter out and back to use as an ID value if > the final results are supposed to be in order -- that way if one thread > finishes before another, the items can be placed into the list where > they'd have been sequentially. > > I can only hope that "domain.get_item" is an activity that is I/O > bound AND that it supports parallel accesses... Otherwise the above > workerthreadsseem to be adding a lot of overhead for queue I/O and > threading swaps for what is otherwise a rather linear process. > > Perhaps your posts don't reveal enough... Maybe you have multiple > mainthreadsthat are posting to the worker feed queue (and you were > using separate mutables for storing the results). In this situation, I'd > remove the results queue from being a global entity, create one queue > per main processing thread, and pass the queue as one of the parameters. > This way, a worker can return data to any source thread by using the > supplied queue for the return... > > Modify prepSearch with: > > myQueue = Queue.Queue() > ... > feed.put( (counter, searchItem, myQueue) ) > ... > (ID, title, scraped) = myQueue.get() > > Modify worker with: > > (ID, item, retqueue) = feed.get() > ... > retqueue.put( (ID, model["title"], model["scraped"]) ) > -- > Wulfraed Dennis Lee Bieber KD6MOG > [EMAIL PROTECTED] [EMAIL PROTECTED] > HTTP://wlfraed.home.netcom.com/ > (Bestiaria Support Staff: [EMAIL PROTECTED]) > HTTP://www.bestiaria.com/
wow. thanks for the help. i seem to have fixed my problem though - it turns out domain.get_item was not thread safe as it was using the underlying httplib. the solution was to create a new connection to the database for each thread (which is ok as the database is meant to be queried in a massively paralell fashion). the missing part of the code included a part where i inserted the results at the given position into the list. the only issue i have now is that it takes a long time for 100 threads to initialize that connection (>5 minutes) - and as i'm doing this on a webserver any time i update the code i have to restart all those threads, which i'm doing right now in a for loop. is there any way I can keep the thread stuff separate from the rest of the code for this file, yet allow access? It wouldn't help having a .pyc or using psycho, correct, as the time is being spent in the runtime? something along the lines of 'start a new thread every minute until you get to a 100' without blocking the execution of the rest of the code in that file? or maybe any time i need to do a search, start a new thread if the #threads is <100? thanks again -- http://mail.python.org/mailman/listinfo/python-list