Hi Vishwas

This because Flink's checkpoint mechanism could offer you more ability. You 
could resume from offset within specific checkpoint instead of last committed 
offset not to mention you could benefit from restoring from last timer state, 
operator state and keyed state.

Best
Yun Tang


________________________________
From: Congxian Qiu <qcx978132...@gmail.com>
Sent: Wednesday, October 9, 2019 10:06:12 AM
To: Vishwas Siravara <vsirav...@gmail.com>
Cc: Yun Tang <myas...@live.com>; user <user@flink.apache.org>
Subject: Re: Flink restoring a job from a checkpoint

Hi Vishwas

Currently, Flink can only restore retained checkpoint or savepoint with 
parameter `-s`[1][2], otherwise, it will start from scratch.

```
checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
savepoint --> bin/flink run -s :savepointPath [:runArgs]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints

Best,
Congxian


Vishwas Siravara <vsirav...@gmail.com<mailto:vsirav...@gmail.com>> 
于2019年10月9日周三 上午5:07写道:
Hi Yun,
Thanks for your reply. I do start from GROUP_OFFSET . Here is the code snippet :

env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
source")

I have also enabled and externalized checkpointing to S3 .
Why is it not recommended to just restart the job once I cancel it, as long as 
the topology does not change? What is the advantage of explicitly restoring 
from last checkpoint by passing the -s option to the flink command line if it 
does the same thing? For instance if 
s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
 is my last successful checkpoint, what is the difference between 1 and 2.

1. /usr/mware/flink/bin/flink run -d -C file:///usr/mware/flink/externalconfig/ 
-c com.visa.flink.cli.Main flink-job-assembly.jar flink druid -p 8 -cp 
qa_streaming
2. /usr/mware/flink/bin/flink run -s 
s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
 -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
flink-job-assembly.jar flink druid -p 4 -cp qa_streaming

Thanks,
Vishwas

On Tue, Oct 8, 2019 at 1:51 PM Yun Tang 
<myas...@live.com<mailto:myas...@live.com>> wrote:
Hi Vishwas

If you did not configure your 
org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is 
GROUP_OFFSET by default, which means "Start from committed offsets in ZK / 
Kafka brokers of a specific consumer group". And you need  to enable checkpoint 
so that kafka offsets are committed when checkpoint completes.

In other words, even if you don't resume from checkpoint, just enable 
checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could 
restore from last committed offset if previous checkpoint completed [1][2]. 
However, this is not really recommended, better to resume from last checkpoint 
[3]

[1] 
https://www.slideshare.net/robertmetzger1/clickthrough-example-for-flinks-kafkaconsumer-checkpointing
[2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint


Best
Yun Tang


________________________________
From: Vishwas Siravara <vsirav...@gmail.com<mailto:vsirav...@gmail.com>>
Sent: Wednesday, October 9, 2019 0:54
To: user <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Flink restoring a job from a checkpoint

Hi guys,
I have a flink streaming job which streams from a kafka source. There is no 
state in the job, just a simple filter , map and write to a kafka sink. Suppose 
I stop my job and then submit the job again to the cluster with the same 
consumer group, will the job restore automatically from the last successful 
checkpoint , since this is what is the last committed offset to kafka ?

Thanks,
Vishwas

Reply via email to