Re: [DISCUSS] KIP-362: Dynamic Session Window Support

2018-12-15 Thread Lei Chen
Sorry for the late reply Matthias. Have been busy with other work recently.
I'll restart the discussion and update the KIP accordingly.

Lei

On Tue, Nov 6, 2018 at 3:11 PM Matthias J. Sax 
wrote:

> Any update on this KIP?
>
> On 9/20/18 3:30 PM, Matthias J. Sax wrote:
> > Thanks for following up. Very nice examples!
> >
> > I think, that the window definition for Flink is semantically
> > questionable. If there is only a single record, why is the window
> > defined as [ts, ts+gap]? To me, this definition is not sound and seems
> > to be arbitrary. To define the windows as [ts-gap,ts+gap] as you mention
> > would be semantically more useful -- still, I think that defining the
> > window as [ts,ts] as we do currently in Kafka Streams is semantically
> > the best.
> >
> > I have the impression, that Flink only defines them differently, because
> > it solves the issues in the implementation. (Ie, an implementation
> > details leaks into the semantics, what is usually not desired.)
> >
> > However, I believe that we could change the implementation accordingly.
> > We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap]) in
> > RocksDB, but at API level we return [ts,ts]. This way, we can still find
> > all windows we need and provide the same deterministic behavior and keep
> > the current window boundaries on the semantic level (there is no need to
> > store the window start and/or end time). With this technique, we can
> > also implement dynamic session gaps. I think, we would need to store the
> > used "gap" for each window, too. But again, this would be an
> > implementation detail.
> >
> > Let's see what others think.
> >
> > One tricky question we would need to address is, how we can be backward
> > compatible. I am currently working on KIP-258 that should help to
> > address this backward compatibility issue though.
> >
> >
> > -Matthias
> >
> >
> >
> > On 9/19/18 5:17 PM, Lei Chen wrote:
> >> Thanks Matthias. That makes sense.
> >>
> >> You're right that symmetric merge is necessary to ensure consistency. On
> >> the other hand, I kinda feel it defeats the purpose of dynamic gap,
> which
> >> is to update the gap from old value to new value. The symmetric merge
> >> always honor the larger gap in both direction, rather than honor the gap
> >> carried by record with larger timestamp. I wasn't able to find any
> semantic
> >> definitions w.r.t this particular aspect online, but spent some time
> >> looking into other streaming engines like Apache Flink.
> >>
> >> Apache Flink defines the window differently, that uses (start time,
> start
> >> time + gap).
> >>
> >> so our previous example (10, 10), (19,5),(15,3) in Flink's case will be:
> >> [10,20]
> >> [19,24] => merged to [10,24]
> >> [15,18] => merged to [10,24]
> >>
> >> while example (15,3)(19,5)(10,10) will be
> >> [15,18]
> >> [19,24] => no merge
> >> [10,20] => merged to [10,24]
> >>
> >> however, since it only records gap in future direction, not past, a late
> >> record might not trigger any merge where in symmetric merge it would.
> >> (7,2),(10, 10), (19,5),(15,3)
> >> [7,9]
> >> [10,20]
> >> [19,24] => merged to [10,24]
> >> [15,18] => merged to [10,24]
> >> so at the end
> >> two windows [7,9][10,24] are there.
> >>
> >> As you can see, in Flink, the gap semantic is more toward to the way
> that,
> >> a gap carried by one record only affects how this record merges with
> future
> >> records. e.g. a later event (T2, G2) will only be merged with (T1, G1)
> is
> >> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's call
> >> this "forward-merge" way of handling this. I just went thought some
> source
> >> code and if my understanding is incorrect about Flink's implementation,
> >> please correct me.
> >>
> >> On the other hand, if we want to do symmetric merge in Kafka Streams, we
> >> can change the window definition to [start time - gap, start time +
> gap].
> >> This way the example (7,2),(10, 10), (19,5),(15,3) will be
> >> [5,9]
> >> [0,20] => merged to [0,20]
> >> [14,24] => merged to [0,24]
> >> [12,18] => merged to [0,24]
> >>
> >>  (19,5),(15,3)(7,2),(10, 10) will generate same result
> >> [14,24]
> >> [12,

access for KIP

2018-06-20 Thread Lei Chen
Hi, there,

I'd like to request permission to the kafka space in our ASF cwiki, to be able
to create KIPs.

Username: leyncl
email: ley...@gmail.com

Thanks!


[DISCUSS] KIP-362: Dynamic Session Window Support

2018-08-22 Thread Lei Chen
Hi All,

I created a KIP to add dynamic gap session window support to Kafka Streams
DSL.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-362%3A+Support+dynamic+gap+session+window

Please take a look,

Thanks,
Lei


Re: [DISCUSS] KIP-362: Dynamic Session Window Support

2018-08-30 Thread Lei Chen
Hi Guozhang,

Thanks for reviewing the proposal. I didn't think of out of order events
and glad that you brought it up.

In the example you gave,

(10, 10), (19, 5), (15, 3) ...

my understanding is that the correct result window should be the same as in
order events

(10, 10), (15, 3), (19, 5) ...

when (15, 3) is received, [15,15] is creatd
when (19, 5) is received, [15, 15] and [19, 19] are merged and [15, 19] is
created, meanwhile [15,15] is removed

back to out of order case,

when (19 ,5) is received, [19, 19] is created
when (15, 3) is received, in order to generate the same result,
1. if late event is later than retention period, it will be dropped
2. otherwise, adjacent session windows within gap should be retrieved and
merged accordingly, in this case [19, 19], and create a new session [15, 19]
I'm little confused when you said "the window [15, 15] SHOULD actually be
expired at 18 and hence the next record (19, 5) should be for a new session
already.". If i understand it correctly, the expiration of the window is
only checked when next event (19,5) comes and then it should be merged to
it. [15, 15] will then be closed. Is that also what you meant?
I cannot think of a case where a window will be split by a late event,
because if event A and C fall into the same session window, a late event B
in middle will definitely fall into C's gap as well. IOW, late event will
only cause window extension, not split.

Take a look at another example,
(13, 3),  (19, 5), (15, 3) ...

in this case when (15, 3) is received, [13,13] should be retrieved and
merged to a new window [13, 15], then [19,19] should be updated to [13,
19]. Correct?

To be able to achieve that, like you said, the gap needs to be stored for
sessions. We don't need to save the gap with each event, but only for each
session window. To avoid upgrading existing session window, how about
create a new Window type extended from SessionWindow along with a new
KeySchema?

What do you think?

Lei


On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang  wrote:

> Hello Lei,
>
> Thanks for the proposal. I've just made a quick pass over it and there is a
> question I have:
>
> The session windows are defined per key, i.e. does that mean that each
> incoming record of the key can dynamically change the gap of the window?
> For example, say you have the following record for the same key coming in
> order, where the first time is the timestamp of the record, and the second
> value is the extracted gap value:
>
> (10, 10), (19, 5), ...
>
>
> When we receive the first record at time 10, the gap is extracted as 10,
> and hence the window will be expired at 20 if no other record is received.
> When we receive the second record at time 19, the gap is modified to 5, and
> hence the window will be expired at 24 if no other record is received.
>
>
> If that's the case, I'm wondering how out-of-order data can be handled
> then, consider this stream:
>
> (10, 10), (19, 5), (15, 3) ...
>
> I.e. you received a late record indicating at timestamp 15, which shorten
> the gap to 3. It means that the window SHOULD actually be expired at 18,
> and hence the next record (19, 5) should be for a new session already.
> Today Streams session window implementation does not do "window split", so
> have you thought about how this can be extended?
>
> Also since in your proposal each session window's gap value would be
> different, we need to store this value along with each record then, how
> would we store it, and what would be the upgrade path if it is not a
> compatible change on disk storage etc?
>
>
>
> Guozhang
>
>
>
> On Wed, Aug 22, 2018 at 10:05 AM, Lei Chen  wrote:
>
> > Hi All,
> >
> > I created a KIP to add dynamic gap session window support to Kafka
> Streams
> > DSL.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 362%3A+Support+dynamic+gap+session+window
> >
> > Please take a look,
> >
> > Thanks,
> > Lei
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-362: Dynamic Session Window Support

2018-09-11 Thread Lei Chen
ke sense to you?
>
>
> Guozhang
>
>
> On Thu, Sep 6, 2018 at 9:50 PM, Matthias J. Sax 
> wrote:
>
> > I cannot follow the example:
> >
> > >> (10, 10), (15, 3), (19, 5) ...
> >
> > First, [10,10] is created, second the window is extended to [10,15], and
> > third [19,19] is created. Why would there be a [15,15]? And why would
> > (19,5) be merged into [15,15] -- the gap was set to 3 via (15,3) and
> > thus [19,19] should be its own window?
> >
> > > Take a look at another example,
> > > (13, 3),  (19, 5), (15, 3) ...
> > >
> > > in this case when (15, 3) is received, [13,13] should be retrieved and
> > > merged to a new window [13, 15], then [19,19] should be updated to [13,
> > > 19]. Correct?
> >
> > This example makes sense. However, Guozhang's example was different. The
> > late even, _reduces_ the gap and this can lead to a window split.
> > Guozhang's example was
> >
> > >>> (10, 10), (19, 5), (15, 3) ...
> >
> > First [10,10] is created, second [10,19] is create (gap is 10, so 10 and
> > 19 merge). Last, (15,3) reduced the gap from 10 to 3, thus [10,15] and
> > [19,19] must be two windows, ie, original window [10,19] must be split.
> >
> >
> > Or maybe you have different semantic about gaps are dynamically modified
> > in mind? It's a little unclear for the KIP itself what semantics dynamic
> > sessions windows should have.
> >
> >
> > What is also unclear to me atm is, what use cases you have in mind? The
> > KIP only says
> >
> > > the statistical aggregation result, liquidity of the records,
> >
> >
> > I am not sure what this means. Can you elaborate?
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 8/30/18 3:32 PM, Lei Chen wrote:
> > > Hi Guozhang,
> > >
> > > Thanks for reviewing the proposal. I didn't think of out of order
> events
> > > and glad that you brought it up.
> > >
> > > In the example you gave,
> > >
> > > (10, 10), (19, 5), (15, 3) ...
> > >
> > > my understanding is that the correct result window should be the same
> as
> > in
> > > order events
> > >
> > > (10, 10), (15, 3), (19, 5) ...
> > >
> > > when (15, 3) is received, [15,15] is creatd
> > > when (19, 5) is received, [15, 15] and [19, 19] are merged and [15, 19]
> > is
> > > created, meanwhile [15,15] is removed
> > >
> > > back to out of order case,
> > >
> > > when (19 ,5) is received, [19, 19] is created
> > > when (15, 3) is received, in order to generate the same result,
> > > 1. if late event is later than retention period, it will be dropped
> > > 2. otherwise, adjacent session windows within gap should be retrieved
> and
> > > merged accordingly, in this case [19, 19], and create a new session
> [15,
> > 19]
> > > I'm little confused when you said "the window [15, 15] SHOULD actually
> be
> > > expired at 18 and hence the next record (19, 5) should be for a new
> > session
> > > already.". If i understand it correctly, the expiration of the window
> is
> > > only checked when next event (19,5) comes and then it should be merged
> to
> > > it. [15, 15] will then be closed. Is that also what you meant?
> > > I cannot think of a case where a window will be split by a late event,
> > > because if event A and C fall into the same session window, a late
> event
> > B
> > > in middle will definitely fall into C's gap as well. IOW, late event
> will
> > > only cause window extension, not split.
> > >
> > > Take a look at another example,
> > > (13, 3),  (19, 5), (15, 3) ...
> > >
> > > in this case when (15, 3) is received, [13,13] should be retrieved and
> > > merged to a new window [13, 15], then [19,19] should be updated to [13,
> > > 19]. Correct?
> > >
> > > To be able to achieve that, like you said, the gap needs to be stored
> for
> > > sessions. We don't need to save the gap with each event, but only for
> > each
> > > session window. To avoid upgrading existing session window, how about
> > > create a new Window type extended from SessionWindow along with a new
> > > KeySchema?
> > >
> > > What do you think?
> > >
> > > Lei
> > >
> > >
> > > On Fri, Aug 24, 2018 at 9:42 AM Guozhang Wang 
> > wr

Re: [DISCUSS] KIP-362: Dynamic Session Window Support

2018-09-19 Thread Lei Chen
t
> > created in current kafka streams
> > implementation doesn't have gap info, it has start and end, which is the
> > earliest and latest event timestamp
> > in that window interval, i.e for (10,10), the session window gets created
> > is [10,10], rather than [10,20]. Just to clarify
> > so that it's clear why (10,10) cannot be fetched when looking for gap of
> > (15,3), it's because the end boundary 10 of
> > [10,10] is smaller than search boundary [12,18].
>
> We don't need to store the gap, because the gap is know from the window
> definition. The created window size depends on the data that is
> contained in the window. I guess one could define it differently, too,
> ie, for the (10,10) record, we create a window [0,20] -- not sure if it
> makes a big difference in practice though. Note, that creating window
> [10,20] would not be correct, because the gap must be applied in both
> directions, not just into the future.
>
> About the second part: the search would not be applied from (15,3) in
> range [12,18], but from existing window [10,10] into range [0,20] and 15
> is contained there. This example also shows, that we would need to come
> up with a clever way, to identify window [10,10] when processing (15,3)
> -- not sure atm how to do this. However, only consider (15,3) would
> result in inconsistencies for out-of-order data as pointed out above and
> would not be sufficient.
>
>
> Does this make sense?
>
>
> Or is there another way to define dynamic session gap semantics in a
> deterministic way with regard to out-of-order data?
>
>
>
> -Matthias
>
>
> On 9/11/18 4:28 PM, Lei Chen wrote:
> > Thanks Matthias and Guozhang for the response.
> >
> > Seems like our understanding mainly differs in the semantics of gap in
> > session windows.
> >
> > My understanding is that gap is used to merge nearby records together
> such
> > that no record
> > in the merged window has distance later than gap. In Kafka Streams's
> > implementation it's
> > mainly used to find neighbor records/windows in session store so that
> > nearby records can
> > be merge. It is NOT used to determine when a window should be closed,
> which
> > is in
> > fact determined by window's grace period.
> >
> > Guozhang you said "b. When later we received (15, 3), it means that this
> > record ** changed **
> > the window gap interval from 10 to 3, and hence we received a new record
> at
> > 15, with the new window gap of 3, it means that by timestamp 18 (15 + 3)
> if
> > we have not received any new data, the window should be closed, i.e. the
> > window is now [10, 18) which includes two records at 10 and 15."
> >
> > This is different from what i thought will happen.
> >
> > I thought when (15,3) is received, kafka streams look up for neighbor
> > record/window that is within the gap
> > of [15-3, 15+3], and merge if any. Previous record (10, 10) created its
> own
> > window [10, 10], which is
> > out of the gap, so nothing will be found and no merge occurs. Hence we
> have
> > two windows now in session store,
> > [10, 10] and [15, 15] respectively.
> >
> > Also another thing worth mentioning is that, the session window object
> > created in current kafka streams
> > implementation doesn't have gap info, it has start and end, which is the
> > earliest and latest event timestamp
> > in that window interval, i.e for (10,10), the session window gets created
> > is [10,10], rather than [10,20]. Just to clarify
> > so that it's clear why (10,10) cannot be fetched when looking for gap of
> > (15,3), it's because the end boundary 10 of
> > [10,10] is smaller than search boundary [12,18].
> >
> > Please correct me if my understanding is wrong here.
> >
> > @Matthias, to answer your use case question, we have an use case where
> > asynchronous time series data
> > are received in the stream, from different contributors, with different
> > quality and at different pace.
> > Inside Kafka Streams, we use state to maintain statistic aggregations and
> > other mathematics model to track
> > the liquidity and calculate time decay rate and dynamic gap, so that at
> > runtime, for each contributor we can
> > 1. determine how many historical records we should maintain in state.
> > 2. for each incoming record, output a record using aggregations from
> > *nearby* records from that contributor.
> > Why fixed gap session window doesn't work here? Because the definition of
> > "nearby" here 

Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2018-09-21 Thread Lei Chen
Hi,

Just want to know is anyone actively working on this and also KAFKA-4835
? Seems like the JIRA has
been inactive for couple months. We want this feature and would like to
move it forward if no one else is working on it.

Lei

On Wed, Jun 20, 2018 at 7:27 PM Matthias J. Sax 
wrote:

> No worries. It's just good to know. It seems that some other people are
> interested to drive this further. So we will just "reassign" it to them.
>
> Thanks for letting us know.
>
>
> -Matthias
>
> On 6/20/18 2:51 PM, Jeyhun Karimov wrote:
> > Hi Matthias, all,
> >
> > Currently, I am not able to complete this KIP. Please accept my
> > apologies for that.
> >
> >
> > Cheers,
> > Jeyhun
> >
> > On Mon, Jun 11, 2018 at 2:25 AM Matthias J. Sax  > > wrote:
> >
> > What is the status of this KIP?
> >
> > -Matthias
> >
> >
> > On 2/13/18 1:43 PM, Matthias J. Sax wrote:
> > > Is there any update for this KIP?
> > >
> > >
> > > -Matthias
> > >
> > > On 12/4/17 2:08 PM, Matthias J. Sax wrote:
> > >> Jeyhun,
> > >>
> > >> thanks for updating the KIP.
> > >>
> > >> I am wondering if you intend to add a new class `Produced`? There
> is
> > >> already `org.apache.kafka.streams.kstream.Produced`. So if we
> want to
> > >> add a new class, it must have a different name -- or we might be
> > able to
> > >> merge both into one?
> > >>
> > >> Also, for the KStream overlaods of `through()` and `to()`, can
> > you add
> > >> the different behavior using different overloads? It's not clear
> from
> > >> the KIP what the semantics are.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 11/17/17 3:27 PM, Jeyhun Karimov wrote:
> > >>> Hi,
> > >>>
> > >>> Thanks for your comments. I agree with Matthias partially.
> > >>> I think we should relax some requirements related with to() and
> > through()
> > >>> methods.
> > >>> IMHO, Produced class can cover (existing/to be created) topic
> > information,
> > >>> and which will ease our effort:
> > >>>
> > >>> KStream.to(Produced topicInfo)
> > >>> KStream.through(Produced topicInfo)
> > >>>
> > >>> This will decrease the number of overloads but we will need to
> > deprecate
> > >>> the existing to() and through() methods, perhaps.
> > >>> I updated the KIP accordingly.
> > >>>
> > >>>
> > >>> Cheers,
> > >>> Jeyhun
> > >>>
> > >>> On Thu, Nov 16, 2017 at 10:21 PM Matthias J. Sax
> > mailto:matth...@confluent.io>>
> > >>> wrote:
> > >>>
> >  @Jan:
> > 
> >  The `Produced` class was introduced in 1.0 to specify key and
> valud
> >  Serdes (and partitioner) if data is written into a topic.
> > 
> >  Old API:
> > 
> >  KStream#to("topic", keySerde, valueSerde);
> > 
> >  New API:
> > 
> >  KStream#to("topic", Produced.with(keySerde, valueSerde));
> > 
> > 
> >  This allows to reduce the number of overloads for `to()` (and
> >  `through()` that follows the same pattern) -- the second
> > parameter is
> >  used to cover all different variations of option parameters
> > users can
> >  specify, while we only have 2 overload for `to()` itself.
> > 
> >  What is still unclear to me it, what you mean by this topic
> prefix
> >  thing? Either a user cares about the topic name and thus, must
> > create
> >  and manage it manually. Or the user does not care, and Streams
> > create
> >  it. How would this prefix idea fit in here?
> > 
> > 
> > 
> >  @Guozhang:
> > 
> >  My idea was to extend `Produced` with the hint we want to give
> for
> >  creating internal topic and pass a optional `Produced`
> > parameter. There
> >  are multiple things we can do here:
> > 
> >  1) stream.through(null, Produced...).groupBy().aggregate()
> >  -> just allow for `null` topic name indicating that Streams
> should
> >  create an internal topic
> > 
> >  2) stream.through(Produced...).groupBy().aggregate()
> >  -> add one overload taking an mandatory `Produced`
> > 
> >  We use `Serialized` to picky back the information
> > 
> >  3) stream.groupBy(Serialized...).aggregate()
> >  and stream.groupByKey(Serialized...).aggregate()
> >  -> we don't need new top level overloads
> > 
> > 
> >  There are different trade-offs for those alternatives and maybe
> > there
> >  are other ways to change the API. It's just to push the
> > discussion further.
> > 
> > 
> >  -Matthias
> > 
> >  On 11/12/17 1:22 PM, Jan 

