mjsax commented on a change in pull request #10462: URL: https://github.com/apache/kafka/pull/10462#discussion_r614538486
########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ########## @@ -486,6 +486,327 @@ public void testJoin() { } } + @Test + public void testImprovedLeftJoin() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + final KStream<Integer, String> joined; + final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>(); + + stream1 = builder.stream(topic1, consumed); + stream2 = builder.stream(topic2, consumed); + joined = stream1.leftJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(ofMillis(100)).grace(ofMillis(0)), + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())); + joined.process(supplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + final TestInputTopic<Integer, String> inputTopic1 = + driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final TestInputTopic<Integer, String> inputTopic2 = + driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); + final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); + + /* + * Verifies left non-joined records are emitted by a record processed in the left topic + */ + + long windowStart = 0; + + // No joins detected; No null-joins emitted + inputTopic1.pipeInput(0, "A0", windowStart + 1); + inputTopic1.pipeInput(1, "A1", windowStart + 2); + inputTopic1.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic2.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + + // Dummy record in left topic will emit expired non-joined records from the left topic + inputTopic1.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), + new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + + // Flush internal non-joined state store by joining the dummy record + inputTopic2.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 401)); + + /* + * Verifies left non-joined records are emitted by a record processed in the right topic + */ + + windowStart = windowStart + 301; + + // No joins detected; No null-joins emitted + inputTopic1.pipeInput(0, "A0", windowStart + 1); + inputTopic1.pipeInput(1, "A1", windowStart + 2); + inputTopic1.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic2.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "A1+a1", windowStart + 3)); + + // Dummy record in right topic will emit expired non-joined records from the left topic + inputTopic2.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "A0+null", windowStart + 1), + new KeyValueTimestamp<>(0, "A0-0+null", windowStart + 3)); + + // Flush internal non-joined state store by joining the dummy record + inputTopic1.pipeInput(2, "dummy", windowStart + 402); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + + /* + * Verifies right non-joined records are not emitted by record processed in the left topic + */ + + windowStart = windowStart + 401; + + // No joins detected; No null-joins emitted + inputTopic2.pipeInput(0, "A0", windowStart + 1); + inputTopic2.pipeInput(1, "A1", windowStart + 2); + inputTopic2.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic1.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3)); + + // Dummy record in left topic will not emit records + inputTopic1.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult(); + + // Process the dummy joined record + inputTopic2.pipeInput(2, "dummy", windowStart + 402); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + + /* + * Verifies right non-joined records are not emitted by record processed in the right topic + */ + + windowStart = windowStart + 401; + + // No joins detected; No null-joins emitted + inputTopic2.pipeInput(0, "A0", windowStart + 1); + inputTopic2.pipeInput(1, "A1", windowStart + 2); + inputTopic2.pipeInput(0, "A0-0", windowStart + 3); + processor.checkAndClearProcessResult(); + + // Join detected; No null-joins emitted + inputTopic1.pipeInput(1, "a1", windowStart + 3); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(1, "a1+A1", windowStart + 3)); + + // Dummy record in right topic will not emit records + inputTopic2.pipeInput(2, "dummy", windowStart + 401); + processor.checkAndClearProcessResult(); + + // Process the dummy joined record + inputTopic1.pipeInput(2, "dummy", windowStart + 402); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(2, "dummy+dummy", windowStart + 402)); + } + } + + @Test + public void testImprovedFullOuterJoin() { Review comment: I am just realizing, that we don't have a `KStreamKStreamOuterJoinTest` class -- we should add one and move this test there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org