Repasting my comment from the other email thread: --------------------------
Flávio, thanks for creating this KIP. I think this "single-aggregation" use case is common enough that we should consider how to efficiently supports it: for example, for KSQL that's built on top of Streams, we've seen lots of query statements whose return is expected a single row indicating the "total aggregate" etc. See https://github.com/confluentinc/ksql/issues/430 for details. I've not read through https://issues.apache.org/jira/browse/KAFKA-6953, but I'm wondering if we have discussed the option of supporting it in a "pre-aggregate" manner: that is we do partial aggregates on parallel tasks, and then sends the partial aggregated value via a single topic partition for the final aggregate, to reduce the traffic on that single partition and hence the final aggregate workload. Of course, for non-commutative aggregates we'd probably need to provide another API in addition to aggregate, like the `merge` function for session-based aggregates, to let users customize the operations of merging two partial aggregates into a single partial aggregate. What's its pros and cons compared with the current proposal? Guozhang On Fri, Jun 29, 2018 at 12:53 PM, Bill Bejeck <bbej...@gmail.com> wrote: > Hi Flávio, > > Thanks for creating the KIP. > > I agree with Guozhang on comparing the pros and cons of the approach he > outlined vs the one in the proposed KIP. > > I also have a few clarification questions on the current KIP > > Will the triggering mechanism always be time, or would it make sense to > expand to use other mechanisms such as the number of records, or some value > present in one of the records? > When setting "all instances" to true, how is the leader chosen? > If "all instances" is set to false are all the partial aggregates forwarded > to single output topic? > > Thanks again, > Bill > > > On Fri, Jun 29, 2018 at 2:15 PM Matthias J. Sax <matth...@confluent.io> > wrote: > > > Flavio, > > > > thanks for cleaning up the KIP number collision. > > > > With regard to KIP-328 > > ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 328%3A+Ability+to+suppress+updates+for+KTables > > ) > > I am wondering how both relate to each other? > > > > Any thoughts? > > > > > > -Matthias > > > > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote: > > > Just copying a follow up from another thread to here (sorry about the > > mess): > > > > > > From: Guozhang Wang <wangg...@gmail.com> > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source > > > Date: 2018/06/25 22:24:17 > > > List: dev@kafka.apache.org > > > > > > Flávio, thanks for creating this KIP. > > > > > > I think this "single-aggregation" use case is common enough that we > > should > > > consider how to efficiently supports it: for example, for KSQL that's > > built > > > on top of Streams, we've seen lots of query statements whose return is > > > expected a single row indicating the "total aggregate" etc. See > > > https://github.com/confluentinc/ksql/issues/430 for details. > > > > > > I've not read through https://issues.apache.org/jira/browse/KAFKA-6953 > , > > but > > > I'm wondering if we have discussed the option of supporting it in a > > > "pre-aggregate" manner: that is we do partial aggregates on parallel > > tasks, > > > and then sends the partial aggregated value via a single topic > partition > > > for the final aggregate, to reduce the traffic on that single partition > > and > > > hence the final aggregate workload. > > > Of course, for non-commutative aggregates we'd probably need to provide > > > another API in addition to aggregate, like the `merge` function for > > > session-based aggregates, to let users customize the operations of > > merging > > > two partial aggregates into a single partial aggregate. What's its pros > > and > > > cons compared with the current proposal? > > > > > > > > > Guozhang > > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com> wrote: > > >> Hey, guys, I've just created a new KIP about creating a new DSL graph > > >> source for realtime partitioned consolidations. > > >> > > >> We have faced the following scenario/problem in a lot of situations > with > > >> KStreams: > > >> - Huge incoming data being processed by numerous application > > instances > > >> - Need to aggregate different fields whose records span all topic > > >> partitions (something like “total amount spent by people aged > 30 > yrs” > > >> when processing a topic partitioned by userid). > > >> > > >> The challenge here is to manage this kind of situation without any > > >> bottlenecks. We don't need the “global aggregation” to be processed at > > each > > >> incoming message. On a scenario of 500 instances, each handling 1k > > >> messages/s, any single point of aggregation (single partitioned > topics, > > >> global tables or external databases) would create a bottleneck of 500k > > >> messages/s for single threaded/CPU elements. > > >> > > >> For this scenario, it is possible to store the partial aggregations on > > >> local stores and, from time to time, query those states and aggregate > > them > > >> as a single value, avoiding bottlenecks. This is a way to create a > > "timed > > >> aggregation barrier”. > > >> > > >> If we leverage this kind of built-in feature we could greatly enhance > > the > > >> ability of KStreams to better handle the CAP Theorem characteristics, > so > > >> that one could choose to have Consistency over Availability when > needed. > > >> > > >> We started this discussion with Matthias J. Sax here: > > >> https://issues.apache.org/jira/browse/KAFKA-6953 > > >> > > >> If you want to see more, go to KIP-326 at: > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 326%3A+Schedulable+KTable+as+Graph+source > > >> > > >> -Flávio Stutz > > >> > > > > > -- -- Guozhang