On Mar 3, 2011, at 2:39 PM, Glyph Lefkowitz wrote:
> On Mar 3, 2011, at 7:31 AM, Fantix King wrote:
>
> > Hi,
> >
> > I tried to make python.context work in asynchronous code between main
loops. Anyone has similar experience to share please?
> >
> > Not sure if I am rebuilding a wheel :P
> >
> >
http://code.google.com/p/little-site/source/browse/littlesite/custom_reactor.py
>
> This is something I've often thought about doing in Twisted itself,
actually :).  But I wasn't sure that chaining context would actually do
anything practically useful most of the time.  Have you found that it's
actually useful?  Have you managed to leverage this to, for example, get
more informative error messages out of Deferred failures?
>
> Doing it as a subclass like this is not optimal, as it limits you to one
reactor (and the Select reactor is not really the best one).  A wrapper
would be slightly more tricky (you'd have to deal with the places that the
reactor passes itself through to things like Process and Port, so you'd have
to create wrappers for those as well) but much more general.


Thanks for replying! :)

Yes! That's a wonderful idea to use this context for asynchronous traceback!
I made
some small changes to the code and wrote a patch for Twisted (as addReader
and
addWriter is quite different from one impl to another, I changed
SelectReactor only.
I haven't got a better idea for this, please advise), please see attachment.

With a simple example of raising exception in deferLater-ed function
(a-b-c-deferLater-d-e-f-g):

from twisted.internet import reactor
from twisted.internet.task import deferLater
reactor.usingAsyncTraceback = True

def g():
    raise Exception('Something happened inside.')

def f():
    return g()

def e():
    return f()

def d():
    return e()

def c():
    deferred = deferLater(reactor, 1, lambda: None)
    deferred.addCallback(lambda x: d())
    return deferred

def b():
    return c()

def a():
    return b()

if __name__ == '__main__':
    deferred = a()
    def errback(failure):
        failure.printTraceback()
    deferred.addErrback(errback)
    deferred.addBoth(lambda x: reactor.stop())
    reactor.run()


I could get this:


Traceback (most recent call last):
  File "test.py", line 31, in <module>
    deferred = a()
  File "test.py", line 28, in a
    return b()
  File "test.py", line 25, in b
    return c()
  File "test.py", line 20, in c
    deferred = deferLater(reactor, 1, lambda: None)
  File "/home/fantix/ac/twisted/internet/task.py", line 751, in deferLater
    delayedCall = clock.callLater(delay, d.callback, None)
  File "/home/fantix/ac/twisted/internet/base.py", line 701, in callLater
    _f, args, kw = self._chainContext(_f, args, kw)
*--- <asynchronous break point> ---*
  File "/home/fantix/ac/twisted/python/context.py", line 59, in
callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/fantix/ac/twisted/python/context.py", line 37, in
callWithContext
    return func(*args,**kw)
  File "/home/fantix/ac/twisted/internet/defer.py", line 361, in callback
    self._startRunCallbacks(result)
  File "/home/fantix/ac/twisted/internet/defer.py", line 455, in
_startRunCallbacks
    self._runCallbacks()
--- <exception caught here> ---
  File "/home/fantix/ac/twisted/internet/defer.py", line 542, in
_runCallbacks
    current.result = callback(current.result, *args, **kw)
  File "test.py", line 21, in <lambda>
    deferred.addCallback(lambda x: d())
  File "test.py", line 17, in d
    return e()
  File "test.py", line 14, in e
    return f()
  File "test.py", line 11, in f
    return g()
  File "test.py", line 8, in g
    raise Exception('Something happened inside.')


Additionally, in my scenario of a 5 years old asynchronous Twisted web
application, we
need the "request" object available throughout all code between asynchronous
network
accesses and database accesses because our global configuration system
needs the
request object. It would greatly reduce our manual work to pass through the
request
object here and there to have a context working in the asynchronous way.


BR,
Fantix.
Index: twisted/python/failure.py
===================================================================
--- twisted/python/failure.py	(revision 31014)
+++ twisted/python/failure.py	(working copy)
@@ -18,7 +18,7 @@
 import opcode
 from cStringIO import StringIO
 
-from twisted.python import reflect
+from twisted.python import reflect, context
 
 count = 0
 traceupLength = 4
@@ -286,6 +286,8 @@
         else:
             self.parents = [self.type]
 
+        self.async_stacks = context.get('__stacks__', [])[:]
+
     def trap(self, *errorTypes):
         """Trap this failure if its type is in a predetermined list.
 
@@ -520,6 +522,11 @@
         else:
             w( 'Traceback (most recent call last):\n')
 
+        # Asynchronous stacks
+        for stack in self.async_stacks:
+            format_frames(stack, w, detail)
+            w('--- <asynchronous break point> ---\n')
+
         # Frames, formatted in appropriate style
         if self.frames:
             if not elideFrameworkCode:
Index: twisted/internet/selectreactor.py
===================================================================
--- twisted/internet/selectreactor.py	(revision 31014)
+++ twisted/internet/selectreactor.py	(working copy)
@@ -19,7 +19,7 @@
 from twisted.internet.interfaces import IReactorFDSet
 from twisted.internet import error
 from twisted.internet import posixbase
-from twisted.python import log
+from twisted.python import log, context
 from twisted.python.runtime import platformType
 
 
@@ -137,13 +137,14 @@
                 if selectable not in fdset:
                     continue
                 # This for pausing input when we're not ready for more.
-                _logrun(selectable, _drdw, selectable, method, dict)
+                _logrun(selectable, _drdw, selectable, method, dict, fdset)
 
     doIteration = doSelect
 
-    def _doReadOrWrite(self, selectable, method, dict):
+    def _doReadOrWrite(self, selectable, method, dict, fdset):
         try:
-            why = getattr(selectable, method)()
+            ctx = fdset[selectable]
+            why = context.call(ctx, getattr(selectable, method))
             handfn = getattr(selectable, 'fileno', None)
             if not handfn:
                 why = _NO_FILENO
@@ -159,13 +160,15 @@
         """
         Add a FileDescriptor for notification of data available to read.
         """
-        self._reads[reader] = 1
+        ctx = self._currentContext()
+        self._reads[reader] = ctx
 
     def addWriter(self, writer):
         """
         Add a FileDescriptor for notification of data available to write.
         """
-        self._writes[writer] = 1
+        ctx = self._currentContext()
+        self._writes[writer] = ctx
 
     def removeReader(self, reader):
         """
Index: twisted/internet/base.py
===================================================================
--- twisted/internet/base.py	(revision 31014)
+++ twisted/internet/base.py	(working copy)
@@ -11,6 +11,7 @@
 
 import sys
 import warnings
+from copy import copy
 from heapq import heappush, heappop, heapify
 
 import traceback
@@ -21,7 +22,7 @@
 from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
 from twisted.internet.interfaces import IConnector, IDelayedCall
 from twisted.internet import fdesc, main, error, abstract, defer, threads
-from twisted.python import log, failure, reflect
+from twisted.python import log, failure, reflect, context
 from twisted.python.runtime import seconds as runtimeSeconds, platform
 from twisted.internet.defer import Deferred, DeferredList
 from twisted.persisted import styles
@@ -459,6 +460,8 @@
     installed = False
     usingThreads = False
     resolver = BlockingResolver()
+    usingAsyncContext = True
+    usingAsyncTraceback = False
 
     __name__ = "twisted.internet.reactor"
 
@@ -695,6 +698,7 @@
         assert callable(_f), "%s is not callable" % _f
         assert sys.maxint >= _seconds >= 0, \
                "%s is not greater than or equal to 0 seconds" % (_seconds,)
+        _f, args, kw = self._chainContext(_f, args, kw)
         tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
                            self._cancelCallLater,
                            self._moveCallLaterSooner,
@@ -913,7 +917,7 @@
             # lists are thread-safe in CPython, but not in Jython
             # this is probably a bug in Jython, but until fixed this code
             # won't work in Jython.
-            self.threadCallQueue.append((f, args, kw))
+            self.threadCallQueue.append(self._chainContext(f, args, kw))
             self.wakeUp()
 
         def _initThreadPool(self):
@@ -975,8 +979,26 @@
         def callFromThread(self, f, *args, **kw):
             assert callable(f), "%s is not callable" % (f,)
             # See comment in the other callFromThread implementation.
-            self.threadCallQueue.append((f, args, kw))
+            self.threadCallQueue.append(self._chainContext(f, args, kw))
 
+    def _currentContext(self):
+        ctx = copy(context.theContextTracker.currentContext().contexts[-1])
+        if self.usingAsyncTraceback:
+            try:
+                raise ZeroDivisionError
+            except:
+                stacks = ctx.get('__stacks__', [])[:]
+                stacks.append(failure.Failure().stack[:-1])
+                ctx['__stacks__'] = stacks
+        return ctx
+
+    def _chainContext(self, f, args, kw):
+        if self.usingAsyncContext:
+            ctx = self._currentContext()
+            return context.call, (ctx, f) + args, kw
+        else:
+            return f, args, kw
+
 if platform.supportsThreads():
     classImplements(ReactorBase, IReactorThreads)
 
Index: twisted/trial/test/test_reporter.py
===================================================================
--- twisted/trial/test/test_reporter.py	(revision 31014)
+++ twisted/trial/test/test_reporter.py	(working copy)
@@ -205,6 +205,10 @@
             'Traceback (most recent call last):',
             re.compile(r'^\s+File .* in runUntilCurrent'),
             re.compile(r'^\s+.*'),
+            re.compile(r'^\s+File .*, in callWithContext'),
+            re.compile(r'^\s+.*'),
+            re.compile(r'^\s+File .*, in callWithContext'),
+            re.compile(r'^\s+.*'),
             re.compile('^\s+File .*erroneous\.py", line \d+, in go'),
             re.compile('^\s+raise RuntimeError\(self.hiddenExceptionMsg\)'),
             'exceptions.RuntimeError: something blew up',
_______________________________________________
Twisted-Python mailing list
[email protected]
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python

Reply via email to