That's a lot of email exchanges for me to catch up :) My original proposed alternative solution is indeed relying on pre-aggregate before sending to the single-partition topic, so that the traffic on that single-partition topic would not be huge (I called it partial-aggregate but the intent was the same).
What I was thinking is that, given such a scenario could be common, if we've decided to go down this route should we provide a new API that wrap's John's proposed topology (right now with KIP-328 users still need to leverage this trick manually): ---------- final KStream<String, String> siteEvents = builder.stream("/site-events"); final KStream<Integer, Integer> keyedByPartition = siteEvents.transform(/* generate KeyValue(key, 1) for the pre-aggregate*/); final KTable<Integer, Long> countsByPartition = keyedByPartition.groupByKey().count(); /* pre-aggregate */ final KGroupedTable<String, Long> singlePartition = countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value)); /* sent the suppressed pre-aggregate values to the single partition topic */ final KTable<String, Long> totalCount = singlePartition.reduce((l, r) -> l + r, (l, r) -> l - r); /* read from the single partition topic, do reduce on the data*/ ---------- Note that if we wrap them all into a new operator, users would need to provide two functions, for the aggregate and for the final "reduce" (in my previous email I called it merger function, but for the same intent). Guozhang On Thu, Jul 5, 2018 at 3:38 PM, John Roesler <j...@confluent.io> wrote: > Ok, I didn't get quite as far as I hoped, and several things are far from > ready, but here's what I have so far: > https://github.com/apache/kafka/pull/5337 > > The "unit" test works, and is a good example of how you should expect it to > behave: > https://github.com/apache/kafka/pull/5337/files#diff- > 2fdec52b9cc3d0e564f0c12a199bed77 > > I have one working integration test, but it's slow going getting the timing > right, so no promises of any kind ;) > > Let me know what you think! > > Thanks, > -John > > On Thu, Jul 5, 2018 at 8:39 AM John Roesler <j...@confluent.io> wrote: > > > Hey Flávio, > > > > Thanks! I haven't got anything usable yet, but I'm working on it now. I'm > > hoping to push up my branch by the end of the day. > > > > I don't know if you've seen it but Streams actually already has something > > like this, in the form of caching on materialized stores. If you pass in > a > > "Materialized.withCachingEnabled()", you should be able to get a POC > > working by setting the max cache size pretty high and setting the commit > > interval for your desired rate: > > https://docs.confluent.io/current/streams/developer- > guide/memory-mgmt.html#streams-developer-guide-memory-management > > . > > > > There are a couple of cases in joins and whatnot where it doesn't work, > > but for the aggregations we discussed, it should. The reason for KIP-328 > is > > to provide finer control and hopefully a more straightforward API. > > > > Let me know if that works, and I'll drop a message in here when I create > > the draft PR for KIP-328. I'd really appreciate your feedback. > > > > Thanks, > > -John > > > > On Wed, Jul 4, 2018 at 10:17 PM flaviost...@gmail.com < > > flaviost...@gmail.com> wrote: > > > >> John, that was fantastic, man! > >> Have you built any custom implementation of your KIP in your machine so > >> that I could test it out here? I wish I could test it out. > >> If you need any help implementing this feature, please tell me. > >> > >> Thanks. > >> > >> -Flávio Stutz > >> > >> > >> > >> > >> On 2018/07/03 18:04:52, John Roesler <j...@confluent.io> wrote: > >> > Hi Flávio, > >> > Thanks! I think that we can actually do this, but the API could be > >> better. > >> > I've included Java code below, but I'll copy and modify your example > so > >> > we're on the same page. > >> > > >> > EXERCISE 1: > >> > - The case is "total counting of events for a huge website" > >> > - Tasks from Application A will have something like: > >> > .stream(/site-events) > >> > .transform( re-key s.t. the new key is the partition id) > >> > .groupByKey() // you have to do this before count > >> > .count() > >> > // you explicitly published to a one-partition topic here, > but > >> > it's actually sufficient just > >> > // to re-group onto one key. You could name and pre-create > the > >> > intermediate topic here, > >> > // but you don't need a separate application for the final > >> > aggregation. > >> > .groupBy((partitionId, partialCount) -> new KeyValue("ALL", > >> > partialCount)) > >> > .aggregate(sum up the partialCounts) > >> > .publish(/counter-total) > >> > > >> > I've left out the suppressions, but they would go right after the > >> count() > >> > and the aggregate(). > >> > > >> > With this program, you don't have to worry about the > double-aggregation > >> you > >> > mentioned in the last email. The KTable produced by the first count() > >> will > >> > maintain the correct count per partition. If the value changes for any > >> > partition, it'll emit a retraction of the old value and then the new > >> value > >> > downstream, so that the final aggregation can update itself properly. > >> > > >> > I think we can optimize both the execution and the programability by > >> adding > >> > a "global aggregation" concept. But In principle, it seems like this > >> usage > >> > of the current API will support your use case. > >> > > >> > Once again, though, this is just to present an alternative. I haven't > >> done > >> > the math on whether your proposal would be more efficient. > >> > > >> > Thanks, > >> > -John > >> > > >> > Here's the same algorithm written in Java: > >> > > >> > final KStream<String, String> siteEvents = > >> builder.stream("/site-events"); > >> > > >> > // here we re-key the events so that the key is actually the partition > >> id. > >> > // we don't need the value to do a count, so I just set it to "1". > >> > final KStream<Integer, Integer> keyedByPartition = > >> siteEvents.transform(() > >> > -> new Transformer<String, String, KeyValue<Integer, Integer>>() { > >> > private ProcessorContext context; > >> > > >> > @Override > >> > public void init(final ProcessorContext context) { > >> > this.context = context; > >> > } > >> > > >> > @Override > >> > public KeyValue<Integer, Integer> transform(final String key, > final > >> > String value) { > >> > return new KeyValue<>(context.partition(), 1); > >> > } > >> > }); > >> > > >> > // Note that we can't do "count()" on a KStream, we have to group it > >> first. > >> > I'm grouping by the key, so it will produce the count for each key. > >> > // Since the key is actually the partition id, it will produce the > >> > pre-aggregated count per partition. > >> > // Note that the result is a KTable<PartitionId,Count>. It'll always > >> > contain the most recent count for each partition. > >> > final KTable<Integer, Long> countsByPartition = > >> > keyedByPartition.groupByKey().count(); > >> > > >> > // Now we get ready for the final roll-up. We re-group all the > >> constituent > >> > counts > >> > final KGroupedTable<String, Long> singlePartition = > >> > countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", > value)); > >> > > >> > final KTable<String, Long> totalCount = singlePartition.reduce((l, r) > >> -> l > >> > + r, (l, r) -> l - r); > >> > > >> > totalCount.toStream().foreach((k, v) -> { > >> > // k is always "ALL" > >> > // v is always the most recent total value > >> > System.out.println("The total event count is: " + v); > >> > }); > >> > > >> > > >> > On Tue, Jul 3, 2018 at 9:21 AM flaviost...@gmail.com < > >> flaviost...@gmail.com> > >> > wrote: > >> > > >> > > Great feature you have there! > >> > > > >> > > I'll try to exercise here how we would achieve the same functional > >> > > objectives using your KIP: > >> > > > >> > > EXERCISE 1: > >> > > - The case is "total counting of events for a huge website" > >> > > - Tasks from Application A will have something like: > >> > > .stream(/site-events) > >> > > .count() > >> > > .publish(/single-partitioned-topic-with-count-partials) > >> > > - The published messages will be, for example: > >> > > ["counter-task1", 2345] > >> > > ["counter-task2", 8495] > >> > > ["counter-task3", 4839] > >> > > - Single Task from Application B will have something like: > >> > > .stream(/single-partitioned-topic-with-count-partials) > >> > > .aggregate(by messages whose key starts with "counter") > >> > > .publish(/counter-total) > >> > > - FAIL HERE. How would I know what is the overall partitions? > Maybe > >> two > >> > > partials for the same task will arrive before other tasks and it > maybe > >> > > aggregated twice. > >> > > > >> > > I tried to think about using GlobalKTables, but I didn't get an easy > >> way > >> > > to aggregate the keys from that table. Do you have any clue? > >> > > > >> > > Thanks. > >> > > > >> > > -Flávio Stutz > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > /partial-counters-to-single-partitioned-topic > >> > > > >> > > On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote: > >> > > > Hi Flávio, > >> > > > > >> > > > Thanks for the KIP. I'll apologize that I'm arriving late to the > >> > > > discussion. I've tried to catch up, but I might have missed some > >> nuances. > >> > > > > >> > > > Regarding KIP-328, the idea is to add the ability to suppress > >> > > intermediate > >> > > > results from all KTables, not just windowed ones. I think this > could > >> > > > support your use case in combination with the strategy that > Guozhang > >> > > > proposed of having one or more pre-aggregation steps that > >> ultimately push > >> > > > into a single-partition topic for final aggregation. Suppressing > >> > > > intermediate results would solve the problem you noted that today > >> > > > pre-aggregating doesn't do much to staunch the flow up updates. > >> > > > > >> > > > I'm not sure if this would be good enough for you overall; I just > >> wanted > >> > > to > >> > > > clarify the role of KIP-328. > >> > > > In particular, the solution you mentioned is to have the > downstream > >> > > KTables > >> > > > actually query the upstream ones to compute their results. I'm not > >> sure > >> > > > whether it's more efficient to do these queries on the schedule, > or > >> to > >> > > have > >> > > > the upstream tables emit their results, on the same schedule. > >> > > > > >> > > > What do you think? > >> > > > > >> > > > Thanks, > >> > > > -John > >> > > > > >> > > > On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com < > >> > > flaviost...@gmail.com> > >> > > > wrote: > >> > > > > >> > > > > For what I understood, that KIP is related to how KStreams will > >> handle > >> > > > > KTable updates in Windowed scenarios to optimize resource usage. > >> > > > > I couldn't see any specific relation to this KIP. Had you? > >> > > > > > >> > > > > -Flávio Stutz > >> > > > > > >> > > > > > >> > > > > On 2018/06/29 18:14:46, "Matthias J. Sax" < > matth...@confluent.io> > >> > > wrote: > >> > > > > > Flavio, > >> > > > > > > >> > > > > > thanks for cleaning up the KIP number collision. > >> > > > > > > >> > > > > > With regard to KIP-328 > >> > > > > > ( > >> > > > > > >> > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 328%3A+Ability+to+suppress+updates+for+KTables > >> > > > > ) > >> > > > > > I am wondering how both relate to each other? > >> > > > > > > >> > > > > > Any thoughts? > >> > > > > > > >> > > > > > > >> > > > > > -Matthias > >> > > > > > > >> > > > > > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote: > >> > > > > > > Just copying a follow up from another thread to here (sorry > >> about > >> > > the > >> > > > > mess): > >> > > > > > > > >> > > > > > > From: Guozhang Wang <wangg...@gmail.com> > >> > > > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph > >> source > >> > > > > > > Date: 2018/06/25 22:24:17 > >> > > > > > > List: dev@kafka.apache.org > >> > > > > > > > >> > > > > > > Flávio, thanks for creating this KIP. > >> > > > > > > > >> > > > > > > I think this "single-aggregation" use case is common enough > >> that we > >> > > > > should > >> > > > > > > consider how to efficiently supports it: for example, for > KSQL > >> > > that's > >> > > > > built > >> > > > > > > on top of Streams, we've seen lots of query statements whose > >> > > return is > >> > > > > > > expected a single row indicating the "total aggregate" etc. > >> See > >> > > > > > > https://github.com/confluentinc/ksql/issues/430 for > details. > >> > > > > > > > >> > > > > > > I've not read through > >> > > https://issues.apache.org/jira/browse/KAFKA-6953, > >> > > > > but > >> > > > > > > I'm wondering if we have discussed the option of supporting > >> it in a > >> > > > > > > "pre-aggregate" manner: that is we do partial aggregates on > >> > > parallel > >> > > > > tasks, > >> > > > > > > and then sends the partial aggregated value via a single > topic > >> > > > > partition > >> > > > > > > for the final aggregate, to reduce the traffic on that > single > >> > > > > partition and > >> > > > > > > hence the final aggregate workload. > >> > > > > > > Of course, for non-commutative aggregates we'd probably need > >> to > >> > > provide > >> > > > > > > another API in addition to aggregate, like the `merge` > >> function for > >> > > > > > > session-based aggregates, to let users customize the > >> operations of > >> > > > > merging > >> > > > > > > two partial aggregates into a single partial aggregate. > >> What's its > >> > > > > pros and > >> > > > > > > cons compared with the current proposal? > >> > > > > > > > >> > > > > > > > >> > > > > > > Guozhang > >> > > > > > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com > > > >> > > wrote: > >> > > > > > >> Hey, guys, I've just created a new KIP about creating a new > >> DSL > >> > > graph > >> > > > > > >> source for realtime partitioned consolidations. > >> > > > > > >> > >> > > > > > >> We have faced the following scenario/problem in a lot of > >> > > situations > >> > > > > with > >> > > > > > >> KStreams: > >> > > > > > >> - Huge incoming data being processed by numerous > >> application > >> > > > > instances > >> > > > > > >> - Need to aggregate different fields whose records span > >> all > >> > > topic > >> > > > > > >> partitions (something like “total amount spent by people > >> aged > 30 > >> > > > > yrs” > >> > > > > > >> when processing a topic partitioned by userid). > >> > > > > > >> > >> > > > > > >> The challenge here is to manage this kind of situation > >> without any > >> > > > > > >> bottlenecks. We don't need the “global aggregation” to be > >> > > processed > >> > > > > at each > >> > > > > > >> incoming message. On a scenario of 500 instances, each > >> handling 1k > >> > > > > > >> messages/s, any single point of aggregation (single > >> partitioned > >> > > > > topics, > >> > > > > > >> global tables or external databases) would create a > >> bottleneck of > >> > > 500k > >> > > > > > >> messages/s for single threaded/CPU elements. > >> > > > > > >> > >> > > > > > >> For this scenario, it is possible to store the partial > >> > > aggregations on > >> > > > > > >> local stores and, from time to time, query those states and > >> > > aggregate > >> > > > > them > >> > > > > > >> as a single value, avoiding bottlenecks. This is a way to > >> create a > >> > > > > "timed > >> > > > > > >> aggregation barrier”. > >> > > > > > >> > >> > > > > > >> If we leverage this kind of built-in feature we could > greatly > >> > > enhance > >> > > > > the > >> > > > > > >> ability of KStreams to better handle the CAP Theorem > >> > > characteristics, > >> > > > > so > >> > > > > > >> that one could choose to have Consistency over Availability > >> when > >> > > > > needed. > >> > > > > > >> > >> > > > > > >> We started this discussion with Matthias J. Sax here: > >> > > > > > >> https://issues.apache.org/jira/browse/KAFKA-6953 > >> > > > > > >> > >> > > > > > >> If you want to see more, go to KIP-326 at: > >> > > > > > >> > >> > > > > > >> > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 326%3A+Schedulable+KTable+as+Graph+source > >> > > > > > >> > >> > > > > > >> -Flávio Stutz > >> > > > > > >> > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > -- -- Guozhang