Hello Chad,

Here are a few thoughts on top of my head: for left joins, we would keep
those received records from the left side that have NOT found a match on
the right side in a separate temporary store (this is only recently
improved, but since you're already on 3.2.1 it's the case indeed). When
later e.g. a right hand side record arrives and found a match on the
temporary left hand side "no-matching-yet" store, we would delete from that
store and emit the join result. But if no matches found as the join window
elapsed, we would still emit those records from the "no-matching-yet" store
and emit the join result as (left, null).

In your case, I think the arrival of the second record advances the
inferred stream time, and hence after that time advanced the first record,
originally in the "no-matching-yet" store, are doomed to not found a match
as join window already expires, so we would emit that record, but as I
said, when that happens the join code should execute with the right side as
"null". So my question is: when you see that join func executed with the
left side as the first record, is the right side "null"? If yes I think
that's reflecting what I'm describing here.


Guozhang




On Thu, Aug 4, 2022 at 9:29 AM Chad Preisler <chad.preis...@gmail.com>
wrote:

> Hello,
>
> I'm doing a stream to stream leftjoin. Here is what I am seeing when I test
> the code.
>
> - I write a record to the left side topic. The stream app reads  the
> message and the deserializer gets triggered. However, the join is not
> triggered at this time.
>
> - I write another record to the left side topic (different key) and I see
> the deserializer get called for the topic. I see the deserializer gets
> called a second time for a store-changelog topic and it deserializes the
> first record. The leftjoin code is executed for the first record submitted.
> This behavior isn't even consistent. Some records on the left never get
> processed.
>
> Why are all the records not processed right away or at all? My join window
> is just 500ms.
>
> I'm using the Kafka 3.2.1 client.
>
> Here is a code snippet of the leftjoin.
>
>         KStream<String, Party> partyStream =
> streamsBuilder.stream(PARTY_TOPIC,
>                 Consumed.with(Serdes.String(), partySerde));
>
>         KStream<String, ItemList> itemListStream =
> streamsBuilder.stream(TODO_ITEMS_LIST_TOPIC, Consumed.with(Serdes.String(),
> itemListSerde));
>
>
>         KStream<String, Party> updatedPartyStream =
> partyStream.leftJoin(itemListStream, (party, itemList) -> {
>             if (itemList != null) {
>                 party.setToDoItems(itemList.getToDoItems());
>             }
>             return party;
>         }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(500)),
>                 StreamJoined.with(Serdes.String(), partySerde,
> itemListSerde));
>
> Thanks,
> Chad
>


-- 
-- Guozhang

Reply via email to