[ https://issues.apache.org/jira/browse/KAFKA-12540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Antony Stubbs updated KAFKA-12540: ---------------------------------- Description: If I am, for example, wanting to aggregate by an account, and by a metric, and the input topic is keyed by account (and let’s say there’s massive amount of traffic), this will have have to rekey on account+metric, which will cause a repartition topic, then group by and aggregate. However because we know that all the metrics for an account will already exist on the same partition, we ideally don’t want to have to repartition - causing a large unneeded overhead. Ideally a new `#selectSubkey` sort of method could be introduced, which would force a compound key with the original. {{var subKeyStream = stream#selectSubKey(x,v->v.getField(“metric”)) <— under the hood this appends the returned key to the existing key}} Although this might break key->partition strategy, the topology shouldn’t be dirty at this stage still as we know we’re still co-partitioned. What can happen next in the topology may need to be restricted however. In this case we would then do a: {{subKeyStream.groupByKey().aggregate(<snip>)}} Functions other than aggregate, may need a repartition still, or maybe not - not sure. Similarly described quite well in this forum here: [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-keying-sub-keying-a-stream-without-repartitioning-td12745.html] I can achieve what I want with a custom processor and state store, but this seems something that might be common and useful to have supported at DSL level. was: If I am, for example, wanting to aggregate by an account, and by a metric, and the input topic is keyed by account (and let’s say there’s massive amount of traffic), this will have have to rekey on account+metric, which will cause a repartition topic, then group by and aggregate. However because we know that all the metrics for an account will already exist on the same partition, we ideally don’t want to have to repartition - causing a large unneeded overhead. Ideally a new `#selectSubkey` sort of method could be introduced, which would force a compound key with the original. {{var subKeyStream = stream#selectSubKey(x,v->v.getField(“metric”)) <— under the hood this appends the returned key to the existing key}} Although this might break key->partition strategy, the topology shouldn’t be dirty at this stage still as we know we’re still co-partitioned. What can happen next in the topology may need to be restricted however. In this case we would then do a: {{subKeyStream.groupByKey().aggregate(<snip>)}} Functions other than aggregate, may need a repartition still, or maybe not - not sure. Similarly described quite well in this forum here: [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-keying-sub-keying-a-stream-without-repartitioning-td12745.html] > Sub-key support to avoid unnecessary rekey operations with new key is a > compound key of the original key + sub-field > -------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-12540 > URL: https://issues.apache.org/jira/browse/KAFKA-12540 > Project: Kafka > Issue Type: New Feature > Components: streams > Affects Versions: 2.8.0 > Reporter: Antony Stubbs > Priority: Major > > If I am, for example, wanting to aggregate by an account, and by a metric, > and the input topic is keyed by account (and let’s say there’s massive amount > of traffic), this will have have to rekey on account+metric, which will cause > a repartition topic, then group by and aggregate. > However because we know that all the metrics for an account will already > exist on the same partition, we ideally don’t want to have to repartition - > causing a large unneeded overhead. > > Ideally a new `#selectSubkey` sort of method could be introduced, which would > force a compound key with the original. > > {{var subKeyStream = stream#selectSubKey(x,v->v.getField(“metric”)) <— under > the hood this appends the returned key to the existing key}} > > Although this might break key->partition strategy, the topology shouldn’t be > dirty at this stage still as we know we’re still co-partitioned. What can > happen next in the topology may need to be restricted however. In this case > we would then do a: > > {{subKeyStream.groupByKey().aggregate(<snip>)}} > > Functions other than aggregate, may need a repartition still, or maybe not - > not sure. > > Similarly described quite well in this forum here: > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-keying-sub-keying-a-stream-without-repartitioning-td12745.html] > > I can achieve what I want with a custom processor and state store, but this > seems something that might be common and useful to have supported at DSL > level. -- This message was sent by Atlassian Jira (v8.3.4#803005)