(back to list) state.checkpoints.dir is a configuration parameter which you set in the flink configuration itself (see [1]). This will be used for checkpoint metadata only (for RocksDB and Fs) while the checkpoints themselves are stored in the given directory.
Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ checkpoints.html#directory-structure On Tuesday, 11 July 2017 10:32:25 CEST Aftab Ansari wrote: > Thanks nico, > I am trying to go for externalized checkpoint. But the below codes throws > error: "Caused by: java.lang.IllegalStateException: CheckpointConfig says > to persist periodic checkpoints, but no checkpoint directory has been > configured. You can configure configure one via key 'state.checkpoints.dir" > > StreamExecutionEnvironment env = StreamUtils.getEnvironment(params); > //specify backend > //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), true)); > env.setStateBackend(new > FsStateBackend("file:///Users/aftabansari/flink-state/", true)); > //enable checkpoint > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.Ex > ternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > after stateBackend, do I need to configure another dir for > checkpoints? How can I set this configuration in main method like I > did for stateBackend ? > > BR, > > On 10 July 2017 at 17:06, Nico Kruber <n...@data-artisans.com> wrote: > > Hi Aftab, > > looks like what you want is either an externalized checkpoint with > > RETAIN_ON_CANCELLATION mode [1] or a savepoint [2]. > > > > Ordinary checkpoints are deleted when the job is cancelled and only serve > > as a > > fault tolerance layer in case something goes wrong, i.e. machines fail, so > > that the job can be restarted automatically based on the restart policy. > > > > > > Nico > > > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ > > checkpoints.html > > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ > > savepoints.html > > > > On Monday, 10 July 2017 14:20:53 CEST Aftab Ansari wrote: > > > Hi, > > > I am new to flink. I am facing issue implementing checkpoint. > > > > > > checkpoint related code: > > > > > > long checkpointInterval = 5000; > > > > > > StreamExecutionEnvironment env = StreamUtils.getEnvironment(params); > > > //specify backend > > > //env.setStateBackend(new FsStateBackend("s3n://xxx/flink-state/"), > > > > true)); > > > > > env.setStateBackend(new > > > FsStateBackend("file:///Users/aftabansari/flink-state/", true)); > > > > > > //enable checkpoint > > > env.enableCheckpointing(checkpointInterval, > > > > > > CheckpointingMode.EXACTLY_ONCE); > > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); > > > > > > > > > When I run the code, I can see flink-state being written in my local > > > machine. but when I stop the job , wait for a few minutes and restart > > > the > > > job, it does not pick up from the time it left but it starts from when I > > > started the job. > > > > > > Could you point out what i am doing wrong. I am testing it locally from > > > ideaIntellij. below is what i see from localhost. Any help would be > > > appreciated. Thanks > > > [image: Inline images 1] > > > Br,
signature.asc
Description: This is a digitally signed message part.