Hi Aljoscha,

Thanks for the informative technical description.

>  - function state: this is the state that you get when a user function 
> implements the Checkpointed interface. it is not partitioned
>  - operator state: This is the state that a StreamOperator can snapshot, it 
> is similar to the function state, but for operators. it is not partitioned
> - partitioned state: state that is scoped to the key of the incoming element, 
> in Flink, this is (confusingly) called OperatorState and KvState (internally)

Let's clean that up! Let's rename the OperatorState interface to KvState.

> Both stream operators and user functions can have partitioned state, and the 
> namespace is the same, i.e. the state can clash. The partitioned state will 
> stay indefinitely if not manually cleared.

I suppose operators currently have to take care to use a unique
identifier for the state such that it doesn't clash with the user
function. Wouldn't be too hard to introduce a scoping here.

Your proposal makes sense. It seems like this is a rather delicate
change which improves the flexibility of the streaming API. What is
the motivation behind this? I suppose you are thinking of improvements
to the session capabilities of the streaming API.

> If we want to also implement the current WindowOperator on top of these 
> generic facilities we need to have a way to scope state not only by key but 
> also by windows (or better, some generic state scope).

This is currently handled by the WindowOperator itself and would then
be delegated to the enhanced state interface? Makes sense if we want
to make use of the new state interface. Again, is it just cleaner or
does this enable new type of applications?

Cheers,
Max

On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> Hi All,
> I want to discuss some ideas about improving the primitives/operations that 
> Flink offers for user-state, timers and windows and how these concepts can be 
> unified.
>
> It has come up a lot lately that people have very specific requirements 
> regarding the state that they keep and it seems necessary to allows users to 
> set their own custom timers (on processing time and watermark time 
> (event-time)) to do both expiration of state and implementation of custom 
> windowing semantics. While we’re at this, we might also think about cleaning 
> up the state handling a bit.
>
> Let me first describe the status quo, so that we’re all on the same page. 
> There are three types of state:
>  - function state: this is the state that you get when a user function 
> implements the Checkpointed interface. it is not partitioned
>  - operator state: This is the state that a StreamOperator can snapshot, it 
> is similar to the function state, but for operators. it is not partitioned
>  - partitioned state: state that is scoped to the key of the incoming 
> element, in Flink, this is (confusingly) called OperatorState and KvState 
> (internally)
>
> (Operator is the low-level concept, user functions are usually invoked by the 
> operator, for example StreamMap is the operator that handles a MapFunction.)
>
> Function state and operator state is not partitioned, meaning that it becomes 
> difficult when we want to implement dynamic scale-in/scale-out. With 
> partitioned state it can be redistributed when changing the degree of 
> parallelism.
>
> Both stream operators and user functions can have partitioned state, and the 
> namespace is the same, i.e. the state can clash. The partitioned state will 
> stay indefinitely if not manually cleared.
>
> On to timers, operators can register processing-time callbacks, they can 
> react to watermarks to implement event-time callbacks. They have to implement 
> the logic themselves, however. For example, the WindowOperator has custom 
> code to keep track of watermark timers and for reacting to watermarks. User 
> functions have no way of registering timers. Also, timers are not scoped to 
> any key. So if you register a timer while processing an element of a certain 
> key, when the timer fires you don’t know what key was active when registering 
> the timer. This might be necessary for cleaning up state for certain keys, or 
> to trigger processing for a certain key only, for example with session 
> windows of some kind.
>
> Now, on to new stuff. I propose to add a timer facility that can be used by 
> both operators and user functions. Both partitioned state and timers should 
> be aware of keys and if a timer fires the partitioned state should be scoped 
> to the same key that was active when the timer was registered.
>
> One last bit. If we want to also implement the current WindowOperator on top 
> of these generic facilities we need to have a way to scope state not only by 
> key but also by windows (or better, some generic state scope). The reason is, 
> that one key can have several active windows at one point in time and firing 
> timers need to me mapped to the correct window (for example, for sliding 
> windows, or session windows or what have you…).
>
> Happy discussing. :D
>
> Cheers,
> Aljoscha
>
>

Reply via email to