If I understood correctly you're saying that in this case I'd need to
reprocess all messages from scratch (unless I retain my checkpoints..),
Could it be a good strategy to schedule savepoints periodically to avoid
such situations? Is there any smarter solution to this?

On Fri, Oct 11, 2019 at 4:45 AM Yun Tang <myas...@live.com> wrote:

> Any checkpoint could only completed if your job not failed. Since
> checkpoint barrier is injected with messages together, if the problematic
> message would cause your job to fail. You cannot complete any checkpoint
> after that problematic message processed. In other words, you could always
> resume your job from kafka offset before that problematic message.
> Best
> Yun Tang
> ------------------------------
> *From:* Flavio Pompermaier <pomperma...@okkam.it>
> *Sent:* Friday, October 11, 2019 5:50
> *To:* Yun Tang <myas...@live.com>
> *Cc:* Congxian Qiu <qcx978132...@gmail.com>;
> theo.diefent...@scoop-software.de <theo.diefent...@scoop-software.de>;
> user <user@flink.apache.org>
> *Subject:* Re: Flink restoring a job from a checkpoint
> Sorry for the dumb question but let's suppose to not use retained
> checkpoint and my job processed billions of messages from Kafka. Then a
> problematic message causes my job to fail..am I able to complete a
> savepoint to fic the job and restart from the problematic message (i.e.
> last "valid" kafka offset)?
> Il Gio 10 Ott 2019, 20:01 Yun Tang <myas...@live.com> ha scritto:
> Hi Vishwas
> Image this scenario, if your last committed offset is A with a savepoint-A
> and then you just stop this job and try a new program logical such as print
> your output instead of writing to previous sink to do some experiments. The
> new experimental job might commit offset-B to kafka. Once verified, and
> then you still need to resume from kafka offset-A to ensure all data has
> been written to target sink. This would be easier If you just restore the
> job from savepoint-A.
> In other words, Flink has already provided a more strong and flexible
> mechanism to resume kafka offsets, why not use this?
> Best
> Yun Tang
> ------------------------------
> *From:* Congxian Qiu <qcx978132...@gmail.com>
> *Sent:* Thursday, October 10, 2019 11:52
> *To:* theo.diefent...@scoop-software.de <theo.diefent...@scoop-software.de
> >
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: Flink restoring a job from a checkpoint
> Hi Vishwas
> Sorry for the confusing, what Theo said previous is the meaning I want to
> say.  Previously, what I said is from Flink's side, if we do not restore
> from checkpoint/savepoint, all the TMs will have no state, so the Job
> starts from scratch.
> Best,
> Congxian
> theo.diefent...@scoop-software.de <theo.diefent...@scoop-software.de>
> 于2019年10月10日周四 上午1:15写道:
> Hi Vishaws,
> With "from scratch", Congxian means that Flink won't load any state
> automatically and starts as if there was no state. Of course if the kafka
> consumer group already exists and you have configured Flink to start from
> group offsets if there is no state yet, it will start from the group
> offsets.
> I think your approach is totally fine. Ignoring savepoints and don't
> retaining checkpoints saves overhead and configuration burdens and works
> nicely as long as you don't have any state in your pipeline.
> You should however be certain that nobody in your team will add something
> with state later on and forgets to think about the missing state...
> Best regards
> Theo
> -------- Ursprüngliche Nachricht --------
> Betreff: Re: Flink restoring a job from a checkpoint
> Von: Vishwas Siravara
> An: Congxian Qiu
> Cc: Yun Tang ,user
> Hi Congxian,
> Thanks for getting back. Why would the streaming start from scratch if my
> consumer group does not change ? I start from the group offsets :
> env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka
> source")
> So when I restart the job it should consume from the last committed offset
> to kafka isn't it ? Let me know what you think .
> Best,
> Vishwas
> On Tue, Oct 8, 2019 at 9:06 PM Congxian Qiu <qcx978132...@gmail.com>
> wrote:
> Hi Vishwas
> Currently, Flink can only restore retained checkpoint or savepoint with
> parameter `-s`[1][2], otherwise, it will start from scratch.
> ```
> checkpoint    --->     bin/flink run -s :checkpointMetaDataPath [:runArgs]
> savepoint --> bin/flink run -s :savepointPath [:runArgs]
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#resuming-from-savepoints
> Best,
> Congxian
> Vishwas Siravara <vsirav...@gmail.com> 于2019年10月9日周三 上午5:07写道:
> Hi Yun,
> Thanks for your reply. I do start from GROUP_OFFSET . Here is the code
> snippet :
> env.addSource(consumer.setStartFromGroupOffsets()).name(source + "- kafka 
> source")
> I have also enabled and externalized checkpointing to S3 .
> Why is it not recommended to just restart the job once I cancel it, as
> long as the topology does not change? What is the advantage of
> explicitly restoring from last checkpoint by passing the -s option to the
> flink command line if it does the same thing? For instance if 
> s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
> is my last successful checkpoint, what is the difference between 1 and 2.
> 1. /usr/mware/flink/bin/flink run -d -C 
> file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
> flink-job-assembly.jar flink druid -p 8 -cp qa_streaming
> 2. /usr/mware/flink/bin/flink run -s 
> s3://featuretoolkit.checkpoints/qa_streaming/c17f2cb6da5e6cbc897410fe49676edd/chk-1350/
>  -d -C file:///usr/mware/flink/externalconfig/ -c com.visa.flink.cli.Main 
> flink-job-assembly.jar flink druid -p 4 -cp qa_streaming
> Thanks,
> Vishwas
> On Tue, Oct 8, 2019 at 1:51 PM Yun Tang <myas...@live.com> wrote:
> Hi Vishwas
> If you did not configure your
> org.apache.flink.streaming.connectors.kafka.config.StartupMode, it is
> GROUP_OFFSET by default, which means "Start from committed offsets in ZK /
> Kafka brokers of a specific consumer group". And you need  to enable
> checkpoint so that kafka offsets are committed when checkpoint completes.
> In other words, even if you don't resume from checkpoint, just enable
> checkpoint in previous jobs and set startupMode as GROUP_OFFSET, you could
> restore from last committed offset if previous checkpoint completed [1][2].
> However, this is not really recommended, better to resume from last
> checkpoint [3]
> [1]
> https://www.slideshare.net/robertmetzger1/clickthrough-example-for-flinks-kafkaconsumer-checkpointing
> [2] https://www.ververica.com/blog/kafka-flink-a-practical-how-to
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint
> Best
> Yun Tang
> ------------------------------
> *From:* Vishwas Siravara <vsirav...@gmail.com>
> *Sent:* Wednesday, October 9, 2019 0:54
> *To:* user <user@flink.apache.org>
> *Subject:* Flink restoring a job from a checkpoint
> Hi guys,
> I have a flink streaming job which streams from a kafka source. There is
> no state in the job, just a simple filter , map and write to a kafka sink.
> Suppose I stop my job and then submit the job again to the cluster with the
> same consumer group, will the job restore automatically from the last
> successful checkpoint , since this is what is the last committed offset to
> kafka ?
> Thanks,
> Vishwas

Reply via email to