On Mar 3, 2011, at 2:39 PM, Glyph Lefkowitz wrote:
> On Mar 3, 2011, at 7:31 AM, Fantix King wrote:
>
> > Hi,
> >
> > I tried to make python.context work in asynchronous code between main
loops. Anyone has similar experience to share please?
> >
> > Not sure if I am rebuilding a wheel :P
> >
> >
http://code.google.com/p/little-site/source/browse/littlesite/custom_reactor.py
>
> This is something I've often thought about doing in Twisted itself,
actually :). But I wasn't sure that chaining context would actually do
anything practically useful most of the time. Have you found that it's
actually useful? Have you managed to leverage this to, for example, get
more informative error messages out of Deferred failures?
>
> Doing it as a subclass like this is not optimal, as it limits you to one
reactor (and the Select reactor is not really the best one). A wrapper
would be slightly more tricky (you'd have to deal with the places that the
reactor passes itself through to things like Process and Port, so you'd have
to create wrappers for those as well) but much more general.
Thanks for replying! :)
Yes! That's a wonderful idea to use this context for asynchronous traceback!
I made
some small changes to the code and wrote a patch for Twisted (as addReader
and
addWriter is quite different from one impl to another, I changed
SelectReactor only.
I haven't got a better idea for this, please advise), please see attachment.
With a simple example of raising exception in deferLater-ed function
(a-b-c-deferLater-d-e-f-g):
from twisted.internet import reactor
from twisted.internet.task import deferLater
reactor.usingAsyncTraceback = True
def g():
raise Exception('Something happened inside.')
def f():
return g()
def e():
return f()
def d():
return e()
def c():
deferred = deferLater(reactor, 1, lambda: None)
deferred.addCallback(lambda x: d())
return deferred
def b():
return c()
def a():
return b()
if __name__ == '__main__':
deferred = a()
def errback(failure):
failure.printTraceback()
deferred.addErrback(errback)
deferred.addBoth(lambda x: reactor.stop())
reactor.run()
I could get this:
Traceback (most recent call last):
File "test.py", line 31, in <module>
deferred = a()
File "test.py", line 28, in a
return b()
File "test.py", line 25, in b
return c()
File "test.py", line 20, in c
deferred = deferLater(reactor, 1, lambda: None)
File "/home/fantix/ac/twisted/internet/task.py", line 751, in deferLater
delayedCall = clock.callLater(delay, d.callback, None)
File "/home/fantix/ac/twisted/internet/base.py", line 701, in callLater
_f, args, kw = self._chainContext(_f, args, kw)
*--- <asynchronous break point> ---*
File "/home/fantix/ac/twisted/python/context.py", line 59, in
callWithContext
return self.currentContext().callWithContext(ctx, func, *args, **kw)
File "/home/fantix/ac/twisted/python/context.py", line 37, in
callWithContext
return func(*args,**kw)
File "/home/fantix/ac/twisted/internet/defer.py", line 361, in callback
self._startRunCallbacks(result)
File "/home/fantix/ac/twisted/internet/defer.py", line 455, in
_startRunCallbacks
self._runCallbacks()
--- <exception caught here> ---
File "/home/fantix/ac/twisted/internet/defer.py", line 542, in
_runCallbacks
current.result = callback(current.result, *args, **kw)
File "test.py", line 21, in <lambda>
deferred.addCallback(lambda x: d())
File "test.py", line 17, in d
return e()
File "test.py", line 14, in e
return f()
File "test.py", line 11, in f
return g()
File "test.py", line 8, in g
raise Exception('Something happened inside.')
Additionally, in my scenario of a 5 years old asynchronous Twisted web
application, we
need the "request" object available throughout all code between asynchronous
network
accesses and database accesses because our global configuration system
needs the
request object. It would greatly reduce our manual work to pass through the
request
object here and there to have a context working in the asynchronous way.
BR,
Fantix.
Index: twisted/python/failure.py
===================================================================
--- twisted/python/failure.py (revision 31014)
+++ twisted/python/failure.py (working copy)
@@ -18,7 +18,7 @@
import opcode
from cStringIO import StringIO
-from twisted.python import reflect
+from twisted.python import reflect, context
count = 0
traceupLength = 4
@@ -286,6 +286,8 @@
else:
self.parents = [self.type]
+ self.async_stacks = context.get('__stacks__', [])[:]
+
def trap(self, *errorTypes):
"""Trap this failure if its type is in a predetermined list.
@@ -520,6 +522,11 @@
else:
w( 'Traceback (most recent call last):\n')
+ # Asynchronous stacks
+ for stack in self.async_stacks:
+ format_frames(stack, w, detail)
+ w('--- <asynchronous break point> ---\n')
+
# Frames, formatted in appropriate style
if self.frames:
if not elideFrameworkCode:
Index: twisted/internet/selectreactor.py
===================================================================
--- twisted/internet/selectreactor.py (revision 31014)
+++ twisted/internet/selectreactor.py (working copy)
@@ -19,7 +19,7 @@
from twisted.internet.interfaces import IReactorFDSet
from twisted.internet import error
from twisted.internet import posixbase
-from twisted.python import log
+from twisted.python import log, context
from twisted.python.runtime import platformType
@@ -137,13 +137,14 @@
if selectable not in fdset:
continue
# This for pausing input when we're not ready for more.
- _logrun(selectable, _drdw, selectable, method, dict)
+ _logrun(selectable, _drdw, selectable, method, dict, fdset)
doIteration = doSelect
- def _doReadOrWrite(self, selectable, method, dict):
+ def _doReadOrWrite(self, selectable, method, dict, fdset):
try:
- why = getattr(selectable, method)()
+ ctx = fdset[selectable]
+ why = context.call(ctx, getattr(selectable, method))
handfn = getattr(selectable, 'fileno', None)
if not handfn:
why = _NO_FILENO
@@ -159,13 +160,15 @@
"""
Add a FileDescriptor for notification of data available to read.
"""
- self._reads[reader] = 1
+ ctx = self._currentContext()
+ self._reads[reader] = ctx
def addWriter(self, writer):
"""
Add a FileDescriptor for notification of data available to write.
"""
- self._writes[writer] = 1
+ ctx = self._currentContext()
+ self._writes[writer] = ctx
def removeReader(self, reader):
"""
Index: twisted/internet/base.py
===================================================================
--- twisted/internet/base.py (revision 31014)
+++ twisted/internet/base.py (working copy)
@@ -11,6 +11,7 @@
import sys
import warnings
+from copy import copy
from heapq import heappush, heappop, heapify
import traceback
@@ -21,7 +22,7 @@
from twisted.internet.interfaces import IResolverSimple, IReactorPluggableResolver
from twisted.internet.interfaces import IConnector, IDelayedCall
from twisted.internet import fdesc, main, error, abstract, defer, threads
-from twisted.python import log, failure, reflect
+from twisted.python import log, failure, reflect, context
from twisted.python.runtime import seconds as runtimeSeconds, platform
from twisted.internet.defer import Deferred, DeferredList
from twisted.persisted import styles
@@ -459,6 +460,8 @@
installed = False
usingThreads = False
resolver = BlockingResolver()
+ usingAsyncContext = True
+ usingAsyncTraceback = False
__name__ = "twisted.internet.reactor"
@@ -695,6 +698,7 @@
assert callable(_f), "%s is not callable" % _f
assert sys.maxint >= _seconds >= 0, \
"%s is not greater than or equal to 0 seconds" % (_seconds,)
+ _f, args, kw = self._chainContext(_f, args, kw)
tple = DelayedCall(self.seconds() + _seconds, _f, args, kw,
self._cancelCallLater,
self._moveCallLaterSooner,
@@ -913,7 +917,7 @@
# lists are thread-safe in CPython, but not in Jython
# this is probably a bug in Jython, but until fixed this code
# won't work in Jython.
- self.threadCallQueue.append((f, args, kw))
+ self.threadCallQueue.append(self._chainContext(f, args, kw))
self.wakeUp()
def _initThreadPool(self):
@@ -975,8 +979,26 @@
def callFromThread(self, f, *args, **kw):
assert callable(f), "%s is not callable" % (f,)
# See comment in the other callFromThread implementation.
- self.threadCallQueue.append((f, args, kw))
+ self.threadCallQueue.append(self._chainContext(f, args, kw))
+ def _currentContext(self):
+ ctx = copy(context.theContextTracker.currentContext().contexts[-1])
+ if self.usingAsyncTraceback:
+ try:
+ raise ZeroDivisionError
+ except:
+ stacks = ctx.get('__stacks__', [])[:]
+ stacks.append(failure.Failure().stack[:-1])
+ ctx['__stacks__'] = stacks
+ return ctx
+
+ def _chainContext(self, f, args, kw):
+ if self.usingAsyncContext:
+ ctx = self._currentContext()
+ return context.call, (ctx, f) + args, kw
+ else:
+ return f, args, kw
+
if platform.supportsThreads():
classImplements(ReactorBase, IReactorThreads)
Index: twisted/trial/test/test_reporter.py
===================================================================
--- twisted/trial/test/test_reporter.py (revision 31014)
+++ twisted/trial/test/test_reporter.py (working copy)
@@ -205,6 +205,10 @@
'Traceback (most recent call last):',
re.compile(r'^\s+File .* in runUntilCurrent'),
re.compile(r'^\s+.*'),
+ re.compile(r'^\s+File .*, in callWithContext'),
+ re.compile(r'^\s+.*'),
+ re.compile(r'^\s+File .*, in callWithContext'),
+ re.compile(r'^\s+.*'),
re.compile('^\s+File .*erroneous\.py", line \d+, in go'),
re.compile('^\s+raise RuntimeError\(self.hiddenExceptionMsg\)'),
'exceptions.RuntimeError: something blew up',
_______________________________________________
Twisted-Python mailing list
[email protected]
http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python