Hello Bill, thanks a lot for the reply, I will implement your recommendation about the *KafkaStreams#setGlobalStateRestoreListener.*
About your question: *When you say you have used both "exactly once" and "at least once" for the* *"at least once" case did you run for a while in that mode then restart?* *Yes, I have done that among other combinations, but the same behaviour.* This is what I see in the logs after restart: INFO [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3] internals.StoreChangelogReader (StoreChangelogReader.java:215) - stream-thread [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3] No checkpoint found for task 1_8 state store KTABLE-SUPPRESS-STATE-STORE-0000000011 changelog *APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8* with EOS turned on. Reinitializing the task and restore its state from the beginning. INFO [*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3] internals.Fetcher (Fetcher.java:583) - [Consumer clientId=*APP-ID*-51df00e9-8b2e-42e5-8d62-6fbf506035d2-StreamThread-3-restore-consumer, groupId=] Resetting offset for partition *APP-ID-KTABLE-SUPPRESS-STATE-STORE-0000000011-changelog-8 to offset 0*. Before I restart, I always check the LAG for the consumer group (*APP-ID*) reading from the output topic 'outPutTopicNameOfGroupedData' to verify is 1. Immediately after the restart and verify the logs above, the LAG for that consumer group (*APP-ID*) reading from the output topic ' outPutTopicNameOfGroupedData' goes up, increasing so much that the App reading from 'outPutTopicNameOfGroupedData' topic, is re-processing the data again. I hope someone can give me some clue, I will really appreciate. Cheers! -- Jonathan On Mon, Jan 14, 2019 at 4:12 PM Bill Bejeck <b...@confluent.io> wrote: > Hi Jonathan, > > With EOS enabled, Kafka Streams does not use checkpoint files for restoring > state stores; it will replay the data contained in the changelog topic. > But this should not affect where the input source topic(s) after a restart > also the changelog topics are only consumed from during a restore (or for > keeping standby tasks up to date). > > When you say you have used both "exactly once" and "at least once" for the > "at least once" case did you run for a while in that mode then restart? You > can confirm how much data and from which offset the streams is restoring a > state store by using a custom implementation of the StateRestoreListener > class and set it via the KafkaStreams#setGlobalStateRestoreListener. > > -Bill > > > On Mon, Jan 14, 2019 at 7:32 AM Jonathan Santilli < > jonathansanti...@gmail.com> wrote: > > > I have a Kafka Streams application for which, whenever I restart it, the > > offsets for the topic partitions (*KTABLE-SUPPRESS-STATE-STORE*) it is > > consuming get reset to 0. Hence, for all partitions, the lags increase > and > > the app needs to reprocess all the data. > > > > I have ensured the lag is 1 for every partition before the restart. All > > consumers that belong to that consumer-group-id (app-id) are active. The > > restart is immediate, it takes around 30 secs. > > > > The app is using exactly once as processing guarantee. > > > > I have read this answer How does an offset expire for an Apache Kafka > > consumer group? > > < > > > https://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group > > > > > . > > > > I have tried with *auto.offset.reset = latest* and *auto.offset.reset = > > earliest*. > > > > I assume that after the restart the app should pick-up from the latest > > committed offset for that consumer group. > > > > It is possible to know why the offsets are getting reset from 0? > > > > I would really appreciate any clue about this. > > > > This is the code the App execute: > > > > final StreamsBuilder builder = new StreamsBuilder(); > > final KStream<..., ...> events = builder > > .stream(inputTopicNames, Consumed.with(..., ...) > > .withTimestampExtractor(...); > > > > events > > .filter((k, v) -> ...) > > .flatMapValues(v -> ...) > > .flatMapValues(v -> ...) > > .selectKey((k, v) -> v) > > .groupByKey(Grouped.with(..., ...)) > > .windowedBy( > > TimeWindows.of(Duration.ofSeconds(windowSizeInSecs)) > > .advanceBy(Duration.ofSeconds(windowSizeInSecs)) > > .grace(Duration.ofSeconds(windowSizeGraceInSecs))) > > .reduce((agg, new) -> { > > ... > > return agg; > > }) > > .suppress(Suppressed.untilWindowCloses( > > Suppressed.BufferConfig.unbounded())) > > .toStream() > > .to(outPutTopicNameOfGroupedData, Produced.with(..., ...)); > > > > The offset reset just and always happens (after restarting) with the > > *KTABLE-SUPPRESS-STATE-STORE* internal topic created by the Kafka Stream > > API. > > > > I have tried with the Processing guarantee *exactly once* and *at least > > once*. > > > > Once again, I will really appreciate any clue about this. > > P.S: I have also posted the question in *SO*: > > > > > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > > > > > > Cheers! > > -- > > Santilli Jonathan > > > -- Santilli Jonathan