[ https://issues.apache.org/jira/browse/KAFKA-8413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bill Bejeck resolved KAFKA-8413. -------------------------------- Resolution: Not A Problem > Add possibility to do repartitioning on KStream > ----------------------------------------------- > > Key: KAFKA-8413 > URL: https://issues.apache.org/jira/browse/KAFKA-8413 > Project: Kafka > Issue Type: New Feature > Components: streams > Reporter: Levani Kokhreidze > Priority: Minor > Attachments: topology-1.png, topology-2.png > > > Consider following code: > {code:java} > final KStream<String, String> streamByProfileId = streamsBuilder > .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) > .selectKey((key, value) -> value); > streamByProfileId > .groupByKey() > .aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") > ); > streamByProfileId > .groupByKey() > .aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") > ); > {code} > > This code will generate following topology: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-0000000001 > Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) > --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008 > <-- KSTREAM-SOURCE-0000000000 > Processor: KSTREAM-FILTER-0000000004 (stores: []) > --> KSTREAM-SINK-0000000003 > <-- KSTREAM-KEY-SELECT-0000000001 > Processor: KSTREAM-FILTER-0000000008 (stores: []) > --> KSTREAM-SINK-0000000007 > <-- KSTREAM-KEY-SELECT-0000000001 > Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition) > <-- KSTREAM-FILTER-0000000004 > Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition) > <-- KSTREAM-FILTER-0000000008 > Sub-topology: 1 > Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition]) > --> KSTREAM-AGGREGATE-0000000002 > Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-0000000005 > Sub-topology: 2 > Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition]) > --> KSTREAM-AGGREGATE-0000000006 > Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-0000000009 > > {code} > Kafka Streams creates two repartition topics for each `groupByKey` operation. > In this example, two repartition topics are not really necessary and > processing can be done with one sub-topology. > > Kafka Streams user, in DSL, may specify repartition topic manually using > *KStream#through* method: > {code:java} > final KStream<Object, Object> streamByProfileId = streamsBuilder > .stream("input-topic") > .selectKey((key, value) -> value) > .through("repartition-topic"); > streamByProfileId > .groupByKey() > .aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-1") > ); > streamByProfileId > .groupByKey() > .aggregate( > () -> 0d, > (key, value, aggregate) -> aggregate, > Materialized.as("store-2") > ); > {code} > > > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) > --> KSTREAM-KEY-SELECT-0000000001 > Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) > --> KSTREAM-SINK-0000000002 > <-- KSTREAM-SOURCE-0000000000 > Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic) > <-- KSTREAM-KEY-SELECT-0000000001 > Sub-topology: 1 > Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic]) > --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005 > Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1]) > --> none > <-- KSTREAM-SOURCE-0000000003 > Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2]) > --> none > <-- KSTREAM-SOURCE-0000000003 > {code} > > While this gives possibility to optimizes Kafka Streams application, user > still has to manually create repartition topic with correct number of > partitions based on input topic. It would be great if in DSL we could have > something like *repartition()* operation on *KStream* which can generate > repartition topic based on user command. -- This message was sent by Atlassian JIRA (v7.6.3#76005)