Hi Colleagues:

I recently updated to Stackless 2.6.1 and Twisted 8.2. I executed the following 
programme that works fine under Stackless 2.5.2 and Twisted (including 8.2) and 
I received the following error:

traceback (most recent call last):
          File "ToyProcessor5.py", line 60, in twistedReactor
            reactor.run()
          File "/usr/local/lib/python2.6/site-packages/twisted/internet/base.py"
, line 1048, in run
            self.mainLoop()
        --- <exception caught here> ---
          File "/usr/local/lib/python2.6/site-packages/twisted/internet/base.py"
, line 1057, in mainLoop
            self.runUntilCurrent()
          File "/usr/local/lib/python2.6/site-packages/twisted/internet/base.py"
, line 707, in runUntilCurrent
            log.deferr()
          File "/usr/local/lib/python2.6/site-packages/twisted/python/log.py", l
ine 153, in err
            _stuff = failure.Failure()
          File "/usr/local/lib/python2.6/site-packages/twisted/python/failure.py
", line 265, in __init__
            parentCs = reflect.allYourBase(self.type)
", line 542, in allYourBase
            accumulateBases(classObj, l, baseClass)
          File 
"/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550, 
in accumulateBases
            accumulateBases(base, l, baseClass)
          File 
"/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550, 
in accumulateBases
            accumulateBases(base, l, baseClass)
          File 
"/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550, 
in accumulateBases
            accumulateBases(base, l, baseClass)
          File 
"/usr/local/lib/python2.6/site-packages/twisted/python/reflect.py", line 550, 
in accumulateBases
            accumulateBases(base, l, baseClass)
        exceptions.RuntimeError: maximum recursion depth exceeded

Similarily written programmes fail in the same fashion. What is suspicious is 
that there is no recursion in the offending section. Even if I create one 
worker tasklet, I get the same error.

def twistedReactor():
    l = task.LoopingCall(stackless.schedule)
    l.start(.01)
    reactor.run()

however, if I change

l = task.LoopingCall(stackless.schedule)

to

l = task.LoopingCall(tick)

and tick is

def tick():
    stackless.schedule()

the programme works. Although a work around, I would like to find the real 
problem.

What particular worries me is when I created small test examples, I was not  
able to recreate the problem. Something else is going on....

What I would appreciate is some hints as to what may be happening.  

