Hi Guozhang, I've been working around this issue by dropping down to the Processor API, but, I was hoping you might be able to point out if there is a flaw is in this proposed change:
https://github.com/apache/kafka/compare/trunk...mfenniak:suppress-duplicate-repartition-output This adjusts KTableRepartitionMap so that if there's no change in the group-by key, the repartition processor just forwards the changed value onwards. (This breaks a couple of tests that anticipate the exact existing output, so don't consider this a complete patch...) Mathieu On Fri, Aug 19, 2016 at 12:29 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Mathieu, > > If you are only interested in the aggregate result "snapshot" but not its > change stream (note that KTable itself is not actually a "table" as in > RDBMS, but still a stream), you can try to use the queryable state feature > that is available in trunk, which will be available in 0.10.1.0 release. > > In sum, it allows you to query any states "snapshot" which is used in > aggregation operators in real time with state store provided APIs such as > get-by-key, range queries on windows, etc. Details can be found in thie KIP > (we are working on more docs / blog posts at the time): > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams > > Guozhang > > > On Thu, Aug 18, 2016 at 6:40 AM, Mathieu Fenniak < > mathieu.fenn...@replicon.com> wrote: > >> Hi Guozhang, >> >> Hm... I hadn't thought of the repartitioning involvement. >> >> I'm not confident I'm understanding completely, but I believe you're >> saying the decision to process data in this way is made before the >> data being processed is available, because the partition *may* change, >> because the groupBy key *may* change. >> >> I'm still feeling that I'm stuck getting corrupted output in the >> middle of an aggregation. >> >> It's especially problematic for me if the updates to the source KTable >> don't actually affect the results of the aggregation. In the >> word-count example in my original e-mail, this might be similar to >> editing an unrelated field "author" in any article; doesn't actually >> affect the groupBy, doesn't affect the aggregation, but still results >> in the wrong output occurring temporarily. (and inefficient >> processing) >> >> Are there any tools in Kafka Streams that might help me prevent >> downstream calculations if the relevant inputs haven't changed? I was >> thinking I'd be able to use mapValues to pluck only relevant fields >> out of a KTable, materialize a new KTable (.through) from that, and >> then there'd be some state from which KS would be able to only invoke >> downstream nodes if data has changed... but it doesn't seem to work >> like that. >> >> Thanks so much for your responses Guozhang, I really appreciate your >> time to help me out. >> >> Mathieu >> >> >> On Wed, Aug 17, 2016 at 5:51 PM, Guozhang Wang <wangg...@gmail.com> wrote: >> > The problem is that Kafka Streams need to repartition the streams based >> on >> > the groupBy keys when doing aggregations. For your case, the original >> > stream may be read from a topic that is partitioned on "K", and you need >> to >> > first repartition on "category" on an intermediate topic before the >> > aggregation can be executed. >> > >> > Hence the old and new value may be sent to two different partitions of >> the >> > intermediate topic, and hence be processed by two different process (it >> > won't be the case in your application, since you mentioned the "category" >> > will never change). Since the library cannot tell if the groupBy key will >> > never change, it has to be conservative and do this subtract / add >> process >> > while receiving the old / new value. >> > >> > >> > Guozhang >> > >> > >> > On Wed, Aug 17, 2016 at 1:45 PM, Mathieu Fenniak < >> > mathieu.fenn...@replicon.com> wrote: >> > >> >> Hi Guozhang, >> >> >> >> Thanks for responding. Ah, I see what you're saying... in the case of >> >> an update to the KTable, the aggregator's subtractor result would be >> >> necessary if the group-by key changes in the update. >> >> >> >> It makes sense, but unfortunately the behavior leaves me feeling a >> >> little sketchy... when the group-by key doesn't change (which is >> >> guaranteed in my case), I'm outputting results that don't correspond >> >> at all to the inputs, temporarily. It's immediately followed by a >> >> corrected result. >> >> >> >> Would it be a feasible optimization to not send the subtractor's >> >> result out of the aggregate, only in the case where the groupBy key >> >> does not change between the old record and the new record? >> >> >> >> Mathieu >> >> >> >> On Wed, Aug 17, 2016 at 2:12 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> > Hello Mathieu, >> >> > >> >> > Note that semantics of KTable aggregations (i.e. >> >> "KTable.groupBy.aggregate" >> >> > as in 0.10.0) and KStream aggregations (i.e. "KStream.aggregateByKey" >> as >> >> in >> >> > 0.10.0) are different, in the sense that when the table is updated >> (i.e. >> >> a >> >> > new record with the same key "K1" is received), the old record's >> effect >> >> on >> >> > the aggregation need to first be subtracted before the new record's >> >> effect >> >> > on the aggregation can be added; whereas in the latter case there is >> no >> >> > "old values" that are not overridden, hence only "adder" aggregator is >> >> > needed. >> >> > >> >> > So suppose your updated record on K1 is on a different "category", >> say: >> >> > >> >> > K1, {"category": "kafka2", "text": "word1, word2, word3, word4"} >> >> > >> >> > >> >> > Then the aggregated result should be: >> >> > >> >> > {key: "kafka", value: 2} >> >> > {key: "kafka2", value: 4} >> >> > >> >> > >> >> > Does this make sense now? >> >> > >> >> > Guozhang >> >> > >> >> > >> >> > On Wed, Aug 17, 2016 at 7:59 AM, Mathieu Fenniak < >> >> > mathieu.fenn...@replicon.com> wrote: >> >> > >> >> >> Hello again, kafka-users, >> >> >> >> >> >> When I aggregate a KTable, a future input that updates a KTable's >> >> >> value for a specific key causes the aggregate's subtractor to be >> >> >> invoked, and then its adder. This part is great, completely >> >> >> as-expected. >> >> >> >> >> >> But what I didn't expect is that the intermediate result of the >> >> >> subtractor would be sent downstream. This value doesn't reflect the >> >> >> reality of the inputs to the aggregator, so sending it downstream is >> >> >> effectively sending "corrupt" data to the next processing node. Is >> >> >> this the expected behavior, or is this a bug? >> >> >> >> >> >> Take for example, a table of blog articles and an aggregator that >> >> >> counts the number of words in each category of the blog: >> >> >> >> >> >> topic: articles >> >> >> K1, {"category": "kafka", "text": "word1, word2, word3"} >> >> >> K2, {"category": "kafka", "text": "word1, word2"} >> >> >> >> >> >> articles.groupBy((k,v) -> v.category) >> >> >> .aggregate(() -> 0, >> >> >> (k,v,t) -> t + v.text.split(" ").length, >> >> >> (k,v,t) -> t - v.text.split(" ").length >> >> >> ) >> >> >> >> >> >> This aggregator will produce {key: "kafka", value: 3}, then {key: >> >> >> "kafka", value: 5}. If I update one of the blog articles and send a >> >> >> new message to the articles topic: >> >> >> >> >> >> K1, {"category": "kafka", "text": "word1, word2, word3, word4"} >> >> >> >> >> >> The aggregator will first produce {key: "kafka", value: 2} when the >> >> >> subtractor is called, then will produce {key: "kafka", value: 6} when >> >> >> the adder is called. The subtractor's calculation does not actually >> >> >> match the reality; K1 was never deleted, it was just updated. >> >> >> >> >> >> Mathieu >> >> >> >> >> > >> >> > >> >> > >> >> > -- >> >> > -- Guozhang >> >> >> > >> > >> > >> > -- >> > -- Guozhang >> > > > > -- > -- Guozhang