A lot of this makes sense, but I am not sure about renaming
"OperatorState". The other name is nicer, but why make users' life hard
just for a name?


On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <m...@apache.org> wrote:

> Hi Aljoscha,
>
> Thanks for the informative technical description.
>
> >  - function state: this is the state that you get when a user function
> implements the Checkpointed interface. it is not partitioned
> >  - operator state: This is the state that a StreamOperator can snapshot,
> it is similar to the function state, but for operators. it is not
> partitioned
> > - partitioned state: state that is scoped to the key of the incoming
> element, in Flink, this is (confusingly) called OperatorState and KvState
> (internally)
>
> Let's clean that up! Let's rename the OperatorState interface to KvState.
>
> > Both stream operators and user functions can have partitioned state, and
> the namespace is the same, i.e. the state can clash. The partitioned state
> will stay indefinitely if not manually cleared.
>
> I suppose operators currently have to take care to use a unique
> identifier for the state such that it doesn't clash with the user
> function. Wouldn't be too hard to introduce a scoping here.
>
> Your proposal makes sense. It seems like this is a rather delicate
> change which improves the flexibility of the streaming API. What is
> the motivation behind this? I suppose you are thinking of improvements
> to the session capabilities of the streaming API.
>
> > If we want to also implement the current WindowOperator on top of these
> generic facilities we need to have a way to scope state not only by key but
> also by windows (or better, some generic state scope).
>
> This is currently handled by the WindowOperator itself and would then
> be delegated to the enhanced state interface? Makes sense if we want
> to make use of the new state interface. Again, is it just cleaner or
> does this enable new type of applications?
>
> Cheers,
> Max
>
> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > Hi All,
> > I want to discuss some ideas about improving the primitives/operations
> that Flink offers for user-state, timers and windows and how these concepts
> can be unified.
> >
> > It has come up a lot lately that people have very specific requirements
> regarding the state that they keep and it seems necessary to allows users
> to set their own custom timers (on processing time and watermark time
> (event-time)) to do both expiration of state and implementation of custom
> windowing semantics. While we’re at this, we might also think about
> cleaning up the state handling a bit.
> >
> > Let me first describe the status quo, so that we’re all on the same
> page. There are three types of state:
> >  - function state: this is the state that you get when a user function
> implements the Checkpointed interface. it is not partitioned
> >  - operator state: This is the state that a StreamOperator can snapshot,
> it is similar to the function state, but for operators. it is not
> partitioned
> >  - partitioned state: state that is scoped to the key of the incoming
> element, in Flink, this is (confusingly) called OperatorState and KvState
> (internally)
> >
> > (Operator is the low-level concept, user functions are usually invoked
> by the operator, for example StreamMap is the operator that handles a
> MapFunction.)
> >
> > Function state and operator state is not partitioned, meaning that it
> becomes difficult when we want to implement dynamic scale-in/scale-out.
> With partitioned state it can be redistributed when changing the degree of
> parallelism.
> >
> > Both stream operators and user functions can have partitioned state, and
> the namespace is the same, i.e. the state can clash. The partitioned state
> will stay indefinitely if not manually cleared.
> >
> > On to timers, operators can register processing-time callbacks, they can
> react to watermarks to implement event-time callbacks. They have to
> implement the logic themselves, however. For example, the WindowOperator
> has custom code to keep track of watermark timers and for reacting to
> watermarks. User functions have no way of registering timers. Also, timers
> are not scoped to any key. So if you register a timer while processing an
> element of a certain key, when the timer fires you don’t know what key was
> active when registering the timer. This might be necessary for cleaning up
> state for certain keys, or to trigger processing for a certain key only,
> for example with session windows of some kind.
> >
> > Now, on to new stuff. I propose to add a timer facility that can be used
> by both operators and user functions. Both partitioned state and timers
> should be aware of keys and if a timer fires the partitioned state should
> be scoped to the same key that was active when the timer was registered.
> >
> > One last bit. If we want to also implement the current WindowOperator on
> top of these generic facilities we need to have a way to scope state not
> only by key but also by windows (or better, some generic state scope). The
> reason is, that one key can have several active windows at one point in
> time and firing timers need to me mapped to the correct window (for
> example, for sliding windows, or session windows or what have you…).
> >
> > Happy discussing. :D
> >
> > Cheers,
> > Aljoscha
> >
> >
>

Reply via email to