Levani Kokhreidze updated KAFKA-8413:
Consider following code:
final KStream<String, String> streamByProfileId = streamsBuilder
   .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
   .selectKey((key, value) -> value);

      () -> 0d,
      (key, value, aggregate) -> aggregate,

      () -> 0d,
      (key, value, aggregate) -> aggregate,

This code will generate following topology:
 Sub-topology: 0
 Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
 --> KSTREAM-KEY-SELECT-0000000001
 Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
 --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008
 <-- KSTREAM-SOURCE-0000000000
 Processor: KSTREAM-FILTER-0000000004 (stores: [])
 --> KSTREAM-SINK-0000000003
 <-- KSTREAM-KEY-SELECT-0000000001
 Processor: KSTREAM-FILTER-0000000008 (stores: [])
 --> KSTREAM-SINK-0000000007
 <-- KSTREAM-KEY-SELECT-0000000001
 Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition)
 <-- KSTREAM-FILTER-0000000004
 Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition)
 <-- KSTREAM-FILTER-0000000008
Sub-topology: 1
 Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition])
 --> KSTREAM-AGGREGATE-0000000002
 Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1])
 --> none
 <-- KSTREAM-SOURCE-0000000005
Sub-topology: 2
 Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition])
 --> KSTREAM-AGGREGATE-0000000006
 Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2])
 --> none
 <-- KSTREAM-SOURCE-0000000009
Kafka Streams creates two repartition topics for each `groupByKey` operation. 
In this example, two repartition topics are not really necessary and processing 
can be done with one sub-topology.


Kafka Streams user, in DSL, may specify repartition topic manually using 
*KStream#through* method:
final KStream<Object, Object> streamByProfileId = streamsBuilder
   .selectKey((key, value) -> value)

      () -> 0d,
      (key, value, aggregate) -> aggregate,

      () -> 0d,
      (key, value, aggregate) -> aggregate,

Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
--> KSTREAM-KEY-SELECT-0000000001
Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
--> KSTREAM-SINK-0000000002
<-- KSTREAM-SOURCE-0000000000
Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic)
<-- KSTREAM-KEY-SELECT-0000000001

Sub-topology: 1
Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic])
Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1])
--> none
<-- KSTREAM-SOURCE-0000000003
Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2])
--> none
<-- KSTREAM-SOURCE-0000000003

While this gives possibility to optimizes Kafka Streams application, user still 
has to manually create repartition topic with correct number of partitions 
based on input topic. It would be great if in DSL we could have something like 
*repartition()* operation on *KStream* which can generate repartition topic 
based on user command.

Consider following code:

final KStream<String, String> streamByProfileId = streamsBuilder
   .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
   .selectKey((key, value) -> value);

      () -> 0d,
      (key, value, aggregate) -> aggregate,

      () -> 0d,
      (key, value, aggregate) -> aggregate,

This code will generate following topology:

 Sub-topology: 0
 Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
 --> KSTREAM-KEY-SELECT-0000000001
 Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
 --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008
 <-- KSTREAM-SOURCE-0000000000
 Processor: KSTREAM-FILTER-0000000004 (stores: [])
 --> KSTREAM-SINK-0000000003
 <-- KSTREAM-KEY-SELECT-0000000001
 Processor: KSTREAM-FILTER-0000000008 (stores: [])
 --> KSTREAM-SINK-0000000007
 <-- KSTREAM-KEY-SELECT-0000000001
 Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition)
 <-- KSTREAM-FILTER-0000000004
 Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition)
 <-- KSTREAM-FILTER-0000000008
Sub-topology: 1
 Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition])
 --> KSTREAM-AGGREGATE-0000000002
 Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1])
 --> none
 <-- KSTREAM-SOURCE-0000000005
Sub-topology: 2
 Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition])
 --> KSTREAM-AGGREGATE-0000000006
 Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2])
 --> none
 <-- KSTREAM-SOURCE-0000000009

Kafka Streams creates two repartition topics for each `groupByKey` operation. 
In this example, two repartition topics are not really necessary and processing 
can be done with one sub-topology.

Kafka Streams user, in DSL, may specify repartition topic manually using 
*KStream#through* method:

final KStream<Object, Object> streamByProfileId = streamsBuilder
   .selectKey((key, value) -> value)

      () -> 0d,
      (key, value, aggregate) -> aggregate,

      () -> 0d,
      (key, value, aggregate) -> aggregate,

Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
--> KSTREAM-KEY-SELECT-0000000001
Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
--> KSTREAM-SINK-0000000002
<-- KSTREAM-SOURCE-0000000000
Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic)
<-- KSTREAM-KEY-SELECT-0000000001

