I'm in the process of rewriting a web spider (originally in twisted circa 2005) , and keep running into an issue with deferreds releasing too many tokens.
i've been playing around with this all day, and can't seem to shake this problem. i'm guessing that i designed the application logic wrong. Does the following code raise any warning signs for people ? The general setup is this: AnalyzeLink - - run-of-the-mill class that performs actual db operations and link fetching - doesn't really rely on twisted, aside from being coded to the specs of runInteraction ( accepts an adbapi txn, raises for a rollback, is generally happy for a commit ) AnalyzeLinksService- - relies on twisted - queries the database for a batch of items to update - each actionable item is wrapped into an '_AnalyzeLinksRequestWrapper' instance, all of which are tossed into a defer.DeferredList() AnalyzeLinksRequestWrapper- - relies on twisted - pushes actual work into callbacks via threads.deferToThread - uses a defer.DeferredSemaphore provided by AnalyzeLinksService to acquire locks some cleaned-up code is below : ------------------ class AnalyzeLink(object): def get_update_batch(self,txn): # returns list of ids/data/etc to process def action_for_data(self,txn,data): # processes an entry class _AnalyzeLinksRequestWrapper(RequestWrapper): dbConnectionPool = None semaphoreService = None semaphoreLock = None def __init__( self , semaphoreService = None , dbConnectionPool = None ): self.dbConnectionPool= dbConnectionPool self.semaphoreService= semaphoreService def queue_thread( self , data=None ): self.queued_data= data d = self.semaphoreService.acquire()\ .addCallback( self._T_to_thread ) return d def _T_to_thread( self , deferredSemaphore ): self.semaphoreLock= deferredSemaphore t = threads.deferToThread( self._T_thread_begin )\ .addErrback( self._T_errors )\ .addCallback( self._T_thread_end ) def _T_thread_begin( self ): log.debug("_AnalyzeLinksRequestWrapper._T_thread_begin" ) updater = AnalyzeLink() self.dbConnectionPool.runInteraction( updater.action_for_data , self.queued_data )\ .addCallback( self._T_thread_end )\ .addErrback( self._T_errors ) def _T_thread_end( self , rval=None ): self.semaphoreLock.release() def _T_errors( self , x ): self._T_thread_end() raise x class _AnalyzeLinksService(ServiceScaffold): SEMAPHORE_TOKENS = 25 def __init__( self ): self.semaphoreService= defer.DeferredSemaphore( tokens=self.SEMAPHORE_TOKENS ) def action( self ): updater= AnalyzeLink() database.get_dbPool().runInteraction( updater.get_update_batch , queued_updates )\ .addCallback( self._action_2 )\ .addErrback( self._action_error ) def _action_2( self , queued_updates ): if len( queued_updates ): updates= [] for item in queued_updates: requestWrapper= _AnalyzeLinksRequestWrapper(\ semaphoreService = self.semaphoreService , dbConnectionPool = database.get_dbPool() ) result= requestWrapper.queue_thread( data=item ) updates.append(result) finished= defer.DeferredList( updates )\ .addCallback( self.deferred_list_finish ) else: d= defer.Deferred() self.deferred_list_finish( d ) def _action_error( self , raised ): log.debug("%s._action_error" % self.__class__.__name__ ) self.set_processing_status( False ) if isinstance( raised.value , database.DbRollback ): print "DB Rollback" raise raised elif isinstance( raised.value , database.DbRollbackOk ): print "DB Rollback ok" else: raise raised AnalyzeLinksService= _AnalyzeLinksService() class AnalyzeLinksService_Service(internet.TimerService): def __init__( self , dbConfigHash=None ): internet.TimerService.__init__( self, CHECK_PERIOD__IMPORT , AnalyzeLinksService.action ) _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python