I think it should circumvent many of the problems. If you re-submit the program from the client with a lower DOP, all intermediae state (saved by you in files) will be re-partitioned, because no assumptions about prior properties are made.
I may not be getting the whole picture here, so sorry, if I am not hitting the point, actually... On Wed, Apr 22, 2015 at 9:44 AM, Markus Holzemer < holzemer.mar...@googlemail.com> wrote: > Hi Stephan, > thanks for your answer! I thought about this, too, but I am not sure if > this solves to problem. > I think it works if there is only a temporary failure an the job can be > restartet with the same dop. > But it does not work if one node is permanently lost and the dop needs to > be adjusted. > If the iteration expects a sorted or hashed input, I can not just read from > a previous stored checkpoint with a lower dop because this property will be > lost. > I don't see how moving the computation from the JobManager to the driver > solves this problem. > > best, > Markus > > 2015-04-21 19:36 GMT+02:00 Stephan Ewen <se...@apache.org>: > > > Hi Markus! > > > > I see your point. My first guess would be that it would be simpler to do > > this logic in the driver program, rather > > than inside the JobManager. If the checkpoints are all written and the > job > > fails, you check what was the latest completed > > checkpoint (by file) and then start the program again with the source > that > > refers to those files. > > > > That way, you go through the proper stack (optimizer and jobgraph > > generator) that inserts all the necessary partition and > > sort operations. > > > > Greetings, > > Stephan > > > > > > > > On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer < > > holzemer.mar...@googlemail.com> wrote: > > > > > Hi everybody, > > > > > > I am writing my master thesis about making flink iterations / iterative > > > flink algorithms fault tolerant. > > > The first approach I implemented is a basic checkpointing, where every > N > > > iterations the current state is saved into HDFS. > > > To do this I enabled data sinks inside of iterations, then attached a > new > > > checkpointing sink to the beginning of each iteration. To recover from > a > > > previous checkpoint I cancel all tasks, add a new datasource in front > of > > > the iteration and reschedule the tasks with lower dop. I do this out of > > the > > > JobManager during runtime without starting a new job. > > > The problem is that sometimes the input data to the iteration has some > > > properties like a certain partitioning or sorting, and I am struggeling > > > with reconstructing theses properties from the checkpoint source. > > > I figured that an easier way to do this is to re-optimize the new plan > > > (with the new source as input to the iteration) before the > rescheduling. > > > But in the current project structure flink-runtime has no access to > > > flink-optimizer and it would be a major design break to change this. > > > Has somebody any advice on this? > > > > > > best, > > > Markus > > > > > >