This is a better way of using DeferredSemaphore: def queue_thread( self , data=None ): self.queued_data= data return self.semaphoreService.run( self._T_to_thread )
It handles acquisition and release for you. This will avoid any code path that might result in a double-release. On Wed, Sep 26, 2012 at 1:07 PM, Jonathan Vanasco <twisted-pyt...@2xlp.com> wrote: > I'm in the process of rewriting a web spider (originally in twisted circa > 2005) , and keep running into an issue with deferreds releasing too many > tokens. > > i've been playing around with this all day, and can't seem to shake this > problem. > > i'm guessing that i designed the application logic wrong. > > Does the following code raise any warning signs for people ? > > The general setup is this: > > AnalyzeLink - > - run-of-the-mill class that performs actual db operations > and link fetching > - doesn't really rely on twisted, aside from being coded to > the specs of runInteraction ( accepts an adbapi txn, raises for a rollback, > is generally happy for a commit ) > > AnalyzeLinksService- > - relies on twisted > - queries the database for a batch of items to update > - each actionable item is wrapped into an > '_AnalyzeLinksRequestWrapper' instance, all of which are tossed into a > defer.DeferredList() > > AnalyzeLinksRequestWrapper- > - relies on twisted > - pushes actual work into callbacks via threads.deferToThread > - uses a defer.DeferredSemaphore provided by > AnalyzeLinksService to acquire locks > > > some cleaned-up code is below : > > ------------------ > > class AnalyzeLink(object): > def get_update_batch(self,txn): > # returns list of ids/data/etc to process > > def action_for_data(self,txn,data): > # processes an entry > > > class _AnalyzeLinksRequestWrapper(RequestWrapper): > dbConnectionPool = None > semaphoreService = None > semaphoreLock = None > > def __init__( self , semaphoreService = None , dbConnectionPool = None ): > self.dbConnectionPool= dbConnectionPool > self.semaphoreService= semaphoreService > > def queue_thread( self , data=None ): > self.queued_data= data > d = self.semaphoreService.acquire()\ > .addCallback( self._T_to_thread ) > return d > > def _T_to_thread( self , deferredSemaphore ): > self.semaphoreLock= deferredSemaphore > t = threads.deferToThread( self._T_thread_begin )\ > .addErrback( self._T_errors )\ > .addCallback( self._T_thread_end ) > > def _T_thread_begin( self ): > log.debug("_AnalyzeLinksRequestWrapper._T_thread_begin" ) > updater = AnalyzeLink() > self.dbConnectionPool.runInteraction( updater.action_for_data , > self.queued_data )\ > .addCallback( self._T_thread_end )\ > .addErrback( self._T_errors ) > > def _T_thread_end( self , rval=None ): > self.semaphoreLock.release() > > def _T_errors( self , x ): > self._T_thread_end() > raise x > > > class _AnalyzeLinksService(ServiceScaffold): > SEMAPHORE_TOKENS = 25 > > def __init__( self ): > self.semaphoreService= defer.DeferredSemaphore( > tokens=self.SEMAPHORE_TOKENS ) > > def action( self ): > updater= AnalyzeLink() > database.get_dbPool().runInteraction( updater.get_update_batch , > queued_updates )\ > .addCallback( self._action_2 )\ > .addErrback( self._action_error ) > > def _action_2( self , queued_updates ): > if len( queued_updates ): > updates= [] > for item in queued_updates: > requestWrapper= _AnalyzeLinksRequestWrapper(\ > semaphoreService = self.semaphoreService , > dbConnectionPool = database.get_dbPool() > ) > result= requestWrapper.queue_thread( data=item ) > updates.append(result) > finished= defer.DeferredList( updates )\ > .addCallback( self.deferred_list_finish ) > else: > d= defer.Deferred() > self.deferred_list_finish( d ) > > def _action_error( self , raised ): > log.debug("%s._action_error" % self.__class__.__name__ ) > self.set_processing_status( False ) > if isinstance( raised.value , database.DbRollback ): > print "DB Rollback" > raise raised > elif isinstance( raised.value , database.DbRollbackOk ): > print "DB Rollback ok" > else: > raise raised > > > AnalyzeLinksService= _AnalyzeLinksService() > > > class AnalyzeLinksService_Service(internet.TimerService): > def __init__( self , dbConfigHash=None ): > internet.TimerService.__init__( self, > CHECK_PERIOD__IMPORT , > AnalyzeLinksService.action > ) > > _______________________________________________ > Twisted-Python mailing list > Twisted-Python@twistedmatrix.com > http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python