Hi Guozhang,

Thanks for reviewing the proposal. I didn't think of out of order events
and glad that you brought it up.

In the example you gave,

(10, 10), (19, 5), (15, 3) ...

my understanding is that the correct result window should be the same as in
order events

(10, 10), (15, 3), (19, 5) ...

when (15, 3) is received, [15,15] is creatd
when (19, 5) is received, [15, 15] and [19, 19] are merged and [15, 19] is
created, meanwhile [15,15] is removed

back to out of order case,

when (19 ,5) is received, [19, 19] is created
when (15, 3) is received, in order to generate the same result,
1. if late event is later than retention period, it will be dropped
2. otherwise, adjacent session windows within gap should be retrieved and
merged accordingly, in this case [19, 19], and create a new session [15, 19]
I'm little confused when you said "the window [15, 15] SHOULD actually be
expired at 18 and hence the next record (19, 5) should be for a new session
already.". If i understand it correctly, the expiration of the window is
only checked when next event (19,5) comes and then it should be merged to
it. [15, 15] will then be closed. Is that also what you meant?
I cannot think of a case where a window will be split by a late event,
because if event A and C fall into the same session window, a late event B
in middle will definitely fall into C's gap as well. IOW, late event will
only cause window extension, not split.

Take a look at another example,
(13, 3),  (19, 5), (15, 3) ...

in this case when (15, 3) is received, [13,13] should be retrieved and
merged to a new window [13, 15], then [19,19] should be updated to [13,
19]. Correct?

To be able to achieve that, like you said, the gap needs to be stored for
sessions. We don't need to save the gap with each event, but only for each
session window. To avoid upgrading existing session window, how about
create a new Window type extended from SessionWindow along with a new
KeySchema?

What do you think?

Lei


On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Lei,
>
> Thanks for the proposal. I've just made a quick pass over it and there is a
> question I have:
>
> The session windows are defined per key, i.e. does that mean that each
> incoming record of the key can dynamically change the gap of the window?
> For example, say you have the following record for the same key coming in
> order, where the first time is the timestamp of the record, and the second
> value is the extracted gap value:
>
> (10, 10), (19, 5), ...
>
>
> When we receive the first record at time 10, the gap is extracted as 10,
> and hence the window will be expired at 20 if no other record is received.
> When we receive the second record at time 19, the gap is modified to 5, and
> hence the window will be expired at 24 if no other record is received.
>
>
> If that's the case, I'm wondering how out-of-order data can be handled
> then, consider this stream:
>
> (10, 10), (19, 5), (15, 3) ...
>
> I.e. you received a late record indicating at timestamp 15, which shorten
> the gap to 3. It means that the window SHOULD actually be expired at 18,
> and hence the next record (19, 5) should be for a new session already.
> Today Streams session window implementation does not do "window split", so
> have you thought about how this can be extended?
>
> Also since in your proposal each session window's gap value would be
> different, we need to store this value along with each record then, how
> would we store it, and what would be the upgrade path if it is not a
> compatible change on disk storage etc?
>
>
>
> Guozhang
>
>
>
> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <ley...@gmail.com> wrote:
>
> > Hi All,
> >
> > I created a KIP to add dynamic gap session window support to Kafka
> Streams
> > DSL.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 362%3A+Support+dynamic+gap+session+window
> >
> > Please take a look,
> >
> > Thanks,
> > Lei
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to