Hi John,

Thank you for your swift response and the advice. Your answers confirm our
expectations.

As you suggested we'll keep monitoring the application to see whether a
similar issue occurs. For now every analysis points towards the behaviour
being an effect of circumstance rather than a problem in the Kafka Streams
library, so it may have been a one-time thing. In case it does resurface we
hopefully will have the evidence we need to pin-point the source.

Thanks again for your assistance!
Jeffrey



Op za 21 nov. 2020 om 06:02 schreef John Roesler <vvcep...@apache.org>:

> Hello Jeffrey,
>
> I’m sorry for the trouble. I appreciate your diligence in tracking this
> down. In reading your description, nothing jumps out to me as problematic.
> I’m a bit at a loss as to what may have been the problem.
>
>
> >    - Is there a realistic scenario (e.g. crash, rebalance) which you can
> >    think of where the offset has been committed, but the repartition was
> not?
> I don’t think so. We wait for acks before committing, so it should be
> safe. Maybe double-check that the producer is set to acks=all?
>
> >    - Similarly, is there a realistic scenario (e.g. crash, rebalance)
> which
> >    you can think of where the offset has been committed, but the
> changelog was
> >    not?
> Changelogs ought to be exactly the same mechanism as repartition topics.
>
> >    - When caching the output of a KTable, are the corresponding offsets
> >    committed when the cache is flushed, or are they eligible to be
> committed
> >    as soon as the record is added to the cache?
> The order of operations is to flush the caches, wait for acks, and then
> commit.
>
> >    - Could it be a race-condition on the update of the state store? I.e.
> >    given 2 back-to-back messages for the same key, could an aggregation
> >    function handle both based on the old value?
> This really shouldn’t be possible. Each task runs single-threaded.
>
> Of course, that is all the intent of the code. There may be a bug that
> invalidates one of those responses. But I don’t think we know of other
> occasions of this happening.
>
> It sounds like you don’t need general advice, but I feel compelled to
> offer it. The best I can think of is to keep a really close eye on the app.
> If you can catch the problem right away, you might be able to correlate it
> with the application logs, look in the topic partitions, etc.
>
> I hope this helps,
> John
>
> On Fri, Nov 20, 2020, at 14:39, Jeffrey Goderie wrote:
> > Hi all,
> >
> > We recently started using Kafka Streams and we encountered an unexpected
> > issue with our Streams application. Using the following topology we ran
> > into data loss:
> >
> > Topologies:
> >    Sub-topology: 0
> >     Source: meteringpoints-source (topics:
> > [serving.meteringpoints.mad-meteringpoint])
> >       --> meteringpoints
> >     Processor: meteringpoints (stores: [meteringpoints])
> >       --> KTABLE-SELECT-0000000002
> >       <-- meteringpoints-source
> >     Processor: KTABLE-SELECT-0000000002 (stores: [])
> >       --> KSTREAM-SINK-0000000003
> >       <-- meteringpoints
> >     Sink: KSTREAM-SINK-0000000003 (topic:
> > meteringpointsbyzipcode-repartition)
> >       <-- KTABLE-SELECT-0000000002
> >
> >   Sub-topology: 1
> >     Source: KSTREAM-SOURCE-0000000004 (topics:
> > [meteringpointsbyzipcode-repartition])
> >       --> KTABLE-AGGREGATE-0000000005
> >     Processor: KTABLE-AGGREGATE-0000000005 (stores:
> > [meteringpointsbyzipcode])
> >       --> none
> >       <-- KSTREAM-SOURCE-0000000004
> >
> > The topology is optimized, so the 'meteringpoints' store does not have an
> > additional changelog. The 'meteringpointsbyzipcode' store does have one.
> >
> > During the aggregation (KTABLE-AGGREGATE-0000000005) we build up a set of
> > objects that we encountered for that specific key. Upon inspection we
> > noticed that some of our keys did not have all of the expected objects in
> > their associated value.
> >
> > Configuration-wise: our cluster consists of 3 brokers, and the topics
> > (regular and internal) are replicated over all of them. We don't have EOS
> > enabled, as our aggregation is idempotent. Our consumers' isolation level
> > is 'read_uncommited', which we thought was irrelevant as 'at_least_once'
> > delivery doesn't seem to use Kafka transactions. The amount of consumers
> is
> > equal to the amount of partitions in each topic, so each consumer deals
> > with a single partition for each topic.
> >
> > Unfortunately, both the repartition topic and the changelog topic were
> > wiped before we were able to investigate what caused the issue. Because
> of
> > this we are unable to determine whether the problem originated in the
> > changelog or the repartition topic. Resetting the application (and its
> > offsets) caused all data to be reprocessed after which the issue was
> gone.
> > We tried reproducing the erroneous scenario, but have not yet succeeded,
> up
> > mostly because we don't know what events caused it in the first place.
> >
> > Since we are seemingly unable to reproduce the behaviour, and didn't find
> > any recent accounts of similar problems, we decided to look into the
> Kafka
> > Streams source code to determine how things function. While insightful,
> it
> > didn't help in determining the cause. As such, we were wondering whether
> > you could aid us by providing an answer to some of the following
> questions?
> >
> >    - Is there a realistic scenario (e.g. crash, rebalance) which you can
> >    think of where the offset has been committed, but the repartition was
> not?
> >    - Similarly, is there a realistic scenario (e.g. crash, rebalance)
> which
> >    you can think of where the offset has been committed, but the
> changelog was
> >    not?
> >    - When caching the output of a KTable, are the corresponding offsets
> >    committed when the cache is flushed, or are they eligible to be
> committed
> >    as soon as the record is added to the cache?
> >    - Could it be a race-condition on the update of the state store? I.e.
> >    given 2 back-to-back messages for the same key, could an aggregation
> >    function handle both based on the old value?
> >
> > Kind regards,
> > Jeffrey G
> >
>

Reply via email to