What did change was the default starting position when not starting from a checkpoint. With FlinkKafkaConsumer, it starts from the committed offsets by default. With KafkaSource, it starts from the earliest offset.
David On Fri, Jul 15, 2022 at 5:57 AM Chesnay Schepler <ches...@apache.org> wrote: > I'm not sure about the previous behavior, but at the very least according > to the documentation the behavior is identical. > > 1.12: > https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-start-position-configuration > > *setStartFromEarliest()** / **setStartFromLatest()**: Start from the > earliest / latest record. Under these modes, committed offsets in Kafka > will be ignored and not used as starting positions.* > > On 13/07/2022 18:53, Alexis Sarda-Espinosa wrote: > > Hello, > > I have a job running with Flink 1.15.0 that consumes from Kafka with the > new KafkaSource API, setting a group ID explicitly and specifying > OffsetsInitializer.earliest() as a starting offset. Today I restarted the > job ignoring both savepoint and checkpoint, and the consumer started > reading from the first available message in the broker (from 24 hours ago), > i.e. it completely ignored the offsets that were committed to Kafka. If I > use OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST) > instead, the problem seems to go away. > > With the previous FlinkKafkaConsumer, using earliest didn't cause any such > issues. Was this changed in the aforementioned way on purpose? > > Regards, > Alexis. > > >