It's basically as you have said.

If you resume from a checkpoint/savepoint (./bin flink run -s <path>),
Flink will always use the offset that has been stored inside it.
If you don't resume from a checkpoint, it depends on how you have
configured the consumer. If you have supplied a group.id and left the other
configurations as they have been. It uses that offset from the group.
How recent that offset is, depends on your consumer configuration (synced
on checkpoint or auto-committed after each read).

On Mon, Aug 23, 2021 at 10:30 AM Pranjul Ahuja <ahuja0...@gmail.com> wrote:

> I use FlinkKafkaConsumer to consume Kafka and enable checkpoints. Now I'm
> a little confused about the offset management and checkpoint mechanism.
>
> What is the behavior if I stop the application by executing the yarn
> application -kill appId and run the start command like ./bin flink run ...?
> Will flink get the offset from a checkpoint or from the group-id managed by
> Kafka?
>
> Right now, I am doing the checkpoints on HDFS but not executing the flink
> with the checkpoint directory so I am assuming that the Kafka consumer is
> picking the offsets from kafka for the particular group.id. I have also
> validated from the kafka-consumer-groups.sh tool that once the flink is
> restarted, it picks offsets from the last commit in Kafka topic.
>
>
>

Reply via email to