Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias


On 1/9/19 10:18 PM, Guozhang Wang wrote:
> Hello Lei,
> 
> Just checking what's the current status of this KIP. We have a KIP deadline
> for 2.2 on 24th and wondering if this one may be able to make it.
> 
> 
> Guozhang
> 
> On Sat, Dec 15, 2018 at 1:01 PM Lei Chen <ley...@gmail.com> wrote:
> 
>> Sorry for the late reply Matthias. Have been busy with other work recently.
>> I'll restart the discussion and update the KIP accordingly.
>>
>> Lei
>>
>> On Tue, Nov 6, 2018 at 3:11 PM Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Any update on this KIP?
>>>
>>> On 9/20/18 3:30 PM, Matthias J. Sax wrote:
>>>> Thanks for following up. Very nice examples!
>>>>
>>>> I think, that the window definition for Flink is semantically
>>>> questionable. If there is only a single record, why is the window
>>>> defined as [ts, ts+gap]? To me, this definition is not sound and seems
>>>> to be arbitrary. To define the windows as [ts-gap,ts+gap] as you
>> mention
>>>> would be semantically more useful -- still, I think that defining the
>>>> window as [ts,ts] as we do currently in Kafka Streams is semantically
>>>> the best.
>>>>
>>>> I have the impression, that Flink only defines them differently,
>> because
>>>> it solves the issues in the implementation. (Ie, an implementation
>>>> details leaks into the semantics, what is usually not desired.)
>>>>
>>>> However, I believe that we could change the implementation accordingly.
>>>> We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap])
>> in
>>>> RocksDB, but at API level we return [ts,ts]. This way, we can still
>> find
>>>> all windows we need and provide the same deterministic behavior and
>> keep
>>>> the current window boundaries on the semantic level (there is no need
>> to
>>>> store the window start and/or end time). With this technique, we can
>>>> also implement dynamic session gaps. I think, we would need to store
>> the
>>>> used "gap" for each window, too. But again, this would be an
>>>> implementation detail.
>>>>
>>>> Let's see what others think.
>>>>
>>>> One tricky question we would need to address is, how we can be backward
>>>> compatible. I am currently working on KIP-258 that should help to
>>>> address this backward compatibility issue though.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>> On 9/19/18 5:17 PM, Lei Chen wrote:
>>>>> Thanks Matthias. That makes sense.
>>>>>
>>>>> You're right that symmetric merge is necessary to ensure consistency.
>> On
>>>>> the other hand, I kinda feel it defeats the purpose of dynamic gap,
>>> which
>>>>> is to update the gap from old value to new value. The symmetric merge
>>>>> always honor the larger gap in both direction, rather than honor the
>> gap
>>>>> carried by record with larger timestamp. I wasn't able to find any
>>> semantic
>>>>> definitions w.r.t this particular aspect online, but spent some time
>>>>> looking into other streaming engines like Apache Flink.
>>>>>
>>>>> Apache Flink defines the window differently, that uses (start time,
>>> start
>>>>> time + gap).
>>>>>
>>>>> so our previous example (10, 10), (19,5),(15,3) in Flink's case will
>> be:
>>>>> [10,20]
>>>>> [19,24] => merged to [10,24]
>>>>> [15,18] => merged to [10,24]
>>>>>
>>>>> while example (15,3)(19,5)(10,10) will be
>>>>> [15,18]
>>>>> [19,24] => no merge
>>>>> [10,20] => merged to [10,24]
>>>>>
>>>>> however, since it only records gap in future direction, not past, a
>> late
>>>>> record might not trigger any merge where in symmetric merge it would.
>>>>> (7,2),(10, 10), (19,5),(15,3)
>>>>> [7,9]
>>>>> [10,20]
>>>>> [19,24] => merged to [10,24]
>>>>> [15,18] => merged to [10,24]
>>>>> so at the end
>>>>> two windows [7,9][10,24] are there.
>>>>>
>>>>> As you can see, in Flink, the gap semantic is more toward to the way
>>> that,
>>>>> a gap carried by one record only affects how this record merges with
>>> future
>>>>> records. e.g. a later event (T2, G2) will only be merged with (T1, G1)
>>> is
>>>>> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's
>> call
>>>>> this "forward-merge" way of handling this. I just went thought some
>>> source
>>>>> code and if my understanding is incorrect about Flink's
>> implementation,
>>>>> please correct me.
>>>>>
>>>>> On the other hand, if we want to do symmetric merge in Kafka Streams,
>> we
>>>>> can change the window definition to [start time - gap, start time +
>>> gap].
>>>>> This way the example (7,2),(10, 10), (19,5),(15,3) will be
>>>>> [5,9]
>>>>> [0,20] => merged to [0,20]
>>>>> [14,24] => merged to [0,24]
>>>>> [12,18] => merged to [0,24]
>>>>>
>>>>>  (19,5),(15,3)(7,2),(10, 10) will generate same result
>>>>> [14,24]
>>>>> [12,18] => merged to [12,24]
>>>>> [5,9] => no merge
>>>>> [0,20] => merged to [0,24]
>>>>>
>>>>> Note that symmetric-merge would require us to change the way how Kafka
>>>>> Steams fetch windows now, instead of fetching range from timestamp-gap
>>> to
>>>>> timestamp+gap, we will need to fetch all windows that are not expired
>>> yet.
>>>>> On the other hand, I'm not sure how this will impact the current logic
>>> of
>>>>> how a window is considered as closed, because the window doesn't carry
>>> end
>>>>> timestamp anymore, but end timestamp + gap.
>>>>>
>>>>> So do you guys think forward-merge approach used by Flink makes more
>>> sense
>>>>> in Kafka Streams, or symmetric-merge makes more sense? Both of them
>>> seems
>>>>> to me can give deterministic result.
>>>>>
>>>>> BTW I'll add the use case into original KIP.
>>>>>
>>>>> Lei
>>>>>
>>>>>
>>>>> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <
>> matth...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Thanks for explaining your understanding. And thanks for providing
>> more
>>>>>> details about the use-case. Maybe you can add this to the KIP?
>>>>>>
>>>>>>
>>>>>> First one general comment. I guess that my and Guozhangs
>> understanding
>>>>>> about gap/close/gracePeriod is the same as yours -- we might not have
>>>>>> use the term precisely correct in previous email.
>>>>>>
>>>>>>
>>>>>> To you semantics of gap in detail:
>>>>>>
>>>>>>> I thought when (15,3) is received, kafka streams look up for
>> neighbor
>>>>>>> record/window that is within the gap
>>>>>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
>>> its
>>>>>> own
>>>>>>> window [10, 10], which is
>>>>>>> out of the gap, so nothing will be found and no merge occurs. Hence
>> we
>>>>>> have
>>>>>>> two windows now in session store,
>>>>>>> [10, 10] and [15, 15] respectively.
>>>>>>
>>>>>> If you have record (10,10), we currently create a window of size
>>>>>> [10,10]. When record (15,3) arrives, your observation that the gap 3
>> is
>>>>>> too small to be merged into [10,10] window -- however, merging is a
>>>>>> symmetric operation and the existing window of [10,10] has a gap of
>> 10
>>>>>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is
>>>>>> merged into the existing window resulting in window [10,15].
>>>>>>
>>>>>> If we don't respect the gap both ways, we end up with inconsistencies
>>> if
>>>>>> data is out-of-order. For example, if we use the same input record
>>>>>> (10,10) and (15,3) from above, and it happens that (15,3) is
>> processed
>>>>>> first, when processing out-of-order record (10,10) we would want to
>>>>>> merge both into a single window, too?
>>>>>>
>>>>>> Does this make sense?
>>>>>>
>>>>>> Now the question remains, if two records with different gap parameter
>>>>>> are merged, which gap should we apply for processing/merging future
>>>>>> records into the window? It seems, that we should use the gap
>> parameter
>>>>>> from the record with this larges timestamp. In the example above
>>> (15,3).
>>>>>> We would use gap 3 after merging independent of the order of
>>> processing.
>>>>>>
>>>>>>
>>>>>>> Also another thing worth mentioning is that, the session window
>> object
>>>>>>> created in current kafka streams
>>>>>>> implementation doesn't have gap info, it has start and end, which is
>>> the
>>>>>>> earliest and latest event timestamp
>>>>>>> in that window interval, i.e for (10,10), the session window gets
>>> created
>>>>>>> is [10,10], rather than [10,20]. Just to clarify
>>>>>>> so that it's clear why (10,10) cannot be fetched when looking for
>> gap
>>> of
>>>>>>> (15,3), it's because the end boundary 10 of
>>>>>>> [10,10] is smaller than search boundary [12,18].
>>>>>>
>>>>>> We don't need to store the gap, because the gap is know from the
>> window
>>>>>> definition. The created window size depends on the data that is
>>>>>> contained in the window. I guess one could define it differently,
>> too,
>>>>>> ie, for the (10,10) record, we create a window [0,20] -- not sure if
>> it
>>>>>> makes a big difference in practice though. Note, that creating window
>>>>>> [10,20] would not be correct, because the gap must be applied in both
>>>>>> directions, not just into the future.
>>>>>>
>>>>>> About the second part: the search would not be applied from (15,3) in
>>>>>> range [12,18], but from existing window [10,10] into range [0,20] and
>>> 15
>>>>>> is contained there. This example also shows, that we would need to
>> come
>>>>>> up with a clever way, to identify window [10,10] when processing
>> (15,3)
>>>>>> -- not sure atm how to do this. However, only consider (15,3) would
>>>>>> result in inconsistencies for out-of-order data as pointed out above
>>> and
>>>>>> would not be sufficient.
>>>>>>
>>>>>>
>>>>>> Does this make sense?
>>>>>>
>>>>>>
>>>>>> Or is there another way to define dynamic session gap semantics in a
>>>>>> deterministic way with regard to out-of-order data?
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>> On 9/11/18 4:28 PM, Lei Chen wrote:
>>>>>>> Thanks Matthias and Guozhang for the response.
>>>>>>>
>>>>>>> Seems like our understanding mainly differs in the semantics of gap
>> in
>>>>>>> session windows.
>>>>>>>
>>>>>>> My understanding is that gap is used to merge nearby records
>> together
>>>>>> such
>>>>>>> that no record
>>>>>>> in the merged window has distance later than gap. In Kafka Streams's
>>>>>>> implementation it's
>>>>>>> mainly used to find neighbor records/windows in session store so
>> that
>>>>>>> nearby records can
>>>>>>> be merge. It is NOT used to determine when a window should be
>> closed,
>>>>>> which
>>>>>>> is in
>>>>>>> fact determined by window's grace period.
>>>>>>>
>>>>>>> Guozhang you said "b. When later we received (15, 3), it means that
>>> this
>>>>>>> record ** changed **
>>>>>>> the window gap interval from 10 to 3, and hence we received a new
>>> record
>>>>>> at
>>>>>>> 15, with the new window gap of 3, it means that by timestamp 18 (15
>> +
>>> 3)
>>>>>> if
>>>>>>> we have not received any new data, the window should be closed, i.e.
>>> the
>>>>>>> window is now [10, 18) which includes two records at 10 and 15."
>>>>>>>
>>>>>>> This is different from what i thought will happen.
>>>>>>>
>>>>>>> I thought when (15,3) is received, kafka streams look up for
>> neighbor
>>>>>>> record/window that is within the gap
>>>>>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
>>> its
>>>>>> own
>>>>>>> window [10, 10], which is
>>>>>>> out of the gap, so nothing will be found and no merge occurs. Hence
>> we
>>>>>> have
>>>>>>> two windows now in session store,
>>>>>>> [10, 10] and [15, 15] respectively.
>>>>>>>
>>>>>>> Also another thing worth mentioning is that, the session window
>> object
>>>>>>> created in current kafka streams
>>>>>>> implementation doesn't have gap info, it has start and end, which is
>>> the
>>>>>>> earliest and latest event timestamp
>>>>>>> in that window interval, i.e for (10,10), the session window gets
>>> created
>>>>>>> is [10,10], rather than [10,20]. Just to clarify
>>>>>>> so that it's clear why (10,10) cannot be fetched when looking for
>> gap
>>> of
>>>>>>> (15,3), it's because the end boundary 10 of
>>>>>>> [10,10] is smaller than search boundary [12,18].
>>>>>>>
>>>>>>> Please correct me if my understanding is wrong here.
>>>>>>>
>>>>>>> @Matthias, to answer your use case question, we have an use case
>> where
>>>>>>> asynchronous time series data
>>>>>>> are received in the stream, from different contributors, with
>>> different
>>>>>>> quality and at different pace.
>>>>>>> Inside Kafka Streams, we use state to maintain statistic
>> aggregations
>>> and
>>>>>>> other mathematics model to track
>>>>>>> the liquidity and calculate time decay rate and dynamic gap, so that
>>> at
>>>>>>> runtime, for each contributor we can
>>>>>>> 1. determine how many historical records we should maintain in
>> state.
>>>>>>> 2. for each incoming record, output a record using aggregations from
>>>>>>> *nearby* records from that contributor.
>>>>>>> Why fixed gap session window doesn't work here? Because the
>>> definition of
>>>>>>> "nearby" here is determined by
>>>>>>> several very dynamic factors in our case, it changes not only with
>>>>>>> different hours in a day, but also related to
>>>>>>> other contributors.
>>>>>>> The purpose of this KIP is to suggest a dynamic session window
>>>>>>> implementation so that we can embed such
>>>>>>> dynamic "nearby" calculation capability into kafka streams session
>>>>>> windows
>>>>>>> semantics. Hope it makes sense to you.
>>>>>>>
>>>>>>> Lei
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wangg...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Lei,
>>>>>>>>
>>>>>>>> As Matthias mentioned, the key question here is that because of the
>>> late
>>>>>>>> arrivals of records which may indicate a shorter session gap
>>> interval,
>>>>>> some
>>>>>>>> session windows may be "mistakenly" merged and hence need to be
>>> undone
>>>>>> the
>>>>>>>> merge, i.e. to split them again.
>>>>>>>>
>>>>>>>> Back to my example, you are right that the processing result of
>>>>>>>>
>>>>>>>> (10, 10), (19, 5), (15, 3) ..
>>>>>>>>
>>>>>>>> should be the same as the processing result of
>>>>>>>>
>>>>>>>> (10, 10), (15, 3), (19, 5) ..
>>>>>>>>
>>>>>>>> Note that the second value is NOT the window end time, but the
>>> extracted
>>>>>>>> window gap interval, as you suggested in the KIP this value can be
>>>>>>>> dynamically changed
>>>>>>>>
>>>>>>>> a. If you take a look at the second ordering, when we receive (10,
>>> 10)
>>>>>> it
>>>>>>>> means a window starting at 10 is created, and its gap interval is
>> 10,
>>>>>> which
>>>>>>>> means that if by the timestamp of 20 we do not receive any new
>> data,
>>>>>> then
>>>>>>>> the window should be closed, i.e. the window [10, 20).
>>>>>>>>
>>>>>>>> b. When later we received (15, 3), it means that this record **
>>> changed
>>>>>> **
>>>>>>>> the window gap interval from 10 to 3, and hence we received a new
>>>>>> record at
>>>>>>>> 15, with the new window gap of 3, it means that by timestamp 18
>> (15 +
>>>>>> 3) if
>>>>>>>> we have not received any new data, the window should be closed,
>> i.e.
>>> the
>>>>>>>> window is now [10, 18) which includes two records at 10 and 15.
>>>>>>>>
>>>>>>>> c. The third record is received at 19, which is after the window
>>> close
>>>>>> time
>>>>>>>> 18, it means that we should now start a new window starting at 19,
>>> i.e.
>>>>>> the
>>>>>>>> window is [19, 24),
>>>>>>>>
>>>>>>>>
>>>>>>>> BUT, because of the out of ordering, we did not receive (15, 3) in
>>> time,
>>>>>>>> but received (19, 5), it will cause us to mistakenly merge the
>>> window of
>>>>>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received
>>>>>> (15, 3)
>>>>>>>> we realized that the previous window should have been ended at 18.
>>>>>>>>
>>>>>>>> Does that make sense to you?
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <
>>> matth...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I cannot follow the example:
>>>>>>>>>
>>>>>>>>>>> (10, 10), (15, 3), (19, 5) ...
>>>>>>>>>
>>>>>>>>> First, [10,10] is created, second the window is extended to
>> [10,15],
>>>>>> and
>>>>>>>>> third [19,19] is created. Why would there be a [15,15]? And why
>>> would
>>>>>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3)
>> and
>>>>>>>>> thus [19,19] should be its own window?
>>>>>>>>>
>>>>>>>>>> Take a look at another example,
>>>>>>>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>>>>>>>
>>>>>>>>>> in this case when (15, 3) is received, [13,13] should be
>> retrieved
>>> and
>>>>>>>>>> merged to a new window [13, 15], then [19,19] should be updated
>> to
>>>>>> [13,
>>>>>>>>>> 19]. Correct?
>>>>>>>>>
>>>>>>>>> This example makes sense. However, Guozhang's example was
>> different.
>>>>>> The
>>>>>>>>> late even, _reduces_ the gap and this can lead to a window split.
>>>>>>>>> Guozhang's example was
>>>>>>>>>
>>>>>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>>>>
>>>>>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so
>> 10
>>>>>> and
>>>>>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15]
>>> and
>>>>>>>>> [19,19] must be two windows, ie, original window [10,19] must be
>>> split.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Or maybe you have different semantic about gaps are dynamically
>>>>>> modified
>>>>>>>>> in mind? It's a little unclear for the KIP itself what semantics
>>>>>> dynamic
>>>>>>>>> sessions windows should have.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What is also unclear to me atm is, what use cases you have in
>> mind?
>>> The
>>>>>>>>> KIP only says
>>>>>>>>>
>>>>>>>>>> the statistical aggregation result, liquidity of the records,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am not sure what this means. Can you elaborate?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 8/30/18 3:32 PM, Lei Chen wrote:
>>>>>>>>>> Hi Guozhang,
>>>>>>>>>>
>>>>>>>>>> Thanks for reviewing the proposal. I didn't think of out of order
>>>>>>>> events
>>>>>>>>>> and glad that you brought it up.
>>>>>>>>>>
>>>>>>>>>> In the example you gave,
>>>>>>>>>>
>>>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>>>>>
>>>>>>>>>> my understanding is that the correct result window should be the
>>> same
>>>>>>>> as
>>>>>>>>> in
>>>>>>>>>> order events
>>>>>>>>>>
>>>>>>>>>> (10, 10), (15, 3), (19, 5) ...
>>>>>>>>>>
>>>>>>>>>> when (15, 3) is received, [15,15] is creatd
>>>>>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and
>> [15,
>>>>>> 19]
>>>>>>>>> is
>>>>>>>>>> created, meanwhile [15,15] is removed
>>>>>>>>>>
>>>>>>>>>> back to out of order case,
>>>>>>>>>>
>>>>>>>>>> when (19 ,5) is received, [19, 19] is created
>>>>>>>>>> when (15, 3) is received, in order to generate the same result,
>>>>>>>>>> 1. if late event is later than retention period, it will be
>> dropped
>>>>>>>>>> 2. otherwise, adjacent session windows within gap should be
>>> retrieved
>>>>>>>> and
>>>>>>>>>> merged accordingly, in this case [19, 19], and create a new
>> session
>>>>>>>> [15,
>>>>>>>>> 19]
>>>>>>>>>> I'm little confused when you said "the window [15, 15] SHOULD
>>> actually
>>>>>>>> be
>>>>>>>>>> expired at 18 and hence the next record (19, 5) should be for a
>> new
>>>>>>>>> session
>>>>>>>>>> already.". If i understand it correctly, the expiration of the
>>> window
>>>>>>>> is
>>>>>>>>>> only checked when next event (19,5) comes and then it should be
>>> merged
>>>>>>>> to
>>>>>>>>>> it. [15, 15] will then be closed. Is that also what you meant?
>>>>>>>>>> I cannot think of a case where a window will be split by a late
>>> event,
>>>>>>>>>> because if event A and C fall into the same session window, a
>> late
>>>>>>>> event
>>>>>>>>> B
>>>>>>>>>> in middle will definitely fall into C's gap as well. IOW, late
>>> event
>>>>>>>> will
>>>>>>>>>> only cause window extension, not split.
>>>>>>>>>>
>>>>>>>>>> Take a look at another example,
>>>>>>>>>> (13, 3),  (19, 5), (15, 3) ...
>>>>>>>>>>
>>>>>>>>>> in this case when (15, 3) is received, [13,13] should be
>> retrieved
>>> and
>>>>>>>>>> merged to a new window [13, 15], then [19,19] should be updated
>> to
>>>>>> [13,
>>>>>>>>>> 19]. Correct?
>>>>>>>>>>
>>>>>>>>>> To be able to achieve that, like you said, the gap needs to be
>>> stored
>>>>>>>> for
>>>>>>>>>> sessions. We don't need to save the gap with each event, but only
>>> for
>>>>>>>>> each
>>>>>>>>>> session window. To avoid upgrading existing session window, how
>>> about
>>>>>>>>>> create a new Window type extended from SessionWindow along with a
>>> new
>>>>>>>>>> KeySchema?
>>>>>>>>>>
>>>>>>>>>> What do you think?
>>>>>>>>>>
>>>>>>>>>> Lei
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <
>> wangg...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello Lei,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the proposal. I've just made a quick pass over it and
>>>>>> there
>>>>>>>>> is a
>>>>>>>>>>> question I have:
>>>>>>>>>>>
>>>>>>>>>>> The session windows are defined per key, i.e. does that mean
>> that
>>>>>> each
>>>>>>>>>>> incoming record of the key can dynamically change the gap of the
>>>>>>>> window?
>>>>>>>>>>> For example, say you have the following record for the same key
>>>>>> coming
>>>>>>>>> in
>>>>>>>>>>> order, where the first time is the timestamp of the record, and
>>> the
>>>>>>>>> second
>>>>>>>>>>> value is the extracted gap value:
>>>>>>>>>>>
>>>>>>>>>>> (10, 10), (19, 5), ...
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> When we receive the first record at time 10, the gap is
>> extracted
>>> as
>>>>>>>> 10,
>>>>>>>>>>> and hence the window will be expired at 20 if no other record is
>>>>>>>>> received.
>>>>>>>>>>> When we receive the second record at time 19, the gap is
>> modified
>>> to
>>>>>>>> 5,
>>>>>>>>> and
>>>>>>>>>>> hence the window will be expired at 24 if no other record is
>>>>>> received.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> If that's the case, I'm wondering how out-of-order data can be
>>>>>> handled
>>>>>>>>>>> then, consider this stream:
>>>>>>>>>>>
>>>>>>>>>>> (10, 10), (19, 5), (15, 3) ...
>>>>>>>>>>>
>>>>>>>>>>> I.e. you received a late record indicating at timestamp 15,
>> which
>>>>>>>>> shorten
>>>>>>>>>>> the gap to 3. It means that the window SHOULD actually be
>> expired
>>> at
>>>>>>>> 18,
>>>>>>>>>>> and hence the next record (19, 5) should be for a new session
>>>>>> already.
>>>>>>>>>>> Today Streams session window implementation does not do "window
>>>>>>>> split",
>>>>>>>>> so
>>>>>>>>>>> have you thought about how this can be extended?
>>>>>>>>>>>
>>>>>>>>>>> Also since in your proposal each session window's gap value
>> would
>>> be
>>>>>>>>>>> different, we need to store this value along with each record
>>> then,
>>>>>>>> how
>>>>>>>>>>> would we store it, and what would be the upgrade path if it is
>>> not a
>>>>>>>>>>> compatible change on disk storage etc?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <ley...@gmail.com>
>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I created a KIP to add dynamic gap session window support to
>>> Kafka
>>>>>>>>>>> Streams
>>>>>>>>>>>> DSL.
>>>>>>>>>>>>
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>>>>>>>> 362%3A+Support+dynamic+gap+session+window
>>>>>>>>>>>>
>>>>>>>>>>>> Please take a look,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Lei
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to