Hi Andrew, the reason why the program doesn't fail (and cannot fail, with the current architecture) is that the partitioned state is dynamic/lazy. For example, the count trigger might have a partitioned state called "count" that it uses to keep track of the count. The time trigger requires no state but simply reacts to watermark/processing time progress. The system doesn't know the names and types of state that user functions or internal operators will access. And the types/names might even change over the runtime of a program.
If you restore with one window type the state for the other type will simply sit around, not being queried. Cheers, Aljoscha On Mon, 16 May 2016 at 17:42 Andrew Whitaker < andrew.whita...@braintreepayments.com> wrote: > Thanks Ufuk. > > Thanks for explaining. The reasons behind the savepoint being restored > successfully kind of make sense, but it seems like the window type (count > vs time) should be taken into account when restoring savepoints. I don't > actually see anyone doing this, but I would expect flink to complain about > changing windowing semantics between program versions. > > On Sat, May 14, 2016 at 3:34 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> For a WindowedStream the uid would be set on the result of the >> apply/reduce/fold call. The WindowedStream itself does not represent an >> operation. >> >> On Fri, 13 May 2016 at 00:20 Ufuk Celebi <u...@apache.org> wrote: >> >>> On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker >>> <andrew.whita...@braintreepayments.com> wrote: >>> > From what I've observed, most of the time when Flink can't successfully >>> > restore a checkpoint it throws an exception saying as much. I was >>> expecting >>> > to see that behavior here. Could someone explain why this "works" (as >>> in, >>> > flink accepts the program with the savepoint from the first version of >>> the >>> > program), and if this is a bug? >>> >>> Hey Andrew! Thanks for reporting this. >>> >>> Flink generates operator IDs and uses these to map the state back to >>> the same operator when restoring from a savepoint. We want these IDs >>> to stay the same as long as the program does not change. >>> >>> The ID can either be generated automatically by Flink or manually by the >>> user. >>> >>> The automatically generated ID is based on certain topology attributes >>> like parallelism, operator placement, etc. If the attribute changes, >>> the operator ID changes and you can't map the savepoint state back. If >>> it stays the same, we assume that the program has not changed. >>> >>> The problem in your example is that to Flink both programs look the >>> same with respect to how the IDs are generated: the topology didn't >>> change and both the time and count window are executed by the >>> WindowOperator with an InternalWindowFunction. >>> >>> The recommended way to work with savepoints is to skip the automatic >>> IDs altogether and assign the IDs manually instead. You can do this >>> via the "uid(String)" method of each operator, which gives you >>> fine-grained control over the "versioning" of state: >>> >>> env.addSource(..).uid("my-source") >>> >>> vs. >>> >>> env.addSource(..).uid("my-source-2") >>> >>> The problem I've just noticed is that you can't specify this on >>> WindowedStreams, but only on DataStreams, which is clearly a bug. >>> Furthermore, it might be a good idea to special case windows when >>> automatically generating the IDs. >>> >>> I hope this helps a little with understanding the core problem. If you >>> have further questions, feel free to ask. I will make sure to fix this >>> soon. >>> >>> – Ufuk >>> >> > > > -- > Andrew Whitaker | andrew.whita...@braintreepayments.com > -- > Note: this information is confidential. It is prohibited to share, post > online or otherwise publicize without Braintree's prior written consent. >