[ 
https://issues.apache.org/jira/browse/KAFKA-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922176#comment-17922176
 ] 

Matthias J. Sax commented on KAFKA-10844:
-----------------------------------------

Yes, nobody works on this. It's up for grabs.

> groupBy without shuffling
> -------------------------
>
>                 Key: KAFKA-10844
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10844
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Mathieu DESPRIEE
>            Assignee: Shay Lin
>            Priority: Major
>              Labels: kip
>
> The idea is to give a way to keep the current partitioning while doing a 
> groupBy.
> Our use-case is the following:
> We process device data (stream is partitioned by device-id), each device 
> produces several metrics. We want to aggregate by metric, so currently we do a
> {code:java}
>  selectKey( ... => (device, 
> metric)).groupByKey.windowedBy(...).aggregate(...)  {code}
> This shuffles the data around, but it's not necessary, each (device, metric) 
> group could stay in the original partition.
> This is not only an optimization question. We are experiencing invalid 
> aggregations when reprocessing history. In these reprocessing, we frequently 
> see some tasks moving faster on some partitions. This causes problems with 
> event-time: Lets' say data for device d1 is in partition p1 and stream-time 
> t1, and device d2 / partition p2 / time t2.
> Now, if I re-key by (device, metric), records from both devices could have 
> the same hash-key and land in the same partition. And if t2 is far ahead of 
> t1, then all time-windows for t1 get expired at once.
> Maybe I miss some way of doing this with the existing API, please let me 
> know. Currently, I manually repartition and specify a custom partitioner, but 
> it's tedious.
> If I were to rewrite the aggregations manually with Transformer API, I would 
> use (device, key) for my state store key, without changing the record key.
>  
> _(poke_ [~vvcephei] _following our discussion on users ml)_
> KIP-759: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to