[ 
https://issues.apache.org/jira/browse/KAFKA-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17249971#comment-17249971
 ] 

Matthias J. Sax commented on KAFKA-10844:
-----------------------------------------

For (1) (or KAFKA-4835) it's actually an easy thing to fix – the demand was 
just not big and nobody picked it up. – Btw: I would not change `groupBy()` for 
this case, but rather let people rewrite to 
`selectKey(...).reinterpretAsKeyedStream().groupByKey()` – this keep the API 
surface area smaller.

For (2), as we process data in timestamp order, the skew should not become too 
big, and setting a larger grace-period and retention-time should help to 
mitigate the issue. It's a more complex problem to address, but we have some 
ideas and might tackle it in 2021...

> groupBy without shuffling
> -------------------------
>
>                 Key: KAFKA-10844
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10844
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Mathieu DESPRIEE
>            Priority: Major
>              Labels: needs-kip
>
> The idea is to give a way to keep the current partitioning while doing a 
> groupBy.
> Our use-case is the following:
>  We process device data (stream is partitioned by device-id), each device 
> produces several metrics. We want to aggregate by metric, so currently we do a
> {code:java}
>  selectKey( ... => (device, 
> metric)).groupByKey.windowedBy(...).aggregate(...)  {code}
> This shuffles the data around, but it's not necessary, each (device, metric) 
> group could stay in the original partition.
> This is not only an optimization question. We are experiencing invalid 
> aggregations when reprocessing history. In these reprocessing, we frequently 
> see some tasks moving faster on some partitions. This causes problems with 
> event-time: Lets' say data for device d1 is in partition p1 and stream-time 
> t1, and device d2 / partition p2 / time t2.
>  Now, if I re-key by (device, metric), records from both devices could have 
> the same hash-key and land in the same partition. And if t2 is far ahead of 
> t1, then all time-windows for t1 get expired at once.
> Maybe I miss some way of doing this with the existing API, please let me 
> know. Currently, I manually repartition and specify a custom partitioner, but 
> it's tedious.
> If I were to rewrite the aggregations manually with Transformer API, I would 
> use (device, key) for my state store key, without changing the record key.
>  
> _(poke_ [~vvcephei] _following our discussion on users ml)_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to