On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
<andrew.whita...@braintreepayments.com> wrote:
> From what I've observed, most of the time when Flink can't successfully
> restore a checkpoint it throws an exception saying as much. I was expecting
> to see that behavior here. Could someone explain why this "works" (as in,
> flink accepts the program with the savepoint from the first version of the
> program), and if this is a bug?

Hey Andrew! Thanks for reporting this.

Flink generates operator IDs and uses these to map the state back to
the same operator when restoring from a savepoint. We want these IDs
to stay the same as long as the program does not change.

The ID can either be generated automatically by Flink or manually by the user.

The automatically generated ID is based on certain topology attributes
like parallelism, operator placement, etc. If the attribute changes,
the operator ID changes and you can't map the savepoint state back. If
it stays the same, we assume that the program has not changed.

The problem in your example is that to Flink both programs look the
same with respect to how the IDs are generated: the topology didn't
change and both the time and count window are executed by the
WindowOperator with an InternalWindowFunction.

The recommended way to work with savepoints is to skip the automatic
IDs altogether and assign the IDs manually instead. You can do this
via the "uid(String)" method of each operator, which gives you
fine-grained control over the "versioning" of state:

env.addSource(..).uid("my-source")

vs.

env.addSource(..).uid("my-source-2")

The problem I've just noticed is that you can't specify this on
WindowedStreams, but only on DataStreams, which is clearly a bug.
Furthermore, it might be a good idea to special case windows when
automatically generating the IDs.

I hope this helps a little with understanding the core problem. If you
have further questions, feel free to ask. I will make sure to fix this
soon.

– Ufuk

Reply via email to