[ https://issues.apache.org/jira/browse/KAFKA-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
John Roesler resolved KAFKA-9503. --------------------------------- Resolution: Fixed > TopologyTestDriver processes intermediate results in the wrong order > -------------------------------------------------------------------- > > Key: KAFKA-9503 > URL: https://issues.apache.org/jira/browse/KAFKA-9503 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils > Reporter: John Roesler > Assignee: John Roesler > Priority: Major > > TopologyTestDriver has the feature that it processes each input > synchronously, resolving one of the most significant challenges with > verifying the correctness of streaming applications. > When processing an input, it feeds that record to the source node, which then > synchronously (it's always synchronous within a task) gets passed through the > subtopology via Context#forward calls. Ultimately, outputs from that input > are forwarded into the RecordCollector, which converts it to Producer.send > calls. In TopologyTestDriver, this Producer is a special one that actually > just captures the records. > Some output topics from one subtopology are inputs to another subtopology. > For example, repartition topics. Immediately after the synchronous > subtopology process() invocation, TopologyTestDriver iterates over the > collected outputs from the special Producer. If they are purely output > records, it just enqueues them for later retrieval by testing code. If they > are records for internal topics, though, TopologyTestDriver immediately > processes them as inputs for the relevant subtopology. > The problem, and this is very subtle, is that TopologyTestDriver does this > recursively, which with some (apparently rare) programs can cause the output > to be observed in an invalid order. > One such program is the one I wrote to test the fix for KAFKA-9487 . It > involves a foreign-key join whose result is joined back to one of its inputs. > {noformat} > Here's a simplified version: > // foreign key join > J = A.join(B, (extractor) a -> a.b, (joiner) (a,b) -> new Pair(a, b)) > // equi-join > OUT = A.join(J, (joiner) (a, j) -> new Pair(a, j)) > Let's say we have the following initial condition: > A: > a1 = {v: X, b: b1} > B: > b1 = {v: Y} > J: > a1 = Pair({v: X}, b: b1}, {v: Y}) > OUT: > a1 = Pair({v: X}, b: b1}, Pair({v: X}, b: b1}, {v: Y})) > Now, piping an update: > a1: {v: Z, b: b1} > results immediately in two buffered results in the Producer: > (FK join subscription): b1: {a1} > (OUT): a1 = Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y})) > Note that the FK join result isn't updated synchronously, since it's an async > operation, so the RHS lookup is temporarily incorrect, yielding the nonsense > intermediate result where the outer pair has the updated value for a1, but > the inner (fk result) one still has the old value for a1. > However! We don't buffer that output record for consumption by testing code > yet, we leave it in the internal Producer while we process the first > intermediate record (the FK subscription). > Processing that internal record means that we have a new internal record to > process: > (FK join subscription response): a1: {b1: {v: Y}} > so right now, our internal-records-to-process stack looks like: > (FK join subscription response): a1: {b1: {v: Y}} > (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y})) > Again, we start by processing the first thing, the FK join response, which > results in an updated FK join result: > (J) a1: Pair({v: Z}, b: b1}, {v: Y}) > and output: > (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y})) > and, we still haven't handled the earlier output, so now our > internal-records-to-process stack looks like: > (J) a1: Pair({v: Z}, b: b1}, {v: Y}) > (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y})) > (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y})) > At this point, there's nothing else to process in internal topics, so we just > copy the records one by one to the "output" collection for later handling by > testing code, but this yields the wrong final state of: > (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y})) > That was an incorrect intermediate result, but because we're processing > internal records recursively (as a stack), it winds up emitted at the end > instead of in the middle. > If we change the processing model from a stack to a queue, the correct order > is preserved, and the final state is: > (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y})) > {noformat} > This is what I did in https://github.com/apache/kafka/pull/8015 -- This message was sent by Atlassian Jira (v8.3.4#803005)