What did change was the default starting position when not starting from a
checkpoint. With FlinkKafkaConsumer, it starts from the committed offsets
by default. With KafkaSource, it starts from the earliest offset.

David

On Fri, Jul 15, 2022 at 5:57 AM Chesnay Schepler <ches...@apache.org> wrote:

> I'm not sure about the previous behavior, but at the very least according
> to the documentation the behavior is identical.
>
> 1.12:
> https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>
> *setStartFromEarliest()** / **setStartFromLatest()**: Start from the
> earliest / latest record. Under these modes, committed offsets in Kafka
> will be ignored and not used as starting positions.*
>
> On 13/07/2022 18:53, Alexis Sarda-Espinosa wrote:
>
> Hello,
>
> I have a job running with Flink 1.15.0 that consumes from Kafka with the
> new KafkaSource API, setting a group ID explicitly and specifying
> OffsetsInitializer.earliest() as a starting offset. Today I restarted the
> job ignoring both savepoint and checkpoint, and the consumer started
> reading from the first available message in the broker (from 24 hours ago),
> i.e. it completely ignored the offsets that were committed to Kafka. If I
> use OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
> instead, the problem seems to go away.
>
> With the previous FlinkKafkaConsumer, using earliest didn't cause any such
> issues. Was this changed in the aforementioned way on purpose?
>
> Regards,
> Alexis.
>
>
>

Reply via email to