(back to list)

state.checkpoints.dir is a configuration parameter which you set in the flink 
configuration itself (see [1]). This will be used for checkpoint metadata only 
(for RocksDB and Fs) while the checkpoints themselves are stored in the given 
directory.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
checkpoints.html#directory-structure

On Tuesday, 11 July 2017 10:32:25 CEST Aftab Ansari wrote:
> Thanks nico,
> I am trying to go for externalized checkpoint. But the below codes throws
> error: "Caused by: java.lang.IllegalStateException: CheckpointConfig says
> to persist periodic checkpoints, but no checkpoint directory has been
> configured. You can configure configure one via key 'state.checkpoints.dir"
> 
>  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
>  //specify backend
>  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true));
> env.setStateBackend(new
> FsStateBackend("file:///Users/aftabansari/flink-state/", true));
>  //enable checkpoint
>  env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.Ex
> ternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 
> after stateBackend, do I need to configure another dir for
> checkpoints? How can I set this configuration in main method like I
> did for stateBackend ?
> 
> BR,
> 
> On 10 July 2017 at 17:06, Nico Kruber <n...@data-artisans.com> wrote:
> > Hi Aftab,
> > looks like what you want is either an externalized checkpoint with
> > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2].
> > 
> > Ordinary checkpoints are deleted when the job is cancelled and only serve
> > as a
> > fault tolerance layer in case something goes wrong, i.e. machines fail, so
> > that the job can be restarted automatically based on the restart policy.
> > 
> > 
> > Nico
> > 
> > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > checkpoints.html
> > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
> > savepoints.html
> > 
> > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote:
> > > Hi,
> > > I am new to flink. I am facing issue implementing checkpoint.
> > > 
> > > checkpoint related code:
> > > 
> > > long checkpointInterval = 5000;
> > > 
> > >  StreamExecutionEnvironment env = StreamUtils.getEnvironment(params);
> > >  //specify backend
> > >  //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"),
> > 
> > true));
> > 
> > > env.setStateBackend(new
> > > FsStateBackend("file:///Users/aftabansari/flink-state/", true));
> > > 
> > >  //enable checkpoint
> > >  env.enableCheckpointing(checkpointInterval,
> > > 
> > > CheckpointingMode.EXACTLY_ONCE);
> > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
> > > 
> > > 
> > > When I run the code, I can see flink-state being written in my local
> > > machine. but when I stop the job , wait for a few minutes and restart
> > > the
> > > job, it does not pick up from the time it left but it starts from when I
> > > started the job.
> > > 
> > > Could you point out what i am doing wrong. I am testing it locally from
> > > ideaIntellij. below is what i see from localhost. Any help would be
> > > appreciated. Thanks
> > > [image: Inline images 1]
> > > Br,

Attachment: signature.asc
Description: This is a digitally signed message part.

Reply via email to