Hi Guozhang,

I've been working around this issue by dropping down to the Processor
API, but, I was hoping you might be able to point out if there is a
flaw is in this proposed change:

    
https://github.com/apache/kafka/compare/trunk...mfenniak:suppress-duplicate-repartition-output

This adjusts KTableRepartitionMap so that if there's no change in the
group-by key, the repartition processor just forwards the changed
value onwards.  (This breaks a couple of tests that anticipate the
exact existing output, so don't consider this a complete patch...)

Mathieu


On Fri, Aug 19, 2016 at 12:29 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> Hi Mathieu,
>
> If you are only interested in the aggregate result "snapshot" but not its
> change stream (note that KTable itself is not actually a "table" as in
> RDBMS, but still a stream), you can try to use the queryable state feature
> that is available in trunk, which will be available in 0.10.1.0 release.
>
> In sum, it allows you to query any states "snapshot" which is used in
> aggregation operators in real time with state store provided APIs such as
> get-by-key, range queries on windows, etc. Details can be found in thie KIP
> (we are working on more docs / blog posts at the time):
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
>
> Guozhang
>
>
> On Thu, Aug 18, 2016 at 6:40 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
>
>> Hi Guozhang,
>>
>> Hm... I hadn't thought of the repartitioning involvement.
>>
>> I'm not confident I'm understanding completely, but I believe you're
>> saying the decision to process data in this way is made before the
>> data being processed is available, because the partition *may* change,
>> because the groupBy key *may* change.
>>
>> I'm still feeling that I'm stuck getting corrupted output in the
>> middle of an aggregation.
>>
>> It's especially problematic for me if the updates to the source KTable
>> don't actually affect the results of the aggregation.  In the
>> word-count example in my original e-mail, this might be similar to
>> editing an unrelated field "author" in any article; doesn't actually
>> affect the groupBy, doesn't affect the aggregation, but still results
>> in the wrong output occurring temporarily.  (and inefficient
>> processing)
>>
>> Are there any tools in Kafka Streams that might help me prevent
>> downstream calculations if the relevant inputs haven't changed?  I was
>> thinking I'd be able to use mapValues to pluck only relevant fields
>> out of a KTable, materialize a new KTable (.through) from that, and
>> then there'd be some state from which KS would be able to only invoke
>> downstream nodes if data has changed... but it doesn't seem to work
>> like that.
>>
>> Thanks so much for your responses Guozhang, I really appreciate your
>> time to help me out.
>>
>> Mathieu
>>
>>
>> On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>> > The problem is that Kafka Streams need to repartition the streams based
>> on
>> > the groupBy keys when doing aggregations. For your case, the original
>> > stream may be read from a topic that is partitioned on "K", and you need
>> to
>> > first repartition on "category" on an intermediate topic before the
>> > aggregation can be executed.
>> >
>> > Hence the old and new value may be sent to two different partitions of
>> the
>> > intermediate topic, and hence be processed by two different process (it
>> > won't be the case in your application, since you mentioned the "category"
>> > will never change). Since the library cannot tell if the groupBy key will
>> > never change, it has to be conservative and do this subtract / add
>> process
>> > while receiving the old / new value.
>> >
>> >
>> > Guozhang
>> >
>> >
>> > On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak <
>> > mathieu.fenn...@replicon.com> wrote:
>> >
>> >> Hi Guozhang,
>> >>
>> >> Thanks for responding.  Ah, I see what you're saying... in the case of
>> >> an update to the KTable, the aggregator's subtractor result would be
>> >> necessary if the group-by key changes in the update.
>> >>
>> >> It makes sense, but unfortunately the behavior leaves me feeling a
>> >> little sketchy... when the group-by key doesn't change (which is
>> >> guaranteed in my case), I'm outputting results that don't correspond
>> >> at all to the inputs, temporarily.  It's immediately followed by a
>> >> corrected result.
>> >>
>> >> Would it be a feasible optimization to not send the subtractor's
>> >> result out of the aggregate, only in the case where the groupBy key
>> >> does not change between the old record and the new record?
>> >>
>> >> Mathieu
>> >>
>> >> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >> > Hello Mathieu,
>> >> >
>> >> > Note that semantics of KTable aggregations (i.e.
>> >> "KTable.groupBy.aggregate"
>> >> > as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey"
>> as
>> >> in
>> >> > 0.10.0) are different, in the sense that when the table is updated
>> (i.e.
>> >> a
>> >> > new record with the same key "K1" is received), the old record's
>> effect
>> >> on
>> >> > the aggregation need to first be subtracted before the new record's
>> >> effect
>> >> > on the aggregation can be added; whereas in the latter case there is
>> no
>> >> > "old values" that are not overridden, hence only "adder" aggregator is
>> >> > needed.
>> >> >
>> >> > So suppose your updated record on K1 is on a different "category",
>> say:
>> >> >
>> >> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"}
>> >> >
>> >> >
>> >> > Then the aggregated result should be:
>> >> >
>> >> > {key: "kafka", value: 2}
>> >> > {key: "kafka2", value: 4}
>> >> >
>> >> >
>> >> > Does this make sense now?
>> >> >
>> >> > Guozhang
>> >> >
>> >> >
>> >> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak <
>> >> > mathieu.fenn...@replicon.com> wrote:
>> >> >
>> >> >> Hello again, kafka-users,
>> >> >>
>> >> >> When I aggregate a KTable, a future input that updates a KTable's
>> >> >> value for a specific key causes the aggregate's subtractor to be
>> >> >> invoked, and then its adder.  This part is great, completely
>> >> >> as-expected.
>> >> >>
>> >> >> But what I didn't expect is that the intermediate result of the
>> >> >> subtractor would be sent downstream.  This value doesn't reflect the
>> >> >> reality of the inputs to the aggregator, so sending it downstream is
>> >> >> effectively sending "corrupt" data to the next processing node.  Is
>> >> >> this the expected behavior, or is this a bug?
>> >> >>
>> >> >> Take for example, a table of blog articles and an aggregator that
>> >> >> counts the number of words in each category of the blog:
>> >> >>
>> >> >> topic: articles
>> >> >>   K1, {"category": "kafka", "text": "word1, word2, word3"}
>> >> >>   K2, {"category": "kafka", "text": "word1, word2"}
>> >> >>
>> >> >> articles.groupBy((k,v) -> v.category)
>> >> >>   .aggregate(() -> 0,
>> >> >>     (k,v,t) -> t + v.text.split(" ").length,
>> >> >>     (k,v,t) -> t - v.text.split(" ").length
>> >> >>   )
>> >> >>
>> >> >> This aggregator will produce {key: "kafka", value: 3}, then {key:
>> >> >> "kafka", value: 5}.  If I update one of the blog articles and send a
>> >> >> new message to the articles topic:
>> >> >>
>> >> >>   K1, {"category": "kafka", "text": "word1, word2, word3, word4"}
>> >> >>
>> >> >> The aggregator will first produce {key: "kafka", value: 2} when the
>> >> >> subtractor is called, then will produce {key: "kafka", value: 6} when
>> >> >> the adder is called.  The subtractor's calculation does not actually
>> >> >> match the reality; K1 was never deleted, it was just updated.
>> >> >>
>> >> >> Mathieu
>> >> >>
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > -- Guozhang
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
>
> --
> -- Guozhang

Reply via email to