Hi Stephan,
thanks for your answer! I thought about this, too, but I am not sure if
this solves to problem.
I think it works if there is only a temporary failure an the job can be
restartet with the same dop.
But it does not work if one node is permanently lost and the dop needs to
be adjusted.
If the iteration expects a sorted or hashed input, I can not just read from
a previous stored checkpoint with a lower dop because this property will be
lost.
I don't see how moving the computation from the JobManager to the driver
solves this problem.

best,
Markus

2015-04-21 19:36 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Hi Markus!
>
> I see your point. My first guess would be that it would be simpler to do
> this logic in the driver program, rather
> than inside the JobManager. If the checkpoints are all written and the job
> fails, you check what was the latest completed
> checkpoint (by file) and then start the program again with the source that
> refers to those files.
>
> That way, you go through the proper stack (optimizer and jobgraph
> generator) that inserts all the necessary partition and
> sort operations.
>
> Greetings,
> Stephan
>
>
>
> On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer <
> holzemer.mar...@googlemail.com> wrote:
>
> > Hi everybody,
> >
> > I am writing my master thesis about making flink iterations / iterative
> > flink algorithms fault tolerant.
> > The first approach I implemented is a basic checkpointing, where every N
> > iterations the current state is saved into HDFS.
> > To do this I enabled data sinks inside of iterations, then attached a new
> > checkpointing sink to the beginning of each iteration. To recover from a
> > previous checkpoint I cancel all tasks, add a new datasource in front of
> > the iteration and reschedule the tasks with lower dop. I do this out of
> the
> > JobManager during runtime without starting a new job.
> > The problem is that sometimes the input data to the iteration has some
> > properties like a certain partitioning or sorting, and I am struggeling
> > with reconstructing theses properties from the checkpoint source.
> > I figured that an easier way to do this is to re-optimize the new plan
> > (with the new source as input to the iteration) before the rescheduling.
> > But in the current project structure flink-runtime has no access to
> > flink-optimizer and it would be a major design break to change this.
> > Has somebody any advice on this?
> >
> > best,
> > Markus
> >
>

Reply via email to