Hi Flávio, Sure thing. And apologies in advance if I missed the point.
Below is some more-or-less realistic Java code to demonstrate how, given a high-volume (heavily partitioned) stream of purchases, we can "step down" the update rate with rate-limited intermediate aggregations. Please bear in mind that the suppression API itself is still under debate, so this is just for illustration purposes. Basically, the "suppress" operator creates a processor whose job is just to store the latest value for each key and not emit it until the configured time. So if key "X" gets updated 1000x/sec, we can use suppress to make sure it doesn't get emitted to the next processor more than once per second. Does this make sense? Thanks, -John public class KTableSuppressProcessorTest { private static class Purchase { final long customerId; final int value; private Purchase(final long customerId, final int value) { this.customerId = customerId; this.value = value; } } private static class PurchaseSerde implements Serde<Purchase> {...} public Topology buildTopology() { final StreamsBuilder builder = new StreamsBuilder(); final String purchases = "purchases"; final KTable<Long, Purchase> input = builder.table( purchases, Consumed.with(Serdes.Long(), new PurchaseSerde()) ); // Fairly sloppy, but the idea is to "split" each customer id into one id per partition. // This way, we can first total their purchases inside each partition before aggregating them // across partitions final KTable<Long, Purchase> purchasesWithPartitionedCustomers = input.transformValues( () -> new ValueTransformerWithKey<Long, Purchase, Purchase>() { private ProcessorContext context; @Override public void init(final ProcessorContext context) { this.context = context; } @Override public Purchase transform(final Long readOnlyKey, final Purchase purchase) { final int partition = context.partition(); return new Purchase( purchase.customerId * 1000 + partition, // Assuming we have < 1k partitions... purchase.value ); } }); final KGroupedTable<Long, Integer> purchaseValueByPartitionedCustomer = purchasesWithPartitionedCustomers.groupBy( (id, purchase) -> new KeyValue<>(purchase.customerId, purchase.value) ); final Suppression<Long, Integer> oncePerKeyPerSecond = Suppression.suppressIntermediateEvents( IntermediateSuppression .emitAfter(Duration.ofSeconds(1)) .bufferKeys(5000) .bufferFullStrategy(EMIT) ); // First level of aggregation. Each customer gets their purchases aggregated *just within each partition*. // The result of this aggregation is emitted at most once per second per customer per purchase-partition final KTable<Long, Integer> totalValueByPartitionedCustomer = purchaseValueByPartitionedCustomer .reduce((l, r) -> l + r, (l, r) -> l - r) .suppress(oncePerKeyPerSecond); // This is where we reverse the partitioning of each customer and then aggregate // each customer's purchases across partitions // The result of this aggregation is emitted at most once per second per customer final KTable<Long, Integer> aggregatedTotalValueByPartitionedCustomer = totalValueByPartitionedCustomer .groupBy((key, value) -> new KeyValue<>(key / 1000, value)) .reduce((l, r) -> l + r, (l, r) -> l - r) .suppress(oncePerKeyPerSecond); // Sending all the intermediate totals to a single key to get the final aggregation // The result of this aggregation is emitted at most once per second final KTable<String, Integer> total = aggregatedTotalValueByPartitionedCustomer .groupBy((key, value) -> new KeyValue<>("ALL", value)) .reduce((l, r) -> l + r, (l, r) -> l - r) .suppress(Suppression.suppressIntermediateEvents( IntermediateSuppression.emitAfter(Duration.ofSeconds(1)) )); // This topic will contain just one key ("ALL"), and the value will be // the ever-updating all-time purchase value // Note that it'll be updated once per second. total.toStream().to("total-purchases-value"); return builder.build(); } } On Mon, Jul 2, 2018 at 3:38 PM flaviost...@gmail.com <flaviost...@gmail.com> wrote: > Thanks for clarifying the real usage of KIP-328. Now I understood a bit > better. > I didn't see how that feature would be used to minimize the number of > publications to the single partitioned output topic. When it is falls into > supression, the graph stops going down? Could you explain better? If that > is possible I think it would be great. > > Thanks for the intervention! > > -Flávio Stutz > > > > > On 2018/07/02 20:03:57, John Roesler <j...@confluent.io> wrote: > > Hi Flávio, > > > > Thanks for the KIP. I'll apologize that I'm arriving late to the > > discussion. I've tried to catch up, but I might have missed some nuances. > > > > Regarding KIP-328, the idea is to add the ability to suppress > intermediate > > results from all KTables, not just windowed ones. I think this could > > support your use case in combination with the strategy that Guozhang > > proposed of having one or more pre-aggregation steps that ultimately push > > into a single-partition topic for final aggregation. Suppressing > > intermediate results would solve the problem you noted that today > > pre-aggregating doesn't do much to staunch the flow up updates. > > > > I'm not sure if this would be good enough for you overall; I just wanted > to > > clarify the role of KIP-328. > > In particular, the solution you mentioned is to have the downstream > KTables > > actually query the upstream ones to compute their results. I'm not sure > > whether it's more efficient to do these queries on the schedule, or to > have > > the upstream tables emit their results, on the same schedule. > > > > What do you think? > > > > Thanks, > > -John > > > > On Sun, Jul 1, 2018 at 10:03 PM flaviost...@gmail.com < > flaviost...@gmail.com> > > wrote: > > > > > For what I understood, that KIP is related to how KStreams will handle > > > KTable updates in Windowed scenarios to optimize resource usage. > > > I couldn't see any specific relation to this KIP. Had you? > > > > > > -Flávio Stutz > > > > > > > > > On 2018/06/29 18:14:46, "Matthias J. Sax" <matth...@confluent.io> > wrote: > > > > Flavio, > > > > > > > > thanks for cleaning up the KIP number collision. > > > > > > > > With regard to KIP-328 > > > > ( > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables > > > ) > > > > I am wondering how both relate to each other? > > > > > > > > Any thoughts? > > > > > > > > > > > > -Matthias > > > > > > > > On 6/29/18 10:23 AM, flaviost...@gmail.com wrote: > > > > > Just copying a follow up from another thread to here (sorry about > the > > > mess): > > > > > > > > > > From: Guozhang Wang <wangg...@gmail.com> > > > > > Subject: Re: [DISCUSS] KIP-323: Schedulable KTable as Graph source > > > > > Date: 2018/06/25 22:24:17 > > > > > List: dev@kafka.apache.org > > > > > > > > > > Flávio, thanks for creating this KIP. > > > > > > > > > > I think this "single-aggregation" use case is common enough that we > > > should > > > > > consider how to efficiently supports it: for example, for KSQL > that's > > > built > > > > > on top of Streams, we've seen lots of query statements whose > return is > > > > > expected a single row indicating the "total aggregate" etc. See > > > > > https://github.com/confluentinc/ksql/issues/430 for details. > > > > > > > > > > I've not read through > https://issues.apache.org/jira/browse/KAFKA-6953, > > > but > > > > > I'm wondering if we have discussed the option of supporting it in a > > > > > "pre-aggregate" manner: that is we do partial aggregates on > parallel > > > tasks, > > > > > and then sends the partial aggregated value via a single topic > > > partition > > > > > for the final aggregate, to reduce the traffic on that single > > > partition and > > > > > hence the final aggregate workload. > > > > > Of course, for non-commutative aggregates we'd probably need to > provide > > > > > another API in addition to aggregate, like the `merge` function for > > > > > session-based aggregates, to let users customize the operations of > > > merging > > > > > two partial aggregates into a single partial aggregate. What's its > > > pros and > > > > > cons compared with the current proposal? > > > > > > > > > > > > > > > Guozhang > > > > > On 2018/06/26 18:22:27, Flávio Stutz <flaviost...@gmail.com> > wrote: > > > > >> Hey, guys, I've just created a new KIP about creating a new DSL > graph > > > > >> source for realtime partitioned consolidations. > > > > >> > > > > >> We have faced the following scenario/problem in a lot of > situations > > > with > > > > >> KStreams: > > > > >> - Huge incoming data being processed by numerous application > > > instances > > > > >> - Need to aggregate different fields whose records span all > topic > > > > >> partitions (something like “total amount spent by people aged > 30 > > > yrs” > > > > >> when processing a topic partitioned by userid). > > > > >> > > > > >> The challenge here is to manage this kind of situation without any > > > > >> bottlenecks. We don't need the “global aggregation” to be > processed > > > at each > > > > >> incoming message. On a scenario of 500 instances, each handling 1k > > > > >> messages/s, any single point of aggregation (single partitioned > > > topics, > > > > >> global tables or external databases) would create a bottleneck of > 500k > > > > >> messages/s for single threaded/CPU elements. > > > > >> > > > > >> For this scenario, it is possible to store the partial > aggregations on > > > > >> local stores and, from time to time, query those states and > aggregate > > > them > > > > >> as a single value, avoiding bottlenecks. This is a way to create a > > > "timed > > > > >> aggregation barrier”. > > > > >> > > > > >> If we leverage this kind of built-in feature we could greatly > enhance > > > the > > > > >> ability of KStreams to better handle the CAP Theorem > characteristics, > > > so > > > > >> that one could choose to have Consistency over Availability when > > > needed. > > > > >> > > > > >> We started this discussion with Matthias J. Sax here: > > > > >> https://issues.apache.org/jira/browse/KAFKA-6953 > > > > >> > > > > >> If you want to see more, go to KIP-326 at: > > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-326%3A+Schedulable+KTable+as+Graph+source > > > > >> > > > > >> -Flávio Stutz > > > > >> > > > > > > > > > > > > > >