Hello again, I just performed a test using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I did a few tests in the following order, and I noticed a few weird things. Note that our job uses Processing Time windows, so watermarks are irrelevant.
1. After the job had been running for a while, we manually moved the consumer group's offset to 12 hours in the past [1] (without restarting the job). - After this, the consumer simply stopped reading messages - the consumer lag in Kafka stayed at around 150k (no new data arrived) 2. We restarted the job with a checkpoint. - The consumer lag in Kafka dropped down to 0, but no data was emitted from the windows. 3. We stopped the job, moved the offset again, and restarted Without any checkpoint/savepoint. - This time the consumer correctly processed the backlog and emitted events from the windows. This was done with Flink 1.15.0. Is this expected? In other words, if there's a mismatch between Flink's state's offset and Kafka's offset, will the job be unable to run? [1] The command to move the offset was: kafka-consumer-groups.sh \ --bootstrap-server ... \ --topic our-topic \ --group our-group \ --command-config kafka-preprod.properties \ --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \ --execute Regards, Alexis. Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa < sarda.espin...@gmail.com>: > Hi Yaroslav, > > The test I did was just using earliest, I'll test with committed offset > again, thanks. > > Regards, > Alexis. > > On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, <yaros...@goldsky.io> > wrote: > >> Hi Alexis, >> >> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka >> consumer offsets? In this case, it should get the offsets from Kafka and >> not the state. >> >> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa < >> sarda.espin...@gmail.com> wrote: >> >>> Hello, >>> >>> Regarding the new Kafka source (configure with a consumer group), I >>> found out that if I manually change the group's offset with Kafka's admin >>> API independently of Flink (while the job is running), the Flink source >>> will ignore that and reset it to whatever it stored internally. Is there >>> any way to prevent this? >>> >>> Regards, >>> Alexis. >>> >>>