Sorry for the late reply Matthias. Have been busy with other work recently. I'll restart the discussion and update the KIP accordingly.
Lei On Tue, Nov 6, 2018 at 3:11 PM Matthias J. Sax <matth...@confluent.io> wrote: > Any update on this KIP? > > On 9/20/18 3:30 PM, Matthias J. Sax wrote: > > Thanks for following up. Very nice examples! > > > > I think, that the window definition for Flink is semantically > > questionable. If there is only a single record, why is the window > > defined as [ts, ts+gap]? To me, this definition is not sound and seems > > to be arbitrary. To define the windows as [ts-gap,ts+gap] as you mention > > would be semantically more useful -- still, I think that defining the > > window as [ts,ts] as we do currently in Kafka Streams is semantically > > the best. > > > > I have the impression, that Flink only defines them differently, because > > it solves the issues in the implementation. (Ie, an implementation > > details leaks into the semantics, what is usually not desired.) > > > > However, I believe that we could change the implementation accordingly. > > We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap]) in > > RocksDB, but at API level we return [ts,ts]. This way, we can still find > > all windows we need and provide the same deterministic behavior and keep > > the current window boundaries on the semantic level (there is no need to > > store the window start and/or end time). With this technique, we can > > also implement dynamic session gaps. I think, we would need to store the > > used "gap" for each window, too. But again, this would be an > > implementation detail. > > > > Let's see what others think. > > > > One tricky question we would need to address is, how we can be backward > > compatible. I am currently working on KIP-258 that should help to > > address this backward compatibility issue though. > > > > > > -Matthias > > > > > > > > On 9/19/18 5:17 PM, Lei Chen wrote: > >> Thanks Matthias. That makes sense. > >> > >> You're right that symmetric merge is necessary to ensure consistency. On > >> the other hand, I kinda feel it defeats the purpose of dynamic gap, > which > >> is to update the gap from old value to new value. The symmetric merge > >> always honor the larger gap in both direction, rather than honor the gap > >> carried by record with larger timestamp. I wasn't able to find any > semantic > >> definitions w.r.t this particular aspect online, but spent some time > >> looking into other streaming engines like Apache Flink. > >> > >> Apache Flink defines the window differently, that uses (start time, > start > >> time + gap). > >> > >> so our previous example (10, 10), (19,5),(15,3) in Flink's case will be: > >> [10,20] > >> [19,24] => merged to [10,24] > >> [15,18] => merged to [10,24] > >> > >> while example (15,3)(19,5)(10,10) will be > >> [15,18] > >> [19,24] => no merge > >> [10,20] => merged to [10,24] > >> > >> however, since it only records gap in future direction, not past, a late > >> record might not trigger any merge where in symmetric merge it would. > >> (7,2),(10, 10), (19,5),(15,3) > >> [7,9] > >> [10,20] > >> [19,24] => merged to [10,24] > >> [15,18] => merged to [10,24] > >> so at the end > >> two windows [7,9][10,24] are there. > >> > >> As you can see, in Flink, the gap semantic is more toward to the way > that, > >> a gap carried by one record only affects how this record merges with > future > >> records. e.g. a later event (T2, G2) will only be merged with (T1, G1) > is > >> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's call > >> this "forward-merge" way of handling this. I just went thought some > source > >> code and if my understanding is incorrect about Flink's implementation, > >> please correct me. > >> > >> On the other hand, if we want to do symmetric merge in Kafka Streams, we > >> can change the window definition to [start time - gap, start time + > gap]. > >> This way the example (7,2),(10, 10), (19,5),(15,3) will be > >> [5,9] > >> [0,20] => merged to [0,20] > >> [14,24] => merged to [0,24] > >> [12,18] => merged to [0,24] > >> > >> (19,5),(15,3)(7,2),(10, 10) will generate same result > >> [14,24] > >> [12,18] => merged to [12,24] > >> [5,9] => no merge > >> [0,20] => merged to [0,24] > >> > >> Note that symmetric-merge would require us to change the way how Kafka > >> Steams fetch windows now, instead of fetching range from timestamp-gap > to > >> timestamp+gap, we will need to fetch all windows that are not expired > yet. > >> On the other hand, I'm not sure how this will impact the current logic > of > >> how a window is considered as closed, because the window doesn't carry > end > >> timestamp anymore, but end timestamp + gap. > >> > >> So do you guys think forward-merge approach used by Flink makes more > sense > >> in Kafka Streams, or symmetric-merge makes more sense? Both of them > seems > >> to me can give deterministic result. > >> > >> BTW I'll add the use case into original KIP. > >> > >> Lei > >> > >> > >> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <matth...@confluent.io> > >> wrote: > >> > >>> Thanks for explaining your understanding. And thanks for providing more > >>> details about the use-case. Maybe you can add this to the KIP? > >>> > >>> > >>> First one general comment. I guess that my and Guozhangs understanding > >>> about gap/close/gracePeriod is the same as yours -- we might not have > >>> use the term precisely correct in previous email. > >>> > >>> > >>> To you semantics of gap in detail: > >>> > >>>> I thought when (15,3) is received, kafka streams look up for neighbor > >>>> record/window that is within the gap > >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created > its > >>> own > >>>> window [10, 10], which is > >>>> out of the gap, so nothing will be found and no merge occurs. Hence we > >>> have > >>>> two windows now in session store, > >>>> [10, 10] and [15, 15] respectively. > >>> > >>> If you have record (10,10), we currently create a window of size > >>> [10,10]. When record (15,3) arrives, your observation that the gap 3 is > >>> too small to be merged into [10,10] window -- however, merging is a > >>> symmetric operation and the existing window of [10,10] has a gap of 10 > >>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is > >>> merged into the existing window resulting in window [10,15]. > >>> > >>> If we don't respect the gap both ways, we end up with inconsistencies > if > >>> data is out-of-order. For example, if we use the same input record > >>> (10,10) and (15,3) from above, and it happens that (15,3) is processed > >>> first, when processing out-of-order record (10,10) we would want to > >>> merge both into a single window, too? > >>> > >>> Does this make sense? > >>> > >>> Now the question remains, if two records with different gap parameter > >>> are merged, which gap should we apply for processing/merging future > >>> records into the window? It seems, that we should use the gap parameter > >>> from the record with this larges timestamp. In the example above > (15,3). > >>> We would use gap 3 after merging independent of the order of > processing. > >>> > >>> > >>>> Also another thing worth mentioning is that, the session window object > >>>> created in current kafka streams > >>>> implementation doesn't have gap info, it has start and end, which is > the > >>>> earliest and latest event timestamp > >>>> in that window interval, i.e for (10,10), the session window gets > created > >>>> is [10,10], rather than [10,20]. Just to clarify > >>>> so that it's clear why (10,10) cannot be fetched when looking for gap > of > >>>> (15,3), it's because the end boundary 10 of > >>>> [10,10] is smaller than search boundary [12,18]. > >>> > >>> We don't need to store the gap, because the gap is know from the window > >>> definition. The created window size depends on the data that is > >>> contained in the window. I guess one could define it differently, too, > >>> ie, for the (10,10) record, we create a window [0,20] -- not sure if it > >>> makes a big difference in practice though. Note, that creating window > >>> [10,20] would not be correct, because the gap must be applied in both > >>> directions, not just into the future. > >>> > >>> About the second part: the search would not be applied from (15,3) in > >>> range [12,18], but from existing window [10,10] into range [0,20] and > 15 > >>> is contained there. This example also shows, that we would need to come > >>> up with a clever way, to identify window [10,10] when processing (15,3) > >>> -- not sure atm how to do this. However, only consider (15,3) would > >>> result in inconsistencies for out-of-order data as pointed out above > and > >>> would not be sufficient. > >>> > >>> > >>> Does this make sense? > >>> > >>> > >>> Or is there another way to define dynamic session gap semantics in a > >>> deterministic way with regard to out-of-order data? > >>> > >>> > >>> > >>> -Matthias > >>> > >>> > >>> On 9/11/18 4:28 PM, Lei Chen wrote: > >>>> Thanks Matthias and Guozhang for the response. > >>>> > >>>> Seems like our understanding mainly differs in the semantics of gap in > >>>> session windows. > >>>> > >>>> My understanding is that gap is used to merge nearby records together > >>> such > >>>> that no record > >>>> in the merged window has distance later than gap. In Kafka Streams's > >>>> implementation it's > >>>> mainly used to find neighbor records/windows in session store so that > >>>> nearby records can > >>>> be merge. It is NOT used to determine when a window should be closed, > >>> which > >>>> is in > >>>> fact determined by window's grace period. > >>>> > >>>> Guozhang you said "b. When later we received (15, 3), it means that > this > >>>> record ** changed ** > >>>> the window gap interval from 10 to 3, and hence we received a new > record > >>> at > >>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 + > 3) > >>> if > >>>> we have not received any new data, the window should be closed, i.e. > the > >>>> window is now [10, 18) which includes two records at 10 and 15." > >>>> > >>>> This is different from what i thought will happen. > >>>> > >>>> I thought when (15,3) is received, kafka streams look up for neighbor > >>>> record/window that is within the gap > >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created > its > >>> own > >>>> window [10, 10], which is > >>>> out of the gap, so nothing will be found and no merge occurs. Hence we > >>> have > >>>> two windows now in session store, > >>>> [10, 10] and [15, 15] respectively. > >>>> > >>>> Also another thing worth mentioning is that, the session window object > >>>> created in current kafka streams > >>>> implementation doesn't have gap info, it has start and end, which is > the > >>>> earliest and latest event timestamp > >>>> in that window interval, i.e for (10,10), the session window gets > created > >>>> is [10,10], rather than [10,20]. Just to clarify > >>>> so that it's clear why (10,10) cannot be fetched when looking for gap > of > >>>> (15,3), it's because the end boundary 10 of > >>>> [10,10] is smaller than search boundary [12,18]. > >>>> > >>>> Please correct me if my understanding is wrong here. > >>>> > >>>> @Matthias, to answer your use case question, we have an use case where > >>>> asynchronous time series data > >>>> are received in the stream, from different contributors, with > different > >>>> quality and at different pace. > >>>> Inside Kafka Streams, we use state to maintain statistic aggregations > and > >>>> other mathematics model to track > >>>> the liquidity and calculate time decay rate and dynamic gap, so that > at > >>>> runtime, for each contributor we can > >>>> 1. determine how many historical records we should maintain in state. > >>>> 2. for each incoming record, output a record using aggregations from > >>>> *nearby* records from that contributor. > >>>> Why fixed gap session window doesn't work here? Because the > definition of > >>>> "nearby" here is determined by > >>>> several very dynamic factors in our case, it changes not only with > >>>> different hours in a day, but also related to > >>>> other contributors. > >>>> The purpose of this KIP is to suggest a dynamic session window > >>>> implementation so that we can embed such > >>>> dynamic "nearby" calculation capability into kafka streams session > >>> windows > >>>> semantics. Hope it makes sense to you. > >>>> > >>>> Lei > >>>> > >>>> > >>>> > >>>> > >>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>>> > >>>>> Hello Lei, > >>>>> > >>>>> As Matthias mentioned, the key question here is that because of the > late > >>>>> arrivals of records which may indicate a shorter session gap > interval, > >>> some > >>>>> session windows may be "mistakenly" merged and hence need to be > undone > >>> the > >>>>> merge, i.e. to split them again. > >>>>> > >>>>> Back to my example, you are right that the processing result of > >>>>> > >>>>> (10, 10), (19, 5), (15, 3) .. > >>>>> > >>>>> should be the same as the processing result of > >>>>> > >>>>> (10, 10), (15, 3), (19, 5) .. > >>>>> > >>>>> Note that the second value is NOT the window end time, but the > extracted > >>>>> window gap interval, as you suggested in the KIP this value can be > >>>>> dynamically changed > >>>>> > >>>>> a. If you take a look at the second ordering, when we receive (10, > 10) > >>> it > >>>>> means a window starting at 10 is created, and its gap interval is 10, > >>> which > >>>>> means that if by the timestamp of 20 we do not receive any new data, > >>> then > >>>>> the window should be closed, i.e. the window [10, 20). > >>>>> > >>>>> b. When later we received (15, 3), it means that this record ** > changed > >>> ** > >>>>> the window gap interval from 10 to 3, and hence we received a new > >>> record at > >>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 + > >>> 3) if > >>>>> we have not received any new data, the window should be closed, i.e. > the > >>>>> window is now [10, 18) which includes two records at 10 and 15. > >>>>> > >>>>> c. The third record is received at 19, which is after the window > close > >>> time > >>>>> 18, it means that we should now start a new window starting at 19, > i.e. > >>> the > >>>>> window is [19, 24), > >>>>> > >>>>> > >>>>> BUT, because of the out of ordering, we did not receive (15, 3) in > time, > >>>>> but received (19, 5), it will cause us to mistakenly merge the > window of > >>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received > >>> (15, 3) > >>>>> we realized that the previous window should have been ended at 18. > >>>>> > >>>>> Does that make sense to you? > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax < > matth...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> I cannot follow the example: > >>>>>> > >>>>>>>> (10, 10), (15, 3), (19, 5) ... > >>>>>> > >>>>>> First, [10,10] is created, second the window is extended to [10,15], > >>> and > >>>>>> third [19,19] is created. Why would there be a [15,15]? And why > would > >>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and > >>>>>> thus [19,19] should be its own window? > >>>>>> > >>>>>>> Take a look at another example, > >>>>>>> (13, 3), (19, 5), (15, 3) ... > >>>>>>> > >>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved > and > >>>>>>> merged to a new window [13, 15], then [19,19] should be updated to > >>> [13, > >>>>>>> 19]. Correct? > >>>>>> > >>>>>> This example makes sense. However, Guozhang's example was different. > >>> The > >>>>>> late even, _reduces_ the gap and this can lead to a window split. > >>>>>> Guozhang's example was > >>>>>> > >>>>>>>>> (10, 10), (19, 5), (15, 3) ... > >>>>>> > >>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so 10 > >>> and > >>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] > and > >>>>>> [19,19] must be two windows, ie, original window [10,19] must be > split. > >>>>>> > >>>>>> > >>>>>> Or maybe you have different semantic about gaps are dynamically > >>> modified > >>>>>> in mind? It's a little unclear for the KIP itself what semantics > >>> dynamic > >>>>>> sessions windows should have. > >>>>>> > >>>>>> > >>>>>> What is also unclear to me atm is, what use cases you have in mind? > The > >>>>>> KIP only says > >>>>>> > >>>>>>> the statistical aggregation result, liquidity of the records, > >>>>>> > >>>>>> > >>>>>> I am not sure what this means. Can you elaborate? > >>>>>> > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 8/30/18 3:32 PM, Lei Chen wrote: > >>>>>>> Hi Guozhang, > >>>>>>> > >>>>>>> Thanks for reviewing the proposal. I didn't think of out of order > >>>>> events > >>>>>>> and glad that you brought it up. > >>>>>>> > >>>>>>> In the example you gave, > >>>>>>> > >>>>>>> (10, 10), (19, 5), (15, 3) ... > >>>>>>> > >>>>>>> my understanding is that the correct result window should be the > same > >>>>> as > >>>>>> in > >>>>>>> order events > >>>>>>> > >>>>>>> (10, 10), (15, 3), (19, 5) ... > >>>>>>> > >>>>>>> when (15, 3) is received, [15,15] is creatd > >>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and [15, > >>> 19] > >>>>>> is > >>>>>>> created, meanwhile [15,15] is removed > >>>>>>> > >>>>>>> back to out of order case, > >>>>>>> > >>>>>>> when (19 ,5) is received, [19, 19] is created > >>>>>>> when (15, 3) is received, in order to generate the same result, > >>>>>>> 1. if late event is later than retention period, it will be dropped > >>>>>>> 2. otherwise, adjacent session windows within gap should be > retrieved > >>>>> and > >>>>>>> merged accordingly, in this case [19, 19], and create a new session > >>>>> [15, > >>>>>> 19] > >>>>>>> I'm little confused when you said "the window [15, 15] SHOULD > actually > >>>>> be > >>>>>>> expired at 18 and hence the next record (19, 5) should be for a new > >>>>>> session > >>>>>>> already.". If i understand it correctly, the expiration of the > window > >>>>> is > >>>>>>> only checked when next event (19,5) comes and then it should be > merged > >>>>> to > >>>>>>> it. [15, 15] will then be closed. Is that also what you meant? > >>>>>>> I cannot think of a case where a window will be split by a late > event, > >>>>>>> because if event A and C fall into the same session window, a late > >>>>> event > >>>>>> B > >>>>>>> in middle will definitely fall into C's gap as well. IOW, late > event > >>>>> will > >>>>>>> only cause window extension, not split. > >>>>>>> > >>>>>>> Take a look at another example, > >>>>>>> (13, 3), (19, 5), (15, 3) ... > >>>>>>> > >>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved > and > >>>>>>> merged to a new window [13, 15], then [19,19] should be updated to > >>> [13, > >>>>>>> 19]. Correct? > >>>>>>> > >>>>>>> To be able to achieve that, like you said, the gap needs to be > stored > >>>>> for > >>>>>>> sessions. We don't need to save the gap with each event, but only > for > >>>>>> each > >>>>>>> session window. To avoid upgrading existing session window, how > about > >>>>>>> create a new Window type extended from SessionWindow along with a > new > >>>>>>> KeySchema? > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> Lei > >>>>>>> > >>>>>>> > >>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wangg...@gmail.com> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hello Lei, > >>>>>>>> > >>>>>>>> Thanks for the proposal. I've just made a quick pass over it and > >>> there > >>>>>> is a > >>>>>>>> question I have: > >>>>>>>> > >>>>>>>> The session windows are defined per key, i.e. does that mean that > >>> each > >>>>>>>> incoming record of the key can dynamically change the gap of the > >>>>> window? > >>>>>>>> For example, say you have the following record for the same key > >>> coming > >>>>>> in > >>>>>>>> order, where the first time is the timestamp of the record, and > the > >>>>>> second > >>>>>>>> value is the extracted gap value: > >>>>>>>> > >>>>>>>> (10, 10), (19, 5), ... > >>>>>>>> > >>>>>>>> > >>>>>>>> When we receive the first record at time 10, the gap is extracted > as > >>>>> 10, > >>>>>>>> and hence the window will be expired at 20 if no other record is > >>>>>> received. > >>>>>>>> When we receive the second record at time 19, the gap is modified > to > >>>>> 5, > >>>>>> and > >>>>>>>> hence the window will be expired at 24 if no other record is > >>> received. > >>>>>>>> > >>>>>>>> > >>>>>>>> If that's the case, I'm wondering how out-of-order data can be > >>> handled > >>>>>>>> then, consider this stream: > >>>>>>>> > >>>>>>>> (10, 10), (19, 5), (15, 3) ... > >>>>>>>> > >>>>>>>> I.e. you received a late record indicating at timestamp 15, which > >>>>>> shorten > >>>>>>>> the gap to 3. It means that the window SHOULD actually be expired > at > >>>>> 18, > >>>>>>>> and hence the next record (19, 5) should be for a new session > >>> already. > >>>>>>>> Today Streams session window implementation does not do "window > >>>>> split", > >>>>>> so > >>>>>>>> have you thought about how this can be extended? > >>>>>>>> > >>>>>>>> Also since in your proposal each session window's gap value would > be > >>>>>>>> different, we need to store this value along with each record > then, > >>>>> how > >>>>>>>> would we store it, and what would be the upgrade path if it is > not a > >>>>>>>> compatible change on disk storage etc? > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <ley...@gmail.com> > wrote: > >>>>>>>> > >>>>>>>>> Hi All, > >>>>>>>>> > >>>>>>>>> I created a KIP to add dynamic gap session window support to > Kafka > >>>>>>>> Streams > >>>>>>>>> DSL. > >>>>>>>>> > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>>>>>>> 362%3A+Support+dynamic+gap+session+window > >>>>>>>>> > >>>>>>>>> Please take a look, > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Lei > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>> > >>> > >> > > > >