I would suggest updating the documentation to include that statement. I imagine dynamic partition discovery has no effect on this?
Regards, Alexis. Am Do., 21. Juli 2022 um 10:03 Uhr schrieb Chesnay Schepler < ches...@apache.org>: > Flink only reads the offsets from Kafka when the job is initially started > from a clear slate. > Once checkpoints are involved it only relies on offsets stored in the > state. > > On 20/07/2022 14:51, Alexis Sarda-Espinosa wrote: > > Hello again, > > I just performed a test > using OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST). I > did a few tests in the following order, and I noticed a few weird things. > Note that our job uses Processing Time windows, so watermarks are > irrelevant. > > 1. After the job had been running for a while, we manually moved the > consumer group's offset to 12 hours in the past [1] (without restarting the > job). > - After this, the consumer simply stopped reading messages - the > consumer lag in Kafka stayed at around 150k (no new data arrived) > > 2. We restarted the job with a checkpoint. > - The consumer lag in Kafka dropped down to 0, but no data was > emitted from the windows. > > 3. We stopped the job, moved the offset again, and restarted Without any > checkpoint/savepoint. > - This time the consumer correctly processed the backlog and emitted > events from the windows. > > This was done with Flink 1.15.0. > > Is this expected? In other words, if there's a mismatch between Flink's > state's offset and Kafka's offset, will the job be unable to run? > > > > [1] The command to move the offset was: > > kafka-consumer-groups.sh \ > --bootstrap-server ... \ > --topic our-topic \ > --group our-group \ > --command-config kafka-preprod.properties \ > --reset-offsets --to-datetime '2022-07-20T00:01:00.000' \ > --execute > > Regards, > Alexis. > > Am Do., 14. Juli 2022 um 22:56 Uhr schrieb Alexis Sarda-Espinosa < > sarda.espin...@gmail.com>: > >> Hi Yaroslav, >> >> The test I did was just using earliest, I'll test with committed offset >> again, thanks. >> >> Regards, >> Alexis. >> >> On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko, <yaros...@goldsky.io> >> wrote: >> >>> Hi Alexis, >>> >>> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka >>> consumer offsets? In this case, it should get the offsets from Kafka and >>> not the state. >>> >>> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa < >>> sarda.espin...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> Regarding the new Kafka source (configure with a consumer group), I >>>> found out that if I manually change the group's offset with Kafka's admin >>>> API independently of Flink (while the job is running), the Flink source >>>> will ignore that and reset it to whatever it stored internally. Is there >>>> any way to prevent this? >>>> >>>> Regards, >>>> Alexis. >>>> >>>> >