Hi Andrew,
the reason why the program doesn't fail (and cannot fail, with the current
architecture) is that the partitioned state is dynamic/lazy. For example,
the count trigger might have a partitioned state called "count" that it
uses to keep track of the count. The time trigger requires no state but
simply reacts to watermark/processing time progress. The system doesn't
know the names and types of state that user functions or internal operators
will access. And the types/names might even change over the runtime of a
program.

If you restore with one window type the state for the other type will
simply sit around, not being queried.

Cheers,
Aljoscha

On Mon, 16 May 2016 at 17:42 Andrew Whitaker <
andrew.whita...@braintreepayments.com> wrote:

> Thanks Ufuk.
>
> Thanks for explaining. The reasons behind the savepoint being restored
> successfully kind of make sense, but it seems like the window type (count
> vs time) should be taken into account when restoring savepoints. I don't
> actually see anyone doing this, but I would expect flink to complain about
> changing windowing semantics between program versions.
>
> On Sat, May 14, 2016 at 3:34 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> For a WindowedStream the uid would be set on the result of the
>> apply/reduce/fold call. The WindowedStream itself does not represent an
>> operation.
>>
>> On Fri, 13 May 2016 at 00:20 Ufuk Celebi <u...@apache.org> wrote:
>>
>>> On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
>>> <andrew.whita...@braintreepayments.com> wrote:
>>> > From what I've observed, most of the time when Flink can't successfully
>>> > restore a checkpoint it throws an exception saying as much. I was
>>> expecting
>>> > to see that behavior here. Could someone explain why this "works" (as
>>> in,
>>> > flink accepts the program with the savepoint from the first version of
>>> the
>>> > program), and if this is a bug?
>>>
>>> Hey Andrew! Thanks for reporting this.
>>>
>>> Flink generates operator IDs and uses these to map the state back to
>>> the same operator when restoring from a savepoint. We want these IDs
>>> to stay the same as long as the program does not change.
>>>
>>> The ID can either be generated automatically by Flink or manually by the
>>> user.
>>>
>>> The automatically generated ID is based on certain topology attributes
>>> like parallelism, operator placement, etc. If the attribute changes,
>>> the operator ID changes and you can't map the savepoint state back. If
>>> it stays the same, we assume that the program has not changed.
>>>
>>> The problem in your example is that to Flink both programs look the
>>> same with respect to how the IDs are generated: the topology didn't
>>> change and both the time and count window are executed by the
>>> WindowOperator with an InternalWindowFunction.
>>>
>>> The recommended way to work with savepoints is to skip the automatic
>>> IDs altogether and assign the IDs manually instead. You can do this
>>> via the "uid(String)" method of each operator, which gives you
>>> fine-grained control over the "versioning" of state:
>>>
>>> env.addSource(..).uid("my-source")
>>>
>>> vs.
>>>
>>> env.addSource(..).uid("my-source-2")
>>>
>>> The problem I've just noticed is that you can't specify this on
>>> WindowedStreams, but only on DataStreams, which is clearly a bug.
>>> Furthermore, it might be a good idea to special case windows when
>>> automatically generating the IDs.
>>>
>>> I hope this helps a little with understanding the core problem. If you
>>> have further questions, feel free to ask. I will make sure to fix this
>>> soon.
>>>
>>> – Ufuk
>>>
>>
>
>
> --
> Andrew Whitaker | andrew.whita...@braintreepayments.com
> --
> Note: this information is confidential. It is prohibited to share, post
> online or otherwise publicize without Braintree's prior written consent.
>

Reply via email to