Hello Lei, Just checking what's the current status of this KIP. We have a KIP deadline for 2.2 on 24th and wondering if this one may be able to make it.
Guozhang On Sat, Dec 15, 2018 at 1:01 PM Lei Chen <ley...@gmail.com> wrote: > Sorry for the late reply Matthias. Have been busy with other work recently. > I'll restart the discussion and update the KIP accordingly. > > Lei > > On Tue, Nov 6, 2018 at 3:11 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > Any update on this KIP? > > > > On 9/20/18 3:30 PM, Matthias J. Sax wrote: > > > Thanks for following up. Very nice examples! > > > > > > I think, that the window definition for Flink is semantically > > > questionable. If there is only a single record, why is the window > > > defined as [ts, ts+gap]? To me, this definition is not sound and seems > > > to be arbitrary. To define the windows as [ts-gap,ts+gap] as you > mention > > > would be semantically more useful -- still, I think that defining the > > > window as [ts,ts] as we do currently in Kafka Streams is semantically > > > the best. > > > > > > I have the impression, that Flink only defines them differently, > because > > > it solves the issues in the implementation. (Ie, an implementation > > > details leaks into the semantics, what is usually not desired.) > > > > > > However, I believe that we could change the implementation accordingly. > > > We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap]) > in > > > RocksDB, but at API level we return [ts,ts]. This way, we can still > find > > > all windows we need and provide the same deterministic behavior and > keep > > > the current window boundaries on the semantic level (there is no need > to > > > store the window start and/or end time). With this technique, we can > > > also implement dynamic session gaps. I think, we would need to store > the > > > used "gap" for each window, too. But again, this would be an > > > implementation detail. > > > > > > Let's see what others think. > > > > > > One tricky question we would need to address is, how we can be backward > > > compatible. I am currently working on KIP-258 that should help to > > > address this backward compatibility issue though. > > > > > > > > > -Matthias > > > > > > > > > > > > On 9/19/18 5:17 PM, Lei Chen wrote: > > >> Thanks Matthias. That makes sense. > > >> > > >> You're right that symmetric merge is necessary to ensure consistency. > On > > >> the other hand, I kinda feel it defeats the purpose of dynamic gap, > > which > > >> is to update the gap from old value to new value. The symmetric merge > > >> always honor the larger gap in both direction, rather than honor the > gap > > >> carried by record with larger timestamp. I wasn't able to find any > > semantic > > >> definitions w.r.t this particular aspect online, but spent some time > > >> looking into other streaming engines like Apache Flink. > > >> > > >> Apache Flink defines the window differently, that uses (start time, > > start > > >> time + gap). > > >> > > >> so our previous example (10, 10), (19,5),(15,3) in Flink's case will > be: > > >> [10,20] > > >> [19,24] => merged to [10,24] > > >> [15,18] => merged to [10,24] > > >> > > >> while example (15,3)(19,5)(10,10) will be > > >> [15,18] > > >> [19,24] => no merge > > >> [10,20] => merged to [10,24] > > >> > > >> however, since it only records gap in future direction, not past, a > late > > >> record might not trigger any merge where in symmetric merge it would. > > >> (7,2),(10, 10), (19,5),(15,3) > > >> [7,9] > > >> [10,20] > > >> [19,24] => merged to [10,24] > > >> [15,18] => merged to [10,24] > > >> so at the end > > >> two windows [7,9][10,24] are there. > > >> > > >> As you can see, in Flink, the gap semantic is more toward to the way > > that, > > >> a gap carried by one record only affects how this record merges with > > future > > >> records. e.g. a later event (T2, G2) will only be merged with (T1, G1) > > is > > >> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's > call > > >> this "forward-merge" way of handling this. I just went thought some > > source > > >> code and if my understanding is incorrect about Flink's > implementation, > > >> please correct me. > > >> > > >> On the other hand, if we want to do symmetric merge in Kafka Streams, > we > > >> can change the window definition to [start time - gap, start time + > > gap]. > > >> This way the example (7,2),(10, 10), (19,5),(15,3) will be > > >> [5,9] > > >> [0,20] => merged to [0,20] > > >> [14,24] => merged to [0,24] > > >> [12,18] => merged to [0,24] > > >> > > >> (19,5),(15,3)(7,2),(10, 10) will generate same result > > >> [14,24] > > >> [12,18] => merged to [12,24] > > >> [5,9] => no merge > > >> [0,20] => merged to [0,24] > > >> > > >> Note that symmetric-merge would require us to change the way how Kafka > > >> Steams fetch windows now, instead of fetching range from timestamp-gap > > to > > >> timestamp+gap, we will need to fetch all windows that are not expired > > yet. > > >> On the other hand, I'm not sure how this will impact the current logic > > of > > >> how a window is considered as closed, because the window doesn't carry > > end > > >> timestamp anymore, but end timestamp + gap. > > >> > > >> So do you guys think forward-merge approach used by Flink makes more > > sense > > >> in Kafka Streams, or symmetric-merge makes more sense? Both of them > > seems > > >> to me can give deterministic result. > > >> > > >> BTW I'll add the use case into original KIP. > > >> > > >> Lei > > >> > > >> > > >> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax < > matth...@confluent.io> > > >> wrote: > > >> > > >>> Thanks for explaining your understanding. And thanks for providing > more > > >>> details about the use-case. Maybe you can add this to the KIP? > > >>> > > >>> > > >>> First one general comment. I guess that my and Guozhangs > understanding > > >>> about gap/close/gracePeriod is the same as yours -- we might not have > > >>> use the term precisely correct in previous email. > > >>> > > >>> > > >>> To you semantics of gap in detail: > > >>> > > >>>> I thought when (15,3) is received, kafka streams look up for > neighbor > > >>>> record/window that is within the gap > > >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created > > its > > >>> own > > >>>> window [10, 10], which is > > >>>> out of the gap, so nothing will be found and no merge occurs. Hence > we > > >>> have > > >>>> two windows now in session store, > > >>>> [10, 10] and [15, 15] respectively. > > >>> > > >>> If you have record (10,10), we currently create a window of size > > >>> [10,10]. When record (15,3) arrives, your observation that the gap 3 > is > > >>> too small to be merged into [10,10] window -- however, merging is a > > >>> symmetric operation and the existing window of [10,10] has a gap of > 10 > > >>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is > > >>> merged into the existing window resulting in window [10,15]. > > >>> > > >>> If we don't respect the gap both ways, we end up with inconsistencies > > if > > >>> data is out-of-order. For example, if we use the same input record > > >>> (10,10) and (15,3) from above, and it happens that (15,3) is > processed > > >>> first, when processing out-of-order record (10,10) we would want to > > >>> merge both into a single window, too? > > >>> > > >>> Does this make sense? > > >>> > > >>> Now the question remains, if two records with different gap parameter > > >>> are merged, which gap should we apply for processing/merging future > > >>> records into the window? It seems, that we should use the gap > parameter > > >>> from the record with this larges timestamp. In the example above > > (15,3). > > >>> We would use gap 3 after merging independent of the order of > > processing. > > >>> > > >>> > > >>>> Also another thing worth mentioning is that, the session window > object > > >>>> created in current kafka streams > > >>>> implementation doesn't have gap info, it has start and end, which is > > the > > >>>> earliest and latest event timestamp > > >>>> in that window interval, i.e for (10,10), the session window gets > > created > > >>>> is [10,10], rather than [10,20]. Just to clarify > > >>>> so that it's clear why (10,10) cannot be fetched when looking for > gap > > of > > >>>> (15,3), it's because the end boundary 10 of > > >>>> [10,10] is smaller than search boundary [12,18]. > > >>> > > >>> We don't need to store the gap, because the gap is know from the > window > > >>> definition. The created window size depends on the data that is > > >>> contained in the window. I guess one could define it differently, > too, > > >>> ie, for the (10,10) record, we create a window [0,20] -- not sure if > it > > >>> makes a big difference in practice though. Note, that creating window > > >>> [10,20] would not be correct, because the gap must be applied in both > > >>> directions, not just into the future. > > >>> > > >>> About the second part: the search would not be applied from (15,3) in > > >>> range [12,18], but from existing window [10,10] into range [0,20] and > > 15 > > >>> is contained there. This example also shows, that we would need to > come > > >>> up with a clever way, to identify window [10,10] when processing > (15,3) > > >>> -- not sure atm how to do this. However, only consider (15,3) would > > >>> result in inconsistencies for out-of-order data as pointed out above > > and > > >>> would not be sufficient. > > >>> > > >>> > > >>> Does this make sense? > > >>> > > >>> > > >>> Or is there another way to define dynamic session gap semantics in a > > >>> deterministic way with regard to out-of-order data? > > >>> > > >>> > > >>> > > >>> -Matthias > > >>> > > >>> > > >>> On 9/11/18 4:28 PM, Lei Chen wrote: > > >>>> Thanks Matthias and Guozhang for the response. > > >>>> > > >>>> Seems like our understanding mainly differs in the semantics of gap > in > > >>>> session windows. > > >>>> > > >>>> My understanding is that gap is used to merge nearby records > together > > >>> such > > >>>> that no record > > >>>> in the merged window has distance later than gap. In Kafka Streams's > > >>>> implementation it's > > >>>> mainly used to find neighbor records/windows in session store so > that > > >>>> nearby records can > > >>>> be merge. It is NOT used to determine when a window should be > closed, > > >>> which > > >>>> is in > > >>>> fact determined by window's grace period. > > >>>> > > >>>> Guozhang you said "b. When later we received (15, 3), it means that > > this > > >>>> record ** changed ** > > >>>> the window gap interval from 10 to 3, and hence we received a new > > record > > >>> at > > >>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 > + > > 3) > > >>> if > > >>>> we have not received any new data, the window should be closed, i.e. > > the > > >>>> window is now [10, 18) which includes two records at 10 and 15." > > >>>> > > >>>> This is different from what i thought will happen. > > >>>> > > >>>> I thought when (15,3) is received, kafka streams look up for > neighbor > > >>>> record/window that is within the gap > > >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created > > its > > >>> own > > >>>> window [10, 10], which is > > >>>> out of the gap, so nothing will be found and no merge occurs. Hence > we > > >>> have > > >>>> two windows now in session store, > > >>>> [10, 10] and [15, 15] respectively. > > >>>> > > >>>> Also another thing worth mentioning is that, the session window > object > > >>>> created in current kafka streams > > >>>> implementation doesn't have gap info, it has start and end, which is > > the > > >>>> earliest and latest event timestamp > > >>>> in that window interval, i.e for (10,10), the session window gets > > created > > >>>> is [10,10], rather than [10,20]. Just to clarify > > >>>> so that it's clear why (10,10) cannot be fetched when looking for > gap > > of > > >>>> (15,3), it's because the end boundary 10 of > > >>>> [10,10] is smaller than search boundary [12,18]. > > >>>> > > >>>> Please correct me if my understanding is wrong here. > > >>>> > > >>>> @Matthias, to answer your use case question, we have an use case > where > > >>>> asynchronous time series data > > >>>> are received in the stream, from different contributors, with > > different > > >>>> quality and at different pace. > > >>>> Inside Kafka Streams, we use state to maintain statistic > aggregations > > and > > >>>> other mathematics model to track > > >>>> the liquidity and calculate time decay rate and dynamic gap, so that > > at > > >>>> runtime, for each contributor we can > > >>>> 1. determine how many historical records we should maintain in > state. > > >>>> 2. for each incoming record, output a record using aggregations from > > >>>> *nearby* records from that contributor. > > >>>> Why fixed gap session window doesn't work here? Because the > > definition of > > >>>> "nearby" here is determined by > > >>>> several very dynamic factors in our case, it changes not only with > > >>>> different hours in a day, but also related to > > >>>> other contributors. > > >>>> The purpose of this KIP is to suggest a dynamic session window > > >>>> implementation so that we can embed such > > >>>> dynamic "nearby" calculation capability into kafka streams session > > >>> windows > > >>>> semantics. Hope it makes sense to you. > > >>>> > > >>>> Lei > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wangg...@gmail.com> > > >>> wrote: > > >>>> > > >>>>> Hello Lei, > > >>>>> > > >>>>> As Matthias mentioned, the key question here is that because of the > > late > > >>>>> arrivals of records which may indicate a shorter session gap > > interval, > > >>> some > > >>>>> session windows may be "mistakenly" merged and hence need to be > > undone > > >>> the > > >>>>> merge, i.e. to split them again. > > >>>>> > > >>>>> Back to my example, you are right that the processing result of > > >>>>> > > >>>>> (10, 10), (19, 5), (15, 3) .. > > >>>>> > > >>>>> should be the same as the processing result of > > >>>>> > > >>>>> (10, 10), (15, 3), (19, 5) .. > > >>>>> > > >>>>> Note that the second value is NOT the window end time, but the > > extracted > > >>>>> window gap interval, as you suggested in the KIP this value can be > > >>>>> dynamically changed > > >>>>> > > >>>>> a. If you take a look at the second ordering, when we receive (10, > > 10) > > >>> it > > >>>>> means a window starting at 10 is created, and its gap interval is > 10, > > >>> which > > >>>>> means that if by the timestamp of 20 we do not receive any new > data, > > >>> then > > >>>>> the window should be closed, i.e. the window [10, 20). > > >>>>> > > >>>>> b. When later we received (15, 3), it means that this record ** > > changed > > >>> ** > > >>>>> the window gap interval from 10 to 3, and hence we received a new > > >>> record at > > >>>>> 15, with the new window gap of 3, it means that by timestamp 18 > (15 + > > >>> 3) if > > >>>>> we have not received any new data, the window should be closed, > i.e. > > the > > >>>>> window is now [10, 18) which includes two records at 10 and 15. > > >>>>> > > >>>>> c. The third record is received at 19, which is after the window > > close > > >>> time > > >>>>> 18, it means that we should now start a new window starting at 19, > > i.e. > > >>> the > > >>>>> window is [19, 24), > > >>>>> > > >>>>> > > >>>>> BUT, because of the out of ordering, we did not receive (15, 3) in > > time, > > >>>>> but received (19, 5), it will cause us to mistakenly merge the > > window of > > >>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received > > >>> (15, 3) > > >>>>> we realized that the previous window should have been ended at 18. > > >>>>> > > >>>>> Does that make sense to you? > > >>>>> > > >>>>> > > >>>>> Guozhang > > >>>>> > > >>>>> > > >>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax < > > matth...@confluent.io> > > >>>>> wrote: > > >>>>> > > >>>>>> I cannot follow the example: > > >>>>>> > > >>>>>>>> (10, 10), (15, 3), (19, 5) ... > > >>>>>> > > >>>>>> First, [10,10] is created, second the window is extended to > [10,15], > > >>> and > > >>>>>> third [19,19] is created. Why would there be a [15,15]? And why > > would > > >>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) > and > > >>>>>> thus [19,19] should be its own window? > > >>>>>> > > >>>>>>> Take a look at another example, > > >>>>>>> (13, 3), (19, 5), (15, 3) ... > > >>>>>>> > > >>>>>>> in this case when (15, 3) is received, [13,13] should be > retrieved > > and > > >>>>>>> merged to a new window [13, 15], then [19,19] should be updated > to > > >>> [13, > > >>>>>>> 19]. Correct? > > >>>>>> > > >>>>>> This example makes sense. However, Guozhang's example was > different. > > >>> The > > >>>>>> late even, _reduces_ the gap and this can lead to a window split. > > >>>>>> Guozhang's example was > > >>>>>> > > >>>>>>>>> (10, 10), (19, 5), (15, 3) ... > > >>>>>> > > >>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so > 10 > > >>> and > > >>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] > > and > > >>>>>> [19,19] must be two windows, ie, original window [10,19] must be > > split. > > >>>>>> > > >>>>>> > > >>>>>> Or maybe you have different semantic about gaps are dynamically > > >>> modified > > >>>>>> in mind? It's a little unclear for the KIP itself what semantics > > >>> dynamic > > >>>>>> sessions windows should have. > > >>>>>> > > >>>>>> > > >>>>>> What is also unclear to me atm is, what use cases you have in > mind? > > The > > >>>>>> KIP only says > > >>>>>> > > >>>>>>> the statistical aggregation result, liquidity of the records, > > >>>>>> > > >>>>>> > > >>>>>> I am not sure what this means. Can you elaborate? > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> -Matthias > > >>>>>> > > >>>>>> > > >>>>>> > > >>>>>> On 8/30/18 3:32 PM, Lei Chen wrote: > > >>>>>>> Hi Guozhang, > > >>>>>>> > > >>>>>>> Thanks for reviewing the proposal. I didn't think of out of order > > >>>>> events > > >>>>>>> and glad that you brought it up. > > >>>>>>> > > >>>>>>> In the example you gave, > > >>>>>>> > > >>>>>>> (10, 10), (19, 5), (15, 3) ... > > >>>>>>> > > >>>>>>> my understanding is that the correct result window should be the > > same > > >>>>> as > > >>>>>> in > > >>>>>>> order events > > >>>>>>> > > >>>>>>> (10, 10), (15, 3), (19, 5) ... > > >>>>>>> > > >>>>>>> when (15, 3) is received, [15,15] is creatd > > >>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and > [15, > > >>> 19] > > >>>>>> is > > >>>>>>> created, meanwhile [15,15] is removed > > >>>>>>> > > >>>>>>> back to out of order case, > > >>>>>>> > > >>>>>>> when (19 ,5) is received, [19, 19] is created > > >>>>>>> when (15, 3) is received, in order to generate the same result, > > >>>>>>> 1. if late event is later than retention period, it will be > dropped > > >>>>>>> 2. otherwise, adjacent session windows within gap should be > > retrieved > > >>>>> and > > >>>>>>> merged accordingly, in this case [19, 19], and create a new > session > > >>>>> [15, > > >>>>>> 19] > > >>>>>>> I'm little confused when you said "the window [15, 15] SHOULD > > actually > > >>>>> be > > >>>>>>> expired at 18 and hence the next record (19, 5) should be for a > new > > >>>>>> session > > >>>>>>> already.". If i understand it correctly, the expiration of the > > window > > >>>>> is > > >>>>>>> only checked when next event (19,5) comes and then it should be > > merged > > >>>>> to > > >>>>>>> it. [15, 15] will then be closed. Is that also what you meant? > > >>>>>>> I cannot think of a case where a window will be split by a late > > event, > > >>>>>>> because if event A and C fall into the same session window, a > late > > >>>>> event > > >>>>>> B > > >>>>>>> in middle will definitely fall into C's gap as well. IOW, late > > event > > >>>>> will > > >>>>>>> only cause window extension, not split. > > >>>>>>> > > >>>>>>> Take a look at another example, > > >>>>>>> (13, 3), (19, 5), (15, 3) ... > > >>>>>>> > > >>>>>>> in this case when (15, 3) is received, [13,13] should be > retrieved > > and > > >>>>>>> merged to a new window [13, 15], then [19,19] should be updated > to > > >>> [13, > > >>>>>>> 19]. Correct? > > >>>>>>> > > >>>>>>> To be able to achieve that, like you said, the gap needs to be > > stored > > >>>>> for > > >>>>>>> sessions. We don't need to save the gap with each event, but only > > for > > >>>>>> each > > >>>>>>> session window. To avoid upgrading existing session window, how > > about > > >>>>>>> create a new Window type extended from SessionWindow along with a > > new > > >>>>>>> KeySchema? > > >>>>>>> > > >>>>>>> What do you think? > > >>>>>>> > > >>>>>>> Lei > > >>>>>>> > > >>>>>>> > > >>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang < > wangg...@gmail.com> > > >>>>>> wrote: > > >>>>>>> > > >>>>>>>> Hello Lei, > > >>>>>>>> > > >>>>>>>> Thanks for the proposal. I've just made a quick pass over it and > > >>> there > > >>>>>> is a > > >>>>>>>> question I have: > > >>>>>>>> > > >>>>>>>> The session windows are defined per key, i.e. does that mean > that > > >>> each > > >>>>>>>> incoming record of the key can dynamically change the gap of the > > >>>>> window? > > >>>>>>>> For example, say you have the following record for the same key > > >>> coming > > >>>>>> in > > >>>>>>>> order, where the first time is the timestamp of the record, and > > the > > >>>>>> second > > >>>>>>>> value is the extracted gap value: > > >>>>>>>> > > >>>>>>>> (10, 10), (19, 5), ... > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> When we receive the first record at time 10, the gap is > extracted > > as > > >>>>> 10, > > >>>>>>>> and hence the window will be expired at 20 if no other record is > > >>>>>> received. > > >>>>>>>> When we receive the second record at time 19, the gap is > modified > > to > > >>>>> 5, > > >>>>>> and > > >>>>>>>> hence the window will be expired at 24 if no other record is > > >>> received. > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> If that's the case, I'm wondering how out-of-order data can be > > >>> handled > > >>>>>>>> then, consider this stream: > > >>>>>>>> > > >>>>>>>> (10, 10), (19, 5), (15, 3) ... > > >>>>>>>> > > >>>>>>>> I.e. you received a late record indicating at timestamp 15, > which > > >>>>>> shorten > > >>>>>>>> the gap to 3. It means that the window SHOULD actually be > expired > > at > > >>>>> 18, > > >>>>>>>> and hence the next record (19, 5) should be for a new session > > >>> already. > > >>>>>>>> Today Streams session window implementation does not do "window > > >>>>> split", > > >>>>>> so > > >>>>>>>> have you thought about how this can be extended? > > >>>>>>>> > > >>>>>>>> Also since in your proposal each session window's gap value > would > > be > > >>>>>>>> different, we need to store this value along with each record > > then, > > >>>>> how > > >>>>>>>> would we store it, and what would be the upgrade path if it is > > not a > > >>>>>>>> compatible change on disk storage etc? > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> Guozhang > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <ley...@gmail.com> > > wrote: > > >>>>>>>> > > >>>>>>>>> Hi All, > > >>>>>>>>> > > >>>>>>>>> I created a KIP to add dynamic gap session window support to > > Kafka > > >>>>>>>> Streams > > >>>>>>>>> DSL. > > >>>>>>>>> > > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >>>>>>>>> 362%3A+Support+dynamic+gap+session+window > > >>>>>>>>> > > >>>>>>>>> Please take a look, > > >>>>>>>>> > > >>>>>>>>> Thanks, > > >>>>>>>>> Lei > > >>>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> > > >>>>>>>> -- > > >>>>>>>> -- Guozhang > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>>> -- > > >>>>> -- Guozhang > > >>>>> > > >>>> > > >>> > > >>> > > >> > > > > > > > > -- -- Guozhang