[ 
https://issues.apache.org/jira/browse/KAFKA-8846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16921757#comment-16921757
 ] 

Matthias J. Sax commented on KAFKA-8846:
----------------------------------------

`TopologyTestDriver` executes topologies slightly different, because 
repartition topics are replaced with "in-memory connections" between 
sub-topologies and thus a single input record is processed by all 
sub-topologies before the next input record is processed. In a real deployment, 
there is no such guarantee because writing into a repartition topic and reading 
the record back implies that multiple input records are processed at the same 
time—or at least concurrently (in different sub-topologies).

Given your topology, it seems that the issue may be that you get two 
repartition topics (instead of one as you might expect—you can verify via 
`Topology#describe()`) and hence data is not aligned any longer in the second 
sub-topology, because each record is written and re-read twice. This issue is 
already fixed (https://issues.apache.org/jira/browse/KAFKA-7201) but you need 
to enable topology optimization (config topology.optimization="all" – note that 
you also need to call StreamsBuilder#builder(Properties) to enable 
optimization) to get both repartition topics merged into one. This might 
resolve your issue.

However, instead of doing a `join()` it might be simpler to just merge the join 
into the aggregation itself: you can just change the aggregation to emit pairs 
<last-record,new-aggregate> instead of just <new-aggregate> to add the latest 
value immediately (instead of joining it in a second step). This way, it should 
even work without enabling optimization.

Hence, from my current understanding, I don't think that there is a bug in KS 
(besides the one that is fixed via KAFKA-7201 already).

> Unexpected results joining a KStream to a KTable after repartitioning 
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-8846
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8846
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Thomas Crowley
>            Priority: Major
>
> We seem to have come across a bug with Kafka Streams (or at least unexpected 
> behavior) when joining a KStream to a KTable after re-partitioning our data 
> (via `selectKey`)
> Our use case is as follows: we want to aggregate some values and join it with 
> the original message, so that we emit the original message with the current 
> value of the aggregation.
> Currently, without re-partitioning, we get the correct behavior as expected, 
> but rekeying the input of the stream gives us incorrect results.
> What's stranger, is that the `TestTopologyDriver` gives us the 
> correct/expected results in our re-partitioned topology.
> Apologies if Clojure is foreign to anyone, but I have an example of the 
> problematic topology here:
> https://github.com/VerrencyOpenSource/repartition-bug/blob/master/src/repartition_bug/with_repartition.clj#L27
> If you have `lein` installed on your machine, I have instructions on how you 
> can run the above topology against both the test topology driver and a Kafka 
> cluster: 
> https://github.com/VerrencyOpenSource/repartition-bug 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to