Hello Bill, thanks a lot for the reply,
I will implement your recommendation about the
*KafkaStreams#setGlobalStateRestoreListener.*

About your question:

*When you say you have used both "exactly once" and "at least once" for the*
*"at least once" case did you run for a while in that mode then restart?*

*Yes, I have done that among other combinations, but the same behaviour.*

This is what I see in the logs after restart:

INFO  [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
internals.StoreChangelogReader (StoreChangelogReader.java:215) -
stream-thread [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
No checkpoint found for task 1_8 state store
KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog
*APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8* with EOS turned
on. Reinitializing the task and restore its state from the beginning.

INFO  [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3]
internals.Fetcher (Fetcher.java:583) - [Consumer
clientId=*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3-restore-consumer,
groupId=] Resetting offset for partition
*APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8
to offset 0*.


Before I restart, I always check the LAG for the consumer group (*APP-ID*)
reading from the output topic 'outPutTopicNameOfGroupedData' to verify is
1. Immediately after the restart and verify the logs above, the LAG for
that consumer group (*APP-ID*) reading from the output topic '
outPutTopicNameOfGroupedData' goes up, increasing so much that the App
reading from 'outPutTopicNameOfGroupedData' topic, is re-processing the
data again.

I hope someone can give me some clue, I will really appreciate.


Cheers!
--
Jonathan


On Mon, Jan 14, 2019 at 4:12 PM Bill Bejeck <b...@confluent.io> wrote:

> Hi Jonathan,
>
> With EOS enabled, Kafka Streams does not use checkpoint files for restoring
> state stores; it will replay the data contained in the changelog topic.
> But this should not affect where the input source topic(s) after a restart
> also the changelog topics are only consumed from during a restore (or for
> keeping standby tasks up to date).
>
> When you say you have used both "exactly once" and "at least once" for the
> "at least once" case did you run for a while in that mode then restart? You
> can confirm how much data and from which offset the streams is restoring a
> state store by using a custom implementation of the StateRestoreListener
> class and set it via the KafkaStreams#setGlobalStateRestoreListener.
>
> -Bill
>
>
> On Mon, Jan 14, 2019 at 7:32 AM Jonathan Santilli <
> jonathansanti...@gmail.com> wrote:
>
> > I have a Kafka Streams application for which, whenever I restart it, the
> > offsets for the topic partitions (*KTABLE-SUPPRESS-STATE-STORE*) it is
> > consuming get reset to 0. Hence, for all partitions, the lags increase
> and
> > the app needs to reprocess all the data.
> >
> > I have ensured the lag is 1 for every partition before the restart. All
> > consumers that belong to that consumer-group-id (app-id) are active. The
> > restart is immediate, it takes around 30 secs.
> >
> > The app is using exactly once as processing guarantee.
> >
> > I have read this answer How does an offset expire for an Apache Kafka
> > consumer group?
> > <
> >
> https://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group
> > >
> > .
> >
> > I have tried with *auto.offset.reset = latest* and *auto.offset.reset =
> > earliest*.
> >
> > I assume that after the restart the app should pick-up from the latest
> > committed offset for that consumer group.
> >
> > It is possible to know why the offsets are getting reset from 0?
> >
> > I would really appreciate any clue about this.
> >
> > This is the code the App execute:
> >
> > final StreamsBuilder builder = new StreamsBuilder();
> > final KStream<..., ...> events = builder
> >         .stream(inputTopicNames, Consumed.with(..., ...)
> >         .withTimestampExtractor(...);
> >
> > events
> >     .filter((k, v) -> ...)
> >     .flatMapValues(v -> ...)
> >     .flatMapValues(v -> ...)
> >     .selectKey((k, v) -> v)
> >     .groupByKey(Grouped.with(..., ...))
> >     .windowedBy(
> >         TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
> >             .advanceBy(Duration.ofSeconds(windowSizeInSecs))
> >             .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
> >     .reduce((agg, new) -> {
> >         ...
> >         return agg;
> >     })
> >     .suppress(Suppressed.untilWindowCloses(
> >                   Suppressed.BufferConfig.unbounded()))
> >     .toStream()
> >     .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));
> >
> > The offset reset just and always happens (after restarting) with the
> > *KTABLE-SUPPRESS-STATE-STORE* internal topic created by the Kafka Stream
> > API.
> >
> > I have tried with the Processing guarantee *exactly once* and *at least
> > once*.
> >
> > Once again, I will really appreciate any clue about this.
> > P.S: I have also posted the question in *SO*:
> >
> >
> https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio
> >
> >
> > Cheers!
> > --
> > Santilli Jonathan
> >
>


-- 
Santilli Jonathan

Reply via email to