Thanks Ufuk. Thanks for explaining. The reasons behind the savepoint being restored successfully kind of make sense, but it seems like the window type (count vs time) should be taken into account when restoring savepoints. I don't actually see anyone doing this, but I would expect flink to complain about changing windowing semantics between program versions.
On Sat, May 14, 2016 at 3:34 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > For a WindowedStream the uid would be set on the result of the > apply/reduce/fold call. The WindowedStream itself does not represent an > operation. > > On Fri, 13 May 2016 at 00:20 Ufuk Celebi <u...@apache.org> wrote: > >> On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker >> <andrew.whita...@braintreepayments.com> wrote: >> > From what I've observed, most of the time when Flink can't successfully >> > restore a checkpoint it throws an exception saying as much. I was >> expecting >> > to see that behavior here. Could someone explain why this "works" (as >> in, >> > flink accepts the program with the savepoint from the first version of >> the >> > program), and if this is a bug? >> >> Hey Andrew! Thanks for reporting this. >> >> Flink generates operator IDs and uses these to map the state back to >> the same operator when restoring from a savepoint. We want these IDs >> to stay the same as long as the program does not change. >> >> The ID can either be generated automatically by Flink or manually by the >> user. >> >> The automatically generated ID is based on certain topology attributes >> like parallelism, operator placement, etc. If the attribute changes, >> the operator ID changes and you can't map the savepoint state back. If >> it stays the same, we assume that the program has not changed. >> >> The problem in your example is that to Flink both programs look the >> same with respect to how the IDs are generated: the topology didn't >> change and both the time and count window are executed by the >> WindowOperator with an InternalWindowFunction. >> >> The recommended way to work with savepoints is to skip the automatic >> IDs altogether and assign the IDs manually instead. You can do this >> via the "uid(String)" method of each operator, which gives you >> fine-grained control over the "versioning" of state: >> >> env.addSource(..).uid("my-source") >> >> vs. >> >> env.addSource(..).uid("my-source-2") >> >> The problem I've just noticed is that you can't specify this on >> WindowedStreams, but only on DataStreams, which is clearly a bug. >> Furthermore, it might be a good idea to special case windows when >> automatically generating the IDs. >> >> I hope this helps a little with understanding the core problem. If you >> have further questions, feel free to ask. I will make sure to fix this >> soon. >> >> – Ufuk >> > -- Andrew Whitaker | andrew.whita...@braintreepayments.com -- Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.