To restate the issue: When checkpointing is disabled, the Flink Kafka Consumer relies on the periodic offsets that are committed to the broker by the internal Kafka client. Such a job would, upon restart, continue from the committed offsets. However, in the situation that the job is restored from a savepoint, then the offsets within the savepoint supercede the broker-based offsets.
It seems a bit unusual to use the savepoint feature on a job that doesn't have checkpointing enabled. Makes me wonder whether `StreamExecutionEnvironment::enableCheckpointing`, is best understood as enabling +periodic+ checkpointing. The docs say that the periodic offset commit feature is not intended for fault tolerance, implying to me that you should use Flink's checkpointing feature. A great reason to use Flink checkpointing is to capture the intermediate state of the job, such as window state, in addition to the consumer offsets. I hope this helps, Eron On Thu, Jan 18, 2018 at 3:26 PM, jelmer <jkupe...@gmail.com> wrote: > I ran into a rather annoying issue today while upgrading a flink jobs > from flink 1.3.2 to 1.4.0 > > This particular job does not use checkpointing not state. > > I followed the instructions at https://ci.apache.org/ > projects/flink/flink-docs-release-1.4/ops/upgrading.html > > First created a savepoint, upgraded the cluster, then restarted the job > from the savepoint. > > This all went well until later a few hours later one of our kafka nodes > dies.This triggered an exception in the job which was subsequently > restarted. > > However instead of picking up where it left off based on the offsets > comitted to kafka (which is what should happen according to > https://ci.apache.org/projects/flink/flink-docs- > release-1.4/dev/connectors/kafka.html) the kafka offsets where reset to > the point when i made the savepoint 3 hours earlier and so it started > reprocessing millions of messages. > > Needless to say that creating a savepoint for a job without state or > checkpoints does not make that much sense. But I would not expect a restart > from a savepoint to completely break a job in the case of failure. > > I created a repository that reproduces the scenario I encountered > > https://github.com/jelmerk/flink-cancel-restart-job-without-checkpointing > > Am I misunderstanding anything or should i file a bug for this ? > > >