[ https://issues.apache.org/jira/browse/KAFKA-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922176#comment-17922176 ]
Matthias J. Sax commented on KAFKA-10844: ----------------------------------------- Yes, nobody works on this. It's up for grabs. > groupBy without shuffling > ------------------------- > > Key: KAFKA-10844 > URL: https://issues.apache.org/jira/browse/KAFKA-10844 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.6.0 > Reporter: Mathieu DESPRIEE > Assignee: Shay Lin > Priority: Major > Labels: kip > > The idea is to give a way to keep the current partitioning while doing a > groupBy. > Our use-case is the following: > We process device data (stream is partitioned by device-id), each device > produces several metrics. We want to aggregate by metric, so currently we do a > {code:java} > selectKey( ... => (device, > metric)).groupByKey.windowedBy(...).aggregate(...) {code} > This shuffles the data around, but it's not necessary, each (device, metric) > group could stay in the original partition. > This is not only an optimization question. We are experiencing invalid > aggregations when reprocessing history. In these reprocessing, we frequently > see some tasks moving faster on some partitions. This causes problems with > event-time: Lets' say data for device d1 is in partition p1 and stream-time > t1, and device d2 / partition p2 / time t2. > Now, if I re-key by (device, metric), records from both devices could have > the same hash-key and land in the same partition. And if t2 is far ahead of > t1, then all time-windows for t1 get expired at once. > Maybe I miss some way of doing this with the existing API, please let me > know. Currently, I manually repartition and specify a custom partitioner, but > it's tedious. > If I were to rewrite the aggregations manually with Transformer API, I would > use (device, key) for my state store key, without changing the record key. > > _(poke_ [~vvcephei] _following our discussion on users ml)_ > KIP-759: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling] > -- This message was sent by Atlassian Jira (v8.20.10#820010)