Hi Jonathan, With EOS enabled, Kafka Streams does not use checkpoint files for restoring state stores; it will replay the data contained in the changelog topic. But this should not affect where the input source topic(s) after a restart also the changelog topics are only consumed from during a restore (or for keeping standby tasks up to date).
When you say you have used both "exactly once" and "at least once" for the "at least once" case did you run for a while in that mode then restart? You can confirm how much data and from which offset the streams is restoring a state store by using a custom implementation of the StateRestoreListener class and set it via the KafkaStreams#setGlobalStateRestoreListener. -Bill On Mon, Jan 14, 2019 at 7:32 AM Jonathan Santilli < jonathansanti...@gmail.com> wrote: > I have a Kafka Streams application for which, whenever I restart it, the > offsets for the topic partitions (*KTABLE-SUPPRESS-STATE-STORE*) it is > consuming get reset to 0. Hence, for all partitions, the lags increase and > the app needs to reprocess all the data. > > I have ensured the lag is 1 for every partition before the restart. All > consumers that belong to that consumer-group-id (app-id) are active. The > restart is immediate, it takes around 30 secs. > > The app is using exactly once as processing guarantee. > > I have read this answer How does an offset expire for an Apache Kafka > consumer group? > < > https://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group > > > . > > I have tried with *auto.offset.reset = latest* and *auto.offset.reset = > earliest*. > > I assume that after the restart the app should pick-up from the latest > committed offset for that consumer group. > > It is possible to know why the offsets are getting reset from 0? > > I would really appreciate any clue about this. > > This is the code the App execute: > > final StreamsBuilder builder = new StreamsBuilder(); > final KStream<..., ...> events = builder > .stream(inputTopicNames, Consumed.with(..., ...) > .withTimestampExtractor(...); > > events > .filter((k, v) -> ...) > .flatMapValues(v -> ...) > .flatMapValues(v -> ...) > .selectKey((k, v) -> v) > .groupByKey(Grouped.with(..., ...)) > .windowedBy( > TimeWindows.of(Duration.ofSeconds(windowSizeInSecs)) > .advanceBy(Duration.ofSeconds(windowSizeInSecs)) > .grace(Duration.ofSeconds(windowSizeGraceInSecs))) > .reduce((agg, new) -> { > ... > return agg; > }) > .suppress(Suppressed.untilWindowCloses( > Suppressed.BufferConfig.unbounded())) > .toStream() > .to(outPutTopicNameOfGroupedData, Produced.with(..., ...)); > > The offset reset just and always happens (after restarting) with the > *KTABLE-SUPPRESS-STATE-STORE* internal topic created by the Kafka Stream > API. > > I have tried with the Processing guarantee *exactly once* and *at least > once*. > > Once again, I will really appreciate any clue about this. > P.S: I have also posted the question in *SO*: > > https://stackoverflow.com/questions/54145281/why-do-the-offsets-of-the-consumer-group-app-id-of-my-kafka-streams-applicatio > > > Cheers! > -- > Santilli Jonathan >