oh, sorry, I misread. Just my +1 to renaming OperatorState then :-) On Mon, Dec 14, 2015 at 11:38 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
> As I mentioned in my previous mail, I think that OperatorState would need > be replaced by more specific types of state (ValueState, ListState, …). > > > On 14 Dec 2015, at 11:34, Maximilian Michels <m...@apache.org> wrote: > > > >> > >> On a side not, why would you call it KvState? And what would be called > >> KvState? > > > > > > The OperatorState interface would be called KvState. > > > > > > On Mon, Dec 14, 2015 at 11:18 AM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > >> Yes, as Kostas said, it would initially nor provide more functionality > but > >> it would enable us to add it later. > >> > >> On a side not, why would you call it KvState? And what would be called > >> KvState? > >> > >>> On 14 Dec 2015, at 11:14, Kostas Tzoumas <ktzou...@apache.org> wrote: > >>> > >>> I suppose that they can start as sugar and evolve to a different > >>> implementation. > >>> > >>> I would +1 the name change to KVState, OperatorState is indeed somewhat > >>> confusing, and it will only get harder to rename later. > >>> > >>> On Mon, Dec 14, 2015 at 11:09 AM, Gyula Fóra <gyula.f...@gmail.com> > >> wrote: > >>> > >>>> Would the Reducing/Folding states just be some API sugar on top of > what > >> we > >>>> have know (ValueState) or does it have some added functionality (like > >>>> incremental checkpoints for list states)? > >>>> > >>>> Gyula > >>>> > >>>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2015. dec. > >> 14., > >>>> H, 11:03): > >>>> > >>>>> While enhancing the state interfaces we would also need to introduce > >> new > >>>>> types of state. I was thinking of these, for a start: > >>>>> - ValueState (works like OperatorState works now, i.e. provides > methods > >>>>> to get/set one state value > >>>>> - ListState, proves methods to add one element to a list of elements > >> and > >>>>> to iterate over all contained elements > >>>>> - ReducingState, somewhat similar to value state but combines the > added > >>>>> value to the existing value using a ReduceFunction > >>>>> - FoldingState, same as above but with fold > >>>>> > >>>>> I think these are necessary to give the system more knowledge about > the > >>>>> semantics of state so that it can handle the state more efficiently. > >>>> Think > >>>>> of incremental checkpoints, for example, these are easy to do if you > >> know > >>>>> that state is a list to which stuff is only appended. > >>>>>> On 14 Dec 2015, at 10:52, Stephan Ewen <se...@apache.org> wrote: > >>>>>> > >>>>>> A lot of this makes sense, but I am not sure about renaming > >>>>>> "OperatorState". The other name is nicer, but why make users' life > >> hard > >>>>>> just for a name? > >>>>>> > >>>>>> > >>>>>> On Mon, Dec 14, 2015 at 10:46 AM, Maximilian Michels < > m...@apache.org> > >>>>> wrote: > >>>>>> > >>>>>>> Hi Aljoscha, > >>>>>>> > >>>>>>> Thanks for the informative technical description. > >>>>>>> > >>>>>>>> - function state: this is the state that you get when a user > >> function > >>>>>>> implements the Checkpointed interface. it is not partitioned > >>>>>>>> - operator state: This is the state that a StreamOperator can > >>>> snapshot, > >>>>>>> it is similar to the function state, but for operators. it is not > >>>>>>> partitioned > >>>>>>>> - partitioned state: state that is scoped to the key of the > incoming > >>>>>>> element, in Flink, this is (confusingly) called OperatorState and > >>>>> KvState > >>>>>>> (internally) > >>>>>>> > >>>>>>> Let's clean that up! Let's rename the OperatorState interface to > >>>>> KvState. > >>>>>>> > >>>>>>>> Both stream operators and user functions can have partitioned > state, > >>>>> and > >>>>>>> the namespace is the same, i.e. the state can clash. The > partitioned > >>>>> state > >>>>>>> will stay indefinitely if not manually cleared. > >>>>>>> > >>>>>>> I suppose operators currently have to take care to use a unique > >>>>>>> identifier for the state such that it doesn't clash with the user > >>>>>>> function. Wouldn't be too hard to introduce a scoping here. > >>>>>>> > >>>>>>> Your proposal makes sense. It seems like this is a rather delicate > >>>>>>> change which improves the flexibility of the streaming API. What is > >>>>>>> the motivation behind this? I suppose you are thinking of > >> improvements > >>>>>>> to the session capabilities of the streaming API. > >>>>>>> > >>>>>>>> If we want to also implement the current WindowOperator on top of > >>>> these > >>>>>>> generic facilities we need to have a way to scope state not only by > >>>> key > >>>>> but > >>>>>>> also by windows (or better, some generic state scope). > >>>>>>> > >>>>>>> This is currently handled by the WindowOperator itself and would > then > >>>>>>> be delegated to the enhanced state interface? Makes sense if we > want > >>>>>>> to make use of the new state interface. Again, is it just cleaner > or > >>>>>>> does this enable new type of applications? > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Max > >>>>>>> > >>>>>>> On Thu, Dec 10, 2015 at 4:47 PM, Aljoscha Krettek < > >>>> aljos...@apache.org> > >>>>>>> wrote: > >>>>>>>> Hi All, > >>>>>>>> I want to discuss some ideas about improving the > >>>> primitives/operations > >>>>>>> that Flink offers for user-state, timers and windows and how these > >>>>> concepts > >>>>>>> can be unified. > >>>>>>>> > >>>>>>>> It has come up a lot lately that people have very specific > >>>> requirements > >>>>>>> regarding the state that they keep and it seems necessary to allows > >>>>> users > >>>>>>> to set their own custom timers (on processing time and watermark > time > >>>>>>> (event-time)) to do both expiration of state and implementation of > >>>>> custom > >>>>>>> windowing semantics. While we’re at this, we might also think about > >>>>>>> cleaning up the state handling a bit. > >>>>>>>> > >>>>>>>> Let me first describe the status quo, so that we’re all on the > same > >>>>>>> page. There are three types of state: > >>>>>>>> - function state: this is the state that you get when a user > >> function > >>>>>>> implements the Checkpointed interface. it is not partitioned > >>>>>>>> - operator state: This is the state that a StreamOperator can > >>>> snapshot, > >>>>>>> it is similar to the function state, but for operators. it is not > >>>>>>> partitioned > >>>>>>>> - partitioned state: state that is scoped to the key of the > incoming > >>>>>>> element, in Flink, this is (confusingly) called OperatorState and > >>>>> KvState > >>>>>>> (internally) > >>>>>>>> > >>>>>>>> (Operator is the low-level concept, user functions are usually > >>>> invoked > >>>>>>> by the operator, for example StreamMap is the operator that > handles a > >>>>>>> MapFunction.) > >>>>>>>> > >>>>>>>> Function state and operator state is not partitioned, meaning that > >> it > >>>>>>> becomes difficult when we want to implement dynamic > >>>> scale-in/scale-out. > >>>>>>> With partitioned state it can be redistributed when changing the > >>>> degree > >>>>> of > >>>>>>> parallelism. > >>>>>>>> > >>>>>>>> Both stream operators and user functions can have partitioned > state, > >>>>> and > >>>>>>> the namespace is the same, i.e. the state can clash. The > partitioned > >>>>> state > >>>>>>> will stay indefinitely if not manually cleared. > >>>>>>>> > >>>>>>>> On to timers, operators can register processing-time callbacks, > they > >>>>> can > >>>>>>> react to watermarks to implement event-time callbacks. They have to > >>>>>>> implement the logic themselves, however. For example, the > >>>> WindowOperator > >>>>>>> has custom code to keep track of watermark timers and for reacting > to > >>>>>>> watermarks. User functions have no way of registering timers. Also, > >>>>> timers > >>>>>>> are not scoped to any key. So if you register a timer while > >> processing > >>>>> an > >>>>>>> element of a certain key, when the timer fires you don’t know what > >> key > >>>>> was > >>>>>>> active when registering the timer. This might be necessary for > >>>> cleaning > >>>>> up > >>>>>>> state for certain keys, or to trigger processing for a certain key > >>>> only, > >>>>>>> for example with session windows of some kind. > >>>>>>>> > >>>>>>>> Now, on to new stuff. I propose to add a timer facility that can > be > >>>>> used > >>>>>>> by both operators and user functions. Both partitioned state and > >>>> timers > >>>>>>> should be aware of keys and if a timer fires the partitioned state > >>>>> should > >>>>>>> be scoped to the same key that was active when the timer was > >>>> registered. > >>>>>>>> > >>>>>>>> One last bit. If we want to also implement the current > >> WindowOperator > >>>>> on > >>>>>>> top of these generic facilities we need to have a way to scope > state > >>>> not > >>>>>>> only by key but also by windows (or better, some generic state > >> scope). > >>>>> The > >>>>>>> reason is, that one key can have several active windows at one > point > >>>> in > >>>>>>> time and firing timers need to me mapped to the correct window (for > >>>>>>> example, for sliding windows, or session windows or what have > you…). > >>>>>>>> > >>>>>>>> Happy discussing. :D > >>>>>>>> > >>>>>>>> Cheers, > >>>>>>>> Aljoscha > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>>> > >>>> > >> > >> > >