Hi,

I think you need to specify the directory of an concrete checkpoint instead of 
the root directory for checkpoints to restore the states. The directory name 
should be like chk-${id}.

The job id will change if you re-submit the job, so jobmanager is not able to 
recognize the retained checkpoint of the previous submission although you are 
using the same checkpoint root dir.

Best,
Paul Lam

> 在 2018年10月18日,09:51,chrisr123 <chris.rueg...@gmail.com> 写道:
> 
> Hi Folks,
> I'm trying to restart my program with restored state from a checkpoint after
> a program failure (restart strategies tried but exhausted), but I'm not
> picking up the restored state. What am I doing wrong here?  
> 
> *Summary*
> I'm using a very simple app on 1 node just to learn checkpointing.
> App reads from a socket stream and I deliberately send in some "bad" data to
> throw an Exception using netcat (nc) as source. App uses  a simple file URL
> as checkpoint backend. 
> 
> *Checkpoint Backend*
> // specified in program:
> env.setStateBackend((StateBackend)new
> FsStateBackend("file:///home/hadoop/flink/checkpoints/"));
> 
> For restart strategy, I specify 3 attempts with 5 second delay between
> attempts
> // specified in program:
> int restartAttempts = 3;
> int restartDelaySeconds = 5;
> long delayBetweenRestarts = restartDelaySeconds*1000;
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
> delayBetweenRestarts));
> 
> *Checkpoint Backend*
> *App Logic:*
> All the application does is parse each line as key,Integer pair and outputs
> accumulated sum to stdout. (See below)
> If I start up nc -l 9999 and type values like this it works fine:
> key1,5
> key1,3
> key1,4
> 
> However if I type in "junk" the program throws Exception trying to parse
> 'junk' as an Integer 
> key1,junk
> 
> When the application fails, nc also stops. If I start nc before all 3
> restart attempts have been tried, everything is fine and the program
> restarts, picking up state where it left off.
> 
> So after all the restarts have been tried and failed, I want to restart my
> program manually and pick up where I left off. Since I am specifying
> checkpoint backend in program , I thought it would just pick it up from
> there. Then I tried passing in the backend using the -s parameter to my
> program but that doesnot work either:
> 
> flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints 
> 
> 
> 
> 
> *App Source:*
> public class ComputeSumFaultTolerant {
> 
>       public static void main(String[] args) throws Exception {               
> 
>               // Execution Environment
>               StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>               ParameterTool parms = ParameterTool.fromArgs(args);
>               env.getConfig().setGlobalJobParameters(parms);
>               String host = "localhost";
>               int port = 9999;
>               
>               System.out.println("ComputeSumFaultTolerant BEGIN");
>       
>               // Setup Checkpoint and Retry
>               String checkpointBackendURL = 
> "file:///home/hadoop/flink/checkpoints/";
>               Utils.configureCheckpoint(env,checkpointBackendURL);
>               Utils.configureRestartFixedDelay(env);
> 
>               // Get Our Raw Data Stream
>               DataStream<Tuple2&lt;String,Long>> eventStream = env
>                               .socketTextStream(host, port)
>                               .map(new MessageParser())
>                               .keyBy(0)
>                               .sum(1);
>               eventStream.print();
>               
>               // Execute
>               env.execute("ComputeSumFaultTolerant");
>       }
>       
>       private static class MessageParser implements
> MapFunction<String,Tuple2&lt;String,Long>> {
>               public Tuple2<String,Long> map(String input) throws Exception {
>                       String[] tokens = input.toLowerCase().split(",");
>                       String key = tokens[0];
>                       Long value = Long.valueOf(tokens[1]);
>                       return new Tuple2<String,Long>(key,value);
>               }
>       }
>       
>       
> }
> 
> public class Utils
> 
>       public static void configureCheckpoint(StreamExecutionEnvironment env,
> String checkpointBackend) throws Exception {
>               // Set Up Checkpoints
>               env.enableCheckpointing(5000L);
>               
>               // set mode to exactly-once (this is the default)
>       
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>               
>               // allow only one checkpoint to be in progress at the same time
>               env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
>               // make sure 500 ms of progress happen between checkpoints
>               env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
> 
>               // checkpoints have to complete within one minute, or are 
> discarded
>               env.getCheckpointConfig().setCheckpointTimeout(10000);
> 
>               // allow only one checkpoint to be in progress at the same time
>               env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
>               // Checkpoint Back-end
>               env.setStateBackend((StateBackend)new 
> FsStateBackend(checkpointBackend));
>               
>               System.out.println("CHECKPOINT IS EXTERNALIZED");
>       
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>               
>               System.out.println("External enabled=" +
> env.getCheckpointConfig().isExternalizedCheckpointsEnabled());
>               
>       }
>       
>       public static void configureRestart(StreamExecutionEnvironment env) 
> throws
> Exception {
>               
>               // Restart Strategy
>               // Fixed Delay
>               int restartAttempts = 3;
>               int restartDelaySeconds = 5;
>               long delayBetweenRestarts = restartDelaySeconds*1000;
>       
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
> delayBetweenRestarts));
>               
>               // Failure Rate Restart
>               int failureRate = 3;   
>               Time failureInterval = Time.of(5, TimeUnit.MINUTES);
>               Time delayInterval = Time.of(5, TimeUnit.SECONDS);
>               //
> env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate,
> failureInterval, delayInterval));
>               
>               // No Restart
>               // env.setRestartStrategy(RestartStrategies.noRestart());
>       }
> 
> }     
> 
> 
> 
> 
> 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to