- I normally don't expect "Maximum Recursion Depth" errors in Stackless. 
- What is the reflect.allYourBases stuff? 
- What is log.deferr? (I don't recall seeing that method).

If I get few clues, it would make it easier for me to write new tests and zero 
in on the problem. Hopefully the problem is with my code rather than Stackless 
2.6.1 and/or Twisted 8.2

I have included some sample code. Unfortunately ToyProcessor5.py is a bit large 
(I have newer code that is smaller but requires more files)

Cheers,
Andrew












      
#!/usr/bin/env python

"""
ToyProcessor.py
Andrew Francis
April 3rd, 2008

The purpose of this programme is to illustrate the techniques
used in the presentation "Adventures in Stackless Python / Twisted Integration"

<song>Dead Letter and the Infinite - Wintersleep </song> 
"""


import stackless
import time
import sys

from twisted.internet.defer                           import Deferred
from twisted.python.failure                           import Failure
from twisted.internet                                 import reactor
from twisted.web                                      import client
from twisted.web                                      import http
from twisted.python                                   import log, logfile
from twisted.internet                                 import task

RECEIVE_RESPONSE = 0
REPLY_RESPONSE = 1
INVOKE_RESPONSE = 2
WAIT_RESPONSE = 3

REQUEST_ERROR = -1
REPLY_ERROR = -1


      
__DEBUG__ = False


class CorrelationNotFound:
    def __init__(self, data):
        self.data = data


class Request(object):
    pass


class Response(object):
    pass


"""
run the Twisted Reactor in its own tasklet
also run a task that makes the Twisted Reactor yield to the Stackless scheduler
"""
def twistedReactor():
    l = task.LoopingCall(stackless.schedule)
    l.start(.01)
    reactor.run()


class InvokeResponse(Response):
    def __init__(self, requestId, message, isError = False):
        self.type = INVOKE_RESPONSE
        self.requestId = requestId
        self.message = message
        self.isError = isError
        return


class ReceiveResponse(Response):
    def __init__(self, path, message, channel):
        self.type = RECEIVE_RESPONSE
        self.path = path
        self.message = message
        self.channel = channel
        return


class ReplyRequest(Request):
    def __init__(self, requestId, message, isError = False):
        self.requestId = requestId
        self.message = message
        self.isError = isError


class ReplyResponse(Response):
    def __init__(self, requestId, isError = False):
        self.type = REPLY_RESPONSE
        self.requestId = requestId
        self.isError = isError
        return


class WaitResponse(Response):
    def __init__(self, requestId):
        self.type = WAIT_RESPONSE
        self.requestId = requestId
        return


"""
Process corresponds to a WS-BPEL process
"""
class Process(object):
    def __init__(self, url, messageExchange):
        self.url = url
        self.messageExchange = messageExchange
        self.scopeId = Process.processor.__generateScope__()
        self.replyMessage = "<html><head></head><body>hello world from process " + str(self.scopeId) + "</body></html>"
        return


    def execute(self):
        log.msg("Process " + str(self.scopeId) + " started")
        self.processor.receive(self.scopeId, self.url, self.messageExchange, self.__class__)
        #result = self.processor.invoke("http://localhost";)
        self.processor.reply(self.scopeId, self.messageExchange, self.replyMessage)
        return        

    
class AlarmProcess(object):
    def __init__(self, period):
        self.period = period
        self.scopeId = Process.processor.__generateScope__()
        return


    def execute(self):
        log.msg("Process " + str(self.scopeId) + " started")
        while (1):
            self.processor.wait(self.scopeId, self.period)
            #result = self.processor.invoke("http://localhost";)
            log.msg("Tick")
        return      

"""
Correlations are used to match incoming messages to the corresponding receive 
activities
"""

class Correlation(object):
    def __init__(self, path, messageExchange, requestId):
        self.path = path
        self.messageExchange = messageExchange
        self.requestId = requestId
        self.aChannel = None
        return


    def __eq__(self, other):
        return self.path == other


    def __repr__(self):
        return self.path + " " + "requestId: " + str(self.requestId) + " " + self.messageExchange


    def getChannel(self):
        return self.aChannel


    def setChannel(self, channel):
        self.aChannel = channel

    channel = property(getChannel, setChannel)


class ToyProcessor(object):
    def __init__(self, channel):
        self.responseChannel = channel
        self.requestId = -1
        self.scopeId = 0
        self.requestTable = {}
        self.correlations = []
        self.messageExchangeStates = {}
        self.flag = True
        return


    """
    level 0 calls
    """

    def createProcess(self, factory, *args,**kwargs):
        stackless.tasklet(factory(*args, **kwargs).execute)()


    def invoke(self, url):
        return self.__processRequest__(self.__invokeRequest__, url, self.responseChannel)


    def receive(self, eventScope, url, messageExchange, factory):
        result = self.__processRequest__(self.__receiveRequest__, eventScope, url, messageExchange)

        # and create a new daemon
        if factory != None:
            log.msg("creating replacement daemon")
            self.createProcess(factory, url, messageExchange)
        return result

    def reply(self, eventScope, messageExchange, message):
        self.__processRequest__(self.__replyRequest__, eventScope, messageExchange, message)


    def wait(self, eventScope, period):
        self.__processRequest__(self.__waitRequest__, eventScope, period)

    """
    level 1 calls
    """

    def __rewrite__(self, scope, messageExchange):
        return str(scope) + ":" + messageExchange


    def __processRequest__(self, callable, *args, **kwargs):
        channel = stackless.channel()
        requestId = self.__addRequest__(channel)
        callable(requestId, *args, **kwargs)
        result = channel.receive()
        self.__deleteRequest__(requestId)
        return result


    def __processResponse__(self, response):
        try:
            if response.type == RECEIVE_RESPONSE:
                requestId, message = self.__receiveResponse__(response)
            elif response.type == REPLY_RESPONSE:
                requestId, message = self.__replyResponse__(response)
            elif response.type == INVOKE_RESPONSE:
                requestId, message = self.__invokeResponse__(response)
            elif response.type == WAIT_RESPONSE:
                requestId, message = self.__waitResponse__(response)
            else:
                log.message("something funny has happened " + str(requestId))
                print response
                reactor.stop

            if requestId != REQUEST_ERROR:
                self.requestTable[requestId].send(message)
        except:
            log.msg(sys.exc_info())


    def __generateScope__(self):
        self.scopeId += 1
        return self.scopeId


    def __addRequest__(self, channel):
        self.requestId = self.requestId + 1
        self.requestTable[self.requestId] = channel
        return self.requestId


    def __deleteRequest__(self, requestId):
        del self.requestTable[requestId]
        return     


    def __invokeRequest__(self, requestId, url, responseChannel):
        log.msg("INVOKE_REQUEST Started " + str(requestId) + " " + url)
        Connection(requestId, responseChannel).connect(url)
        log.msg("INVOKE_REQUEST Finished " + str(requestId))


    def __invokeResponse__(self, response):
        log.msg("INVOKE_RESPONSE Started " + str(response.requestId))
        return response.requestId, response.message


    def __receiveRequest__(self, requestId, scope, path, messageExchange):
        log.msg("__RECEIVE_REQUEST___ STARTED " + str(requestId) + " pid:" + str(scope))
        self.__addCorrelation__(requestId, path, self.__rewrite__(scope, messageExchange))
        log.msg("__RECEIVE_REQUEST__ FINISHED__" + str(requestId) + " pid:" + str(scope))


    def __receiveResponse__(self, response):
        log.msg("RECEIVE_RESPONSE STARTED")
        try:
            correlation = self.__removeCorrelation__(response.path)
            self.messageExchangeStates[correlation.messageExchange] = response.channel

        except CorrelationNotFound, E:
            response.channel.send(ReplyRequest(None,"<html><head><body>Path not found</body></html>",True))
            return REQUEST_ERROR, None
        else:                          
            log.msg("RECEIVE_RESPONSE ENDED " + str(correlation.requestId))
            return correlation.requestId, response.message


    def __replyRequest__(self, requestId, scope, messageExchange, message):
        log.msg("REPLY_REQUEST STARTED")
        newMessageExchange = self.__rewrite__(scope, messageExchange)
        channel = self.messageExchangeStates[newMessageExchange]
        del self.messageExchangeStates[newMessageExchange]
        log.msg("REPLY_REQUEST SENDING MESSAGE")
        channel.send(ReplyRequest(requestId, message))
        log.msg("REQUEST_REQUEST FINISHED")
        return


    def __replyResponse__(self, response):
        return response.requestId, None


    def __waitRequest__(self, requestId, scope, period):
        def alarm(requestId):
            self.responseChannel.send(WaitResponse(requestId))
        reactor.callLater(period, alarm, requestId)


    def __waitResponse__(self, response):
        return response.requestId, None


    def __addCorrelation__(self, requestId, path, messageExchange): 
        log.msg("entering __addCorrelation__")
        self.correlations.append(Correlation(path, messageExchange, requestId))
        log.msg("exiting __addCorrelation__")


    def __removeCorrelation__(self, path):
        try:
            log.msg("Looking for correlation =>" + path)
            i = self.correlations.index(path)
            print "found =>", self.correlations[i]
        except ValueError:
            raise CorrelationNotFound(path)
        return self.correlations.pop(i)   


    def execute(self):
        while (self.flag):
            log.msg("processor - waiting for message")
            self.__processResponse__(self.responseChannel.receive())

"""
level 2
"""
class MyRequestHandler(http.Request):

    """
    it is necessary to run the body of the request handler in its own tasklet
    as to prevent the entire reactor from blocking on a channel
    """
    def process(self):
        stackless.tasklet(self.doWork)()

    def doWork(self):
        channel = stackless.channel()
        receiveRequest = ReceiveResponse(self.path, self.content.read(), channel)
        MyRequestHandler.responseChannel.send(receiveRequest)
        log.msg("REQUEST HANDLER WAITING ON CHANNEL")
        reply = channel.receive()

        if reply.isError:
            self.setResponseCode(http.NOT_FOUND)
            theReplyId = REPLY_ERROR
        else:
            theReplyId = reply.requestId
            
        self.write(reply.message.encode("utf-8"))
        self.finish()
        
        log.msg("REQUEST HANDLER WRITING REPLY " + str(theReplyId))
        MyRequestHandler.responseChannel.send(ReplyResponse(theReplyId))
        log.msg("REQUEST HANDLER COMPLETED " + str(theReplyId))
        return

    
class MyHttp(http.HTTPChannel):
    requestFactory = MyRequestHandler

    
class MyHttpFactory(http.HTTPFactory):
    protocol = MyHttp    

    
class Connection(object):
    def __init__(self, requestId, channel):
        self.channel = channel
        self.requestId = requestId

    def __handleConnection__(self, result):
        log.msg("INVOKE RESPONSE " + str(self.requestId))
        self.channel.send(InvokeResponse(self.requestId, result))


    def __handleError__(self, failure):
        self.channel.send(InvokeResponse(self.requestId, failure, True))


    def connect(self, url):
        client.getPage(url).addCallback(self.__handleConnection__).addErrback(self.__handleError__)
            
            

if __name__ == "__main__":
    
    log.startLogging(sys.stdout)
    log.msg("test starting Solution")
    log.msg("test starting now")

    """
    responses are send on a channel shared by Twisted and the ToyProcessor
    """
    responseChannel = stackless.channel()
    processor = ToyProcessor(responseChannel)
    MyRequestHandler.responseChannel = responseChannel
    MyRequestHandler.processor = processor

    #make all Process instances inherit a processor
    #should create an abstract Process
    Process.processor = processor
    AlarmProcess.processor = processor

    """
    okay start the processor    
    """
    stackless.tasklet(processor.execute)()

    """
    okay, let us run some processes
    """ 
    
    for i in range(0,1000):
        processor.createProcess(Process, "/" + str(i), "message" + str(i))
            
    processor.createProcess(AlarmProcess, 10)

    
    """
    and create a HTTP server
    """
    reactor.listenTCP(8000, MyHttpFactory())

    stackless.tasklet(twistedReactor)()

    while (stackless.getruncount() > 1):
        stackless.schedule()

    log.msg("programme has abnormally terminated?")
   
#!/usr/bin/env python

"""
BadProcessor.py
Andrew Francis
January 31th, 2009

Track down error in Python 2.6.1/Twisted 8.2
This version should work

<song> Wrapped Around My Finger -The Police </song>
"""


import stackless
import time
import sys

from twisted.internet                                 import defer
from twisted.python.failure                           import Failure
from twisted.internet                                 import reactor
from twisted.web                                      import client
from twisted.web                                      import http
from twisted.python                                   import log, logfile
from twisted.internet                                 import task

from BufferedChannel                                  import BufferedChannel

REQUEST_ERROR = -1
REPLY_ERROR = -1
OKAY = 200

MAX_PROCESSES = 2

__DEBUG__ = False

def tick():
    while(True):
       print "tick"
       stackless.schedule()


class BadProcessor(object):
    def __init__(self):
        return


    def startNetworking(self):
        """
        run the Twisted Reactor in its own tasklet
        also run a task that makes the Twisted Reactor yield to the Stackless 
	scheduler
        """
        l = task.LoopingCall(stackless.schedule)
        l.start(.01)
        reactor.run()


if __name__ == "__main__":
   processor = BadProcessor()
   stackless.tasklet(processor.startNetworking)() 
   stackless.tasklet(tick)()
   stackless.run()
#!/usr/bin/env python

"""
GoodBPELProcessor.py
Andrew Francis
January 31th, 2009

Track down error in Python 2.6.1/Twisted 8.2
This version should work

<song> Hardcore UFOs -Guided By Voices </song>
"""


import stackless
import time
import sys

from twisted.internet                                 import defer
from twisted.python.failure                           import Failure
from twisted.internet                                 import reactor
from twisted.web                                      import client
from twisted.web                                      import http
from twisted.python                                   import log, logfile
from twisted.internet                                 import task

from BufferedChannel                                  import BufferedChannel

REQUEST_ERROR = -1
REPLY_ERROR = -1
OKAY = 200

MAX_PROCESSES = 2

__DEBUG__ = False


class GoodProcessor(object):
    def __init__(self):
        return


    def tick(self):
        print "tick"
        stackless.schedule()


    def startNetworking(self):
        """
        run the Twisted Reactor in its own tasklet
        also run a task that makes the Twisted Reactor yield to the Stackless 
	scheduler
        """
        l = task.LoopingCall(self.tick)
        l.start(.01)
        reactor.run()


if __name__ == "__main__":
   processor = GoodProcessor()
   stackless.tasklet(processor.startNetworking)() 
   stackless.run()
_______________________________________________
Twisted-Python mailing list
Twisted-Python@twistedmatrix.com
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

Reply via email to