Hello,
I'm confused about the outerJoin and when records are produced with the
following code.
Topology buildTopology() {
var builder = new StreamsBuilder();
var leftStream = builder.stream("leftSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));
var rightStream = builder.stream("rightSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));
leftStream.outerJoin(rightStream, (left, right) -> left + ", " +
right,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
.to("outputTopicSeconds");
return builder.build();
}
Here is the test driver.
@Test
public void testSecondsJoinDoesNotWork() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
var app = new KafkaStreamJoinTest();
var serializer = new StringSerializer();
try(var testDriver = new TopologyTestDriver(app.buildTopology(),
props)) {
var leftTopic = testDriver.createInputTopic("leftSecondsTopic",
serializer, serializer, Instant.ofEpochMilli(0L),
Duration.ZERO);
leftTopic.pipeInput("1", "test string 1", 0L);
leftTopic.pipeInput("1", "test string 2", 2001L);
var outputTopic =
testDriver.createOutputTopic("outputTopicSeconds",
new StringDeserializer(), new StringDeserializer());
assertFalse(outputTopic.isEmpty());
System.out.println("First join result:");
outputTopic.readKeyValuesToList()
.forEach((keyValue)->
System.out.println("Key: " + keyValue.key + " Value: "
+ keyValue.value));
assertTrue(outputTopic.isEmpty());
leftTopic.pipeInput("1", "test string 3", 4002L);
leftTopic.pipeInput("1", "test string 4", 6004L);
System.out.println("Second join result:");
outputTopic.readKeyValuesToList()
.forEach((keyValue)->
System.out.println("Key: " + keyValue.key + " Value: "
+ keyValue.value));
}
}
Here is the output:
First join result:
Key: 1 Value: test string 1, null
Second join result:
I would have expected a join to happen with "test string 2" and "test
string 3" being output with a null right value. Why didn't that happen?