Hi Markus! I see your point. My first guess would be that it would be simpler to do this logic in the driver program, rather than inside the JobManager. If the checkpoints are all written and the job fails, you check what was the latest completed checkpoint (by file) and then start the program again with the source that refers to those files.
That way, you go through the proper stack (optimizer and jobgraph generator) that inserts all the necessary partition and sort operations. Greetings, Stephan On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer < holzemer.mar...@googlemail.com> wrote: > Hi everybody, > > I am writing my master thesis about making flink iterations / iterative > flink algorithms fault tolerant. > The first approach I implemented is a basic checkpointing, where every N > iterations the current state is saved into HDFS. > To do this I enabled data sinks inside of iterations, then attached a new > checkpointing sink to the beginning of each iteration. To recover from a > previous checkpoint I cancel all tasks, add a new datasource in front of > the iteration and reschedule the tasks with lower dop. I do this out of the > JobManager during runtime without starting a new job. > The problem is that sometimes the input data to the iteration has some > properties like a certain partitioning or sorting, and I am struggeling > with reconstructing theses properties from the checkpoint source. > I figured that an easier way to do this is to re-optimize the new plan > (with the new source as input to the iteration) before the rescheduling. > But in the current project structure flink-runtime has no access to > flink-optimizer and it would be a major design break to change this. > Has somebody any advice on this? > > best, > Markus >