[ 
https://issues.apache.org/jira/browse/KAFKA-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9503.
---------------------------------
    Resolution: Fixed

> TopologyTestDriver processes intermediate results in the wrong order
> --------------------------------------------------------------------
>
>                 Key: KAFKA-9503
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9503
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams-test-utils
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> TopologyTestDriver has the feature that it processes each input 
> synchronously, resolving one of the most significant challenges with 
> verifying the correctness of streaming applications.
> When processing an input, it feeds that record to the source node, which then 
> synchronously (it's always synchronous within a task) gets passed through the 
> subtopology via Context#forward calls. Ultimately, outputs from that input 
> are forwarded into the RecordCollector, which converts it to Producer.send 
> calls. In TopologyTestDriver, this Producer is a special one that actually 
> just captures the records.
> Some output topics from one subtopology are inputs to another subtopology. 
> For example, repartition topics. Immediately after the synchronous 
> subtopology process() invocation, TopologyTestDriver iterates over the 
> collected outputs from the special Producer. If they are purely output 
> records, it just enqueues them for later retrieval by testing code. If they 
> are records for internal topics, though, TopologyTestDriver immediately 
> processes them as inputs  for the relevant subtopology.
> The problem, and this is very subtle, is that TopologyTestDriver does this 
> recursively, which with some (apparently rare) programs can cause the output 
> to be observed in an invalid order.
> One such program is the one I wrote to test the fix for KAFKA-9487 . It 
> involves a foreign-key join whose result is joined back to one of its inputs.
> {noformat}
> Here's a simplified version:
> // foreign key join
> J = A.join(B, (extractor) a -> a.b, (joiner) (a,b) -> new Pair(a, b))
> // equi-join
> OUT = A.join(J, (joiner) (a, j) -> new Pair(a, j))
> Let's say we have the following initial condition:
> A:
> a1 = {v: X, b: b1}
> B:
> b1 = {v: Y}
> J:
> a1 = Pair({v: X}, b: b1}, {v: Y})
> OUT:
> a1 = Pair({v: X}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> Now, piping an update:
> a1: {v: Z, b: b1}
> results immediately in two buffered results in the Producer:
> (FK join subscription): b1: {a1}
> (OUT): a1 = Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> Note that the FK join result isn't updated synchronously, since it's an async 
> operation, so the RHS lookup is temporarily incorrect, yielding the nonsense 
> intermediate result where the outer pair has the updated value for a1, but 
> the inner (fk result) one still has the old value for a1.
> However! We don't buffer that output record for consumption by testing code 
> yet, we leave it in the internal Producer while we process the first 
> intermediate record (the FK subscription).
> Processing that internal record means that we have a new internal record to 
> process:
> (FK join subscription response): a1: {b1: {v: Y}}
> so right now, our internal-records-to-process stack looks like:
> (FK join subscription response): a1: {b1: {v: Y}}
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> Again, we start by processing the first thing, the FK join response, which 
> results in an updated FK join result:
> (J) a1: Pair({v: Z}, b: b1}, {v: Y})
> and output:
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
> and, we still haven't handled the earlier output, so now our 
> internal-records-to-process stack looks like:
> (J) a1: Pair({v: Z}, b: b1}, {v: Y})
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> At this point, there's nothing else to process in internal topics, so we just 
> copy the records one by one to the "output" collection for later handling by 
> testing code, but this yields the wrong final state of:
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> That was an incorrect intermediate result, but because we're processing 
> internal records recursively (as a stack), it winds up emitted at the end 
> instead of in the middle.
> If we change the processing model from a stack to a queue, the correct order 
> is preserved, and the final state is:
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
> {noformat}
> This is what I did in https://github.com/apache/kafka/pull/8015



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to