Hi Congxian,
Thanks for getting back. Why would the streaming start from scratch if my
consumer group does not change ? I start from the group offsets :
env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka
source")
So when I restart the job it should consume from the last committed offset
to kafka isn't it ? Let me know what you think .

Best,
Vishwas
On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <qcx978132...@gmail.com> wrote:

> Hi Vishwas
>
> Currently, Flink can only restore retained checkpoint or savepoint with
> parameter `-s`[1][2], otherwise, it will start from scratch.
>
> ```
> checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
> savepoint --> bin/flink run -s :savepointPath [:runArgs]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints
>
> Best,
> Congxian
>
>
> Vishwas Siravara <vsirav...@gmail.com> 于2019年10月9日周三 上午5:07写道:
>
>> Hi Yun,
>> Thanks for your reply. I do start from GROUP_OFFSET . Here is the code
>> snippet :
>>
>> env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
>> source")
>>
>> I have also enabled and externalized checkpointing to S3 .
>> Why is it not recommended to just restart the job once I cancel it, as
>> long as the topology does not change? What is the advantage of
>> explicitly restoring from last checkpoint by passing the -s option to the
>> flink command line if it does the same thing? For instance if 
>> s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
>> is my last successful checkpoint, what is the difference between 1 and 2.
>>
>> 1. /usr/mware/flink/bin/flink run -d -C 
>> file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
>> flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
>> 2. /usr/mware/flink/bin/flink run -s 
>> s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
>>  -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
>> flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
>>
>> Thanks,
>> Vishwas
>>
>> On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <myas...@live.com> wrote:
>>
>>> Hi Vishwas
>>>
>>> If you did not configure your
>>> org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is
>>> GROUP_OFFSET by default, which means "Start from committed offsets in ZK /
>>> Kafka brokers of a specific consumer group". And you need  to enable
>>> checkpoint so that kafka offsets are committed when checkpoint completes.
>>>
>>> In other words, even if you don't resume from checkpoint, just enable
>>> checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could
>>> restore from last committed offset if previous checkpoint completed [1][2].
>>> However, this is not really recommended, better to resume from last
>>> checkpoint [3]
>>>
>>> [1]
>>> https://www.slideshare.net/robertmetzger1/clickthrough-example-for-flinks-kafkaconsumer-checkpointing
>>> [2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
>>>
>>>
>>> Best
>>> Yun Tang
>>>
>>>
>>> ------------------------------
>>> *From:* Vishwas Siravara <vsirav...@gmail.com>
>>> *Sent:* Wednesday, October 9, 2019 0:54
>>> *To:* user <user@flink.apache.org>
>>> *Subject:* Flink restoring a job from a checkpoint
>>>
>>> Hi guys,
>>> I have a flink streaming job which streams from a kafka source. There is
>>> no state in the job, just a simple filter , map and write to a kafka sink.
>>> Suppose I stop my job and then submit the job again to the cluster with the
>>> same consumer group, will the job restore automatically from the last
>>> successful checkpoint , since this is what is the last committed offset to
>>> kafka ?
>>>
>>> Thanks,
>>> Vishwas
>>>
>>

Reply via email to