Mathieu DESPRIEE created KAFKA-10844:
----------------------------------------

             Summary: groupBy without shuffling
                 Key: KAFKA-10844
                 URL: https://issues.apache.org/jira/browse/KAFKA-10844
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 2.6.0
            Reporter: Mathieu DESPRIEE


The idea is to give a way to keep the current partitioning while doing a 
groupBy.

Our use-case is the following:
 We process device data (stream is partitioned by device-id), each device 
produces several metrics. We want to aggregate by metric, so currently we do a
{code:java}
 selectKey( ... => (device, metric)).groupByKey.windowedBy(...).aggregate(...)  
{code}
This shuffles the data around, but it's not necessary, each (device, metric) 
group could stay in the original partition.

This is not only an optimization question. We are experiencing invalid 
aggregations when reprocessing history. In these reprocessing, we frequently 
see some tasks moving faster on some partitions. This causes problems with 
event-time: Lets' say data for device d1 is in partition p1 and stream-time t1, 
and device d2 / partition p2 / time t2.
 Now, if I re-key by (device, metric), records from both devices could have the 
same hash-key and land in the same partition. And if t2 is far ahead of t1, 
then all time-windows for t1 get expired at once.

Maybe I miss some way of doing this with the existing API, please let me know. 
Currently, I manually repartition and specify a custom partitioner, but it's 
tedious.

If I were to rewrite the aggregations manually with Transformer API, I would 
use (device, key) for my state store key, without changing the record key.

 

_(poke_ [~vvcephei] _following our discussion on users ml)_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to