[ https://issues.apache.org/jira/browse/KAFKA-16700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933764#comment-17933764 ]
Karsten Stöckmann commented on KAFKA-16700: ------------------------------------------- [~mjsax] Awesome, thanks for the hint. I'll update our dependencies as soon as 3.9.1 is released and get back here. > Kafka Streams: possible message loss on KTable-KTable FK Left Join > ------------------------------------------------------------------ > > Key: KAFKA-16700 > URL: https://issues.apache.org/jira/browse/KAFKA-16700 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.7.0 > Environment: Apache Kafka 3.7.0 cluster in KRaft mode, 3 brokers and > 3 controllers, on managed Kubernetes 1.28.7, operated by Strimzi Kafka > Operators > Reporter: Karsten Stöckmann > Priority: Major > Labels: dsl, joins, streams > > We are experiencing significant, yet intermittent / non-deterministic / > unexplainable message loss on a Kafka Streams topology while performing a > *KTable-KTable* {*}FK Left Join{*}. > Assume the following snippet: > {code:java} > streamsBuilder > .table( > folderTopicName, > Consumed.with( > folderKeySerde, > folderSerde)) > .leftJoin( > agencies, // KTable<AgencyId, AggregateAgency> > Folder::agencyIdValue, > AggregateFolder::new, > TableJoined.as("folder-to-agency"), > Materialized > .as("folder-to-agency-materialized") > .withKeySerde(folderKeySerde) > .withValueSerde(aggregateFolderSerde)) > .leftJoin( > documents, > {code} > The setup is as follows: > A Debezium Connector for PostgreSQL streams database changes into various > Kafka topics. A series of Quarkus Kafka Streams applications then performs > aggregation operations on those topics to create index documents later to be > sent into an OpenSearch system. > When firing up the Kafka Streams infrastructure to work on initially > populated Kafka Topics (i.e. a snapshot of all relevant table data has been > streamed to Kafka), the above shown KTable-KTable FK Left Join seems to > produce message loss on the first of a series of FK Left Joins; the right > hand {{KTable<AgencyId, AggregateAgency>}} is consumed from an aggregated > topic fed from another Kafka Streams topology / application. > On a (heavily reduced) test data set of 6828 messages in the > {{folderTopicName}} Topic, we observe the following results: > * {{{}folder-to-agency-subscription-registration{}}}: *6967* messages > * {{{}folder-to-agency-subscription-response{}}}: *3048* messages > * {{{}folder-to-agency-subscription-store-changelog{}}}: *6359* messages > * {{{}folder-to-agency-materialized-changelog{}}}: *4644* messages. > Telling from the nature of a (FK) Left Join, I'd expect all messages from the > left hand topic should produce an aggregate even if no matching message is > found in the right hand topic. > Message loss unpredictably varies across tests and seems not to be bound to > specific keys or messages. > As it seems, this can only be observed when initially firing up the Streams > infrastructure to process the message 'backlog' that had been snapshotted by > Debezium. A manual snapshot triggered later (i.e. Streams applications > already running) seems not to show this behaviour. Additionally, as of yet we > observed this kind of message loss only when running multiple replicas of the > affected application. When carrying out the tests with only one replica, > everything seems to work as expected. We've tried to leverage > {{group.initial.rebalance.delay.ms}} in order to rule out possible > rebalancing issues, but to no avail. > Our Kafka configuration: > {code:yaml} > offsets.topic.replication.factor: 3 > transaction.state.log.replication.factor: 3 > transaction.state.log.min.isr: 2 > default.replication.factor: 3 > min.insync.replicas: 2 > message.max.bytes: "20971520" > {code} > Our Kafka Streams application configuration: > {code:yaml} > kafka-streams.num.stream.threads: 5 > kafka-streams.num.standby.replicas: 1 > kafka-streams.auto.offset.reset: earliest > kafka-streams.cache.max.bytes.buffering: "20971520" > kafka-streams.commit.interval.ms: 100 > kafka-streams.fetch.max.bytes: "10485760" > kafka-streams.max.request.size: "10485760" > kafka-streams.max.partition.fetch.bytes: "10485760" > kafka-streams.metadata.max.age.ms: 300000 > kafka-streams.statestore.cache.max.bytes: "20971520" > kafka-streams.topology.optimization: all > kafka-streams.processing.guarantee: exactly_once_v2 > # Kafka Streams Intermediate Topics > kafka-streams.topic.compression.type: lz4 > kafka-streams.topic.segment.ms: "43200000" # 12h > kafka-streams.topic.max.compaction.lag.ms: "86400000" # 24h > kafka-streams.topic.delete.retention.ms: "86400000" # 24h > kafka-streams.producer.max.request.size: "20971520" # 20MiB > kafka-streams.producer.transaction.timeout.ms: 100 # Should match > commit.interval.ms, set close to 100ms for exactly_once_v2 > kafka-streams.consumer.group.instance.id: ${HOSTNAME} > kafka-streams.consumer.heartbeat.interval.ms: 100 > kafka-streams.consumer.session.timeout.ms: 45000 > {code} > All input (and aggregate) topics feature 15 partitions and share this > configuration: > {code:yaml} > cleanup.policy: compact > compression.type: lz4 > segment.ms: "43200000" # 12h > max.compaction.lag.ms: "86400000" # 24h > delete.retention.ms: "86400000" # 24h > {code} > Logs show no indication of where or why this happens. > The issue was discussed on the Kafka [mailing > list|https://lists.apache.org/thread/l50pwmwhobt73db97n0r5v36mydo15rs] as > well as on > [StackOverflow|https://stackoverflow.com/questions/78210993/kafka-streams-topology-initially-dropping-messages-to-intermediate-topics], > but both threads lead to no further explanation. In the end, I was suggested > to file a bug on the Kafka JIRA. I actually can't rule out that this is > entirely based upon some setting in our Kafka environment, but there are > other > [indications|https://stackoverflow.com/questions/75886729/missing-records-in-kafka-streams-foreign-key-join] > of similar message loss on FK Join operations. For the time being, I'd > consider this a bug, perhaps emerging only under certain conditions. > ATM I've no test case to reproduce the issue locally. > In case any additional information is needed, I'd be happy to provide those. -- This message was sent by Atlassian Jira (v8.20.10#820010)