I think it should circumvent many of the problems. If you re-submit the
program from the client with
a lower DOP, all intermediae state (saved by you in files) will be
re-partitioned, because no
assumptions about prior properties are made.

I may not be getting the whole picture here, so sorry, if I am not hitting
the point, actually...

On Wed, Apr 22, 2015 at 9:44 AM, Markus Holzemer <
holzemer.mar...@googlemail.com> wrote:

> Hi Stephan,
> thanks for your answer! I thought about this, too, but I am not sure if
> this solves to problem.
> I think it works if there is only a temporary failure an the job can be
> restartet with the same dop.
> But it does not work if one node is permanently lost and the dop needs to
> be adjusted.
> If the iteration expects a sorted or hashed input, I can not just read from
> a previous stored checkpoint with a lower dop because this property will be
> lost.
> I don't see how moving the computation from the JobManager to the driver
> solves this problem.
>
> best,
> Markus
>
> 2015-04-21 19:36 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
> > Hi Markus!
> >
> > I see your point. My first guess would be that it would be simpler to do
> > this logic in the driver program, rather
> > than inside the JobManager. If the checkpoints are all written and the
> job
> > fails, you check what was the latest completed
> > checkpoint (by file) and then start the program again with the source
> that
> > refers to those files.
> >
> > That way, you go through the proper stack (optimizer and jobgraph
> > generator) that inserts all the necessary partition and
> > sort operations.
> >
> > Greetings,
> > Stephan
> >
> >
> >
> > On Tue, Apr 21, 2015 at 8:58 AM, Markus Holzemer <
> > holzemer.mar...@googlemail.com> wrote:
> >
> > > Hi everybody,
> > >
> > > I am writing my master thesis about making flink iterations / iterative
> > > flink algorithms fault tolerant.
> > > The first approach I implemented is a basic checkpointing, where every
> N
> > > iterations the current state is saved into HDFS.
> > > To do this I enabled data sinks inside of iterations, then attached a
> new
> > > checkpointing sink to the beginning of each iteration. To recover from
> a
> > > previous checkpoint I cancel all tasks, add a new datasource in front
> of
> > > the iteration and reschedule the tasks with lower dop. I do this out of
> > the
> > > JobManager during runtime without starting a new job.
> > > The problem is that sometimes the input data to the iteration has some
> > > properties like a certain partitioning or sorting, and I am struggeling
> > > with reconstructing theses properties from the checkpoint source.
> > > I figured that an easier way to do this is to re-optimize the new plan
> > > (with the new source as input to the iteration) before the
> rescheduling.
> > > But in the current project structure flink-runtime has no access to
> > > flink-optimizer and it would be a major design break to change this.
> > > Has somebody any advice on this?
> > >
> > > best,
> > > Markus
> > >
> >
>

Reply via email to