On Fri, Feb 19, 2016 at 10:18 AM, Joseph L. Casale <jcas...@activenetwerx.com> wrote: >> It's still not clear to me specifically what you're trying to do. It >> would really help if you would describe the problem in more detail. >> Here's what I think you're trying to do: >> >> 1) Submit a task to a ThreadPoolExecutor and get back a future. >> >> 2) When the task is complete, submit another task that needs the >> result of the first task to do its work. >> >> 3) When the chained task is complete, return the final result of the >> chained task back to whoever submitted the original task via the >> original task's future. >> >> The problem arises in that the original task's future already >> completed when the original task did. The chained task sets its result >> in a different future that the submitter didn't know about. > > Yes, I may have 2 or more tasks that depend on the previous. > They are each distinct tasks, or continuations, so they each want the result > of the previous task however each one can cancel the set. > > The callback model doesn't apply, its based on the result of _one_ task. > > What I need I are continuations, much like the .NET TPL. > > def task_a(): > return "a" > > def task_b(): > return "b" > > def task_c(): > return "c" > > So I submit task_a, if it completes successfully, task_b runs on its result. > If task_b completes successfully, task_c runs on its result. They "look" > like callbacks, but they are continuations. > > Task_b must be able to cancel the continuation of task_c if it see's task_a > has failed.
Thanks for the clarification. The first suggestion that I gave in my initial reply will handle cancellations implicitly. If task_b() is waiting on future_a.result() and task_a() raises an uncaught exception, then future_a.result() will reraise the exception. If it then propagates uncaught out of task_b() then future_b.result() will again reraise the exception, and task_c() that is waiting on it will reraise it as well. See this in action below. You might also be interested in the promise decorator described at https://bloerg.net/2013/04/05/chaining-python-futures.html. It does essentially the same as what I've described, but it abstracts it into a decorator and allows the caller to determine which arguments to pass as futures and which to pass as concrete values, instead of hard-coding the resolution of futures of dependencies into the task function itself. In the course of digging that up, I also found this: https://github.com/dvdotsenko/python-future-then which is brand-new and looks promising, but I can't see how one would use it with a ThreadPoolExecutor. py> import concurrent.futures py> def task_a(): ... raise ValueError(42) ... py> def task_b(future_a): ... return future_a.result() ... py> def task_c(future_b): ... return future_b.result() ... py> executor = concurrent.futures.ThreadPoolExecutor() py> future_a = executor.submit(task_a) py> future_b = executor.submit(task_b, future_a) py> future_c = executor.submit(task_c, future_b) py> future_c.result() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 398, in result return self.__get_result() File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result raise self._exception File "/usr/local/lib/python3.5/concurrent/futures/thread.py", line 55, in run result = self.fn(*self.args, **self.kwargs) File "<stdin>", line 2, in task_c File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 398, in result return self.__get_result() File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result raise self._exception File "/usr/local/lib/python3.5/concurrent/futures/thread.py", line 55, in run result = self.fn(*self.args, **self.kwargs) File "<stdin>", line 2, in task_b File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 398, in result return self.__get_result() File "/usr/local/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result raise self._exception File "/usr/local/lib/python3.5/concurrent/futures/thread.py", line 55, in run result = self.fn(*self.args, **self.kwargs) File "<stdin>", line 2, in task_a ValueError: 42 -- https://mail.python.org/mailman/listinfo/python-list