Hi all, We recently started using Kafka Streams and we encountered an unexpected issue with our Streams application. Using the following topology we ran into data loss:
Topologies: Sub-topology: 0 Source: meteringpoints-source (topics: [serving.meteringpoints.mad-meteringpoint]) --> meteringpoints Processor: meteringpoints (stores: [meteringpoints]) --> KTABLE-SELECT-0000000002 <-- meteringpoints-source Processor: KTABLE-SELECT-0000000002 (stores: []) --> KSTREAM-SINK-0000000003 <-- meteringpoints Sink: KSTREAM-SINK-0000000003 (topic: meteringpointsbyzipcode-repartition) <-- KTABLE-SELECT-0000000002 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000004 (topics: [meteringpointsbyzipcode-repartition]) --> KTABLE-AGGREGATE-0000000005 Processor: KTABLE-AGGREGATE-0000000005 (stores: [meteringpointsbyzipcode]) --> none <-- KSTREAM-SOURCE-0000000004 The topology is optimized, so the 'meteringpoints' store does not have an additional changelog. The 'meteringpointsbyzipcode' store does have one. During the aggregation (KTABLE-AGGREGATE-0000000005) we build up a set of objects that we encountered for that specific key. Upon inspection we noticed that some of our keys did not have all of the expected objects in their associated value. Configuration-wise: our cluster consists of 3 brokers, and the topics (regular and internal) are replicated over all of them. We don't have EOS enabled, as our aggregation is idempotent. Our consumers' isolation level is 'read_uncommited', which we thought was irrelevant as 'at_least_once' delivery doesn't seem to use Kafka transactions. The amount of consumers is equal to the amount of partitions in each topic, so each consumer deals with a single partition for each topic. Unfortunately, both the repartition topic and the changelog topic were wiped before we were able to investigate what caused the issue. Because of this we are unable to determine whether the problem originated in the changelog or the repartition topic. Resetting the application (and its offsets) caused all data to be reprocessed after which the issue was gone. We tried reproducing the erroneous scenario, but have not yet succeeded, mostly because we don't know what events caused it in the first place. Since we are seemingly unable to reproduce the behaviour, and didn't find any recent accounts of similar problems, we decided to look into the Kafka Streams source code to determine how things function. While insightful, it didn't help in determining the cause. As such, we were wondering whether you could aid us by providing an answer to some of the following questions? - Is there a realistic scenario (e.g. crash, rebalance) which you can think of where the offset has been committed, but the repartition was not? - Similarly, is there a realistic scenario (e.g. crash, rebalance) which you can think of where the offset has been committed, but the changelog was not? - When caching the output of a KTable, are the corresponding offsets committed when the cache is flushed, or are they eligible to be committed as soon as the record is added to the cache? - Could it be a race-condition on the update of the state store? I.e. given 2 back-to-back messages for the same key, could an aggregation function handle both based on the old value? Kind regards, Jeffrey G