A lot of this makes sense, but I am not sure about renaming "OperatorState". The other name is nicer, but why make users' life hard just for a name?
On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels <m...@apache.org> wrote: > Hi Aljoscha, > > Thanks for the informative technical description. > > > - function state: this is the state that you get when a user function > implements the Checkpointed interface. it is not partitioned > > - operator state: This is the state that a StreamOperator can snapshot, > it is similar to the function state, but for operators. it is not > partitioned > > - partitioned state: state that is scoped to the key of the incoming > element, in Flink, this is (confusingly) called OperatorState and KvState > (internally) > > Let's clean that up! Let's rename the OperatorState interface to KvState. > > > Both stream operators and user functions can have partitioned state, and > the namespace is the same, i.e. the state can clash. The partitioned state > will stay indefinitely if not manually cleared. > > I suppose operators currently have to take care to use a unique > identifier for the state such that it doesn't clash with the user > function. Wouldn't be too hard to introduce a scoping here. > > Your proposal makes sense. It seems like this is a rather delicate > change which improves the flexibility of the streaming API. What is > the motivation behind this? I suppose you are thinking of improvements > to the session capabilities of the streaming API. > > > If we want to also implement the current WindowOperator on top of these > generic facilities we need to have a way to scope state not only by key but > also by windows (or better, some generic state scope). > > This is currently handled by the WindowOperator itself and would then > be delegated to the enhanced state interface? Makes sense if we want > to make use of the new state interface. Again, is it just cleaner or > does this enable new type of applications? > > Cheers, > Max > > On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > Hi All, > > I want to discuss some ideas about improving the primitives/operations > that Flink offers for user-state, timers and windows and how these concepts > can be unified. > > > > It has come up a lot lately that people have very specific requirements > regarding the state that they keep and it seems necessary to allows users > to set their own custom timers (on processing time and watermark time > (event-time)) to do both expiration of state and implementation of custom > windowing semantics. While we’re at this, we might also think about > cleaning up the state handling a bit. > > > > Let me first describe the status quo, so that we’re all on the same > page. There are three types of state: > > - function state: this is the state that you get when a user function > implements the Checkpointed interface. it is not partitioned > > - operator state: This is the state that a StreamOperator can snapshot, > it is similar to the function state, but for operators. it is not > partitioned > > - partitioned state: state that is scoped to the key of the incoming > element, in Flink, this is (confusingly) called OperatorState and KvState > (internally) > > > > (Operator is the low-level concept, user functions are usually invoked > by the operator, for example StreamMap is the operator that handles a > MapFunction.) > > > > Function state and operator state is not partitioned, meaning that it > becomes difficult when we want to implement dynamic scale-in/scale-out. > With partitioned state it can be redistributed when changing the degree of > parallelism. > > > > Both stream operators and user functions can have partitioned state, and > the namespace is the same, i.e. the state can clash. The partitioned state > will stay indefinitely if not manually cleared. > > > > On to timers, operators can register processing-time callbacks, they can > react to watermarks to implement event-time callbacks. They have to > implement the logic themselves, however. For example, the WindowOperator > has custom code to keep track of watermark timers and for reacting to > watermarks. User functions have no way of registering timers. Also, timers > are not scoped to any key. So if you register a timer while processing an > element of a certain key, when the timer fires you don’t know what key was > active when registering the timer. This might be necessary for cleaning up > state for certain keys, or to trigger processing for a certain key only, > for example with session windows of some kind. > > > > Now, on to new stuff. I propose to add a timer facility that can be used > by both operators and user functions. Both partitioned state and timers > should be aware of keys and if a timer fires the partitioned state should > be scoped to the same key that was active when the timer was registered. > > > > One last bit. If we want to also implement the current WindowOperator on > top of these generic facilities we need to have a way to scope state not > only by key but also by windows (or better, some generic state scope). The > reason is, that one key can have several active windows at one point in > time and firing timers need to me mapped to the correct window (for > example, for sliding windows, or session windows or what have you…). > > > > Happy discussing. :D > > > > Cheers, > > Aljoscha > > > > >