Hi!

This is a valid case. This starting-offset is the offset for Kafka source
to read from when the job starts *without checkpoint*. That is to say, if
your job has been running for a while, completed several checkpoints and
then restarted, Kafka source won't read from starting-offset, but from the
offset logged in the checkpoint.

As you're only processing 2 records (which I guess takes less than a
second) no checkpoint has been completed (if you didn't change any
configurations the default checkpointing interval is 10 minutes), so the
next time you start the same job it will still read from starting-offset,
which is the latest offset by your setting.

If you would like to reprocess the second record you can set the
starting-offset to earliest (which is the default setting). The first
record will also be reprocessed but this is still valid because it is just
updating the result for the first record (which is the same as your
previous execution).

Sharon Xie <sharon.xie...@gmail.com> 于2022年1月5日周三 02:56写道:

> Can someone help me understand how Flink deals with the following scenario?
>
> I have a job that reads from a source Kafka (starting-offset: latest) and
> writes to a sink Kafka with exactly-once execution. Let's say that I have 2
> records in the source. The 1st one is processed without issue and the job
> fails when the 2nd record is processed due to a parsing error. I want to
> update the job with a fix for the 2nd record and resume processing from the
> offset of the 2nd record.
>
> However, I can't find a way to stop the job with a savepoint because the
> job is in a failed state. If I just cancel the job without a savepoint, the
> job will start from the new "latest" offset next time I start it.
>
> Is this a valid case? If so, how to handle this case so that I can resume
> processing from the 2nd record's offset after I update the job?
>
>
> Thanks,
> Sharon
>
>

Reply via email to