a portion of my twisted app is having some problems. i think i figured out the issue -- but if I'm right.. i'll be a bit lost.
this portion of the app is essentially a web scraper. it grabs a batch of X urls from a data broker , and then updates a database with data about the URL ( which either comes from an oEmbed endpoint , a third party data provider, or scraping the page if needed ) there's a lot of code that would be messy to follow, so i'll just explain it as best as possible, and provide some highlights. the underlying logic is basically this: reactor starts an UpdateLinksService, that checks for new batches every 30 seconds the UpdateLinksService has an internal marker to check if it's still processing the last batch - or if it's safe to process to process the urls, the UpdateLinksService runs them in a request wrapper , that is supposed to be run through a defer.DeferredSemaphore() service when i'm done with the batch, i clear out internal marker via a `deferred_list_finish` method. looking at some aggressive debugging output, it looks like my work to process a url is happening /after/ i call deferred_list_finish. in other words, i've somehow structured this so that i'm instantly finished. i *thought* i was running out of memory because i had some phantom deferreds running around. now i'm starting to think that i'm just stacking the queue faster than i work on it. i've tried changing things around and using different return values, but then started getting "exceptions.AssertionError:" because "assert not isinstance(result, Deferred)" ( twisted/internet/defer.py", line 381, in callback ) the following is a rough composite of what is going on. if anyone sees an obvious fix, i'd be greatly appreciative. thanks! ================= class UpdateLinksService(): def process_urls(self, urls): requests = [] for url in urls: wrapper = requestWrapper( self.semaphoreService, dbPool ) d = wrapper.queue_url(url) updates.append(d) self.d_list = defer.DeferredList( updates )\ .addCallback( self.deferred_list_finish ) class RequestWrapper(): def __init__(self, semaphore_service, dbPool): self.semaphoreService=semaphore_service self.dbPool = dbPool def queue_url( self, url ): self.url = url d = self.semaphoreService.run( self._to_thread ) return d def _to_thread( self ): d = threads.deferToThread( self._thread_begin ) return d def _thread_begin(self): worker = UrlWorker() d = self.dbPool.runInteraction( worker.process_url , self.url ) class UrlWorker(): def process_url(self,txn, url): #blocking stuff return True/False The reason why I have _to_thread + _thread_begin as 2 functions, and UrlWoker separate is for code re-use. The RequestWrapper functions are mostly all in a base class; i just subclass RequestWrapper and override _thread_begin and an error callback (not shown) UrlWorker's various methods are used througout my twisted daemon. _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python