[ https://issues.apache.org/jira/browse/KAFKA-8846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921757#comment-16921757 ]
Matthias J. Sax commented on KAFKA-8846: ---------------------------------------- `TopologyTestDriver` executes topologies slightly different, because repartition topics are replaced with "in-memory connections" between sub-topologies and thus a single input record is processed by all sub-topologies before the next input record is processed. In a real deployment, there is no such guarantee because writing into a repartition topic and reading the record back implies that multiple input records are processed at the same time—or at least concurrently (in different sub-topologies). Given your topology, it seems that the issue may be that you get two repartition topics (instead of one as you might expect—you can verify via `Topology#describe()`) and hence data is not aligned any longer in the second sub-topology, because each record is written and re-read twice. This issue is already fixed (https://issues.apache.org/jira/browse/KAFKA-7201) but you need to enable topology optimization (config topology.optimization="all" – note that you also need to call StreamsBuilder#builder(Properties) to enable optimization) to get both repartition topics merged into one. This might resolve your issue. However, instead of doing a `join()` it might be simpler to just merge the join into the aggregation itself: you can just change the aggregation to emit pairs <last-record,new-aggregate> instead of just <new-aggregate> to add the latest value immediately (instead of joining it in a second step). This way, it should even work without enabling optimization. Hence, from my current understanding, I don't think that there is a bug in KS (besides the one that is fixed via KAFKA-7201 already). > Unexpected results joining a KStream to a KTable after repartitioning > ---------------------------------------------------------------------- > > Key: KAFKA-8846 > URL: https://issues.apache.org/jira/browse/KAFKA-8846 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.3.0 > Reporter: Thomas Crowley > Priority: Major > > We seem to have come across a bug with Kafka Streams (or at least unexpected > behavior) when joining a KStream to a KTable after re-partitioning our data > (via `selectKey`) > Our use case is as follows: we want to aggregate some values and join it with > the original message, so that we emit the original message with the current > value of the aggregation. > Currently, without re-partitioning, we get the correct behavior as expected, > but rekeying the input of the stream gives us incorrect results. > What's stranger, is that the `TestTopologyDriver` gives us the > correct/expected results in our re-partitioned topology. > Apologies if Clojure is foreign to anyone, but I have an example of the > problematic topology here: > https://github.com/VerrencyOpenSource/repartition-bug/blob/master/src/repartition_bug/with_repartition.clj#L27 > If you have `lein` installed on your machine, I have instructions on how you > can run the above topology against both the test topology driver and a Kafka > cluster: > https://github.com/VerrencyOpenSource/repartition-bug -- This message was sent by Atlassian Jira (v8.3.2#803003)