John Roesler created KAFKA-9503:
-----------------------------------

             Summary: TopologyTestDriver processes intermediate results in the 
wrong order
                 Key: KAFKA-9503
                 URL: https://issues.apache.org/jira/browse/KAFKA-9503
             Project: Kafka
          Issue Type: Bug
            Reporter: John Roesler
            Assignee: John Roesler


TopologyTestDriver has the feature that it processes each input synchronously, 
resolving one of the most significant challenges with verifying the correctness 
of streaming applications.

When processing an input, it feeds that record to the source node, which then 
synchronously (it's always synchronous within a task) gets passed through the 
subtopology via Context#forward calls. Ultimately, outputs from that input are 
forwarded into the RecordCollector, which converts it to Producer.send calls. 
In TopologyTestDriver, this Producer is a special one that actually just 
captures the records.

Some output topics from one subtopology are inputs to another subtopology. For 
example, repartition topics. Immediately after the synchronous subtopology 
process() invocation, TopologyTestDriver iterates over the collected outputs 
from the special Producer. If they are purely output records, it just enqueues 
them for later retrieval by testing code. If they are records for internal 
topics, though, TopologyTestDriver immediately processes them as inputs  for 
the relevant subtopology.

The problem, and this is very subtle, is that TopologyTestDriver does this 
recursively, which with some (apparently rare) programs can cause the output to 
be observed in an invalid order.

One such program is the one I wrote to test the fix for KAFKA-9487 . It 
involves a foreign-key join whose result is joined back to one of its inputs.

Here's a simplified version:
// foreign key join
J = A.join(B, (extractor) a -> a.b, (joiner) (a,b) -> new Pair(a, b))
// equi-join
OUT = A.join(J, (joiner) (a, j) -> new Pair(a, j))

Let's say we have the following initial condition:
A:
a1 = {v: X, b: b1}
B:
b1 = {v: Y}
J:
a1 = Pair({v: X}, b: b1}, {v: Y})
OUT:
a1 = Pair({v: X}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

Now, piping an update:
a1: {v: Z, b: b1}
results immediately in two buffered results in the Producer:
(FK join subscription): b1: {a1}
(OUT): a1 = Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
Note that the FK join result isn't updated synchronously, since it's an async 
operation, so the RHS lookup is temporarily incorrect, yielding the nonsense 
intermediate result where the outer pair has the updated value for a1, but the 
inner (fk result) one still has the old value for a1.

However! We don't buffer that output record for consumption by testing code 
yet, we leave it in the internal Producer while we process the first 
intermediate record (the FK subscription).
Processing that internal record means that we have a new internal record to 
process:
(FK join subscription response): a1: {b1: {v: Y}}

so right now, our internal-records-to-process stack looks like:
(FK join subscription response): a1: {b1: {v: Y}}
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

Again, we start by processing the first thing, the FK join response, which 
results in an updated FK join result:
(J) a1: Pair({v: Z}, b: b1}, {v: Y})
and output:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
and, we still haven't handled the earlier output, so now our 
internal-records-to-process stack looks like:

(J) a1: Pair({v: Z}, b: b1}, {v: Y})
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

At this point, there's nothing else to process in internal topics, so we just 
copy the records one by one to the "output" collection for later handling by 
testing code, but this yields the wrong final state of:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))

That was an incorrect intermediate result, but because we're processing 
internal records recursively (as a stack), it winds up emitted at the end 
instead of in the middle.

If we change the processing model from a stack to a queue, the correct order is 
preserved, and the final state is:
(OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))

This is what I did in https://github.com/apache/kafka/pull/8015



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to