Hi all,

We recently started using Kafka Streams and we encountered an unexpected
issue with our Streams application. Using the following topology we ran
into data loss:

Topologies:
   Sub-topology: 0
    Source: meteringpoints-source (topics:
[serving.meteringpoints.mad-meteringpoint])
      --> meteringpoints
    Processor: meteringpoints (stores: [meteringpoints])
      --> KTABLE-SELECT-0000000002
      <-- meteringpoints-source
    Processor: KTABLE-SELECT-0000000002 (stores: [])
      --> KSTREAM-SINK-0000000003
      <-- meteringpoints
    Sink: KSTREAM-SINK-0000000003 (topic:
meteringpointsbyzipcode-repartition)
      <-- KTABLE-SELECT-0000000002

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000004 (topics:
[meteringpointsbyzipcode-repartition])
      --> KTABLE-AGGREGATE-0000000005
    Processor: KTABLE-AGGREGATE-0000000005 (stores:
[meteringpointsbyzipcode])
      --> none
      <-- KSTREAM-SOURCE-0000000004

The topology is optimized, so the 'meteringpoints' store does not have an
additional changelog. The 'meteringpointsbyzipcode' store does have one.

During the aggregation (KTABLE-AGGREGATE-0000000005) we build up a set of
objects that we encountered for that specific key. Upon inspection we
noticed that some of our keys did not have all of the expected objects in
their associated value.

Configuration-wise: our cluster consists of 3 brokers, and the topics
(regular and internal) are replicated over all of them. We don't have EOS
enabled, as our aggregation is idempotent. Our consumers' isolation level
is 'read_uncommited', which we thought was irrelevant as 'at_least_once'
delivery doesn't seem to use Kafka transactions. The amount of consumers is
equal to the amount of partitions in each topic, so each consumer deals
with a single partition for each topic.

Unfortunately, both the repartition topic and the changelog topic were
wiped before we were able to investigate what caused the issue. Because of
this we are unable to determine whether the problem originated in the
changelog or the repartition topic. Resetting the application (and its
offsets) caused all data to be reprocessed after which the issue was gone.
We tried reproducing the erroneous scenario, but have not yet succeeded,
mostly because we don't know what events caused it in the first place.

Since we are seemingly unable to reproduce the behaviour, and didn't find
any recent accounts of similar problems, we decided to look into the Kafka
Streams source code to determine how things function. While insightful, it
didn't help in determining the cause. As such, we were wondering whether
you could aid us by providing an answer to some of the following questions?

   - Is there a realistic scenario (e.g. crash, rebalance) which you can
   think of where the offset has been committed, but the repartition was not?
   - Similarly, is there a realistic scenario (e.g. crash, rebalance) which
   you can think of where the offset has been committed, but the changelog was
   not?
   - When caching the output of a KTable, are the corresponding offsets
   committed when the cache is flushed, or are they eligible to be committed
   as soon as the record is added to the cache?
   - Could it be a race-condition on the update of the state store? I.e.
   given 2 back-to-back messages for the same key, could an aggregation
   function handle both based on the old value?

Kind regards,
Jeffrey G

Reply via email to