To restate the issue:
When checkpointing is disabled, the Flink Kafka Consumer relies on the
periodic offsets that are committed to the broker by the internal Kafka
client.  Such a job would, upon restart, continue from the committed
offsets.   However, in the situation that the job is restored from a
savepoint, then the offsets within the savepoint supercede the broker-based
offsets.

It seems a bit unusual to use the savepoint feature on a job that doesn't
have checkpointing enabled.  Makes me wonder whether
`StreamExecutionEnvironment::enableCheckpointing`, is best understood as
enabling +periodic+ checkpointing.

The docs say that the periodic offset commit feature is not intended for
fault tolerance, implying to me that you should use Flink's checkpointing
feature.  A great reason to use Flink checkpointing is to capture the
intermediate state of the job, such as window state, in addition to the
consumer offsets.

I hope this helps,
Eron





On Thu, Jan 18, 2018 at 3:26 PM, jelmer <jkupe...@gmail.com> wrote:

> I ran into a rather annoying issue today while upgrading a  flink jobs
> from flink 1.3.2 to 1.4.0
>
> This particular job does not use checkpointing not state.
>
> I followed the instructions at https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/ops/upgrading.html
>
> First created a savepoint, upgraded the cluster, then restarted the job
> from the savepoint.
>
> This all went well until later a few hours later one of our kafka nodes
> dies.This triggered an exception in the job which was subsequently
> restarted.
>
> However instead of picking up where it left off based on the offsets
> comitted to kafka (which is what should happen according to
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/connectors/kafka.html)  the kafka offsets where reset to
> the point when i made the savepoint 3 hours earlier and so it started
> reprocessing millions of messages.
>
> Needless to say that creating a savepoint for a job without state or
> checkpoints does not make that much sense. But I would not expect a restart
> from a savepoint to completely break a job in the case of failure.
>
> I created a repository that reproduces the scenario I encountered
>
> https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing
>
> Am I misunderstanding anything or should i file a bug for this ?
>
>
>

Reply via email to