Hello Lei,

Just checking what's the current status of this KIP. We have a KIP deadline
for 2.2 on 24th and wondering if this one may be able to make it.


Guozhang

On Sat, Dec 15, 2018 at 1:01 PM Lei Chen <ley...@gmail.com> wrote:

> Sorry for the late reply Matthias. Have been busy with other work recently.
> I'll restart the discussion and update the KIP accordingly.
>
> Lei
>
> On Tue, Nov 6, 2018 at 3:11 PM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Any update on this KIP?
> >
> > On 9/20/18 3:30 PM, Matthias J. Sax wrote:
> > > Thanks for following up. Very nice examples!
> > >
> > > I think, that the window definition for Flink is semantically
> > > questionable. If there is only a single record, why is the window
> > > defined as [ts, ts+gap]? To me, this definition is not sound and seems
> > > to be arbitrary. To define the windows as [ts-gap,ts+gap] as you
> mention
> > > would be semantically more useful -- still, I think that defining the
> > > window as [ts,ts] as we do currently in Kafka Streams is semantically
> > > the best.
> > >
> > > I have the impression, that Flink only defines them differently,
> because
> > > it solves the issues in the implementation. (Ie, an implementation
> > > details leaks into the semantics, what is usually not desired.)
> > >
> > > However, I believe that we could change the implementation accordingly.
> > > We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap])
> in
> > > RocksDB, but at API level we return [ts,ts]. This way, we can still
> find
> > > all windows we need and provide the same deterministic behavior and
> keep
> > > the current window boundaries on the semantic level (there is no need
> to
> > > store the window start and/or end time). With this technique, we can
> > > also implement dynamic session gaps. I think, we would need to store
> the
> > > used "gap" for each window, too. But again, this would be an
> > > implementation detail.
> > >
> > > Let's see what others think.
> > >
> > > One tricky question we would need to address is, how we can be backward
> > > compatible. I am currently working on KIP-258 that should help to
> > > address this backward compatibility issue though.
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 9/19/18 5:17 PM, Lei Chen wrote:
> > >> Thanks Matthias. That makes sense.
> > >>
> > >> You're right that symmetric merge is necessary to ensure consistency.
> On
> > >> the other hand, I kinda feel it defeats the purpose of dynamic gap,
> > which
> > >> is to update the gap from old value to new value. The symmetric merge
> > >> always honor the larger gap in both direction, rather than honor the
> gap
> > >> carried by record with larger timestamp. I wasn't able to find any
> > semantic
> > >> definitions w.r.t this particular aspect online, but spent some time
> > >> looking into other streaming engines like Apache Flink.
> > >>
> > >> Apache Flink defines the window differently, that uses (start time,
> > start
> > >> time + gap).
> > >>
> > >> so our previous example (10, 10), (19,5),(15,3) in Flink's case will
> be:
> > >> [10,20]
> > >> [19,24] => merged to [10,24]
> > >> [15,18] => merged to [10,24]
> > >>
> > >> while example (15,3)(19,5)(10,10) will be
> > >> [15,18]
> > >> [19,24] => no merge
> > >> [10,20] => merged to [10,24]
> > >>
> > >> however, since it only records gap in future direction, not past, a
> late
> > >> record might not trigger any merge where in symmetric merge it would.
> > >> (7,2),(10, 10), (19,5),(15,3)
> > >> [7,9]
> > >> [10,20]
> > >> [19,24] => merged to [10,24]
> > >> [15,18] => merged to [10,24]
> > >> so at the end
> > >> two windows [7,9][10,24] are there.
> > >>
> > >> As you can see, in Flink, the gap semantic is more toward to the way
> > that,
> > >> a gap carried by one record only affects how this record merges with
> > future
> > >> records. e.g. a later event (T2, G2) will only be merged with (T1, G1)
> > is
> > >> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's
> call
> > >> this "forward-merge" way of handling this. I just went thought some
> > source
> > >> code and if my understanding is incorrect about Flink's
> implementation,
> > >> please correct me.
> > >>
> > >> On the other hand, if we want to do symmetric merge in Kafka Streams,
> we
> > >> can change the window definition to [start time - gap, start time +
> > gap].
> > >> This way the example (7,2),(10, 10), (19,5),(15,3) will be
> > >> [5,9]
> > >> [0,20] => merged to [0,20]
> > >> [14,24] => merged to [0,24]
> > >> [12,18] => merged to [0,24]
> > >>
> > >>  (19,5),(15,3)(7,2),(10, 10) will generate same result
> > >> [14,24]
> > >> [12,18] => merged to [12,24]
> > >> [5,9] => no merge
> > >> [0,20] => merged to [0,24]
> > >>
> > >> Note that symmetric-merge would require us to change the way how Kafka
> > >> Steams fetch windows now, instead of fetching range from timestamp-gap
> > to
> > >> timestamp+gap, we will need to fetch all windows that are not expired
> > yet.
> > >> On the other hand, I'm not sure how this will impact the current logic
> > of
> > >> how a window is considered as closed, because the window doesn't carry
> > end
> > >> timestamp anymore, but end timestamp + gap.
> > >>
> > >> So do you guys think forward-merge approach used by Flink makes more
> > sense
> > >> in Kafka Streams, or symmetric-merge makes more sense? Both of them
> > seems
> > >> to me can give deterministic result.
> > >>
> > >> BTW I'll add the use case into original KIP.
> > >>
> > >> Lei
> > >>
> > >>
> > >> On Tue, Sep 11, 2018 at 5:45 PM Matthias J. Sax <
> matth...@confluent.io>
> > >> wrote:
> > >>
> > >>> Thanks for explaining your understanding. And thanks for providing
> more
> > >>> details about the use-case. Maybe you can add this to the KIP?
> > >>>
> > >>>
> > >>> First one general comment. I guess that my and Guozhangs
> understanding
> > >>> about gap/close/gracePeriod is the same as yours -- we might not have
> > >>> use the term precisely correct in previous email.
> > >>>
> > >>>
> > >>> To you semantics of gap in detail:
> > >>>
> > >>>> I thought when (15,3) is received, kafka streams look up for
> neighbor
> > >>>> record/window that is within the gap
> > >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
> > its
> > >>> own
> > >>>> window [10, 10], which is
> > >>>> out of the gap, so nothing will be found and no merge occurs. Hence
> we
> > >>> have
> > >>>> two windows now in session store,
> > >>>> [10, 10] and [15, 15] respectively.
> > >>>
> > >>> If you have record (10,10), we currently create a window of size
> > >>> [10,10]. When record (15,3) arrives, your observation that the gap 3
> is
> > >>> too small to be merged into [10,10] window -- however, merging is a
> > >>> symmetric operation and the existing window of [10,10] has a gap of
> 10
> > >>> defined: thus, 15 is close enough to fall into the gap, and (15,3) is
> > >>> merged into the existing window resulting in window [10,15].
> > >>>
> > >>> If we don't respect the gap both ways, we end up with inconsistencies
> > if
> > >>> data is out-of-order. For example, if we use the same input record
> > >>> (10,10) and (15,3) from above, and it happens that (15,3) is
> processed
> > >>> first, when processing out-of-order record (10,10) we would want to
> > >>> merge both into a single window, too?
> > >>>
> > >>> Does this make sense?
> > >>>
> > >>> Now the question remains, if two records with different gap parameter
> > >>> are merged, which gap should we apply for processing/merging future
> > >>> records into the window? It seems, that we should use the gap
> parameter
> > >>> from the record with this larges timestamp. In the example above
> > (15,3).
> > >>> We would use gap 3 after merging independent of the order of
> > processing.
> > >>>
> > >>>
> > >>>> Also another thing worth mentioning is that, the session window
> object
> > >>>> created in current kafka streams
> > >>>> implementation doesn't have gap info, it has start and end, which is
> > the
> > >>>> earliest and latest event timestamp
> > >>>> in that window interval, i.e for (10,10), the session window gets
> > created
> > >>>> is [10,10], rather than [10,20]. Just to clarify
> > >>>> so that it's clear why (10,10) cannot be fetched when looking for
> gap
> > of
> > >>>> (15,3), it's because the end boundary 10 of
> > >>>> [10,10] is smaller than search boundary [12,18].
> > >>>
> > >>> We don't need to store the gap, because the gap is know from the
> window
> > >>> definition. The created window size depends on the data that is
> > >>> contained in the window. I guess one could define it differently,
> too,
> > >>> ie, for the (10,10) record, we create a window [0,20] -- not sure if
> it
> > >>> makes a big difference in practice though. Note, that creating window
> > >>> [10,20] would not be correct, because the gap must be applied in both
> > >>> directions, not just into the future.
> > >>>
> > >>> About the second part: the search would not be applied from (15,3) in
> > >>> range [12,18], but from existing window [10,10] into range [0,20] and
> > 15
> > >>> is contained there. This example also shows, that we would need to
> come
> > >>> up with a clever way, to identify window [10,10] when processing
> (15,3)
> > >>> -- not sure atm how to do this. However, only consider (15,3) would
> > >>> result in inconsistencies for out-of-order data as pointed out above
> > and
> > >>> would not be sufficient.
> > >>>
> > >>>
> > >>> Does this make sense?
> > >>>
> > >>>
> > >>> Or is there another way to define dynamic session gap semantics in a
> > >>> deterministic way with regard to out-of-order data?
> > >>>
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>>
> > >>> On 9/11/18 4:28 PM, Lei Chen wrote:
> > >>>> Thanks Matthias and Guozhang for the response.
> > >>>>
> > >>>> Seems like our understanding mainly differs in the semantics of gap
> in
> > >>>> session windows.
> > >>>>
> > >>>> My understanding is that gap is used to merge nearby records
> together
> > >>> such
> > >>>> that no record
> > >>>> in the merged window has distance later than gap. In Kafka Streams's
> > >>>> implementation it's
> > >>>> mainly used to find neighbor records/windows in session store so
> that
> > >>>> nearby records can
> > >>>> be merge. It is NOT used to determine when a window should be
> closed,
> > >>> which
> > >>>> is in
> > >>>> fact determined by window's grace period.
> > >>>>
> > >>>> Guozhang you said "b. When later we received (15, 3), it means that
> > this
> > >>>> record ** changed **
> > >>>> the window gap interval from 10 to 3, and hence we received a new
> > record
> > >>> at
> > >>>> 15, with the new window gap of 3, it means that by timestamp 18 (15
> +
> > 3)
> > >>> if
> > >>>> we have not received any new data, the window should be closed, i.e.
> > the
> > >>>> window is now [10, 18) which includes two records at 10 and 15."
> > >>>>
> > >>>> This is different from what i thought will happen.
> > >>>>
> > >>>> I thought when (15,3) is received, kafka streams look up for
> neighbor
> > >>>> record/window that is within the gap
> > >>>> of [15-3, 15+3], and merge if any. Previous record (10, 10) created
> > its
> > >>> own
> > >>>> window [10, 10], which is
> > >>>> out of the gap, so nothing will be found and no merge occurs. Hence
> we
> > >>> have
> > >>>> two windows now in session store,
> > >>>> [10, 10] and [15, 15] respectively.
> > >>>>
> > >>>> Also another thing worth mentioning is that, the session window
> object
> > >>>> created in current kafka streams
> > >>>> implementation doesn't have gap info, it has start and end, which is
> > the
> > >>>> earliest and latest event timestamp
> > >>>> in that window interval, i.e for (10,10), the session window gets
> > created
> > >>>> is [10,10], rather than [10,20]. Just to clarify
> > >>>> so that it's clear why (10,10) cannot be fetched when looking for
> gap
> > of
> > >>>> (15,3), it's because the end boundary 10 of
> > >>>> [10,10] is smaller than search boundary [12,18].
> > >>>>
> > >>>> Please correct me if my understanding is wrong here.
> > >>>>
> > >>>> @Matthias, to answer your use case question, we have an use case
> where
> > >>>> asynchronous time series data
> > >>>> are received in the stream, from different contributors, with
> > different
> > >>>> quality and at different pace.
> > >>>> Inside Kafka Streams, we use state to maintain statistic
> aggregations
> > and
> > >>>> other mathematics model to track
> > >>>> the liquidity and calculate time decay rate and dynamic gap, so that
> > at
> > >>>> runtime, for each contributor we can
> > >>>> 1. determine how many historical records we should maintain in
> state.
> > >>>> 2. for each incoming record, output a record using aggregations from
> > >>>> *nearby* records from that contributor.
> > >>>> Why fixed gap session window doesn't work here? Because the
> > definition of
> > >>>> "nearby" here is determined by
> > >>>> several very dynamic factors in our case, it changes not only with
> > >>>> different hours in a day, but also related to
> > >>>> other contributors.
> > >>>> The purpose of this KIP is to suggest a dynamic session window
> > >>>> implementation so that we can embed such
> > >>>> dynamic "nearby" calculation capability into kafka streams session
> > >>> windows
> > >>>> semantics. Hope it makes sense to you.
> > >>>>
> > >>>> Lei
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Mon, Sep 10, 2018 at 5:27 PM Guozhang Wang <wangg...@gmail.com>
> > >>> wrote:
> > >>>>
> > >>>>> Hello Lei,
> > >>>>>
> > >>>>> As Matthias mentioned, the key question here is that because of the
> > late
> > >>>>> arrivals of records which may indicate a shorter session gap
> > interval,
> > >>> some
> > >>>>> session windows may be "mistakenly" merged and hence need to be
> > undone
> > >>> the
> > >>>>> merge, i.e. to split them again.
> > >>>>>
> > >>>>> Back to my example, you are right that the processing result of
> > >>>>>
> > >>>>> (10, 10), (19, 5), (15, 3) ..
> > >>>>>
> > >>>>> should be the same as the processing result of
> > >>>>>
> > >>>>> (10, 10), (15, 3), (19, 5) ..
> > >>>>>
> > >>>>> Note that the second value is NOT the window end time, but the
> > extracted
> > >>>>> window gap interval, as you suggested in the KIP this value can be
> > >>>>> dynamically changed
> > >>>>>
> > >>>>> a. If you take a look at the second ordering, when we receive (10,
> > 10)
> > >>> it
> > >>>>> means a window starting at 10 is created, and its gap interval is
> 10,
> > >>> which
> > >>>>> means that if by the timestamp of 20 we do not receive any new
> data,
> > >>> then
> > >>>>> the window should be closed, i.e. the window [10, 20).
> > >>>>>
> > >>>>> b. When later we received (15, 3), it means that this record **
> > changed
> > >>> **
> > >>>>> the window gap interval from 10 to 3, and hence we received a new
> > >>> record at
> > >>>>> 15, with the new window gap of 3, it means that by timestamp 18
> (15 +
> > >>> 3) if
> > >>>>> we have not received any new data, the window should be closed,
> i.e.
> > the
> > >>>>> window is now [10, 18) which includes two records at 10 and 15.
> > >>>>>
> > >>>>> c. The third record is received at 19, which is after the window
> > close
> > >>> time
> > >>>>> 18, it means that we should now start a new window starting at 19,
> > i.e.
> > >>> the
> > >>>>> window is [19, 24),
> > >>>>>
> > >>>>>
> > >>>>> BUT, because of the out of ordering, we did not receive (15, 3) in
> > time,
> > >>>>> but received (19, 5), it will cause us to mistakenly merge the
> > window of
> > >>>>> [10, 20) with [19, 24) to [10, 24), and only when later we received
> > >>> (15, 3)
> > >>>>> we realized that the previous window should have been ended at 18.
> > >>>>>
> > >>>>> Does that make sense to you?
> > >>>>>
> > >>>>>
> > >>>>> Guozhang
> > >>>>>
> > >>>>>
> > >>>>> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax <
> > matth...@confluent.io>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> I cannot follow the example:
> > >>>>>>
> > >>>>>>>> (10, 10), (15, 3), (19, 5) ...
> > >>>>>>
> > >>>>>> First, [10,10] is created, second the window is extended to
> [10,15],
> > >>> and
> > >>>>>> third [19,19] is created. Why would there be a [15,15]? And why
> > would
> > >>>>>> (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3)
> and
> > >>>>>> thus [19,19] should be its own window?
> > >>>>>>
> > >>>>>>> Take a look at another example,
> > >>>>>>> (13, 3),  (19, 5), (15, 3) ...
> > >>>>>>>
> > >>>>>>> in this case when (15, 3) is received, [13,13] should be
> retrieved
> > and
> > >>>>>>> merged to a new window [13, 15], then [19,19] should be updated
> to
> > >>> [13,
> > >>>>>>> 19]. Correct?
> > >>>>>>
> > >>>>>> This example makes sense. However, Guozhang's example was
> different.
> > >>> The
> > >>>>>> late even, _reduces_ the gap and this can lead to a window split.
> > >>>>>> Guozhang's example was
> > >>>>>>
> > >>>>>>>>> (10, 10), (19, 5), (15, 3) ...
> > >>>>>>
> > >>>>>> First [10,10] is created, second [10,19] is create (gap is 10, so
> 10
> > >>> and
> > >>>>>> 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15]
> > and
> > >>>>>> [19,19] must be two windows, ie, original window [10,19] must be
> > split.
> > >>>>>>
> > >>>>>>
> > >>>>>> Or maybe you have different semantic about gaps are dynamically
> > >>> modified
> > >>>>>> in mind? It's a little unclear for the KIP itself what semantics
> > >>> dynamic
> > >>>>>> sessions windows should have.
> > >>>>>>
> > >>>>>>
> > >>>>>> What is also unclear to me atm is, what use cases you have in
> mind?
> > The
> > >>>>>> KIP only says
> > >>>>>>
> > >>>>>>> the statistical aggregation result, liquidity of the records,
> > >>>>>>
> > >>>>>>
> > >>>>>> I am not sure what this means. Can you elaborate?
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> -Matthias
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On 8/30/18 3:32 PM, Lei Chen wrote:
> > >>>>>>> Hi Guozhang,
> > >>>>>>>
> > >>>>>>> Thanks for reviewing the proposal. I didn't think of out of order
> > >>>>> events
> > >>>>>>> and glad that you brought it up.
> > >>>>>>>
> > >>>>>>> In the example you gave,
> > >>>>>>>
> > >>>>>>> (10, 10), (19, 5), (15, 3) ...
> > >>>>>>>
> > >>>>>>> my understanding is that the correct result window should be the
> > same
> > >>>>> as
> > >>>>>> in
> > >>>>>>> order events
> > >>>>>>>
> > >>>>>>> (10, 10), (15, 3), (19, 5) ...
> > >>>>>>>
> > >>>>>>> when (15, 3) is received, [15,15] is creatd
> > >>>>>>> when (19, 5) is received, [15, 15] and [19, 19] are merged and
> [15,
> > >>> 19]
> > >>>>>> is
> > >>>>>>> created, meanwhile [15,15] is removed
> > >>>>>>>
> > >>>>>>> back to out of order case,
> > >>>>>>>
> > >>>>>>> when (19 ,5) is received, [19, 19] is created
> > >>>>>>> when (15, 3) is received, in order to generate the same result,
> > >>>>>>> 1. if late event is later than retention period, it will be
> dropped
> > >>>>>>> 2. otherwise, adjacent session windows within gap should be
> > retrieved
> > >>>>> and
> > >>>>>>> merged accordingly, in this case [19, 19], and create a new
> session
> > >>>>> [15,
> > >>>>>> 19]
> > >>>>>>> I'm little confused when you said "the window [15, 15] SHOULD
> > actually
> > >>>>> be
> > >>>>>>> expired at 18 and hence the next record (19, 5) should be for a
> new
> > >>>>>> session
> > >>>>>>> already.". If i understand it correctly, the expiration of the
> > window
> > >>>>> is
> > >>>>>>> only checked when next event (19,5) comes and then it should be
> > merged
> > >>>>> to
> > >>>>>>> it. [15, 15] will then be closed. Is that also what you meant?
> > >>>>>>> I cannot think of a case where a window will be split by a late
> > event,
> > >>>>>>> because if event A and C fall into the same session window, a
> late
> > >>>>> event
> > >>>>>> B
> > >>>>>>> in middle will definitely fall into C's gap as well. IOW, late
> > event
> > >>>>> will
> > >>>>>>> only cause window extension, not split.
> > >>>>>>>
> > >>>>>>> Take a look at another example,
> > >>>>>>> (13, 3),  (19, 5), (15, 3) ...
> > >>>>>>>
> > >>>>>>> in this case when (15, 3) is received, [13,13] should be
> retrieved
> > and
> > >>>>>>> merged to a new window [13, 15], then [19,19] should be updated
> to
> > >>> [13,
> > >>>>>>> 19]. Correct?
> > >>>>>>>
> > >>>>>>> To be able to achieve that, like you said, the gap needs to be
> > stored
> > >>>>> for
> > >>>>>>> sessions. We don't need to save the gap with each event, but only
> > for
> > >>>>>> each
> > >>>>>>> session window. To avoid upgrading existing session window, how
> > about
> > >>>>>>> create a new Window type extended from SessionWindow along with a
> > new
> > >>>>>>> KeySchema?
> > >>>>>>>
> > >>>>>>> What do you think?
> > >>>>>>>
> > >>>>>>> Lei
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang <
> wangg...@gmail.com>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Hello Lei,
> > >>>>>>>>
> > >>>>>>>> Thanks for the proposal. I've just made a quick pass over it and
> > >>> there
> > >>>>>> is a
> > >>>>>>>> question I have:
> > >>>>>>>>
> > >>>>>>>> The session windows are defined per key, i.e. does that mean
> that
> > >>> each
> > >>>>>>>> incoming record of the key can dynamically change the gap of the
> > >>>>> window?
> > >>>>>>>> For example, say you have the following record for the same key
> > >>> coming
> > >>>>>> in
> > >>>>>>>> order, where the first time is the timestamp of the record, and
> > the
> > >>>>>> second
> > >>>>>>>> value is the extracted gap value:
> > >>>>>>>>
> > >>>>>>>> (10, 10), (19, 5), ...
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> When we receive the first record at time 10, the gap is
> extracted
> > as
> > >>>>> 10,
> > >>>>>>>> and hence the window will be expired at 20 if no other record is
> > >>>>>> received.
> > >>>>>>>> When we receive the second record at time 19, the gap is
> modified
> > to
> > >>>>> 5,
> > >>>>>> and
> > >>>>>>>> hence the window will be expired at 24 if no other record is
> > >>> received.
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> If that's the case, I'm wondering how out-of-order data can be
> > >>> handled
> > >>>>>>>> then, consider this stream:
> > >>>>>>>>
> > >>>>>>>> (10, 10), (19, 5), (15, 3) ...
> > >>>>>>>>
> > >>>>>>>> I.e. you received a late record indicating at timestamp 15,
> which
> > >>>>>> shorten
> > >>>>>>>> the gap to 3. It means that the window SHOULD actually be
> expired
> > at
> > >>>>> 18,
> > >>>>>>>> and hence the next record (19, 5) should be for a new session
> > >>> already.
> > >>>>>>>> Today Streams session window implementation does not do "window
> > >>>>> split",
> > >>>>>> so
> > >>>>>>>> have you thought about how this can be extended?
> > >>>>>>>>
> > >>>>>>>> Also since in your proposal each session window's gap value
> would
> > be
> > >>>>>>>> different, we need to store this value along with each record
> > then,
> > >>>>> how
> > >>>>>>>> would we store it, and what would be the upgrade path if it is
> > not a
> > >>>>>>>> compatible change on disk storage etc?
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> Guozhang
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen <ley...@gmail.com>
> > wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi All,
> > >>>>>>>>>
> > >>>>>>>>> I created a KIP to add dynamic gap session window support to
> > Kafka
> > >>>>>>>> Streams
> > >>>>>>>>> DSL.
> > >>>>>>>>>
> > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>>>>>>>> 362%3A+Support+dynamic+gap+session+window
> > >>>>>>>>>
> > >>>>>>>>> Please take a look,
> > >>>>>>>>>
> > >>>>>>>>> Thanks,
> > >>>>>>>>> Lei
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>> --
> > >>>>>>>> -- Guozhang
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>>
> > >>>>> --
> > >>>>> -- Guozhang
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>
> > >
> >
> >
>


-- 
-- Guozhang

Reply via email to