blueedgenick created KAFKA-10659:
------------------------------------

             Summary: Cogroup topology generation fails if input streams are 
repartitioned
                 Key: KAFKA-10659
                 URL: https://issues.apache.org/jira/browse/KAFKA-10659
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.5.1, 2.6.0
            Reporter: blueedgenick


Example to reproduce:

 
{code:java}
KGroupedStream<String, A> groupedA = builder
  .stream(topicA, Consumed.with(Serdes.String(), serdeA))
  .selectKey((aKey, aVal) -> aVal.someId)
  .groupByKey();
KGroupedStream<String, B> groupedB = builder
  .stream(topicB, Consumed.with(Serdes.String(), serdeB))
  .selectKey((bKey, bVal) -> bVal.someId)
  .groupByKey();
KGroupedStream<String, C> groupedC = builder
  .stream(topicC, Consumed.with(Serdes.String(), serdeC))
  .selectKey((cKey, cVal) -> cVal.someId)
  .groupByKey();
CogroupedKStream<String, ABC> cogroup = groupedA.cogroup(AggregatorA)
  .cogroup(groupedB, AggregatorB)
 .  cogroup(groupedC, AggregatorC);
// Aggregate all streams of the cogroup
 KTable<String, ABC> agg = cogroup.aggregate(
  () -> new ABC(),
  Named.as("my-agg-proc-name"),
  Materialized.<String, ABC, KeyValueStore<Bytes, byte[]>>as(
 "abc-agg-store") 
 .withKeySerde(Serdes.String())
 .withValueSerde(serdeABC)
 );
{code}
 

 

This throws an exception during topology generation: 

 
{code:java}
org.apache.kafka.streams.errors.TopologyException: Invalid topology: Processor 
abc-agg-store-repartition-filter is already added. at 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter
nalTopologyBuilder.java:485)`
 at 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70)
 at 
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307)
 at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564)
 at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553)
 at ...
{code}
 

The same exception is observed if the `selectKey(...).groupByKey()`  pattern is 
replaced with `groupBy(...)`.

This behavior is observed with topology optimization at default state, 
explicitly set off, or explicitly set on.

Interestingly the problem is avoided, and a workable topology produced,, if the 
grouping step is named by passing a `Grouped.with(...)` expression to either 
`groupByKey`` or `groupBy`.
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to