Hi Vishwas Currently, Flink can only restore retained checkpoint or savepoint with parameter `-s`[1][2], otherwise, it will start from scratch.
``` checkpoint ---> bin/flink run -s :checkpointMetaDataPath [:runArgs] savepoint --> bin/flink run -s :savepointPath [:runArgs] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint [2] https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints Best, Congxian Vishwas Siravara <vsirav...@gmail.com> 于2019年10月9日周三 上午5:07写道: > Hi Yun, > Thanks for your reply. I do start from GROUP_OFFSET . Here is the code > snippet : > > env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka > source") > > I have also enabled and externalized checkpointing to S3 . > Why is it not recommended to just restart the job once I cancel it, as > long as the topology does not change? What is the advantage of > explicitly restoring from last checkpoint by passing the -s option to the > flink command line if it does the same thing? For instance if > s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ > is my last successful checkpoint, what is the difference between 1 and 2. > > 1. /usr/mware/flink/bin/flink run -d -C > file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main > flink-job-assembly.jar flink druid -p 8 -cp qa_streaming > 2. /usr/mware/flink/bin/flink run -s > s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ > -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main > flink-job-assembly.jar flink druid -p 4 -cp qa_streaming > > Thanks, > Vishwas > > On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <myas...@live.com> wrote: > >> Hi Vishwas >> >> If you did not configure your >> org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is >> GROUP_OFFSET by default, which means "Start from committed offsets in ZK / >> Kafka brokers of a specific consumer group". And you need to enable >> checkpoint so that kafka offsets are committed when checkpoint completes. >> >> In other words, even if you don't resume from checkpoint, just enable >> checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could >> restore from last committed offset if previous checkpoint completed [1][2]. >> However, this is not really recommended, better to resume from last >> checkpoint [3] >> >> [1] >> https://www.slideshare.net/robertmetzger1/clickthrough-example-for-flinks-kafkaconsumer-checkpointing >> [2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to >> [3] >> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint >> >> >> Best >> Yun Tang >> >> >> ------------------------------ >> *From:* Vishwas Siravara <vsirav...@gmail.com> >> *Sent:* Wednesday, October 9, 2019 0:54 >> *To:* user <user@flink.apache.org> >> *Subject:* Flink restoring a job from a checkpoint >> >> Hi guys, >> I have a flink streaming job which streams from a kafka source. There is >> no state in the job, just a simple filter , map and write to a kafka sink. >> Suppose I stop my job and then submit the job again to the cluster with the >> same consumer group, will the job restore automatically from the last >> successful checkpoint , since this is what is the last committed offset to >> kafka ? >> >> Thanks, >> Vishwas >> >