blueedgenick created KAFKA-10659: ------------------------------------ Summary: Cogroup topology generation fails if input streams are repartitioned Key: KAFKA-10659 URL: https://issues.apache.org/jira/browse/KAFKA-10659 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.5.1, 2.6.0 Reporter: blueedgenick
Example to reproduce: {code:java} KGroupedStream<String, A> groupedA = builder .stream(topicA, Consumed.with(Serdes.String(), serdeA)) .selectKey((aKey, aVal) -> aVal.someId) .groupByKey(); KGroupedStream<String, B> groupedB = builder .stream(topicB, Consumed.with(Serdes.String(), serdeB)) .selectKey((bKey, bVal) -> bVal.someId) .groupByKey(); KGroupedStream<String, C> groupedC = builder .stream(topicC, Consumed.with(Serdes.String(), serdeC)) .selectKey((cKey, cVal) -> cVal.someId) .groupByKey(); CogroupedKStream<String, ABC> cogroup = groupedA.cogroup(AggregatorA) .cogroup(groupedB, AggregatorB) . cogroup(groupedC, AggregatorC); // Aggregate all streams of the cogroup KTable<String, ABC> agg = cogroup.aggregate( () -> new ABC(), Named.as("my-agg-proc-name"), Materialized.<String, ABC, KeyValueStore<Bytes, byte[]>>as( "abc-agg-store") .withKeySerde(Serdes.String()) .withValueSerde(serdeABC) ); {code} This throws an exception during topology generation: {code:java} org.apache.kafka.streams.errors.TopologyException: Invalid topology: Processor abc-agg-store-repartition-filter is already added. at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter nalTopologyBuilder.java:485)` at org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70) at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307) at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564) at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553) at ... {code} The same exception is observed if the `selectKey(...).groupByKey()` pattern is replaced with `groupBy(...)`. This behavior is observed with topology optimization at default state, explicitly set off, or explicitly set on. Interestingly the problem is avoided, and a workable topology produced,, if the grouping step is named by passing a `Grouped.with(...)` expression to either `groupByKey`` or `groupBy`. -- This message was sent by Atlassian Jira (v8.3.4#803005)