[ https://issues.apache.org/jira/browse/KAFKA-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17249971#comment-17249971 ]
Matthias J. Sax commented on KAFKA-10844: ----------------------------------------- For (1) (or KAFKA-4835) it's actually an easy thing to fix – the demand was just not big and nobody picked it up. – Btw: I would not change `groupBy()` for this case, but rather let people rewrite to `selectKey(...).reinterpretAsKeyedStream().groupByKey()` – this keep the API surface area smaller. For (2), as we process data in timestamp order, the skew should not become too big, and setting a larger grace-period and retention-time should help to mitigate the issue. It's a more complex problem to address, but we have some ideas and might tackle it in 2021... > groupBy without shuffling > ------------------------- > > Key: KAFKA-10844 > URL: https://issues.apache.org/jira/browse/KAFKA-10844 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.6.0 > Reporter: Mathieu DESPRIEE > Priority: Major > Labels: needs-kip > > The idea is to give a way to keep the current partitioning while doing a > groupBy. > Our use-case is the following: > We process device data (stream is partitioned by device-id), each device > produces several metrics. We want to aggregate by metric, so currently we do a > {code:java} > selectKey( ... => (device, > metric)).groupByKey.windowedBy(...).aggregate(...) {code} > This shuffles the data around, but it's not necessary, each (device, metric) > group could stay in the original partition. > This is not only an optimization question. We are experiencing invalid > aggregations when reprocessing history. In these reprocessing, we frequently > see some tasks moving faster on some partitions. This causes problems with > event-time: Lets' say data for device d1 is in partition p1 and stream-time > t1, and device d2 / partition p2 / time t2. > Now, if I re-key by (device, metric), records from both devices could have > the same hash-key and land in the same partition. And if t2 is far ahead of > t1, then all time-windows for t1 get expired at once. > Maybe I miss some way of doing this with the existing API, please let me > know. Currently, I manually repartition and specify a custom partitioner, but > it's tedious. > If I were to rewrite the aggregations manually with Transformer API, I would > use (device, key) for my state store key, without changing the record key. > > _(poke_ [~vvcephei] _following our discussion on users ml)_ -- This message was sent by Atlassian Jira (v8.3.4#803005)