Hello,

I'm doing a stream to stream leftjoin. Here is what I am seeing when I test
the code.

- I write a record to the left side topic. The stream app reads  the
message and the deserializer gets triggered. However, the join is not
triggered at this time.

- I write another record to the left side topic (different key) and I see
the deserializer get called for the topic. I see the deserializer gets
called a second time for a store-changelog topic and it deserializes the
first record. The leftjoin code is executed for the first record submitted.
This behavior isn't even consistent. Some records on the left never get
processed.

Why are all the records not processed right away or at all? My join window
is just 500ms.

I'm using the Kafka 3.2.1 client.

Here is a code snippet of the leftjoin.

        KStream<String, Party> partyStream =
streamsBuilder.stream(PARTY_TOPIC,
                Consumed.with(Serdes.String(), partySerde));

        KStream<String, ItemList> itemListStream =
streamsBuilder.stream(TODO_ITEMS_LIST_TOPIC, Consumed.with(Serdes.String(),
itemListSerde));


        KStream<String, Party> updatedPartyStream =
partyStream.leftJoin(itemListStream, (party, itemList) -> {
            if (itemList != null) {
                party.setToDoItems(itemList.getToDoItems());
            }
            return party;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(500)),
                StreamJoined.with(Serdes.String(), partySerde,
itemListSerde));

Thanks,
Chad

Reply via email to