mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614538992
########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ########## @@ -486,6 +486,327 @@ public void testJoin() { } } + @Test + public void testImprovedLeftJoin() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + final KStream<Integer, String> joined; + final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>(); + + stream1 = builder.stream(topic1, consumed); + stream2 = builder.stream(topic2, consumed); + joined = stream1.leftJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(ofMillis(100)).grace(ofMillis(0)), + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); + joined.process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<Integer, String> inputTopic1 = + driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestInputTopic<Integer, String> inputTopic2 = + driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); + + /* + * Verifies left non-joined records are emitted by a record processed in the left topic + */ + + long windowStart = 0; + + // No joins detected; No null-joins emitted + inputTopic1.pipeInput(0, "A0", windowStart + 1); + inputTopic1.pipeInput(1, "A1", windowStart + 2); + inputTopic1.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic2.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + + // Dummy record in left topic will emit expired non-joined records from the left topic + inputTopic1.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), + new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + + // Flush internal non-joined state store by joining the dummy record + inputTopic2.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401)); + + /* + * Verifies left non-joined records are emitted by a record processed in the right topic + */ + + windowStart = windowStart + 301; + + // No joins detected; No null-joins emitted + inputTopic1.pipeInput(0, "A0", windowStart + 1); + inputTopic1.pipeInput(1, "A1", windowStart + 2); + inputTopic1.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic2.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + + // Dummy record in right topic will emit expired non-joined records from the left topic + inputTopic2.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), + new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + + // Flush internal non-joined state store by joining the dummy record + inputTopic1.pipeInput(2, "dummy", windowStart + 402); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + + /* + * Verifies right non-joined records are not emitted by record processed in the left topic + */ + + windowStart = windowStart + 401; + + // No joins detected; No null-joins emitted + inputTopic2.pipeInput(0, "A0", windowStart + 1); + inputTopic2.pipeInput(1, "A1", windowStart + 2); + inputTopic2.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic1.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3)); + + // Dummy record in left topic will not emit records + inputTopic1.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult(); + + // Process the dummy joined record + inputTopic2.pipeInput(2, "dummy", windowStart + 402); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + + /* + * Verifies right non-joined records are not emitted by record processed in the right topic + */ + + windowStart = windowStart + 401; + + // No joins detected; No null-joins emitted + inputTopic2.pipeInput(0, "A0", windowStart + 1); + inputTopic2.pipeInput(1, "A1", windowStart + 2); + inputTopic2.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic1.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3)); + + // Dummy record in right topic will not emit records + inputTopic2.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult(); + + // Process the dummy joined record + inputTopic1.pipeInput(2, "dummy", windowStart + 402); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + } + } + + @Test + public void testImprovedFullOuterJoin() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + final KStream<Integer, String> joined; + final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>(); + + stream1 = builder.stream(topic1, consumed); + stream2 = builder.stream(topic2, consumed); + joined = stream1.outerJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(ofMillis(100)).grace(ofMillis(0)), + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); + joined.process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<Integer, String> inputTopic1 = + driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestInputTopic<Integer, String> inputTopic2 = + driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); + + /* + * Verifies left non-joined records are emitted by a record processed in the left topic + */ + + long windowStart = 0; + + // No joins detected; No null-joins emitted + inputTopic1.pipeInput(0, "A0", windowStart + 1); + inputTopic1.pipeInput(1, "A1", windowStart + 2); + inputTopic1.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic2.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + + // Dummy record in left topic will emit expired non-joined records from the left topic + inputTopic1.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), + new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + + // Flush internal non-joined state store by joining the dummy record + inputTopic2.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401)); + + /* + * Verifies left non-joined records are emitted by a record processed in the right topic + */ + + windowStart = windowStart + 401; + + // No joins detected; No null-joins emitted + inputTopic1.pipeInput(0, "A0", windowStart + 1); + inputTopic1.pipeInput(1, "A1", windowStart + 2); + inputTopic1.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic2.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + + // Dummy record in right topic will emit expired non-joined records from the left topic + inputTopic2.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), + new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + + // Flush internal non-joined state store by joining the dummy record + inputTopic1.pipeInput(2, "dummy", windowStart + 402); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + + /* + * Verifies right non-joined records are emitted by record processed in the left topic + */ + + windowStart = windowStart + 401; + + // No joins detected; No null-joins emitted + inputTopic2.pipeInput(0, "A0", windowStart + 1); + inputTopic2.pipeInput(1, "A1", windowStart + 2); + inputTopic2.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic1.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3)); + + // Dummy record in left topic will emit expired non-joined records from the right topic + inputTopic1.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "null+A0", windowStart + 1), + new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3)); + + // Process the dummy joined record + inputTopic2.pipeInput(2, "dummy", windowStart + 402); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + + /* + * Verifies right non-joined records are emitted by record processed in the right topic + */ + + windowStart = windowStart + 301; + + // No joins detected; No null-joins emitted + inputTopic2.pipeInput(0, "A0", windowStart + 1); + inputTopic2.pipeInput(1, "A1", windowStart + 2); + inputTopic2.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic1.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3)); + + // Dummy record in right topic will emit expired non-joined records from the right topic + inputTopic2.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "null+A0", windowStart + 1), + new KeyValueTimestamp<>(0, "null+A0-0", windowStart + 3)); + + // Process the dummy joined record + inputTopic1.pipeInput(2, "dummy", windowStart + 402); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + } + } + + @Test + public void testOuterJoinEmitsNonJoinedRecordsAfterWindowCloses() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + final KStream<Integer, String> joined; + final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>(); + + stream1 = builder.stream(topic1, consumed); + stream2 = builder.stream(topic2, consumed); + joined = stream1.outerJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(ofMillis(100)).grace(ofMillis(0)), + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); + joined.process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<Integer, String> inputTopic1 = + driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestInputTopic<Integer, String> inputTopic2 = + driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); + + inputTopic1.pipeInput(0, "A0", 1); + inputTopic1.pipeInput(1, "A1", 2); + + // the window hasn't ended, so no record would be processed yet + processor.checkAndClearProcessResult(); + + inputTopic2.pipeInput(1, "a1", 3); + + // after a join is found, only the joined records are processed + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "A1+a1", 3)); + + // time to process the non-joined records + // a dummy record is sent to advanced the stream time and cause to emit them + inputTopic1.pipeInput(2, "A2", 400); + + // all non-joined of the previous window are emitted + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", 1)); + } + } + @Test public void testOuterJoin() { Review comment: We should move this test into `KStreamKStreamOuterJoinTest` class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org