Hello Chad, Here are a few thoughts on top of my head: for left joins, we would keep those received records from the left side that have NOT found a match on the right side in a separate temporary store (this is only recently improved, but since you're already on 3.2.1 it's the case indeed). When later e.g. a right hand side record arrives and found a match on the temporary left hand side "no-matching-yet" store, we would delete from that store and emit the join result. But if no matches found as the join window elapsed, we would still emit those records from the "no-matching-yet" store and emit the join result as (left, null).
In your case, I think the arrival of the second record advances the inferred stream time, and hence after that time advanced the first record, originally in the "no-matching-yet" store, are doomed to not found a match as join window already expires, so we would emit that record, but as I said, when that happens the join code should execute with the right side as "null". So my question is: when you see that join func executed with the left side as the first record, is the right side "null"? If yes I think that's reflecting what I'm describing here. Guozhang On Thu, Aug 4, 2022 at 9:29 AM Chad Preisler <chad.preis...@gmail.com> wrote: > Hello, > > I'm doing a stream to stream leftjoin. Here is what I am seeing when I test > the code. > > - I write a record to the left side topic. The stream app reads the > message and the deserializer gets triggered. However, the join is not > triggered at this time. > > - I write another record to the left side topic (different key) and I see > the deserializer get called for the topic. I see the deserializer gets > called a second time for a store-changelog topic and it deserializes the > first record. The leftjoin code is executed for the first record submitted. > This behavior isn't even consistent. Some records on the left never get > processed. > > Why are all the records not processed right away or at all? My join window > is just 500ms. > > I'm using the Kafka 3.2.1 client. > > Here is a code snippet of the leftjoin. > > KStream<String, Party> partyStream = > streamsBuilder.stream(PARTY_TOPIC, > Consumed.with(Serdes.String(), partySerde)); > > KStream<String, ItemList> itemListStream = > streamsBuilder.stream(TODO_ITEMS_LIST_TOPIC, Consumed.with(Serdes.String(), > itemListSerde)); > > > KStream<String, Party> updatedPartyStream = > partyStream.leftJoin(itemListStream, (party, itemList) -> { > if (itemList != null) { > party.setToDoItems(itemList.getToDoItems()); > } > return party; > }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(500)), > StreamJoined.with(Serdes.String(), partySerde, > itemListSerde)); > > Thanks, > Chad > -- -- Guozhang