mjsax commented on a change in pull request #10462:
URL: https://github.com/apache/kafka/pull/10462#discussion_r614538992



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -486,6 +486,327 @@ public void testJoin() {
         }
     }
 
+    @Test
+    public void testImprovedLeftJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new 
MockProcessorSupplier<>();
+
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        joined = stream1.leftJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()));
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
+
+            /*
+             * Verifies left non-joined records are emitted by a record 
processed in the left topic
+             */
+
+            long windowStart = 0;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in left topic will emit expired non-joined records 
from the left topic
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy 
record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401));
+
+            /*
+             * Verifies left non-joined records are emitted by a record 
processed in the right topic
+             */
+
+            windowStart = windowStart + 301;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in right topic will emit expired non-joined 
records from the left topic
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy 
record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are not emitted by record 
processed in the left topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in left topic will not emit records
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult();
+
+            // Process the dummy joined record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are not emitted by record 
processed in the right topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in right topic will not emit records
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult();
+
+            // Process the dummy joined record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+        }
+    }
+
+    @Test
+    public void testImprovedFullOuterJoin() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new 
MockProcessorSupplier<>();
+
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()));
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
+
+            /*
+             * Verifies left non-joined records are emitted by a record 
processed in the left topic
+             */
+
+            long windowStart = 0;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in left topic will emit expired non-joined records 
from the left topic
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy 
record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401));
+
+            /*
+             * Verifies left non-joined records are emitted by a record 
processed in the right topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic1.pipeInput(0, "A0", windowStart + 1);
+            inputTopic1.pipeInput(1, "A1", windowStart + 2);
+            inputTopic1.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic2.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3));
+
+            // Dummy record in right topic will emit expired non-joined 
records from the left topic
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", windowStart + 1),
+                new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3));
+
+            // Flush internal non-joined state store by joining the dummy 
record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are emitted by record 
processed in the left topic
+             */
+
+            windowStart = windowStart + 401;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in left topic will emit expired non-joined records 
from the right topic
+            inputTopic1.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "null+A0", windowStart + 1),
+                new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3));
+
+            // Process the dummy joined record
+            inputTopic2.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+
+            /*
+             * Verifies right non-joined records are emitted by record 
processed in the right topic
+             */
+
+            windowStart = windowStart + 301;
+
+            // No joins detected; No null-joins emitted
+            inputTopic2.pipeInput(0, "A0", windowStart + 1);
+            inputTopic2.pipeInput(1, "A1", windowStart + 2);
+            inputTopic2.pipeInput(0, "A0-0", windowStart + 3);
+            processor.checkAndClearProcessResult();
+
+            // Join detected; No null-joins emitted
+            inputTopic1.pipeInput(1, "a1", windowStart + 3);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3));
+
+            // Dummy record in right topic will emit expired non-joined 
records from the right topic
+            inputTopic2.pipeInput(2, "dummy", windowStart + 401);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "null+A0", windowStart + 1),
+                new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3));
+
+            // Process the dummy joined record
+            inputTopic1.pipeInput(2, "dummy", windowStart + 402);
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402));
+        }
+    }
+
+    @Test
+    public void testOuterJoinEmitsNonJoinedRecordsAfterWindowCloses() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> supplier = new 
MockProcessorSupplier<>();
+
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
+        joined = stream1.outerJoin(
+            stream2,
+            MockValueJoiner.TOSTRING_JOINER,
+            JoinWindows.of(ofMillis(100)).grace(ofMillis(0)),
+            StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()));
+        joined.process(supplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<Integer, String> inputTopic1 =
+                driver.createInputTopic(topic1, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final TestInputTopic<Integer, String> inputTopic2 =
+                driver.createInputTopic(topic2, new IntegerSerializer(), new 
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+            final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
+
+            inputTopic1.pipeInput(0, "A0", 1);
+            inputTopic1.pipeInput(1, "A1", 2);
+
+            // the window hasn't ended, so no record would be processed yet
+            processor.checkAndClearProcessResult();
+
+            inputTopic2.pipeInput(1, "a1", 3);
+
+            // after a join is found, only the joined records are processed
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(1, "A1+a1", 3));
+
+            // time to process the non-joined records
+            // a dummy record is sent to advanced the stream time and cause to 
emit them
+            inputTopic1.pipeInput(2, "A2", 400);
+
+            // all non-joined of the previous window are emitted
+            processor.checkAndClearProcessResult(
+                new KeyValueTimestamp<>(0, "A0+null", 1));
+        }
+    }
+
     @Test
     public void testOuterJoin() {

Review comment:
       We should move this test into `KStreamKStreamOuterJoinTest` class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to