a portion of my twisted app is having some problems.  i think i figured out the 
issue -- but if I'm right.. i'll be a bit lost.

this portion of the app is essentially a web scraper.  it grabs a batch of X 
urls from a data broker , and then updates a database with data about the URL ( 
which either comes from an oEmbed endpoint , a third party data provider, or 
scraping the page if needed )

there's a lot of code that would be messy to follow, so i'll just explain it as 
best as possible, and provide some highlights.

the underlying logic is basically this:

        reactor starts an UpdateLinksService, that checks for new batches every 
30 seconds
        the UpdateLinksService has an internal marker to check if it's still 
processing the last batch - or if it's safe to process

        to process the urls, the UpdateLinksService runs them in a request 
wrapper , that is supposed to be run through a defer.DeferredSemaphore() service

        when i'm done with the batch, i clear out internal marker via a 
`deferred_list_finish` method.

looking at some aggressive debugging output, it looks like my work to process a 
url is happening /after/ i call deferred_list_finish.  

in other words, i've somehow structured this so that i'm instantly finished.  

i *thought* i was running out of memory because i had some phantom deferreds 
running around.  now i'm starting to think that i'm just stacking the queue 
faster than i work on it.

i've tried changing things around and using different return values, but then 
started getting "exceptions.AssertionError:" because "assert not 
isinstance(result, Deferred)" ( twisted/internet/defer.py", line 381, in 
callback )

the following is a rough composite of what is going on.  if anyone sees an 
obvious fix, i'd be greatly appreciative.  

thanks!

=================

class UpdateLinksService():

        def process_urls(self, urls):
                requests = []
                for url in urls:
                        wrapper = requestWrapper( self.semaphoreService, dbPool 
) 
                        d = wrapper.queue_url(url)
                        updates.append(d)
                self.d_list = defer.DeferredList( updates )\
                    .addCallback( self.deferred_list_finish )


class RequestWrapper():

        def __init__(self, semaphore_service, dbPool):
                self.semaphoreService=semaphore_service
        self.dbPool = dbPool

    def queue_url( self, url ):
        self.url = url
        d = self.semaphoreService.run( self._to_thread )
        return d

    def _to_thread( self ):
        d = threads.deferToThread( self._thread_begin )
        return d

    def _thread_begin(self):
        worker = UrlWorker()
        d = self.dbPool.runInteraction( worker.process_url , self.url )


class UrlWorker():

        def process_url(self,txn, url):
                #blocking stuff
                return True/False                                       
                
                
The reason why I have _to_thread + _thread_begin  as 2 functions, and UrlWoker 
separate is for code re-use.

The RequestWrapper functions are mostly all in a base class; i just subclass 
RequestWrapper and override _thread_begin and an error callback (not shown)

UrlWorker's various methods are used througout my twisted daemon.
_______________________________________________
Twisted-Python mailing list
Twisted-Python@twistedmatrix.com
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

Reply via email to