Any update on this KIP? On 9/20/18 3:30 PM, Matthias J. Sax wrote: > Thanks for following up. Very nice examples! > > I think, that the window definition for Flink is semantically > questionable. If there is only a single record, why is the window > defined as [ts, ts+gap]? To me, this definition is not sound and seems > to be arbitrary. To define the windows as [ts-gap,ts+gap] as you mention > would be semantically more useful -- still, I think that defining the > window as [ts,ts] as we do currently in Kafka Streams is semantically > the best. > > I have the impression, that Flink only defines them differently, because > it solves the issues in the implementation. (Ie, an implementation > details leaks into the semantics, what is usually not desired.) > > However, I believe that we could change the implementation accordingly. > We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap]) in > RocksDB, but at API level we return [ts,ts]. This way, we can still find > all windows we need and provide the same deterministic behavior and keep > the current window boundaries on the semantic level (there is no need to > store the window start and/or end time). With this technique, we can > also implement dynamic session gaps. I think, we would need to store the > used "gap" for each window, too. But again, this would be an > implementation detail. > > Let's see what others think. > > One tricky question we would need to address is, how we can be backward > compatible. I am currently working on KIP-258 that should help to > address this backward compatibility issue though. > > > -Matthias > > > > On 9/19/18 5:17 PM, Lei Chen wrote: >> Thanks Matthias. That makes sense. >> >> You're right that symmetric merge is necessary to ensure consistency. On >> the other hand, I kinda feel it defeats the purpose of dynamic gap, which >> is to update the gap from old value to new value. The symmetric merge >> always honor the larger gap in both direction, rather than honor the gap >> carried by record with larger timestamp. I wasn't able to find any semantic >> definitions w.r.t this particular aspect online, but spent some time >> looking into other streaming engines like Apache Flink. >> >> Apache Flink defines the window differently, that uses (start time, start >> time + gap). >> >> so our previous example (10, 10), (19,5),(15,3) in Flink's case will be: >> [10,20] >> [19,24] => merged to [10,24] >> [15,18] => merged to [10,24] >> >> while example (15,3)(19,5)(10,10) will be >> [15,18] >> [19,24] => no merge >> [10,20] => merged to [10,24] >> >> however, since it only records gap in future direction, not past, a late >> record might not trigger any merge where in symmetric merge it would. >> (7,2),(10, 10), (19,5),(15,3) >> [7,9] >> [10,20] >> [19,24] => merged to [10,24] >> [15,18] => merged to [10,24] >> so at the end >> two windows [7,9][10,24] are there. >> >> As you can see, in Flink, the gap semantic is more toward to the way that, >> a gap carried by one record only affects how this record merges with future >> records. e.g. a later event (T2, G2) will only be merged with (T1, G1) is >> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's call >> this "forward-merge" way of handling this. I just went thought some source >> code and if my understanding is incorrect about Flink's implementation, >> please correct me. >> >> On the other hand, if we want to do symmetric merge in Kafka Streams, we >> can change the window definition to [start time - gap, start time + gap]. >> This way the example (7,2),(10, 10), (19,5),(15,3) will be >> [5,9] >> [0,20] => merged to [0,20] >> [14,24] => merged to [0,24] >> [12,18] => merged to [0,24] >> >> (19,5),(15,3)(7,2),(10, 10) will generate same result >> [14,24] >> [12,18] => merged to [12,24] >> [5,9] => no merge >> [0,20] => merged to [0,24] >> >> Note that symmetric-merge would require us to change the way how Kafka >> Steams fetch windows now, instead of fetching range from timestamp-gap to >> timestamp+gap, we will need to fetch all windows that are not expired yet. >> On the other hand, I'm not sure how this will impact the current logic of >> how a window is considered as closed, because the window doesn't carry end >> timestamp anymore, but end timestamp + gap. >> >> So do you guys think forward-merge approach used by Flink makes more sense >> in Kafka Streams, or symmetric-merge makes more sense? Both of them seems >> to me can give deterministic result. >> >> BTW I'll add the use case into original KIP. >> >> Lei >> >> >> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Thanks for explaining your understanding. And thanks for providing more >>> details about the use-case. Maybe you can add this to the KIP? >>> >>> >>> First one general comment. I guess that my and Guozhangs understanding >>> about gap/close/gracePeriod is the same as yours -- we might not have >>> use the term precisely correct in previous email. >>> >>> >>> To you semantics of gap in detail: >>> >>>> I thought when (15,3) is received, kafka streams look up for neighbor >>>> record/window that is within the gap >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created its >>> own >>>> window [10, 10], which is >>>> out of the gap, so nothing will be found and no merge occurs. Hence we >>> have >>>> two windows now in session store, >>>> [10, 10] and [15, 15] respectively. >>> >>> If you have record (10,10), we currently create a window of size >>> [10,10]. When record (15,3) arrives, your observation that the gap 3 is >>> too small to be merged into [10,10] window -- however, merging is a >>> symmetric operation and the existing window of [10,10] has a gap of 10 >>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is >>> merged into the existing window resulting in window [10,15]. >>> >>> If we don't respect the gap both ways, we end up with inconsistencies if >>> data is out-of-order. For example, if we use the same input record >>> (10,10) and (15,3) from above, and it happens that (15,3) is processed >>> first, when processing out-of-order record (10,10) we would want to >>> merge both into a single window, too? >>> >>> Does this make sense? >>> >>> Now the question remains, if two records with different gap parameter >>> are merged, which gap should we apply for processing/merging future >>> records into the window? It seems, that we should use the gap parameter >>> from the record with this larges timestamp. In the example above (15,3). >>> We would use gap 3 after merging independent of the order of processing. >>> >>> >>>> Also another thing worth mentioning is that, the session window object >>>> created in current kafka streams >>>> implementation doesn't have gap info, it has start and end, which is the >>>> earliest and latest event timestamp >>>> in that window interval, i.e for (10,10), the session window gets created >>>> is [10,10], rather than [10,20]. Just to clarify >>>> so that it's clear why (10,10) cannot be fetched when looking for gap of >>>> (15,3), it's because the end boundary 10 of >>>> [10,10] is smaller than search boundary [12,18]. >>> >>> We don't need to store the gap, because the gap is know from the window >>> definition. The created window size depends on the data that is >>> contained in the window. I guess one could define it differently, too, >>> ie, for the (10,10) record, we create a window [0,20] -- not sure if it >>> makes a big difference in practice though. Note, that creating window >>> [10,20] would not be correct, because the gap must be applied in both >>> directions, not just into the future. >>> >>> About the second part: the search would not be applied from (15,3) in >>> range [12,18], but from existing window [10,10] into range [0,20] and 15 >>> is contained there. This example also shows, that we would need to come >>> up with a clever way, to identify window [10,10] when processing (15,3) >>> -- not sure atm how to do this. However, only consider (15,3) would >>> result in inconsistencies for out-of-order data as pointed out above and >>> would not be sufficient. >>> >>> >>> Does this make sense? >>> >>> >>> Or is there another way to define dynamic session gap semantics in a >>> deterministic way with regard to out-of-order data? >>> >>> >>> >>> -Matthias >>> >>> >>> On 9/11/18 4:28 PM, Lei Chen wrote: >>>> Thanks Matthias and Guozhang for the response. >>>> >>>> Seems like our understanding mainly differs in the semantics of gap in >>>> session windows. >>>> >>>> My understanding is that gap is used to merge nearby records together >>> such >>>> that no record >>>> in the merged window has distance later than gap. In Kafka Streams's >>>> implementation it's >>>> mainly used to find neighbor records/windows in session store so that >>>> nearby records can >>>> be merge. It is NOT used to determine when a window should be closed, >>> which >>>> is in >>>> fact determined by window's grace period. >>>> >>>> Guozhang you said "b. When later we received (15, 3), it means that this >>>> record ** changed ** >>>> the window gap interval from 10 to 3, and hence we received a new record >>> at >>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3) >>> if >>>> we have not received any new data, the window should be closed, i.e. the >>>> window is now [10, 18) which includes two records at 10 and 15." >>>> >>>> This is different from what i thought will happen. >>>> >>>> I thought when (15,3) is received, kafka streams look up for neighbor >>>> record/window that is within the gap >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created its >>> own >>>> window [10, 10], which is >>>> out of the gap, so nothing will be found and no merge occurs. Hence we >>> have >>>> two windows now in session store, >>>> [10, 10] and [15, 15] respectively. >>>> >>>> Also another thing worth mentioning is that, the session window object >>>> created in current kafka streams >>>> implementation doesn't have gap info, it has start and end, which is the >>>> earliest and latest event timestamp >>>> in that window interval, i.e for (10,10), the session window gets created >>>> is [10,10], rather than [10,20]. Just to clarify >>>> so that it's clear why (10,10) cannot be fetched when looking for gap of >>>> (15,3), it's because the end boundary 10 of >>>> [10,10] is smaller than search boundary [12,18]. >>>> >>>> Please correct me if my understanding is wrong here. >>>> >>>> @Matthias, to answer your use case question, we have an use case where >>>> asynchronous time series data >>>> are received in the stream, from different contributors, with different >>>> quality and at different pace. >>>> Inside Kafka Streams, we use state to maintain statistic aggregations and >>>> other mathematics model to track >>>> the liquidity and calculate time decay rate and dynamic gap, so that at >>>> runtime, for each contributor we can >>>> 1. determine how many historical records we should maintain in state. >>>> 2. for each incoming record, output a record using aggregations from >>>> *nearby* records from that contributor. >>>> Why fixed gap session window doesn't work here? Because the definition of >>>> "nearby" here is determined by >>>> several very dynamic factors in our case, it changes not only with >>>> different hours in a day, but also related to >>>> other contributors. >>>> The purpose of this KIP is to suggest a dynamic session window >>>> implementation so that we can embed such >>>> dynamic "nearby" calculation capability into kafka streams session >>> windows >>>> semantics. Hope it makes sense to you. >>>> >>>> Lei >>>> >>>> >>>> >>>> >>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wangg...@gmail.com> >>> wrote: >>>> >>>>> Hello Lei, >>>>> >>>>> As Matthias mentioned, the key question here is that because of the late >>>>> arrivals of records which may indicate a shorter session gap interval, >>> some >>>>> session windows may be "mistakenly" merged and hence need to be undone >>> the >>>>> merge, i.e. to split them again. >>>>> >>>>> Back to my example, you are right that the processing result of >>>>> >>>>> (10, 10), (19, 5), (15, 3) .. >>>>> >>>>> should be the same as the processing result of >>>>> >>>>> (10, 10), (15, 3), (19, 5) .. >>>>> >>>>> Note that the second value is NOT the window end time, but the extracted >>>>> window gap interval, as you suggested in the KIP this value can be >>>>> dynamically changed >>>>> >>>>> a. If you take a look at the second ordering, when we receive (10, 10) >>> it >>>>> means a window starting at 10 is created, and its gap interval is 10, >>> which >>>>> means that if by the timestamp of 20 we do not receive any new data, >>> then >>>>> the window should be closed, i.e. the window [10, 20). >>>>> >>>>> b. When later we received (15, 3), it means that this record ** changed >>> ** >>>>> the window gap interval from 10 to 3, and hence we received a new >>> record at >>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 + >>> 3) if >>>>> we have not received any new data, the window should be closed, i.e. the >>>>> window is now [10, 18) which includes two records at 10 and 15. >>>>> >>>>> c. The third record is received at 19, which is after the window close >>> time >>>>> 18, it means that we should now start a new window starting at 19, i.e. >>> the >>>>> window is [19, 24), >>>>> >>>>> >>>>> BUT, because of the out of ordering, we did not receive (15, 3) in time, >>>>> but received (19, 5), it will cause us to mistakenly merge the window of >>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received >>> (15, 3) >>>>> we realized that the previous window should have been ended at 18. >>>>> >>>>> Does that make sense to you? >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <matth...@confluent.io> >>>>> wrote: >>>>> >>>>>> I cannot follow the example: >>>>>> >>>>>>>> (10, 10), (15, 3), (19, 5) ... >>>>>> >>>>>> First, [10,10] is created, second the window is extended to [10,15], >>> and >>>>>> third [19,19] is created. Why would there be a [15,15]? And why would >>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and >>>>>> thus [19,19] should be its own window? >>>>>> >>>>>>> Take a look at another example, >>>>>>> (13, 3), (19, 5), (15, 3) ... >>>>>>> >>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved and >>>>>>> merged to a new window [13, 15], then [19,19] should be updated to >>> [13, >>>>>>> 19]. Correct? >>>>>> >>>>>> This example makes sense. However, Guozhang's example was different. >>> The >>>>>> late even, _reduces_ the gap and this can lead to a window split. >>>>>> Guozhang's example was >>>>>> >>>>>>>>> (10, 10), (19, 5), (15, 3) ... >>>>>> >>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so 10 >>> and >>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and >>>>>> [19,19] must be two windows, ie, original window [10,19] must be split. >>>>>> >>>>>> >>>>>> Or maybe you have different semantic about gaps are dynamically >>> modified >>>>>> in mind? It's a little unclear for the KIP itself what semantics >>> dynamic >>>>>> sessions windows should have. >>>>>> >>>>>> >>>>>> What is also unclear to me atm is, what use cases you have in mind? The >>>>>> KIP only says >>>>>> >>>>>>> the statistical aggregation result, liquidity of the records, >>>>>> >>>>>> >>>>>> I am not sure what this means. Can you elaborate? >>>>>> >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>>> On 8/30/18 3:32 PM, Lei Chen wrote: >>>>>>> Hi Guozhang, >>>>>>> >>>>>>> Thanks for reviewing the proposal. I didn't think of out of order >>>>> events >>>>>>> and glad that you brought it up. >>>>>>> >>>>>>> In the example you gave, >>>>>>> >>>>>>> (10, 10), (19, 5), (15, 3) ... >>>>>>> >>>>>>> my understanding is that the correct result window should be the same >>>>> as >>>>>> in >>>>>>> order events >>>>>>> >>>>>>> (10, 10), (15, 3), (19, 5) ... >>>>>>> >>>>>>> when (15, 3) is received, [15,15] is creatd >>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and [15, >>> 19] >>>>>> is >>>>>>> created, meanwhile [15,15] is removed >>>>>>> >>>>>>> back to out of order case, >>>>>>> >>>>>>> when (19 ,5) is received, [19, 19] is created >>>>>>> when (15, 3) is received, in order to generate the same result, >>>>>>> 1. if late event is later than retention period, it will be dropped >>>>>>> 2. otherwise, adjacent session windows within gap should be retrieved >>>>> and >>>>>>> merged accordingly, in this case [19, 19], and create a new session >>>>> [15, >>>>>> 19] >>>>>>> I'm little confused when you said "the window [15, 15] SHOULD actually >>>>> be >>>>>>> expired at 18 and hence the next record (19, 5) should be for a new >>>>>> session >>>>>>> already.". If i understand it correctly, the expiration of the window >>>>> is >>>>>>> only checked when next event (19,5) comes and then it should be merged >>>>> to >>>>>>> it. [15, 15] will then be closed. Is that also what you meant? >>>>>>> I cannot think of a case where a window will be split by a late event, >>>>>>> because if event A and C fall into the same session window, a late >>>>> event >>>>>> B >>>>>>> in middle will definitely fall into C's gap as well. IOW, late event >>>>> will >>>>>>> only cause window extension, not split. >>>>>>> >>>>>>> Take a look at another example, >>>>>>> (13, 3), (19, 5), (15, 3) ... >>>>>>> >>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved and >>>>>>> merged to a new window [13, 15], then [19,19] should be updated to >>> [13, >>>>>>> 19]. Correct? >>>>>>> >>>>>>> To be able to achieve that, like you said, the gap needs to be stored >>>>> for >>>>>>> sessions. We don't need to save the gap with each event, but only for >>>>>> each >>>>>>> session window. To avoid upgrading existing session window, how about >>>>>>> create a new Window type extended from SessionWindow along with a new >>>>>>> KeySchema? >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Lei >>>>>>> >>>>>>> >>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <wangg...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>>> Hello Lei, >>>>>>>> >>>>>>>> Thanks for the proposal. I've just made a quick pass over it and >>> there >>>>>> is a >>>>>>>> question I have: >>>>>>>> >>>>>>>> The session windows are defined per key, i.e. does that mean that >>> each >>>>>>>> incoming record of the key can dynamically change the gap of the >>>>> window? >>>>>>>> For example, say you have the following record for the same key >>> coming >>>>>> in >>>>>>>> order, where the first time is the timestamp of the record, and the >>>>>> second >>>>>>>> value is the extracted gap value: >>>>>>>> >>>>>>>> (10, 10), (19, 5), ... >>>>>>>> >>>>>>>> >>>>>>>> When we receive the first record at time 10, the gap is extracted as >>>>> 10, >>>>>>>> and hence the window will be expired at 20 if no other record is >>>>>> received. >>>>>>>> When we receive the second record at time 19, the gap is modified to >>>>> 5, >>>>>> and >>>>>>>> hence the window will be expired at 24 if no other record is >>> received. >>>>>>>> >>>>>>>> >>>>>>>> If that's the case, I'm wondering how out-of-order data can be >>> handled >>>>>>>> then, consider this stream: >>>>>>>> >>>>>>>> (10, 10), (19, 5), (15, 3) ... >>>>>>>> >>>>>>>> I.e. you received a late record indicating at timestamp 15, which >>>>>> shorten >>>>>>>> the gap to 3. It means that the window SHOULD actually be expired at >>>>> 18, >>>>>>>> and hence the next record (19, 5) should be for a new session >>> already. >>>>>>>> Today Streams session window implementation does not do "window >>>>> split", >>>>>> so >>>>>>>> have you thought about how this can be extended? >>>>>>>> >>>>>>>> Also since in your proposal each session window's gap value would be >>>>>>>> different, we need to store this value along with each record then, >>>>> how >>>>>>>> would we store it, and what would be the upgrade path if it is not a >>>>>>>> compatible change on disk storage etc? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <ley...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I created a KIP to add dynamic gap session window support to Kafka >>>>>>>> Streams >>>>>>>>> DSL. >>>>>>>>> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>> 362%3A+Support+dynamic+gap+session+window >>>>>>>>> >>>>>>>>> Please take a look, >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Lei >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature