Ok, I didn't get quite as far as I hoped, and several things are far from ready, but here's what I have so far: https://github.com/apache/kafka/pull/5337
The "unit" test works, and is a good example of how you should expect it to behave: https://github.com/apache/kafka/pull/5337/files#diff-2fdec52b9cc3d0e564f0c12a199bed77 I have one working integration test, but it's slow going getting the timing right, so no promises of any kind ;) Let me know what you think! Thanks, -John On Thu, Jul 5, 2018 at 8:39 AM John Roesler <j...@confluent.io> wrote: > Hey Flávio, > > Thanks! I haven't got anything usable yet, but I'm working on it now. I'm > hoping to push up my branch by the end of the day. > > I don't know if you've seen it but Streams actually already has something > like this, in the form of caching on materialized stores. If you pass in a > "Materialized.withCachingEnabled()", you should be able to get a POC > working by setting the max cache size pretty high and setting the commit > interval for your desired rate: > https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html#streams-developer-guide-memory-management > . > > There are a couple of cases in joins and whatnot where it doesn't work, > but for the aggregations we discussed, it should. The reason for KIP-328 is > to provide finer control and hopefully a more straightforward API. > > Let me know if that works, and I'll drop a message in here when I create > the draft PR for KIP-328. I'd really appreciate your feedback. > > Thanks, > -John > > On Wed, Jul 4, 2018 at 10:17 PM flaviost...@gmail.com < > flaviost...@gmail.com> wrote: > >> John, that was fantastic, man! >> Have you built any custom implementation of your KIP in your machine so >> that I could test it out here? I wish I could test it out. >> If you need any help implementing this feature, please tell me. >> >> Thanks. >> >> -Flávio Stutz >> >> >> >> >> On 2018/07/03 18:04:52, John Roesler <j...@confluent.io> wrote: >> > Hi Flávio, >> > Thanks! I think that we can actually do this, but the API could be >> better. >> > I've included Java code below, but I'll copy and modify your example so >> > we're on the same page. >> > >> > EXERCISE 1: >> > - The case is "total counting of events for a huge website" >> > - Tasks from Application A will have something like: >> > .stream(/site-events) >> > .transform( re-key s.t. the new key is the partition id) >> > .groupByKey() // you have to do this before count >> > .count() >> > // you explicitly published to a one-partition topic here, but >> > it's actually sufficient just >> > // to re-group onto one key. You could name and pre-create the >> > intermediate topic here, >> > // but you don't need a separate application for the final >> > aggregation. >> > .groupBy((partitionId, partialCount) -> new KeyValue("ALL", >> > partialCount)) >> > .aggregate(sum up the partialCounts) >> > .publish(/counter-total) >> > >> > I've left out the suppressions, but they would go right after the >> count() >> > and the aggregate(). >> > >> > With this program, you don't have to worry about the double-aggregation >> you >> > mentioned in the last email. The KTable produced by the first count() >> will >> > maintain the correct count per partition. If the value changes for any >> > partition, it'll emit a retraction of the old value and then the new >> value >> > downstream, so that the final aggregation can update itself properly. >> > >> > I think we can optimize both the execution and the programability by >> adding >> > a "global aggregation" concept. But In principle, it seems like this >> usage >> > of the current API will support your use case. >> > >> > Once again, though, this is just to present an alternative. I haven't >> done >> > the math on whether your proposal would be more efficient. >> > >> > Thanks, >> > -John >> > >> > Here's the same algorithm written in Java: >> > >> > final KStream<String, String> siteEvents = >> builder.stream("/site-events"); >> > >> > // here we re-key the events so that the key is actually the partition >> id. >> > // we don't need the value to do a count, so I just set it to "1". >> > final KStream<Integer, Integer> keyedByPartition = >> siteEvents.transform(() >> > -> new Transformer<String, String, KeyValue<Integer, Integer>>() { >> > private ProcessorContext context; >> > >> > @Override >> > public void init(final ProcessorContext context) { >> > this.context = context; >> > } >> > >> > @Override >> > public KeyValue<Integer, Integer> transform(final String key, final >> > String value) { >> > return new KeyValue<>(context.partition(), 1); >> > } >> > }); >> > >> > // Note that we can't do "count()" on a KStream, we have to group it >> first. >> > I'm grouping by the key, so it will produce the count for each key. >> > // Since the key is actually the partition id, it will produce the >> > pre-aggregated count per partition. >> > // Note that the result is a KTable<PartitionId,Count>. It'll always >> > contain the most recent count for each partition. >> > final KTable<Integer, Long> countsByPartition = >> > keyedByPartition.groupByKey().count(); >> > >> > // Now we get ready for the final roll-up. We re-group all the >> constituent >> > counts >> > final KGroupedTable<String, Long> singlePartition = >> > countsByPartition.groupBy((key, value) -> new KeyValue<>("ALL", value)); >> > >> > final KTable<String, Long> totalCount = singlePartition.reduce((l, r) >> -> l >> > + r, (l, r) -> l - r); >> > >> > totalCount.toStream().foreach((k, v) -> { >> > // k is always "ALL" >> > // v is always the most recent total value >> > System.out.println("The total event count is: " + v); >> > }); >> > >> > >> > On Tue, Jul 3, 2018 at 9:21 AM flaviost...@gmail.com < >> flaviost...@gmail.com> >> > wrote: >> > >> > > Great feature you have there! >> > > >> > > I'll try to exercise here how we would achieve the same functional >> > > objectives using your KIP: >> > > >> > > EXERCISE 1: >> > > - The case is "total counting of events for a huge website" >> > > - Tasks from Application A will have something like: >> > > .stream(/site-events) >> > > .count() >> > > .publish(/single-partitioned-topic-with-count-partials) >> > > - The published messages will be, for example: >> > > ["counter-task1", 2345] >> > > ["counter-task2", 8495] >> > > ["counter-task3", 4839] >> > > - Single Task from Application B will have something like: >> > > .stream(/single-partitioned-topic-with-count-partials) >> > > .aggregate(by messages whose key starts with "counter") >> > > .publish(/counter-total) >> > > - FAIL HERE. How would I know what is the overall partitions? Maybe >> two >> > > partials for the same task will arrive before other tasks and it maybe >> > > aggregated twice. >> > > >> > > I tried to think about using GlobalKTables, but I didn't get an easy >> way >> > > to aggregate the keys from that table. Do you have any clue? >> > > >> > > Thanks. >> > > >> > > -Flávio Stutz >> > > >> > > >> > > >> > > >> > > >> > > >> > > /partial-counters-to-single-partitioned-topic >> > > >> > > On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote: >> > > > Hi Flávio, >> > > > >> > > > Thanks for the KIP. I'll apologize that I'm arriving late to the >> > > > discussion. I've tried to catch up, but I might have missed some >> nuances. >> > > > >> > > > Regarding KIP-328, the idea is to add the ability to suppress >> > > intermediate >> > > > results from all KTables, not just windowed ones. I think this could >> > > > support your use case in combination with the strategy that Guozhang >> > > > proposed of having one or more pre-aggregation steps that >> ultimately push >> > > > into a single-partition topic for final aggregation. Suppressing >> > > > intermediate results would solve the problem you noted that today >> > > > pre-aggregating doesn't do much to staunch the flow up updates. >> > > > >> > > > I'm not sure if this would be good enough for you overall; I just >> wanted >> > > to >> > > > clarify the role of KIP-328. >> > > > In particular, the solution you mentioned is to have the downstream >> > > KTables >> > > > actually query the upstream ones to compute their results. I'm not >> sure >> > > > whether it's more efficient to do these queries on the schedule, or >> to >> > > have >> > > > the upstream tables emit their results, on the same schedule. >> > > > >> > > > What do you think? >> > > > >> > > > Thanks, >> > > > -John >> > > > >> > > > On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com < >> > > flaviost...@gmail.com> >> > > > wrote: >> > > > >> > > > > For what I understood, that KIP is related to how KStreams will >> handle >> > > > > KTable updates in Windowed scenarios to optimize resource usage. >> > > > > I couldn't see any specific relation to this KIP. Had you? >> > > > > >> > > > > -Flávio Stutz >> > > > > >> > > > > >> > > > > On 2018/06/29 18:14:46, "Matthias J. Sax" <matth...@confluent.io> >> > > wrote: >> > > > > > Flavio, >> > > > > > >> > > > > > thanks for cleaning up the KIP number collision. >> > > > > > >> > > > > > With regard to KIP-328 >> > > > > > ( >> > > > > >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables >> > > > > ) >> > > > > > I am wondering how both relate to each other? >> > > > > > >> > > > > > Any thoughts? >> > > > > > >> > > > > > >> > > > > > -Matthias >> > > > > > >> > > > > > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote: >> > > > > > > Just copying a follow up from another thread to here (sorry >> about >> > > the >> > > > > mess): >> > > > > > > >> > > > > > > From: Guozhang Wang <wangg...@gmail.com> >> > > > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph >> source >> > > > > > > Date: 2018/06/25 22:24:17 >> > > > > > > List: dev@kafka.apache.org >> > > > > > > >> > > > > > > Flávio, thanks for creating this KIP. >> > > > > > > >> > > > > > > I think this "single-aggregation" use case is common enough >> that we >> > > > > should >> > > > > > > consider how to efficiently supports it: for example, for KSQL >> > > that's >> > > > > built >> > > > > > > on top of Streams, we've seen lots of query statements whose >> > > return is >> > > > > > > expected a single row indicating the "total aggregate" etc. >> See >> > > > > > > https://github.com/confluentinc/ksql/issues/430 for details. >> > > > > > > >> > > > > > > I've not read through >> > > https://issues.apache.org/jira/browse/KAFKA-6953, >> > > > > but >> > > > > > > I'm wondering if we have discussed the option of supporting >> it in a >> > > > > > > "pre-aggregate" manner: that is we do partial aggregates on >> > > parallel >> > > > > tasks, >> > > > > > > and then sends the partial aggregated value via a single topic >> > > > > partition >> > > > > > > for the final aggregate, to reduce the traffic on that single >> > > > > partition and >> > > > > > > hence the final aggregate workload. >> > > > > > > Of course, for non-commutative aggregates we'd probably need >> to >> > > provide >> > > > > > > another API in addition to aggregate, like the `merge` >> function for >> > > > > > > session-based aggregates, to let users customize the >> operations of >> > > > > merging >> > > > > > > two partial aggregates into a single partial aggregate. >> What's its >> > > > > pros and >> > > > > > > cons compared with the current proposal? >> > > > > > > >> > > > > > > >> > > > > > > Guozhang >> > > > > > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com> >> > > wrote: >> > > > > > >> Hey, guys, I've just created a new KIP about creating a new >> DSL >> > > graph >> > > > > > >> source for realtime partitioned consolidations. >> > > > > > >> >> > > > > > >> We have faced the following scenario/problem in a lot of >> > > situations >> > > > > with >> > > > > > >> KStreams: >> > > > > > >> - Huge incoming data being processed by numerous >> application >> > > > > instances >> > > > > > >> - Need to aggregate different fields whose records span >> all >> > > topic >> > > > > > >> partitions (something like “total amount spent by people >> aged > 30 >> > > > > yrs” >> > > > > > >> when processing a topic partitioned by userid). >> > > > > > >> >> > > > > > >> The challenge here is to manage this kind of situation >> without any >> > > > > > >> bottlenecks. We don't need the “global aggregation” to be >> > > processed >> > > > > at each >> > > > > > >> incoming message. On a scenario of 500 instances, each >> handling 1k >> > > > > > >> messages/s, any single point of aggregation (single >> partitioned >> > > > > topics, >> > > > > > >> global tables or external databases) would create a >> bottleneck of >> > > 500k >> > > > > > >> messages/s for single threaded/CPU elements. >> > > > > > >> >> > > > > > >> For this scenario, it is possible to store the partial >> > > aggregations on >> > > > > > >> local stores and, from time to time, query those states and >> > > aggregate >> > > > > them >> > > > > > >> as a single value, avoiding bottlenecks. This is a way to >> create a >> > > > > "timed >> > > > > > >> aggregation barrier”. >> > > > > > >> >> > > > > > >> If we leverage this kind of built-in feature we could greatly >> > > enhance >> > > > > the >> > > > > > >> ability of KStreams to better handle the CAP Theorem >> > > characteristics, >> > > > > so >> > > > > > >> that one could choose to have Consistency over Availability >> when >> > > > > needed. >> > > > > > >> >> > > > > > >> We started this discussion with Matthias J. Sax here: >> > > > > > >> https://issues.apache.org/jira/browse/KAFKA-6953 >> > > > > > >> >> > > > > > >> If you want to see more, go to KIP-326 at: >> > > > > > >> >> > > > > >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source >> > > > > > >> >> > > > > > >> -Flávio Stutz >> > > > > > >> >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >