Sorry for the dumb question but let's suppose to not use retained checkpoint and my job processed billions of messages from Kafka. Then a problematic message causes my job to fail..am I able to complete a savepoint to fic the job and restart from the problematic message (i.e. last "valid" kafka offset)?
Il Gio 10 Ott 2019, 20:01 Yun Tang <myas...@live.com> ha scritto: > Hi Vishwas > > Image this scenario, if your last committed offset is A with a savepoint-A > and then you just stop this job and try a new program logical such as print > your output instead of writing to previous sink to do some experiments. The > new experimental job might commit offset-B to kafka. Once verified, and > then you still need to resume from kafka offset-A to ensure all data has > been written to target sink. This would be easier If you just restore the > job from savepoint-A. > > In other words, Flink has already provided a more strong and flexible > mechanism to resume kafka offsets, why not use this? > > Best > Yun Tang > ------------------------------ > *From:* Congxian Qiu <qcx978132...@gmail.com> > *Sent:* Thursday, October 10, 2019 11:52 > *To:* theo.diefent...@scoop-software.de <theo.diefent...@scoop-software.de > > > *Cc:* user <user@flink.apache.org> > *Subject:* Re: Flink restoring a job from a checkpoint > > Hi Vishwas > > Sorry for the confusing, what Theo said previous is the meaning I want to > say. Previously, what I said is from Flink's side, if we do not restore > from checkpoint/savepoint, all the TMs will have no state, so the Job > starts from scratch. > > Best, > Congxian > > > theo.diefent...@scoop-software.de <theo.diefent...@scoop-software.de> > 于2019年10月10日周四 上午1:15写道: > > Hi Vishaws, > > With "from scratch", Congxian means that Flink won't load any state > automatically and starts as if there was no state. Of course if the kafka > consumer group already exists and you have configured Flink to start from > group offsets if there is no state yet, it will start from the group > offsets. > > I think your approach is totally fine. Ignoring savepoints and don't > retaining checkpoints saves overhead and configuration burdens and works > nicely as long as you don't have any state in your pipeline. > > You should however be certain that nobody in your team will add something > with state later on and forgets to think about the missing state... > > Best regards > Theo > > > > > -------- Ursprüngliche Nachricht -------- > Betreff: Re: Flink restoring a job from a checkpoint > Von: Vishwas Siravara > An: Congxian Qiu > Cc: Yun Tang ,user > > Hi Congxian, > Thanks for getting back. Why would the streaming start from scratch if my > consumer group does not change ? I start from the group offsets : > env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka > source") > So when I restart the job it should consume from the last committed offset > to kafka isn't it ? Let me know what you think . > > Best, > Vishwas > On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <qcx978132...@gmail.com> > wrote: > > Hi Vishwas > > Currently, Flink can only restore retained checkpoint or savepoint with > parameter `-s`[1][2], otherwise, it will start from scratch. > > ``` > checkpoint ---> bin/flink run -s :checkpointMetaDataPath [:runArgs] > savepoint --> bin/flink run -s :savepointPath [:runArgs] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint > [2] > https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints > > Best, > Congxian > > > Vishwas Siravara <vsirav...@gmail.com> 于2019年10月9日周三 上午5:07写道: > > Hi Yun, > Thanks for your reply. I do start from GROUP_OFFSET . Here is the code > snippet : > > env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka > source") > > I have also enabled and externalized checkpointing to S3 . > Why is it not recommended to just restart the job once I cancel it, as > long as the topology does not change? What is the advantage of > explicitly restoring from last checkpoint by passing the -s option to the > flink command line if it does the same thing? For instance if > s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ > is my last successful checkpoint, what is the difference between 1 and 2. > > 1. /usr/mware/flink/bin/flink run -d -C > file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main > flink-job-assembly.jar flink druid -p 8 -cp qa_streaming > 2. /usr/mware/flink/bin/flink run -s > s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/ > -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main > flink-job-assembly.jar flink druid -p 4 -cp qa_streaming > > Thanks, > Vishwas > > On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <myas...@live.com> wrote: > > Hi Vishwas > > If you did not configure your > org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is > GROUP_OFFSET by default, which means "Start from committed offsets in ZK / > Kafka brokers of a specific consumer group". And you need to enable > checkpoint so that kafka offsets are committed when checkpoint completes. > > In other words, even if you don't resume from checkpoint, just enable > checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could > restore from last committed offset if previous checkpoint completed [1][2]. > However, this is not really recommended, better to resume from last > checkpoint [3] > > [1] > https://www.slideshare.net/robertmetzger1/clickthrough-example-for-flinks-kafkaconsumer-checkpointing > [2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to > [3] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint > > > Best > Yun Tang > > > ------------------------------ > *From:* Vishwas Siravara <vsirav...@gmail.com> > *Sent:* Wednesday, October 9, 2019 0:54 > *To:* user <user@flink.apache.org> > *Subject:* Flink restoring a job from a checkpoint > > Hi guys, > I have a flink streaming job which streams from a kafka source. There is > no state in the job, just a simple filter , map and write to a kafka sink. > Suppose I stop my job and then submit the job again to the cluster with the > same consumer group, will the job restore automatically from the last > successful checkpoint , since this is what is the last committed offset to > kafka ? > > Thanks, > Vishwas > >