tuna b created KAFKA-18713: ------------------------------ Summary: Kafka Streams Left-Join not always emitting the last value Key: KAFKA-18713 URL: https://issues.apache.org/jira/browse/KAFKA-18713 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.8.0 Reporter: tuna b
There seems to be an issue when performing a left-join, the latest value with of the join is not emitted. {code:java} var builder = new StreamsBuilder(); KTable<String, Department> departments = builder .table("departments", Materialized.<String, Department>as(Stores.persistentKeyValueStore("departments")) .withKeySerde(Serdes.String()) .withValueSerde(CustomJsonSerde.json(Department.class))); KTable<String, Person> persons = builder .table("persons", Materialized.<String, Person>as(Stores.persistentKeyValueStore("persons")) .withKeySerde(Serdes.String()) .withValueSerde(CustomJsonSerde.json(Person.class))); KTable<String, Person> joined = persons .leftJoin(departments, Person::getDepartmentId, (person, department) -> person.toBuilder() .department(department) .build(), TableJoined.as("my-joiner"), Materialized.<String, Person>as(Stores.persistentKeyValueStore("joined-results")) .withKeySerde(Serdes.String()) .withValueSerde(CustomJsonSerde.json(Person.class))); joined .toStream() .to("joined-results", Produced.with(Serdes.String(), CustomJsonSerde.json(Person.class))); {code} How to reproduce: Create two topics persons and departments, each with 10 partitions. Pre-populate the departments topic with 2 departments. Observation: * When i initially produce a Person {{p-1}} with a FK {{{}dep-1{}}}, the join works . ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}} * When i change the FK to {{{}dep-2{}}}, the join updates . ** output is an EnrichedResult with person {{p-1 }}and department {{dep-2}} * When i change the FK back to {{{}dep-1{}}}, the join fails . ** output is an EnrichedResult with person {{p-1}} *but no department* * However, if I reproduce the same event ({{{}p-1{}}} with {{{}dep-1{}}}), the join works again . ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}} Also, even when you are not setting back to a previous FK, there can still be an issue with the left join. Changing an FK means insert + delete operations, but sometimes the result of the delete is emitted after the result of the insert. How to reproduce: Create a departments topic and pre-populate it with 5 departments (dep-1 to dep-5). Create a persons topic and create person p-1 with FK dep-1. Send an update to the persons topic by changing the FK to dep-2 and repeat this step until dep-5. Now you will see that the latest emitted value of the person does not contain a department. How to reproduce: -- This message was sent by Atlassian Jira (v8.20.10#820010)