Hi Colleagues: I recently updated to Stackless 2.6.1 and Twisted 8.2. I executed the following programme that works fine under Stackless 2.5.2 and Twisted (including 8.2) and I received the following error:
traceback (most recent call last): File "ToyProcessor5.py", line 60, in twistedReactor reactor.run() File "/usr/local/lib/python2.6/site-packages/twisted/internet/base.py" , line 1048, in run self.mainLoop() --- <exception caught here> --- File "/usr/local/lib/python2.6/site-packages/twisted/internet/base.py" , line 1057, in mainLoop self.runUntilCurrent() File "/usr/local/lib/python2.6/site-packages/twisted/internet/base.py" , line 707, in runUntilCurrent log.deferr() File "/usr/local/lib/python2.6/site-packages/twisted/python/log.py", l ine 153, in err _stuff = failure.Failure() File "/usr/local/lib/python2.6/site-packages/twisted/python/failure.py ", line 265, in __init__ parentCs = reflect.allYourBase(self.type) ", line 542, in allYourBase accumulateBases(classObj, l, baseClass) File "/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550, in accumulateBases accumulateBases(base, l, baseClass) File "/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550, in accumulateBases accumulateBases(base, l, baseClass) File "/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550, in accumulateBases accumulateBases(base, l, baseClass) File "/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550, in accumulateBases accumulateBases(base, l, baseClass) exceptions.RuntimeError: maximum recursion depth exceeded Similarily written programmes fail in the same fashion. What is suspicious is that there is no recursion in the offending section. Even if I create one worker tasklet, I get the same error. def twistedReactor(): l = task.LoopingCall(stackless.schedule) l.start(.01) reactor.run() however, if I change l = task.LoopingCall(stackless.schedule) to l = task.LoopingCall(tick) and tick is def tick(): stackless.schedule() the programme works. Although a work around, I would like to find the real problem. What particular worries me is when I created small test examples, I was not able to recreate the problem. Something else is going on.... What I would appreciate is some hints as to what may be happening. - I normally don't expect "Maximum Recursion Depth" errors in Stackless. - What is the reflect.allYourBases stuff? - What is log.deferr? (I don't recall seeing that method). If I get few clues, it would make it easier for me to write new tests and zero in on the problem. Hopefully the problem is with my code rather than Stackless 2.6.1 and/or Twisted 8.2 I have included some sample code. Unfortunately ToyProcessor5.py is a bit large (I have newer code that is smaller but requires more files) Cheers, Andrew
#!/usr/bin/env python """ ToyProcessor.py Andrew Francis April 3rd, 2008 The purpose of this programme is to illustrate the techniques used in the presentation "Adventures in Stackless Python / Twisted Integration" <song>Dead Letter and the Infinite - Wintersleep </song> """ import stackless import time import sys from twisted.internet.defer import Deferred from twisted.python.failure import Failure from twisted.internet import reactor from twisted.web import client from twisted.web import http from twisted.python import log, logfile from twisted.internet import task RECEIVE_RESPONSE = 0 REPLY_RESPONSE = 1 INVOKE_RESPONSE = 2 WAIT_RESPONSE = 3 REQUEST_ERROR = -1 REPLY_ERROR = -1 __DEBUG__ = False class CorrelationNotFound: def __init__(self, data): self.data = data class Request(object): pass class Response(object): pass """ run the Twisted Reactor in its own tasklet also run a task that makes the Twisted Reactor yield to the Stackless scheduler """ def twistedReactor(): l = task.LoopingCall(stackless.schedule) l.start(.01) reactor.run() class InvokeResponse(Response): def __init__(self, requestId, message, isError = False): self.type = INVOKE_RESPONSE self.requestId = requestId self.message = message self.isError = isError return class ReceiveResponse(Response): def __init__(self, path, message, channel): self.type = RECEIVE_RESPONSE self.path = path self.message = message self.channel = channel return class ReplyRequest(Request): def __init__(self, requestId, message, isError = False): self.requestId = requestId self.message = message self.isError = isError class ReplyResponse(Response): def __init__(self, requestId, isError = False): self.type = REPLY_RESPONSE self.requestId = requestId self.isError = isError return class WaitResponse(Response): def __init__(self, requestId): self.type = WAIT_RESPONSE self.requestId = requestId return """ Process corresponds to a WS-BPEL process """ class Process(object): def __init__(self, url, messageExchange): self.url = url self.messageExchange = messageExchange self.scopeId = Process.processor.__generateScope__() self.replyMessage = "<html><head></head><body>hello world from process " + str(self.scopeId) + "</body></html>" return def execute(self): log.msg("Process " + str(self.scopeId) + " started") self.processor.receive(self.scopeId, self.url, self.messageExchange, self.__class__) #result = self.processor.invoke("http://localhost") self.processor.reply(self.scopeId, self.messageExchange, self.replyMessage) return class AlarmProcess(object): def __init__(self, period): self.period = period self.scopeId = Process.processor.__generateScope__() return def execute(self): log.msg("Process " + str(self.scopeId) + " started") while (1): self.processor.wait(self.scopeId, self.period) #result = self.processor.invoke("http://localhost") log.msg("Tick") return """ Correlations are used to match incoming messages to the corresponding receive activities """ class Correlation(object): def __init__(self, path, messageExchange, requestId): self.path = path self.messageExchange = messageExchange self.requestId = requestId self.aChannel = None return def __eq__(self, other): return self.path == other def __repr__(self): return self.path + " " + "requestId: " + str(self.requestId) + " " + self.messageExchange def getChannel(self): return self.aChannel def setChannel(self, channel): self.aChannel = channel channel = property(getChannel, setChannel) class ToyProcessor(object): def __init__(self, channel): self.responseChannel = channel self.requestId = -1 self.scopeId = 0 self.requestTable = {} self.correlations = [] self.messageExchangeStates = {} self.flag = True return """ level 0 calls """ def createProcess(self, factory, *args,**kwargs): stackless.tasklet(factory(*args, **kwargs).execute)() def invoke(self, url): return self.__processRequest__(self.__invokeRequest__, url, self.responseChannel) def receive(self, eventScope, url, messageExchange, factory): result = self.__processRequest__(self.__receiveRequest__, eventScope, url, messageExchange) # and create a new daemon if factory != None: log.msg("creating replacement daemon") self.createProcess(factory, url, messageExchange) return result def reply(self, eventScope, messageExchange, message): self.__processRequest__(self.__replyRequest__, eventScope, messageExchange, message) def wait(self, eventScope, period): self.__processRequest__(self.__waitRequest__, eventScope, period) """ level 1 calls """ def __rewrite__(self, scope, messageExchange): return str(scope) + ":" + messageExchange def __processRequest__(self, callable, *args, **kwargs): channel = stackless.channel() requestId = self.__addRequest__(channel) callable(requestId, *args, **kwargs) result = channel.receive() self.__deleteRequest__(requestId) return result def __processResponse__(self, response): try: if response.type == RECEIVE_RESPONSE: requestId, message = self.__receiveResponse__(response) elif response.type == REPLY_RESPONSE: requestId, message = self.__replyResponse__(response) elif response.type == INVOKE_RESPONSE: requestId, message = self.__invokeResponse__(response) elif response.type == WAIT_RESPONSE: requestId, message = self.__waitResponse__(response) else: log.message("something funny has happened " + str(requestId)) print response reactor.stop if requestId != REQUEST_ERROR: self.requestTable[requestId].send(message) except: log.msg(sys.exc_info()) def __generateScope__(self): self.scopeId += 1 return self.scopeId def __addRequest__(self, channel): self.requestId = self.requestId + 1 self.requestTable[self.requestId] = channel return self.requestId def __deleteRequest__(self, requestId): del self.requestTable[requestId] return def __invokeRequest__(self, requestId, url, responseChannel): log.msg("INVOKE_REQUEST Started " + str(requestId) + " " + url) Connection(requestId, responseChannel).connect(url) log.msg("INVOKE_REQUEST Finished " + str(requestId)) def __invokeResponse__(self, response): log.msg("INVOKE_RESPONSE Started " + str(response.requestId)) return response.requestId, response.message def __receiveRequest__(self, requestId, scope, path, messageExchange): log.msg("__RECEIVE_REQUEST___ STARTED " + str(requestId) + " pid:" + str(scope)) self.__addCorrelation__(requestId, path, self.__rewrite__(scope, messageExchange)) log.msg("__RECEIVE_REQUEST__ FINISHED__" + str(requestId) + " pid:" + str(scope)) def __receiveResponse__(self, response): log.msg("RECEIVE_RESPONSE STARTED") try: correlation = self.__removeCorrelation__(response.path) self.messageExchangeStates[correlation.messageExchange] = response.channel except CorrelationNotFound, E: response.channel.send(ReplyRequest(None,"<html><head><body>Path not found</body></html>",True)) return REQUEST_ERROR, None else: log.msg("RECEIVE_RESPONSE ENDED " + str(correlation.requestId)) return correlation.requestId, response.message def __replyRequest__(self, requestId, scope, messageExchange, message): log.msg("REPLY_REQUEST STARTED") newMessageExchange = self.__rewrite__(scope, messageExchange) channel = self.messageExchangeStates[newMessageExchange] del self.messageExchangeStates[newMessageExchange] log.msg("REPLY_REQUEST SENDING MESSAGE") channel.send(ReplyRequest(requestId, message)) log.msg("REQUEST_REQUEST FINISHED") return def __replyResponse__(self, response): return response.requestId, None def __waitRequest__(self, requestId, scope, period): def alarm(requestId): self.responseChannel.send(WaitResponse(requestId)) reactor.callLater(period, alarm, requestId) def __waitResponse__(self, response): return response.requestId, None def __addCorrelation__(self, requestId, path, messageExchange): log.msg("entering __addCorrelation__") self.correlations.append(Correlation(path, messageExchange, requestId)) log.msg("exiting __addCorrelation__") def __removeCorrelation__(self, path): try: log.msg("Looking for correlation =>" + path) i = self.correlations.index(path) print "found =>", self.correlations[i] except ValueError: raise CorrelationNotFound(path) return self.correlations.pop(i) def execute(self): while (self.flag): log.msg("processor - waiting for message") self.__processResponse__(self.responseChannel.receive()) """ level 2 """ class MyRequestHandler(http.Request): """ it is necessary to run the body of the request handler in its own tasklet as to prevent the entire reactor from blocking on a channel """ def process(self): stackless.tasklet(self.doWork)() def doWork(self): channel = stackless.channel() receiveRequest = ReceiveResponse(self.path, self.content.read(), channel) MyRequestHandler.responseChannel.send(receiveRequest) log.msg("REQUEST HANDLER WAITING ON CHANNEL") reply = channel.receive() if reply.isError: self.setResponseCode(http.NOT_FOUND) theReplyId = REPLY_ERROR else: theReplyId = reply.requestId self.write(reply.message.encode("utf-8")) self.finish() log.msg("REQUEST HANDLER WRITING REPLY " + str(theReplyId)) MyRequestHandler.responseChannel.send(ReplyResponse(theReplyId)) log.msg("REQUEST HANDLER COMPLETED " + str(theReplyId)) return class MyHttp(http.HTTPChannel): requestFactory = MyRequestHandler class MyHttpFactory(http.HTTPFactory): protocol = MyHttp class Connection(object): def __init__(self, requestId, channel): self.channel = channel self.requestId = requestId def __handleConnection__(self, result): log.msg("INVOKE RESPONSE " + str(self.requestId)) self.channel.send(InvokeResponse(self.requestId, result)) def __handleError__(self, failure): self.channel.send(InvokeResponse(self.requestId, failure, True)) def connect(self, url): client.getPage(url).addCallback(self.__handleConnection__).addErrback(self.__handleError__) if __name__ == "__main__": log.startLogging(sys.stdout) log.msg("test starting Solution") log.msg("test starting now") """ responses are send on a channel shared by Twisted and the ToyProcessor """ responseChannel = stackless.channel() processor = ToyProcessor(responseChannel) MyRequestHandler.responseChannel = responseChannel MyRequestHandler.processor = processor #make all Process instances inherit a processor #should create an abstract Process Process.processor = processor AlarmProcess.processor = processor """ okay start the processor """ stackless.tasklet(processor.execute)() """ okay, let us run some processes """ for i in range(0,1000): processor.createProcess(Process, "/" + str(i), "message" + str(i)) processor.createProcess(AlarmProcess, 10) """ and create a HTTP server """ reactor.listenTCP(8000, MyHttpFactory()) stackless.tasklet(twistedReactor)() while (stackless.getruncount() > 1): stackless.schedule() log.msg("programme has abnormally terminated?")
#!/usr/bin/env python """ BadProcessor.py Andrew Francis January 31th, 2009 Track down error in Python 2.6.1/Twisted 8.2 This version should work <song> Wrapped Around My Finger -The Police </song> """ import stackless import time import sys from twisted.internet import defer from twisted.python.failure import Failure from twisted.internet import reactor from twisted.web import client from twisted.web import http from twisted.python import log, logfile from twisted.internet import task from BufferedChannel import BufferedChannel REQUEST_ERROR = -1 REPLY_ERROR = -1 OKAY = 200 MAX_PROCESSES = 2 __DEBUG__ = False def tick(): while(True): print "tick" stackless.schedule() class BadProcessor(object): def __init__(self): return def startNetworking(self): """ run the Twisted Reactor in its own tasklet also run a task that makes the Twisted Reactor yield to the Stackless scheduler """ l = task.LoopingCall(stackless.schedule) l.start(.01) reactor.run() if __name__ == "__main__": processor = BadProcessor() stackless.tasklet(processor.startNetworking)() stackless.tasklet(tick)() stackless.run()
#!/usr/bin/env python """ GoodBPELProcessor.py Andrew Francis January 31th, 2009 Track down error in Python 2.6.1/Twisted 8.2 This version should work <song> Hardcore UFOs -Guided By Voices </song> """ import stackless import time import sys from twisted.internet import defer from twisted.python.failure import Failure from twisted.internet import reactor from twisted.web import client from twisted.web import http from twisted.python import log, logfile from twisted.internet import task from BufferedChannel import BufferedChannel REQUEST_ERROR = -1 REPLY_ERROR = -1 OKAY = 200 MAX_PROCESSES = 2 __DEBUG__ = False class GoodProcessor(object): def __init__(self): return def tick(self): print "tick" stackless.schedule() def startNetworking(self): """ run the Twisted Reactor in its own tasklet also run a task that makes the Twisted Reactor yield to the Stackless scheduler """ l = task.LoopingCall(self.tick) l.start(.01) reactor.run() if __name__ == "__main__": processor = GoodProcessor() stackless.tasklet(processor.startNetworking)() stackless.run()
_______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python