Hi John, Thank you for your swift response and the advice. Your answers confirm our expectations.
As you suggested we'll keep monitoring the application to see whether a similar issue occurs. For now every analysis points towards the behaviour being an effect of circumstance rather than a problem in the Kafka Streams library, so it may have been a one-time thing. In case it does resurface we hopefully will have the evidence we need to pin-point the source. Thanks again for your assistance! Jeffrey Op za 21 nov. 2020 om 06:02 schreef John Roesler <vvcep...@apache.org>: > Hello Jeffrey, > > I’m sorry for the trouble. I appreciate your diligence in tracking this > down. In reading your description, nothing jumps out to me as problematic. > I’m a bit at a loss as to what may have been the problem. > > > > - Is there a realistic scenario (e.g. crash, rebalance) which you can > > think of where the offset has been committed, but the repartition was > not? > I don’t think so. We wait for acks before committing, so it should be > safe. Maybe double-check that the producer is set to acks=all? > > > - Similarly, is there a realistic scenario (e.g. crash, rebalance) > which > > you can think of where the offset has been committed, but the > changelog was > > not? > Changelogs ought to be exactly the same mechanism as repartition topics. > > > - When caching the output of a KTable, are the corresponding offsets > > committed when the cache is flushed, or are they eligible to be > committed > > as soon as the record is added to the cache? > The order of operations is to flush the caches, wait for acks, and then > commit. > > > - Could it be a race-condition on the update of the state store? I.e. > > given 2 back-to-back messages for the same key, could an aggregation > > function handle both based on the old value? > This really shouldn’t be possible. Each task runs single-threaded. > > Of course, that is all the intent of the code. There may be a bug that > invalidates one of those responses. But I don’t think we know of other > occasions of this happening. > > It sounds like you don’t need general advice, but I feel compelled to > offer it. The best I can think of is to keep a really close eye on the app. > If you can catch the problem right away, you might be able to correlate it > with the application logs, look in the topic partitions, etc. > > I hope this helps, > John > > On Fri, Nov 20, 2020, at 14:39, Jeffrey Goderie wrote: > > Hi all, > > > > We recently started using Kafka Streams and we encountered an unexpected > > issue with our Streams application. Using the following topology we ran > > into data loss: > > > > Topologies: > > Sub-topology: 0 > > Source: meteringpoints-source (topics: > > [serving.meteringpoints.mad-meteringpoint]) > > --> meteringpoints > > Processor: meteringpoints (stores: [meteringpoints]) > > --> KTABLE-SELECT-0000000002 > > <-- meteringpoints-source > > Processor: KTABLE-SELECT-0000000002 (stores: []) > > --> KSTREAM-SINK-0000000003 > > <-- meteringpoints > > Sink: KSTREAM-SINK-0000000003 (topic: > > meteringpointsbyzipcode-repartition) > > <-- KTABLE-SELECT-0000000002 > > > > Sub-topology: 1 > > Source: KSTREAM-SOURCE-0000000004 (topics: > > [meteringpointsbyzipcode-repartition]) > > --> KTABLE-AGGREGATE-0000000005 > > Processor: KTABLE-AGGREGATE-0000000005 (stores: > > [meteringpointsbyzipcode]) > > --> none > > <-- KSTREAM-SOURCE-0000000004 > > > > The topology is optimized, so the 'meteringpoints' store does not have an > > additional changelog. The 'meteringpointsbyzipcode' store does have one. > > > > During the aggregation (KTABLE-AGGREGATE-0000000005) we build up a set of > > objects that we encountered for that specific key. Upon inspection we > > noticed that some of our keys did not have all of the expected objects in > > their associated value. > > > > Configuration-wise: our cluster consists of 3 brokers, and the topics > > (regular and internal) are replicated over all of them. We don't have EOS > > enabled, as our aggregation is idempotent. Our consumers' isolation level > > is 'read_uncommited', which we thought was irrelevant as 'at_least_once' > > delivery doesn't seem to use Kafka transactions. The amount of consumers > is > > equal to the amount of partitions in each topic, so each consumer deals > > with a single partition for each topic. > > > > Unfortunately, both the repartition topic and the changelog topic were > > wiped before we were able to investigate what caused the issue. Because > of > > this we are unable to determine whether the problem originated in the > > changelog or the repartition topic. Resetting the application (and its > > offsets) caused all data to be reprocessed after which the issue was > gone. > > We tried reproducing the erroneous scenario, but have not yet succeeded, > up > > mostly because we don't know what events caused it in the first place. > > > > Since we are seemingly unable to reproduce the behaviour, and didn't find > > any recent accounts of similar problems, we decided to look into the > Kafka > > Streams source code to determine how things function. While insightful, > it > > didn't help in determining the cause. As such, we were wondering whether > > you could aid us by providing an answer to some of the following > questions? > > > > - Is there a realistic scenario (e.g. crash, rebalance) which you can > > think of where the offset has been committed, but the repartition was > not? > > - Similarly, is there a realistic scenario (e.g. crash, rebalance) > which > > you can think of where the offset has been committed, but the > changelog was > > not? > > - When caching the output of a KTable, are the corresponding offsets > > committed when the cache is flushed, or are they eligible to be > committed > > as soon as the record is added to the cache? > > - Could it be a race-condition on the update of the state store? I.e. > > given 2 back-to-back messages for the same key, could an aggregation > > function handle both based on the old value? > > > > Kind regards, > > Jeffrey G > > >