Thanks Matthias and Guozhang for the response. Seems like our understanding mainly differs in the semantics of gap in session windows.
My understanding is that gap is used to merge nearby records together such that no record in the merged window has distance later than gap. In Kafka Streams's implementation it's mainly used to find neighbor records/windows in session store so that nearby records can be merge. It is NOT used to determine when a window should be closed, which is in fact determined by window's grace period. Guozhang you said "b. When later we received (15, 3), it means that this record ** changed ** the window gap interval from 10 to 3, and hence we received a new record at 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3) if we have not received any new data, the window should be closed, i.e. the window is now [10, 18) which includes two records at 10 and 15." This is different from what i thought will happen. I thought when (15,3) is received, kafka streams look up for neighbor record/window that is within the gap of [15-3, 15+3], and merge if any. Previous record (10, 10) created its own window [10, 10], which is out of the gap, so nothing will be found and no merge occurs. Hence we have two windows now in session store, [10, 10] and [15, 15] respectively. Also another thing worth mentioning is that, the session window object created in current kafka streams implementation doesn't have gap info, it has start and end, which is the earliest and latest event timestamp in that window interval, i.e for (10,10), the session window gets created is [10,10], rather than [10,20]. Just to clarify so that it's clear why (10,10) cannot be fetched when looking for gap of (15,3), it's because the end boundary 10 of [10,10] is smaller than search boundary [12,18]. Please correct me if my understanding is wrong here. @Matthias, to answer your use case question, we have an use case where asynchronous time series data are received in the stream, from different contributors, with different quality and at different pace. Inside Kafka Streams, we use state to maintain statistic aggregations and other mathematics model to track the liquidity and calculate time decay rate and dynamic gap, so that at runtime, for each contributor we can 1. determine how many historical records we should maintain in state. 2. for each incoming record, output a record using aggregations from *nearby* records from that contributor. Why fixed gap session window doesn't work here? Because the definition of "nearby" here is determined by several very dynamic factors in our case, it changes not only with different hours in a day, but also related to other contributors. The purpose of this KIP is to suggest a dynamic session window implementation so that we can embed such dynamic "nearby" calculation capability into kafka streams session windows semantics. Hope it makes sense to you. Lei On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Lei, > > As Matthias mentioned, the key question here is that because of the late > arrivals of records which may indicate a shorter session gap interval, some > session windows may be "mistakenly" merged and hence need to be undone the > merge, i.e. to split them again. > > Back to my example, you are right that the processing result of > > (10, 10), (19, 5), (15, 3) .. > > should be the same as the processing result of > > (10, 10), (15, 3), (19, 5) .. > > Note that the second value is NOT the window end time, but the extracted > window gap interval, as you suggested in the KIP this value can be > dynamically changed > > a. If you take a look at the second ordering, when we receive (10, 10) it > means a window starting at 10 is created, and its gap interval is 10, which > means that if by the timestamp of 20 we do not receive any new data, then > the window should be closed, i.e. the window [10, 20). > > b. When later we received (15, 3), it means that this record ** changed ** > the window gap interval from 10 to 3, and hence we received a new record at > 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3) if > we have not received any new data, the window should be closed, i.e. the > window is now [10, 18) which includes two records at 10 and 15. > > c. The third record is received at 19, which is after the window close time > 18, it means that we should now start a new window starting at 19, i.e. the > window is [19, 24), > > > BUT, because of the out of ordering, we did not receive (15, 3) in time, > but received (19, 5), it will cause us to mistakenly merge the window of > [10, 20) with [19, 24) to [10, 24), and only when later we received (15, 3) > we realized that the previous window should have been ended at 18. > > Does that make sense to you? > > > Guozhang > > > On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > I cannot follow the example: > > > > >> (10, 10), (15, 3), (19, 5) ... > > > > First, [10,10] is created, second the window is extended to [10,15], and > > third [19,19] is created. Why would there be a [15,15]? And why would > > (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and > > thus [19,19] should be its own window? > > > > > Take a look at another example, > > > (13, 3), (19, 5), (15, 3) ... > > > > > > in this case when (15, 3) is received, [13,13] should be retrieved and > > > merged to a new window [13, 15], then [19,19] should be updated to [13, > > > 19]. Correct? > > > > This example makes sense. However, Guozhang's example was different. The > > late even, _reduces_ the gap and this can lead to a window split. > > Guozhang's example was > > > > >>> (10, 10), (19, 5), (15, 3) ... > > > > First [10,10] is created, second [10,19] is create (gap is 10, so 10 and > > 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and > > [19,19] must be two windows, ie, original window [10,19] must be split. > > > > > > Or maybe you have different semantic about gaps are dynamically modified > > in mind? It's a little unclear for the KIP itself what semantics dynamic > > sessions windows should have. > > > > > > What is also unclear to me atm is, what use cases you have in mind? The > > KIP only says > > > > > the statistical aggregation result, liquidity of the records, > > > > > > I am not sure what this means. Can you elaborate? > > > > > > > > -Matthias > > > > > > > > On 8/30/18 3:32 PM, Lei Chen wrote: > > > Hi Guozhang, > > > > > > Thanks for reviewing the proposal. I didn't think of out of order > events > > > and glad that you brought it up. > > > > > > In the example you gave, > > > > > > (10, 10), (19, 5), (15, 3) ... > > > > > > my understanding is that the correct result window should be the same > as > > in > > > order events > > > > > > (10, 10), (15, 3), (19, 5) ... > > > > > > when (15, 3) is received, [15,15] is creatd > > > when (19, 5) is received, [15, 15] and [19, 19] are merged and [15, 19] > > is > > > created, meanwhile [15,15] is removed > > > > > > back to out of order case, > > > > > > when (19 ,5) is received, [19, 19] is created > > > when (15, 3) is received, in order to generate the same result, > > > 1. if late event is later than retention period, it will be dropped > > > 2. otherwise, adjacent session windows within gap should be retrieved > and > > > merged accordingly, in this case [19, 19], and create a new session > [15, > > 19] > > > I'm little confused when you said "the window [15, 15] SHOULD actually > be > > > expired at 18 and hence the next record (19, 5) should be for a new > > session > > > already.". If i understand it correctly, the expiration of the window > is > > > only checked when next event (19,5) comes and then it should be merged > to > > > it. [15, 15] will then be closed. Is that also what you meant? > > > I cannot think of a case where a window will be split by a late event, > > > because if event A and C fall into the same session window, a late > event > > B > > > in middle will definitely fall into C's gap as well. IOW, late event > will > > > only cause window extension, not split. > > > > > > Take a look at another example, > > > (13, 3), (19, 5), (15, 3) ... > > > > > > in this case when (15, 3) is received, [13,13] should be retrieved and > > > merged to a new window [13, 15], then [19,19] should be updated to [13, > > > 19]. Correct? > > > > > > To be able to achieve that, like you said, the gap needs to be stored > for > > > sessions. We don't need to save the gap with each event, but only for > > each > > > session window. To avoid upgrading existing session window, how about > > > create a new Window type extended from SessionWindow along with a new > > > KeySchema? > > > > > > What do you think? > > > > > > Lei > > > > > > > > > On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > >> Hello Lei, > > >> > > >> Thanks for the proposal. I've just made a quick pass over it and there > > is a > > >> question I have: > > >> > > >> The session windows are defined per key, i.e. does that mean that each > > >> incoming record of the key can dynamically change the gap of the > window? > > >> For example, say you have the following record for the same key coming > > in > > >> order, where the first time is the timestamp of the record, and the > > second > > >> value is the extracted gap value: > > >> > > >> (10, 10), (19, 5), ... > > >> > > >> > > >> When we receive the first record at time 10, the gap is extracted as > 10, > > >> and hence the window will be expired at 20 if no other record is > > received. > > >> When we receive the second record at time 19, the gap is modified to > 5, > > and > > >> hence the window will be expired at 24 if no other record is received. > > >> > > >> > > >> If that's the case, I'm wondering how out-of-order data can be handled > > >> then, consider this stream: > > >> > > >> (10, 10), (19, 5), (15, 3) ... > > >> > > >> I.e. you received a late record indicating at timestamp 15, which > > shorten > > >> the gap to 3. It means that the window SHOULD actually be expired at > 18, > > >> and hence the next record (19, 5) should be for a new session already. > > >> Today Streams session window implementation does not do "window > split", > > so > > >> have you thought about how this can be extended? > > >> > > >> Also since in your proposal each session window's gap value would be > > >> different, we need to store this value along with each record then, > how > > >> would we store it, and what would be the upgrade path if it is not a > > >> compatible change on disk storage etc? > > >> > > >> > > >> > > >> Guozhang > > >> > > >> > > >> > > >> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <ley...@gmail.com> wrote: > > >> > > >>> Hi All, > > >>> > > >>> I created a KIP to add dynamic gap session window support to Kafka > > >> Streams > > >>> DSL. > > >>> > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>> 362%3A+Support+dynamic+gap+session+window > > >>> > > >>> Please take a look, > > >>> > > >>> Thanks, > > >>> Lei > > >>> > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > > -- > -- Guozhang >