Levani Kokhreidze created KAFKA-8413: ----------------------------------------
Summary: Add possibility to do repartitioning on KStream Key: KAFKA-8413 URL: https://issues.apache.org/jira/browse/KAFKA-8413 Project: Kafka Issue Type: New Feature Components: streams Reporter: Levani Kokhreidze Attachments: topology-1.png, topology-2.png Consider following code: {code:java} final KStream<String, String> streamByProfileId = streamsBuilder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .selectKey((key, value) -> value); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-1") ); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-2") ); {code} This code will generate following topology: {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) --> KSTREAM-KEY-SELECT-0000000001 Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-FILTER-0000000004 (stores: []) --> KSTREAM-SINK-0000000003 <-- KSTREAM-KEY-SELECT-0000000001 Processor: KSTREAM-FILTER-0000000008 (stores: []) --> KSTREAM-SINK-0000000007 <-- KSTREAM-KEY-SELECT-0000000001 Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition) <-- KSTREAM-FILTER-0000000004 Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition) <-- KSTREAM-FILTER-0000000008 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition]) --> KSTREAM-AGGREGATE-0000000002 Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1]) --> none <-- KSTREAM-SOURCE-0000000005 Sub-topology: 2 Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition]) --> KSTREAM-AGGREGATE-0000000006 Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2]) --> none <-- KSTREAM-SOURCE-0000000009 {code} Kafka Streams creates two repartition topics for each `groupByKey` operation. In this example, two repartition topics are not really necessary and processing can be done with one sub-topology. Kafka Streams user, in DSL, may specify repartition topic manually using *KStream#through* method: {code:java} final KStream<Object, Object> streamByProfileId = streamsBuilder .stream("input-topic") .selectKey((key, value) -> value) .through("repartition-topic"); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-1") ); streamByProfileId .groupByKey() .aggregate( () -> 0d, (key, value, aggregate) -> aggregate, Materialized.as("store-2") ); {code} {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) --> KSTREAM-KEY-SELECT-0000000001 Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000 Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic) <-- KSTREAM-KEY-SELECT-0000000001 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic]) --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005 Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1]) --> none <-- KSTREAM-SOURCE-0000000003 Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2]) --> none <-- KSTREAM-SOURCE-0000000003 {code} While this gives possibility to optimizes Kafka Streams application, user still has to manually create repartition topic with correct number of partitions based on input topic. It would be great if in DSL we could have something like *repartition()* operation on *KStream* which can generate repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)