Sub-topology: 1
Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic])
Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1])
--> none
<-- KSTREAM-SOURCE-0000000003
Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2])
--> none
<-- KSTREAM-SOURCE-0000000003


While this gives possibility to optimizes Kafka Streams application, user still 
has to manually create repartition topic with correct number of partitions 
based on input topic. It would be great if in DSL we could have something like 
*repartition()* operation on *KStream* which can generate repartition topic 
based on user command.

> Add possibility to do repartitioning on KStream
> -----------------------------------------------
>                 Key: KAFKA-8413
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8413
>             Project: Kafka
>          Issue Type: New Feature
>          Components: streams
>            Reporter: Levani Kokhreidze
>            Priority: Minor
>         Attachments: topology-1.png, topology-2.png
> Consider following code:
> {code:java}
> final KStream<String, String> streamByProfileId = streamsBuilder
>    .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()))
>    .selectKey((key, value) -> value);
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-1")
>    );
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-2")
>    );
> {code}
> This code will generate following topology:
> {code:java}
> Topologies:
>  Sub-topology: 0
>  Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
>  --> KSTREAM-KEY-SELECT-0000000001
>  Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
>  --> KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000008
>  <-- KSTREAM-SOURCE-0000000000
>  Processor: KSTREAM-FILTER-0000000004 (stores: [])
>  --> KSTREAM-SINK-0000000003
>  <-- KSTREAM-KEY-SELECT-0000000001
>  Processor: KSTREAM-FILTER-0000000008 (stores: [])
>  --> KSTREAM-SINK-0000000007
>  <-- KSTREAM-KEY-SELECT-0000000001
>  Sink: KSTREAM-SINK-0000000003 (topic: store-1-repartition)
>  <-- KSTREAM-FILTER-0000000004
>  Sink: KSTREAM-SINK-0000000007 (topic: store-2-repartition)
>  <-- KSTREAM-FILTER-0000000008
> Sub-topology: 1
>  Source: KSTREAM-SOURCE-0000000005 (topics: [store-1-repartition])
>  --> KSTREAM-AGGREGATE-0000000002
>  Processor: KSTREAM-AGGREGATE-0000000002 (stores: [store-1])
>  --> none
>  <-- KSTREAM-SOURCE-0000000005
> Sub-topology: 2
>  Source: KSTREAM-SOURCE-0000000009 (topics: [store-2-repartition])
>  --> KSTREAM-AGGREGATE-0000000006
>  Processor: KSTREAM-AGGREGATE-0000000006 (stores: [store-2])
>  --> none
>  <-- KSTREAM-SOURCE-0000000009
> {code}
> Kafka Streams creates two repartition topics for each `groupByKey` operation. 
> In this example, two repartition topics are not really necessary and 
> processing can be done with one sub-topology.
> Kafka Streams user, in DSL, may specify repartition topic manually using 
> *KStream#through* method:
> {code:java}
> final KStream<Object, Object> streamByProfileId = streamsBuilder
>    .stream("input-topic")
>    .selectKey((key, value) -> value)
>    .through("repartition-topic");
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-1")
>    );
> streamByProfileId
>    .groupByKey()
>    .aggregate(
>       () -> 0d,
>       (key, value, aggregate) -> aggregate,
>       Materialized.as("store-2")
>    );
> {code}
> {code:java}
> Topologies:
> Sub-topology: 0
> Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
> --> KSTREAM-KEY-SELECT-0000000001
> Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
> --> KSTREAM-SINK-0000000002
> <-- KSTREAM-SOURCE-0000000000
> Sink: KSTREAM-SINK-0000000002 (topic: repartition-topic)
> <-- KSTREAM-KEY-SELECT-0000000001
> Sub-topology: 1
> Source: KSTREAM-SOURCE-0000000003 (topics: [repartition-topic])
> --> KSTREAM-AGGREGATE-0000000004, KSTREAM-AGGREGATE-0000000005
> Processor: KSTREAM-AGGREGATE-0000000004 (stores: [store-1])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> Processor: KSTREAM-AGGREGATE-0000000005 (stores: [store-2])
> --> none
> <-- KSTREAM-SOURCE-0000000003
> {code}
> While this gives possibility to optimizes Kafka Streams application, user 
> still has to manually create repartition topic with correct number of 
> partitions based on input topic. It would be great if in DSL we could have 
> something like *repartition()* operation on *KStream* which can generate 
> repartition topic based on user command.

This message was sent by Atlassian JIRA

Reply via email to