[ https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guido Josquin updated KAFKA-14173: ---------------------------------- Description: I am trying to test a stream-stream join with `TopologyTestDriver`. My goal is to confirm, without running external services, that my topology performs the following left join correctly. {code:java} bills .leftJoin(payments)( { case (billValue, null) => billValue case (billValue, paymentValue) => (billValue.toInt - paymentValue.toInt).toString }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)) ) .to("debt") {code} In other words, if we see a `bill` and a `payment` within 100ms, the payment should be subtracted from the bill. If we do not see a payment, the debt is simply the bill. Here is the test code. {code:java} val simpleLeftJoinTopology = new SimpleLeftJoinTopology val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology) val serde = Serdes.stringSerde val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer) val payments = driver.createInputTopic("payments", serde.serializer, serde.serializer) val debt = driver.createOutputTopic("debt", serde.deserializer, serde.deserializer) bills.pipeInput("fred", "100") bills.pipeInput("george", "20") payments.pipeInput("fred", "95") // When in doubt, sleep twice driver.advanceWallClockTime(Duration.ofMillis(500)) Thread.sleep(500) val keyValues = debt.readKeyValuesToList() keyValues should contain theSameElementsAs Seq( // This record is present new KeyValue[String, String]("fred", "5"), // This record is missing new KeyValue[String, String]("george", "20") ) {code} Full code available at [https://github.com/Oduig/kstreams-left-join-example] Is seems that advancing the wall clock time, or sleeping for that matter, never triggers the join condition when data only arrives on the left side. was: I am trying to test a stream-stream join with `TopologyTestDriver`. My goal is to confirm, without running external services, that my topology performs the following left join correctly. ``` bills .leftJoin(payments)( { case (billValue, null) => billValue case (billValue, paymentValue) => (billValue.toInt - paymentValue.toInt).toString }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)) ) .to("debt") ``` In other words, if we see a `bill` and a `payment` within 100ms, the payment should be subtracted from the bill. If we do not see a payment, the debt is simply the bill. Here is the test code. ``` val simpleLeftJoinTopology = new SimpleLeftJoinTopology val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology) val serde = Serdes.stringSerde val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer) val payments = driver.createInputTopic("payments", serde.serializer, serde.serializer) val debt = driver.createOutputTopic("debt", serde.deserializer, serde.deserializer) bills.pipeInput("fred", "100") bills.pipeInput("george", "20") payments.pipeInput("fred", "95") // When in doubt, sleep twice driver.advanceWallClockTime(Duration.ofMillis(500)) Thread.sleep(500) val keyValues = debt.readKeyValuesToList() keyValues should contain theSameElementsAs Seq( // This record is present new KeyValue[String, String]("fred", "5"), // This record is missing new KeyValue[String, String]("george", "20") ) ``` Full code available at https://github.com/Oduig/kstreams-left-join-example Is seems that advancing the wall clock time, or sleeping for that matter, never triggers the join condition when data only arrives on the left side. > TopologyTestDriver does not perform left join on two streams when right side > is missing > --------------------------------------------------------------------------------------- > > Key: KAFKA-14173 > URL: https://issues.apache.org/jira/browse/KAFKA-14173 > Project: Kafka > Issue Type: Bug > Components: streams-test-utils > Affects Versions: 2.3.1 > Reporter: Guido Josquin > Priority: Minor > > I am trying to test a stream-stream join with `TopologyTestDriver`. My goal > is to confirm, without running external services, that my topology performs > the following left join correctly. > {code:java} > bills > .leftJoin(payments)( > { > case (billValue, null) => billValue > case (billValue, paymentValue) => (billValue.toInt - > paymentValue.toInt).toString > }, > JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100)) > ) > .to("debt") > {code} > > In other words, if we see a `bill` and a `payment` within 100ms, the payment > should be subtracted from the bill. If we do not see a payment, the debt is > simply the bill. > Here is the test code. > {code:java} > val simpleLeftJoinTopology = new SimpleLeftJoinTopology > val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology) > val serde = Serdes.stringSerde > val bills = driver.createInputTopic("bills", serde.serializer, > serde.serializer) > val payments = driver.createInputTopic("payments", serde.serializer, > serde.serializer) > val debt = driver.createOutputTopic("debt", serde.deserializer, > serde.deserializer) > bills.pipeInput("fred", "100") > bills.pipeInput("george", "20") > payments.pipeInput("fred", "95") > // When in doubt, sleep twice > driver.advanceWallClockTime(Duration.ofMillis(500)) > Thread.sleep(500) > val keyValues = debt.readKeyValuesToList() > keyValues should contain theSameElementsAs Seq( > // This record is present > new KeyValue[String, String]("fred", "5"), > // This record is missing > new KeyValue[String, String]("george", "20") > ) > {code} > Full code available at [https://github.com/Oduig/kstreams-left-join-example] > Is seems that advancing the wall clock time, or sleeping for that matter, > never triggers the join condition when data only arrives on the left side. -- This message was sent by Atlassian Jira (v8.20.10#820010)