[ https://issues.apache.org/jira/browse/KAFKA-18713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17925758#comment-17925758 ]
tuna b edited comment on KAFKA-18713 at 2/10/25 10:14 PM: ---------------------------------------------------------- "If I don't create 10 partition, will the issue still occur? I tried to reproduce this issue, with just one partition, everything works fine." The issue only happens with multiple partitions and you can reproduce it consistently. It doesn't have to be 10 partitions. Btw, i also saw that this issue is not only happening when setting the FK back to a previous value, but also when you assign new FK's. Its not happening immediately, but after changing the FK like 3 or 4 times. If you keep changing the FK, you will see in a random fashion that the join is failing sometimes. was (Author: JIRAUSER308574): "If I don't create 10 partition, will the issue still occur? I tried to reproduce this issue, with just one partition, everything works fine." The issue only happens with multiple partitions and you can reproduce it consistently. Btw, i also saw that this issue is not only happening when setting the FK back to a previous value, but also when you assign new FK's. Its not happening immediately, but after changing the FK like 3 or 4 times. If you keep changing the FK, you will see in a random fashion that the join is failing sometimes. > Kafka Streams Left-Join not always emitting the last value > ---------------------------------------------------------- > > Key: KAFKA-18713 > URL: https://issues.apache.org/jira/browse/KAFKA-18713 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.8.0 > Reporter: tuna b > Assignee: Nil Madhab > Priority: Major > Attachments: Screenshot 2025-02-10 at 12.06.38.png > > > There seems to be an issue when performing a left-join, the latest value with > of the join does not contain the value of right side. > {code:java} > var builder = new StreamsBuilder(); > KTable<String, Department> departments = builder > .table("departments", > Materialized.<String, > Department>as(Stores.persistentKeyValueStore("departments")) > .withKeySerde(Serdes.String()) > .withValueSerde(CustomJsonSerde.json(Department.class))); > KTable<String, Person> persons = builder > .table("persons", > Materialized.<String, > Person>as(Stores.persistentKeyValueStore("persons")) > .withKeySerde(Serdes.String()) > .withValueSerde(CustomJsonSerde.json(Person.class))); > KTable<String, Person> joined = persons > .leftJoin(departments, Person::getDepartmentId, (person, department) -> > person.toBuilder() > .department(department) > .build(), > TableJoined.as("my-joiner"), > Materialized.<String, > Person>as(Stores.persistentKeyValueStore("joined-results")) > .withKeySerde(Serdes.String()) > .withValueSerde(CustomJsonSerde.json(Person.class))); > joined > .toStream() > .to("joined-results", Produced.with(Serdes.String(), > CustomJsonSerde.json(Person.class))); {code} > How to reproduce: > Create two topics persons and departments, each with 10 partitions. > Pre-populate the departments topic with 2 departments. > > Observation: * When i initially produce a Person {{p-1}} with a FK > {{{}dep-1{}}}, the join works . > * > ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}} > * When i change the FK to {{{}dep-2{}}}, the join updates . > ** output is an EnrichedResult with person {{p-1 }}and department {{dep-2}} > * When i change the FK back to {{{}dep-1{}}}, the join fails . > ** output is an EnrichedResult with person {{p-1}} *but no department* > * However, if I reproduce the same event ({{{}p-1{}}} with {{{}dep-1{}}}), > the join works again . > ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}} > Also, even when you are not setting back to a previous FK, there can still be > an issue with the left join. Changing an FK means insert + delete operations, > but sometimes the result of the delete is emitted after the result of the > insert. > How to reproduce: > Create a departments topic and pre-populate it with 5 departments (dep-1 to > dep-5). Create a persons topic and create person p-1 with FK dep-1. Send an > update to the persons topic by changing the FK to dep-2 and repeat this step > until dep-5. Now you will see that the latest emitted value of the person > does not contain a department. -- This message was sent by Atlassian Jira (v8.20.10#820010)