Re: [ANNOUNCE] New Kafka PMC member: Matthias J. Sax

2019-04-19 Thread Lei Chen
Congratulations Matthias! Well deserved!

-Lei

On Fri, Apr 19, 2019 at 2:55 PM James Cheng  wrote:

> Congrats!!
>
> -James
>
> Sent from my iPhone
>
> > On Apr 18, 2019, at 2:35 PM, Guozhang Wang  wrote:
> >
> > Hello Everyone,
> >
> > I'm glad to announce that Matthias J. Sax is now a member of Kafka PMC.
> >
> > Matthias has been a committer since Jan. 2018, and since then he
> continued
> > to be active in the community and made significant contributions the
> > project.
> >
> >
> > Congratulations to Matthias!
> >
> > -- Guozhang
>


[jira] [Created] (KAFKA-7325) Support dynamic gap session window

2018-08-22 Thread Lei Chen (JIRA)
Lei Chen created KAFKA-7325:
---

 Summary: Support dynamic gap session window
 Key: KAFKA-7325
 URL: https://issues.apache.org/jira/browse/KAFKA-7325
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Lei Chen


Currently, Kafka Streams DSL only supports fixed-gap session window. However, 
in some circumstances, the gap is more dynamic and can vary depending on other 
factors: the statistical aggregation result, liquidity of the records, etc. In 
such cases, allowing the user to define a dynamic-gap session is important. 
[KIP-362|https://cwiki.apache.org/confluence/display/KAFKA/KIP-362%3A+Support+dynamic+gap+session+window]
 is created to address this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)