[ 
https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guido Josquin updated KAFKA-14173:
----------------------------------
    Description: 
I am trying to test a stream-stream join with `TopologyTestDriver`. My goal is 
to confirm, without running external services, that my topology performs the 
following left join correctly.
{code:java}
bills
  .leftJoin(payments)(
    {
      case (billValue, null) => billValue
      case (billValue, paymentValue) => (billValue.toInt - 
paymentValue.toInt).toString
    },
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
  )
  .to("debt")
{code}
 

In other words, if we see a `bill` and a `payment` within 100ms, the payment 
should be subtracted from the bill. If we do not see a payment, the debt is 
simply the bill.

Here is the test code.
{code:java}
val simpleLeftJoinTopology = new SimpleLeftJoinTopology
val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
val serde = Serdes.stringSerde

val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer)
val payments = driver.createInputTopic("payments", serde.serializer, 
serde.serializer)
val debt = driver.createOutputTopic("debt", serde.deserializer, 
serde.deserializer)

bills.pipeInput("fred", "100")
bills.pipeInput("george", "20")
payments.pipeInput("fred", "95")

// When in doubt, sleep twice
driver.advanceWallClockTime(Duration.ofMillis(500))
Thread.sleep(500)

val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
  // This record is present
  new KeyValue[String, String]("fred", "5"),
  // This record is missing
  new KeyValue[String, String]("george", "20")
)
{code}
Full code available at [https://github.com/Oduig/kstreams-left-join-example]

Is seems that advancing the wall clock time, or sleeping for that matter, never 
triggers the join condition when data only arrives on the left side.

  was:
I am trying to test a stream-stream join with `TopologyTestDriver`. My goal is 
to confirm, without running external services, that my topology performs the 
following left join correctly.

```
bills
  .leftJoin(payments)(
    {
      case (billValue, null) => billValue
      case (billValue, paymentValue) => (billValue.toInt - 
paymentValue.toInt).toString
    },
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
  )
  .to("debt")
```

In other words, if we see a `bill` and a `payment` within 100ms, the payment 
should be subtracted from the bill. If we do not see a payment, the debt is 
simply the bill.

Here is the test code.

```
val simpleLeftJoinTopology = new SimpleLeftJoinTopology
val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
val serde = Serdes.stringSerde

val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer)
val payments = driver.createInputTopic("payments", serde.serializer, 
serde.serializer)
val debt = driver.createOutputTopic("debt", serde.deserializer, 
serde.deserializer)

bills.pipeInput("fred", "100")
bills.pipeInput("george", "20")
payments.pipeInput("fred", "95")

// When in doubt, sleep twice
driver.advanceWallClockTime(Duration.ofMillis(500))
Thread.sleep(500)

val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
  // This record is present
  new KeyValue[String, String]("fred", "5"),
  // This record is missing
  new KeyValue[String, String]("george", "20")
)
```

Full code available at https://github.com/Oduig/kstreams-left-join-example

Is seems that advancing the wall clock time, or sleeping for that matter, never 
triggers the join condition when data only arrives on the left side.


> TopologyTestDriver does not perform left join on two streams when right side 
> is missing
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14173
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14173
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams-test-utils
>    Affects Versions: 2.3.1
>            Reporter: Guido Josquin
>            Priority: Minor
>
> I am trying to test a stream-stream join with `TopologyTestDriver`. My goal 
> is to confirm, without running external services, that my topology performs 
> the following left join correctly.
> {code:java}
> bills
>   .leftJoin(payments)(
>     {
>       case (billValue, null) => billValue
>       case (billValue, paymentValue) => (billValue.toInt - 
> paymentValue.toInt).toString
>     },
>     JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
>   )
>   .to("debt")
> {code}
>  
> In other words, if we see a `bill` and a `payment` within 100ms, the payment 
> should be subtracted from the bill. If we do not see a payment, the debt is 
> simply the bill.
> Here is the test code.
> {code:java}
> val simpleLeftJoinTopology = new SimpleLeftJoinTopology
> val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
> val serde = Serdes.stringSerde
> val bills = driver.createInputTopic("bills", serde.serializer, 
> serde.serializer)
> val payments = driver.createInputTopic("payments", serde.serializer, 
> serde.serializer)
> val debt = driver.createOutputTopic("debt", serde.deserializer, 
> serde.deserializer)
> bills.pipeInput("fred", "100")
> bills.pipeInput("george", "20")
> payments.pipeInput("fred", "95")
> // When in doubt, sleep twice
> driver.advanceWallClockTime(Duration.ofMillis(500))
> Thread.sleep(500)
> val keyValues = debt.readKeyValuesToList()
> keyValues should contain theSameElementsAs Seq(
>   // This record is present
>   new KeyValue[String, String]("fred", "5"),
>   // This record is missing
>   new KeyValue[String, String]("george", "20")
> )
> {code}
> Full code available at [https://github.com/Oduig/kstreams-left-join-example]
> Is seems that advancing the wall clock time, or sleeping for that matter, 
> never triggers the join condition when data only arrives on the left side.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to