Hi Folks, I'm trying to restart my program with restored state from a checkpoint after a program failure (restart strategies tried but exhausted), but I'm not picking up the restored state. What am I doing wrong here?
*Summary* I'm using a very simple app on 1 node just to learn checkpointing. App reads from a socket stream and I deliberately send in some "bad" data to throw an Exception using netcat (nc) as source. App uses a simple file URL as checkpoint backend. *Checkpoint Backend* // specified in program: env.setStateBackend((StateBackend)new FsStateBackend("file:///home/hadoop/flink/checkpoints/")); For restart strategy, I specify 3 attempts with 5 second delay between attempts // specified in program: int restartAttempts = 3; int restartDelaySeconds = 5; long delayBetweenRestarts = restartDelaySeconds*1000; env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, delayBetweenRestarts)); *Checkpoint Backend* *App Logic:* All the application does is parse each line as key,Integer pair and outputs accumulated sum to stdout. (See below) If I start up nc -l 9999 and type values like this it works fine: key1,5 key1,3 key1,4 However if I type in "junk" the program throws Exception trying to parse 'junk' as an Integer key1,junk When the application fails, nc also stops. If I start nc before all 3 restart attempts have been tried, everything is fine and the program restarts, picking up state where it left off. So after all the restarts have been tried and failed, I want to restart my program manually and pick up where I left off. Since I am specifying checkpoint backend in program , I thought it would just pick it up from there. Then I tried passing in the backend using the -s parameter to my program but that doesnot work either: flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints *App Source:* public class ComputeSumFaultTolerant { public static void main(String[] args) throws Exception { // Execution Environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ParameterTool parms = ParameterTool.fromArgs(args); env.getConfig().setGlobalJobParameters(parms); String host = "localhost"; int port = 9999; System.out.println("ComputeSumFaultTolerant BEGIN"); // Setup Checkpoint and Retry String checkpointBackendURL = "file:///home/hadoop/flink/checkpoints/"; Utils.configureCheckpoint(env,checkpointBackendURL); Utils.configureRestartFixedDelay(env); // Get Our Raw Data Stream DataStream<Tuple2<String,Long>> eventStream = env .socketTextStream(host, port) .map(new MessageParser()) .keyBy(0) .sum(1); eventStream.print(); // Execute env.execute("ComputeSumFaultTolerant"); } private static class MessageParser implements MapFunction<String,Tuple2<String,Long>> { public Tuple2<String,Long> map(String input) throws Exception { String[] tokens = input.toLowerCase().split(","); String key = tokens[0]; Long value = Long.valueOf(tokens[1]); return new Tuple2<String,Long>(key,value); } } } public class Utils public static void configureCheckpoint(StreamExecutionEnvironment env, String checkpointBackend) throws Exception { // Set Up Checkpoints env.enableCheckpointing(5000L); // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(10000); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Checkpoint Back-end env.setStateBackend((StateBackend)new FsStateBackend(checkpointBackend)); System.out.println("CHECKPOINT IS EXTERNALIZED"); env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); System.out.println("External enabled=" + env.getCheckpointConfig().isExternalizedCheckpointsEnabled()); } public static void configureRestart(StreamExecutionEnvironment env) throws Exception { // Restart Strategy // Fixed Delay int restartAttempts = 3; int restartDelaySeconds = 5; long delayBetweenRestarts = restartDelaySeconds*1000; env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, delayBetweenRestarts)); // Failure Rate Restart int failureRate = 3; Time failureInterval = Time.of(5, TimeUnit.MINUTES); Time delayInterval = Time.of(5, TimeUnit.SECONDS); // env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate, failureInterval, delayInterval)); // No Restart // env.setRestartStrategy(RestartStrategies.noRestart()); } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/