[ 
https://issues.apache.org/jira/browse/KAFKA-12540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antony Stubbs updated KAFKA-12540:
----------------------------------
    Description: 
If I am, for example, wanting to aggregate by an account, and by a metric, and 
the input topic is keyed by account (and let’s say there’s massive amount of 
traffic), this will have have to rekey on account+metric, which will cause a 
repartition topic, then group by and aggregate.

However because we know that all the metrics for an account will already exist 
on the same partition, we ideally don’t want to have to repartition - causing a 
large unneeded overhead.

 

Ideally a new `#selectSubkey` sort of method could be introduced, which would 
force a compound key with the original.

 

{{var subKeyStream = stream#selectSubKey(x,v->v.getField(“metric”)) <— under 
the hood this appends the returned key to the existing key, so actual key of 
next stream in memory will be (account+metric)}}

 

Although this might break key->partition strategy, the topology shouldn’t be 
dirty at this stage still as we know we’re still co-partitioned. What can 
happen next in the topology may need to be restricted however. In this case we 
would then do a:

 

{{subKeyStream.groupByKey().aggregate(<snip>)}}

 

Functions other than aggregate, may need a repartition still, or maybe not - 
not sure.

 

Similarly described quite well in this forum here: 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-keying-sub-keying-a-stream-without-repartitioning-td12745.html]

 

I can achieve what I want with a custom processor and state store, but this 
seems something that might be common and useful to have supported at DSL level.

  was:
If I am, for example, wanting to aggregate by an account, and by a metric, and 
the input topic is keyed by account (and let’s say there’s massive amount of 
traffic), this will have have to rekey on account+metric, which will cause a 
repartition topic, then group by and aggregate.

However because we know that all the metrics for an account will already exist 
on the same partition, we ideally don’t want to have to repartition - causing a 
large unneeded overhead.

 

Ideally a new `#selectSubkey` sort of method could be introduced, which would 
force a compound key with the original.

 

{{var subKeyStream = stream#selectSubKey(x,v->v.getField(“metric”)) <— under 
the hood this appends the returned key to the existing key}}

 

Although this might break key->partition strategy, the topology shouldn’t be 
dirty at this stage still as we know we’re still co-partitioned. What can 
happen next in the topology may need to be restricted however. In this case we 
would then do a:

 

{{subKeyStream.groupByKey().aggregate(<snip>)}}

 

Functions other than aggregate, may need a repartition still, or maybe not - 
not sure.

 

Similarly described quite well in this forum here: 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-keying-sub-keying-a-stream-without-repartitioning-td12745.html]

 

I can achieve what I want with a custom processor and state store, but this 
seems something that might be common and useful to have supported at DSL level.


> Sub-key support to avoid unnecessary rekey operations with new key is a 
> compound key of the original key + sub-field
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12540
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12540
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Antony Stubbs
>            Priority: Major
>
> If I am, for example, wanting to aggregate by an account, and by a metric, 
> and the input topic is keyed by account (and let’s say there’s massive amount 
> of traffic), this will have have to rekey on account+metric, which will cause 
> a repartition topic, then group by and aggregate.
> However because we know that all the metrics for an account will already 
> exist on the same partition, we ideally don’t want to have to repartition - 
> causing a large unneeded overhead.
>  
> Ideally a new `#selectSubkey` sort of method could be introduced, which would 
> force a compound key with the original.
>  
> {{var subKeyStream = stream#selectSubKey(x,v->v.getField(“metric”)) <— under 
> the hood this appends the returned key to the existing key, so actual key of 
> next stream in memory will be (account+metric)}}
>  
> Although this might break key->partition strategy, the topology shouldn’t be 
> dirty at this stage still as we know we’re still co-partitioned. What can 
> happen next in the topology may need to be restricted however. In this case 
> we would then do a:
>  
> {{subKeyStream.groupByKey().aggregate(<snip>)}}
>  
> Functions other than aggregate, may need a repartition still, or maybe not - 
> not sure.
>  
> Similarly described quite well in this forum here: 
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-keying-sub-keying-a-stream-without-repartitioning-td12745.html]
>  
> I can achieve what I want with a custom processor and state store, but this 
> seems something that might be common and useful to have supported at DSL 
> level.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to