Michael Viamari created KAFKA-9222: -------------------------------------- Summary: StreamPartitioner for internal repartition topics does not match defaults for to() operation Key: KAFKA-9222 URL: https://issues.apache.org/jira/browse/KAFKA-9222 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.3.1 Reporter: Michael Viamari
When a KStream has a Windowed key, different StreamPartitions are selected depending on how the stream sink is generated. When using `KStream#to()`, the topology uses a `StreamSinkNode`, which chooses a `WindowedStreamPartitioner` when no partitioner is provided when creating a `SinkNode` for the topology. {code:java} KTable<> aggResult = inputStream.groupByKey().windowed(...).aggregate(...); aggResult.toStream().to(aggStreamTopic) {code} When an internal repartition is created before a stateful operation, an `OptimizableRepartitionNode` is used, which results in a `SinkNode` being added to the topology. This node is created with a null partitioner, which then would always use the Producer default partitioner. This becomes an issue when attempting to join a windowed stream/ktable with a stream that was mapped into a windowed key. {code:java} KTable<> windowedAgg = inputStream.groupByKey().windowed(...).aggregate(...); windowedAgg.toStream().to(aggStreamTopic); KStream<> windowedStream = inputStream.map((k, v) -> { Map<Long, TimeWindow> w = windows.windowsFor(v.getTimestamp()); Window minW = getMinWindow(w.values()); return KeyValue.pair(new Windowed<>(k, minW), v); }); windowedStream.leftJoin(windowedAgg, ....); {code} The only work around I've found is to either use the default partitioner for the `KStream#to()` operation, or to use `KStream.through()` for the repartition operation. -- This message was sent by Atlassian Jira (v8.3.4#803005)