> On 09 Sep 2015, at 19:31, Niklas Semmler <nsemm...@inet.tu-berlin.de> wrote: > > Hello Ufuk, > > thanks for you amazingly quick reply. > > I have seen the markFinished in Execution.java, but if I get it right, this > is simply used to stop a task. The ScheduleOrUpdateConsumers message in the > pipeline case on the other hand is notifying the consumers that a pipelined > partition is ready and can now be consumed. Can you give me a hint on how the > receiver is notified of a consumable partition in the batch case?
Yes, this transitions the state of the respective execution to FINISHED. But if you look closely there is a call “finishAllBlockingPartitions”, which schedules the receivers, iff it the last execution finishes the result. You can think about the intermediate data as follows: - On the job graph level you have: (Operator) -> [Result] -> (Operator), e.g. (map) -> [map result] -> (reduce) - At runtime when there are multiple parallel tasks, you have: * [Operator subtask 0] -> [Result partition 0] -> (Operator subtask 0) * [Operator subtask 1] -> [Result partition 1] -> (Operator subtask 1) Now, if the exchange is blocking (in batch execution mode), the result is finished, iff all subtasks producing it have finished. The question now is: when is the result finished? It is finished, iff all subtasks producing it have finished. And that’s where the markFinished comes into play. The last to finish triggers the scheduling. This is non-deterministic, i.e. either subtask 0 or 1 can be the last to finish it. This is kept track of via a simple counter. The task decrementing it to 0 triggers the scheduling. (If the result is pipelined, the first data point triggers the scheduling already via the code paths you have looked at.) Does this help? – Ufuk