Any update on this KIP?

On 9/20/18 3:30 PM, Matthias J. Sax wrote:
> Thanks for following up. Very nice examples!
> I think, that the window definition for Flink is semantically
> questionable. If there is only a single record, why is the window
> defined as [ts, ts+gap]? To me, this definition is not sound and seems
> to be arbitrary. To define the windows as [ts-gap,ts+gap] as you mention
> would be semantically more useful -- still, I think that defining the
> window as [ts,ts] as we do currently in Kafka Streams is semantically
> the best.
> I have the impression, that Flink only defines them differently, because
> it solves the issues in the implementation. (Ie, an implementation
> details leaks into the semantics, what is usually not desired.)
> However, I believe that we could change the implementation accordingly.
> We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap]) in
> RocksDB, but at API level we return [ts,ts]. This way, we can still find
> all windows we need and provide the same deterministic behavior and keep
> the current window boundaries on the semantic level (there is no need to
> store the window start and/or end time). With this technique, we can
> also implement dynamic session gaps. I think, we would need to store the
> used "gap" for each window, too. But again, this would be an
> implementation detail.
> Let's see what others think.
> One tricky question we would need to address is, how we can be backward
> compatible. I am currently working on KIP-258 that should help to
> address this backward compatibility issue though.
> -Matthias
> On 9/19/18 5:17 PM, Lei Chen wrote:
>> Thanks Matthias. That makes sense.
>> You're right that symmetric merge is necessary to ensure consistency. On
>> the other hand, I kinda feel it defeats the purpose of dynamic gap, which
>> is to update the gap from old value to new value. The symmetric merge
>> always honor the larger gap in both direction, rather than honor the gap
>> carried by record with larger timestamp. I wasn't able to find any semantic
>> definitions w.r.t this particular aspect online, but spent some time
>> looking into other streaming engines like Apache Flink.
>> Apache Flink defines the window differently, that uses (start time, start
>> time + gap).
>> so our previous example (10, 10), (19,5),(15,3) in Flink's case will be:
>> [10,20]
>> [19,24] => merged to [10,24]
>> [15,18] => merged to [10,24]
>> while example (15,3)(19,5)(10,10) will be
>> [15,18]
>> [19,24] => no merge
>> [10,20] => merged to [10,24]
>> however, since it only records gap in future direction, not past, a late
>> record might not trigger any merge where in symmetric merge it would.
>> (7,2),(10, 10), (19,5),(15,3)
>> [7,9]
>> [10,20]
>> [19,24] => merged to [10,24]
>> [15,18] => merged to [10,24]
>> so at the end
>> two windows [7,9][10,24] are there.
>> As you can see, in Flink, the gap semantic is more toward to the way that,
>> a gap carried by one record only affects how this record merges with future
>> records. e.g. a later event (T2, G2) will only be merged with (T1, G1) is
>> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's call
>> this "forward-merge" way of handling this. I just went thought some source
>> code and if my understanding is incorrect about Flink's implementation,
>> please correct me.
>> On the other hand, if we want to do symmetric merge in Kafka Streams, we
>> can change the window definition to [start time - gap, start time + gap].
>> This way the example (7,2),(10, 10), (19,5),(15,3) will be
>> [5,9]
>> [0,20] => merged to [0,20]
>> [14,24] => merged to [0,24]
>> [12,18] => merged to [0,24]
>>  (19,5),(15,3)(7,2),(10, 10) will generate same result
>> [14,24]
>> [12,18] => merged to [12,24]
>> [5,9] => no merge
>> [0,20] => merged to [0,24]
>> Note that symmetric-merge would require us to change the way how Kafka
>> Steams fetch windows now, instead of fetching range from timestamp-gap to
>> timestamp+gap, we will need to fetch all windows that are not expired yet.
>> On the other hand, I'm not sure how this will impact the current logic of
>> how a window is considered as closed, because the window doesn't carry end
>> timestamp anymore, but end timestamp + gap.
>> So do you guys think forward-merge approach used by Flink makes more sense
>> in Kafka Streams, or symmetric-merge makes more sense? Both of them seems
>> to me can give deterministic result.
>> BTW I'll add the use case into original KIP.
>> Lei
>> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <>
>> wrote:
>>> Thanks for explaining your understanding. And thanks for providing more
>>> details about the use-case. Maybe you can add this to the KIP?
>>> First one general comment. I guess that my and Guozhangs understanding
>>> about gap/close/gracePeriod is the same as yours -- we might not have
>>> use the term precisely correct in previous email.
>>> To you semantics of gap in detail:
>>>> I thought when (15,3) is received, kafka streams look up for neighbor
>>>> record/window that is within the gap
>>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created its
>>> own
>>>> window [10, 10], which is
>>>> out of the gap, so nothing will be found and no merge occurs. Hence we
>>> have
>>>> two windows now in session store,
>>>> [10, 10] and [15, 15] respectively.
>>> If you have record (10,10), we currently create a window of size
>>> [10,10]. When record (15,3) arrives, your observation that the gap 3 is
>>> too small to be merged into [10,10] window -- however, merging is a
>>> symmetric operation and the existing window of [10,10] has a gap of 10
>>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is
>>> merged into the existing window resulting in window [10,15].
>>> If we don't respect the gap both ways, we end up with inconsistencies if
>>> data is out-of-order. For example, if we use the same input record
>>> (10,10) and (15,3) from above, and it happens that (15,3) is processed
>>> first, when processing out-of-order record (10,10) we would want to
>>> merge both into a single window, too?
>>> Does this make sense?
>>> Now the question remains, if two records with different gap parameter
>>> are merged, which gap should we apply for processing/merging future
>>> records into the window? It seems, that we should use the gap parameter
>>> from the record with this larges timestamp. In the example above (15,3).
>>> We would use gap 3 after merging independent of the order of processing.
>>>> Also another thing worth mentioning is that, the session window object
>>>> created in current kafka streams
>>>> implementation doesn't have gap info, it has start and end, which is the
>>>> earliest and latest event timestamp
>>>> in that window interval, i.e for (10,10), the session window gets created
>>>> is [10,10], rather than [10,20]. Just to clarify
>>>> so that it's clear why (10,10) cannot be fetched when looking for gap of
>>>> (15,3), it's because the end boundary 10 of
>>>> [10,10] is smaller than search boundary [12,18].
>>> We don't need to store the gap, because the gap is know from the window
>>> definition. The created window size depends on the data that is
>>> contained in the window. I guess one could define it differently, too,
>>> ie, for the (10,10) record, we create a window [0,20] -- not sure if it
>>> makes a big difference in practice though. Note, that creating window
>>> [10,20] would not be correct, because the gap must be applied in both
>>> directions, not just into the future.
>>> About the second part: the search would not be applied from (15,3) in
>>> range [12,18], but from existing window [10,10] into range [0,20] and 15
>>> is contained there. This example also shows, that we would need to come
>>> up with a clever way, to identify window [10,10] when processing (15,3)
>>> -- not sure atm how to do this. However, only consider (15,3) would
>>> result in inconsistencies for out-of-order data as pointed out above and
>>> would not be sufficient.
>>> Does this make sense?
>>> Or is there another way to define dynamic session gap semantics in a
>>> deterministic way with regard to out-of-order data?
>>> -Matthias
>>> On 9/11/18 4:28 PM, Lei Chen wrote:
>>>> Thanks Matthias and Guozhang for the response.
>>>> Seems like our understanding mainly differs in the semantics of gap in
>>>> session windows.
>>>> My understanding is that gap is used to merge nearby records together
>>> such
>>>> that no record
>>>> in the merged window has distance later than gap. In Kafka Streams's
>>>> implementation it's
>>>> mainly used to find neighbor records/windows in session store so that
>>>> nearby records can
>>>> be merge. It is NOT used to determine when a window should be closed,
>>> which
>>>> is in
>>>> fact determined by window's grace period.
>>>> Guozhang you said "b. When later we received (15, 3), it means that this
>>>> record ** changed **
>>>> the window gap interval from 10 to 3, and hence we received a new record
>>> at
>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3)
>>> if
>>>> we have not received any new data, the window should be closed, i.e. the
>>>> window is now [10, 18) which includes two records at 10 and 15."
>>>> This is different from what i thought will happen.
>>>> I thought when (15,3) is received, kafka streams look up for neighbor
>>>> record/window that is within the gap
>>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created its
>>> own
>>>> window [10, 10], which is
>>>> out of the gap, so nothing will be found and no merge occurs. Hence we
>>> have
>>>> two windows now in session store,
>>>> [10, 10] and [15, 15] respectively.
>>>> Also another thing worth mentioning is that, the session window object
>>>> created in current kafka streams
>>>> implementation doesn't have gap info, it has start and end, which is the
>>>> earliest and latest event timestamp
>>>> in that window interval, i.e for (10,10), the session window gets created
>>>> is [10,10], rather than [10,20]. Just to clarify
>>>> so that it's clear why (10,10) cannot be fetched when looking for gap of
>>>> (15,3), it's because the end boundary 10 of
>>>> [10,10] is smaller than search boundary [12,18].
>>>> Please correct me if my understanding is wrong here.
>>>> @Matthias, to answer your use case question, we have an use case where
>>>> asynchronous time series data
>>>> are received in the stream, from different contributors, with different
>>>> quality and at different pace.
>>>> Inside Kafka Streams, we use state to maintain statistic aggregations and
>>>> other mathematics model to track
>>>> the liquidity and calculate time decay rate and dynamic gap, so that at
>>>> runtime, for each contributor we can
>>>> 1. determine how many historical records we should maintain in state.
>>>> 2. for each incoming record, output a record using aggregations from
>>>> *nearby* records from that contributor.
>>>> Why fixed gap session window doesn't work here? Because the definition of
>>>> "nearby" here is determined by
>>>> several very dynamic factors in our case, it changes not only with
>>>> different hours in a day, but also related to
>>>> other contributors.
>>>> The purpose of this KIP is to suggest a dynamic session window
>>>> implementation so that we can embed such
>>>> dynamic "nearby" calculation capability into kafka streams session
>>> windows
>>>> semantics. Hope it makes sense to you.
>>>> Lei
>>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <>
>>> wrote:
>>>>> Hello Lei,
>>>>> As Matthias mentioned, the key question here is that because of the late
>>>>> arrivals of records which may indicate a shorter session gap interval,
>>> some
>>>>> session windows may be "mistakenly" merged and hence need to be undone
>>> the
>>>>> merge, i.e. to split them again.
>>>>> Back to my example, you are right that the processing result of
>>>>> (10, 10), (19, 5), (15, 3) ..
>>>>> should be the same as the processing result of
>>>>> (10, 10), (15, 3), (19, 5) ..
>>>>> Note that the second value is NOT the window end time, but the extracted
>>>>> window gap interval, as you suggested in the KIP this value can be
>>>>> dynamically changed
>>>>> a. If you take a look at the second ordering, when we receive (10, 10)
>>> it
>>>>> means a window starting at 10 is created, and its gap interval is 10,
>>> which
>>>>> means that if by the timestamp of 20 we do not receive any new data,
>>> then
>>>>> the window should be closed, i.e. the window [10, 20).
>>>>> b. When later we received (15, 3), it means that this record ** changed
>>> **
>>>>> the window gap interval from 10 to 3, and hence we received a new
>>> record at
>>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15 +
>>> 3) if
>>>>> we have not received any new data, the window should be closed, i.e. the
>>>>> window is now [10, 18) which includes two records at 10 and 15.
>>>>> c. The third record is received at 19, which is after the window close
>>> time
>>>>> 18, it means that we should now start a new window starting at 19, i.e.
>>> the
>>>>> window is [19, 24),
>>>>> BUT, because of the out of ordering, we did not receive (15, 3) in time,
>>>>> but received (19, 5), it will cause us to mistakenly merge the window of
>>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received
>>> (15, 3)
>>>>> we realized that the previous window should have been ended at 18.
>>>>> Does that make sense to you?
>>>>> Guozhang
>>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <>
>>>>> wrote:
>>>>>> I cannot follow the example:
>>>>>>>> (10, 10), (15, 3), (19, 5) ...
>>>>>> First, [10,10] is created, second the window is extended to [10,15],
>>> and
>>>>>> third [19,19] is created. Why would there be a [15,15]? And why would
>>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
>>>>>> thus [19,19] should be its own window?
>>>>>>> Take a look at another example,
>>>>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved and
>>>>>>> merged to a new window [13, 15], then [19,19] should be updated to
>>> [13,
>>>>>>> 19]. Correct?
>>>>>> This example makes sense. However, Guozhang's example was different.
>>> The
>>>>>> late even, _reduces_ the gap and this can lead to a window split.
>>>>>> Guozhang's example was
>>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so 10
>>> and
>>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and
>>>>>> [19,19] must be two windows, ie, original window [10,19] must be split.
>>>>>> Or maybe you have different semantic about gaps are dynamically
>>> modified
>>>>>> in mind? It's a little unclear for the KIP itself what semantics
>>> dynamic
>>>>>> sessions windows should have.
>>>>>> What is also unclear to me atm is, what use cases you have in mind? The
>>>>>> KIP only says
>>>>>>> the statistical aggregation result, liquidity of the records,
>>>>>> I am not sure what this means. Can you elaborate?
>>>>>> -Matthias
>>>>>> On 8/30/18 3:32 PM, Lei Chen wrote:
>>>>>>> Hi Guozhang,
>>>>>>> Thanks for reviewing the proposal. I didn't think of out of order
>>>>> events
>>>>>>> and glad that you brought it up.
>>>>>>> In the example you gave,
>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>> my understanding is that the correct result window should be the same
>>>>> as
>>>>>> in
>>>>>>> order events
>>>>>>> (10, 10), (15, 3), (19, 5) ...
>>>>>>> when (15, 3) is received, [15,15] is creatd
>>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and [15,
>>> 19]
>>>>>> is
>>>>>>> created, meanwhile [15,15] is removed
>>>>>>> back to out of order case,
>>>>>>> when (19 ,5) is received, [19, 19] is created
>>>>>>> when (15, 3) is received, in order to generate the same result,
>>>>>>> 1. if late event is later than retention period, it will be dropped
>>>>>>> 2. otherwise, adjacent session windows within gap should be retrieved
>>>>> and
>>>>>>> merged accordingly, in this case [19, 19], and create a new session
>>>>> [15,
>>>>>> 19]
>>>>>>> I'm little confused when you said "the window [15, 15] SHOULD actually
>>>>> be
>>>>>>> expired at 18 and hence the next record (19, 5) should be for a new
>>>>>> session
>>>>>>> already.". If i understand it correctly, the expiration of the window
>>>>> is
>>>>>>> only checked when next event (19,5) comes and then it should be merged
>>>>> to
>>>>>>> it. [15, 15] will then be closed. Is that also what you meant?
>>>>>>> I cannot think of a case where a window will be split by a late event,
>>>>>>> because if event A and C fall into the same session window, a late
>>>>> event
>>>>>> B
>>>>>>> in middle will definitely fall into C's gap as well. IOW, late event
>>>>> will
>>>>>>> only cause window extension, not split.
>>>>>>> Take a look at another example,
>>>>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>>>> in this case when (15, 3) is received, [13,13] should be retrieved and
>>>>>>> merged to a new window [13, 15], then [19,19] should be updated to
>>> [13,
>>>>>>> 19]. Correct?
>>>>>>> To be able to achieve that, like you said, the gap needs to be stored
>>>>> for
>>>>>>> sessions. We don't need to save the gap with each event, but only for
>>>>>> each
>>>>>>> session window. To avoid upgrading existing session window, how about
>>>>>>> create a new Window type extended from SessionWindow along with a new
>>>>>>> KeySchema?
>>>>>>> What do you think?
>>>>>>> Lei
>>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <>
>>>>>> wrote:
>>>>>>>> Hello Lei,
>>>>>>>> Thanks for the proposal. I've just made a quick pass over it and
>>> there
>>>>>> is a
>>>>>>>> question I have:
>>>>>>>> The session windows are defined per key, i.e. does that mean that
>>> each
>>>>>>>> incoming record of the key can dynamically change the gap of the
>>>>> window?
>>>>>>>> For example, say you have the following record for the same key
>>> coming
>>>>>> in
>>>>>>>> order, where the first time is the timestamp of the record, and the
>>>>>> second
>>>>>>>> value is the extracted gap value:
>>>>>>>> (10, 10), (19, 5), ...
>>>>>>>> When we receive the first record at time 10, the gap is extracted as
>>>>> 10,
>>>>>>>> and hence the window will be expired at 20 if no other record is
>>>>>> received.
>>>>>>>> When we receive the second record at time 19, the gap is modified to
>>>>> 5,
>>>>>> and
>>>>>>>> hence the window will be expired at 24 if no other record is
>>> received.
>>>>>>>> If that's the case, I'm wondering how out-of-order data can be
>>> handled
>>>>>>>> then, consider this stream:
>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>>> I.e. you received a late record indicating at timestamp 15, which
>>>>>> shorten
>>>>>>>> the gap to 3. It means that the window SHOULD actually be expired at
>>>>> 18,
>>>>>>>> and hence the next record (19, 5) should be for a new session
>>> already.
>>>>>>>> Today Streams session window implementation does not do "window
>>>>> split",
>>>>>> so
>>>>>>>> have you thought about how this can be extended?
>>>>>>>> Also since in your proposal each session window's gap value would be
>>>>>>>> different, we need to store this value along with each record then,
>>>>> how
>>>>>>>> would we store it, and what would be the upgrade path if it is not a
>>>>>>>> compatible change on disk storage etc?
>>>>>>>> Guozhang
>>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <> wrote:
>>>>>>>>> Hi All,
>>>>>>>>> I created a KIP to add dynamic gap session window support to Kafka
>>>>>>>> Streams
>>>>>>>>> DSL.
>>>>>>>>> 362%3A+Support+dynamic+gap+session+window
>>>>>>>>> Please take a look,
>>>>>>>>> Thanks,
>>>>>>>>> Lei
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>> --
>>>>> -- Guozhang

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to