Hi Folks,
I'm trying to restart my program with restored state from a checkpoint after
a program failure (restart strategies tried but exhausted), but I'm not
picking up the restored state. What am I doing wrong here?  

*Summary*
I'm using a very simple app on 1 node just to learn checkpointing.
App reads from a socket stream and I deliberately send in some "bad" data to
throw an Exception using netcat (nc) as source. App uses  a simple file URL
as checkpoint backend. 

*Checkpoint Backend*
// specified in program:
env.setStateBackend((StateBackend)new
FsStateBackend("file:///home/hadoop/flink/checkpoints/"));

For restart strategy, I specify 3 attempts with 5 second delay between
attempts
// specified in program:
int restartAttempts = 3;
int restartDelaySeconds = 5;
long delayBetweenRestarts = restartDelaySeconds*1000;
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
delayBetweenRestarts));

*Checkpoint Backend*
*App Logic:*
All the application does is parse each line as key,Integer pair and outputs
accumulated sum to stdout. (See below)
If I start up nc -l 9999 and type values like this it works fine:
key1,5
key1,3
key1,4

However if I type in "junk" the program throws Exception trying to parse
'junk' as an Integer 
key1,junk

When the application fails, nc also stops. If I start nc before all 3
restart attempts have been tried, everything is fine and the program
restarts, picking up state where it left off.

So after all the restarts have been tried and failed, I want to restart my
program manually and pick up where I left off. Since I am specifying
checkpoint backend in program , I thought it would just pick it up from
there. Then I tried passing in the backend using the -s parameter to my
program but that doesnot work either:

flink -c <class> <jar> -s c:\home\hadoop\flink\checkpoints 




*App Source:*
public class ComputeSumFaultTolerant {

        public static void main(String[] args) throws Exception {               

                // Execution Environment
                StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
                ParameterTool parms = ParameterTool.fromArgs(args);
                env.getConfig().setGlobalJobParameters(parms);
                String host = "localhost";
                int port = 9999;
                
                System.out.println("ComputeSumFaultTolerant BEGIN");
        
                // Setup Checkpoint and Retry
                String checkpointBackendURL = 
"file:///home/hadoop/flink/checkpoints/";
                Utils.configureCheckpoint(env,checkpointBackendURL);
                Utils.configureRestartFixedDelay(env);

                // Get Our Raw Data Stream
                DataStream<Tuple2&lt;String,Long>> eventStream = env
                                .socketTextStream(host, port)
                                .map(new MessageParser())
                                .keyBy(0)
                                .sum(1);
                eventStream.print();
                
                // Execute
                env.execute("ComputeSumFaultTolerant");
        }
        
        private static class MessageParser implements
MapFunction<String,Tuple2&lt;String,Long>> {
                public Tuple2<String,Long> map(String input) throws Exception {
                        String[] tokens = input.toLowerCase().split(",");
                        String key = tokens[0];
                        Long value = Long.valueOf(tokens[1]);
                        return new Tuple2<String,Long>(key,value);
                }
        }
        
        
}

public class Utils

        public static void configureCheckpoint(StreamExecutionEnvironment env,
String checkpointBackend) throws Exception {
                // Set Up Checkpoints
                env.enableCheckpointing(5000L);
                
                // set mode to exactly-once (this is the default)
        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                
                // allow only one checkpoint to be in progress at the same time
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

                // make sure 500 ms of progress happen between checkpoints
                env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);

                // checkpoints have to complete within one minute, or are 
discarded
                env.getCheckpointConfig().setCheckpointTimeout(10000);

                // allow only one checkpoint to be in progress at the same time
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

                // Checkpoint Back-end
                env.setStateBackend((StateBackend)new 
FsStateBackend(checkpointBackend));
                
                System.out.println("CHECKPOINT IS EXTERNALIZED");
        
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
                
                System.out.println("External enabled=" +
env.getCheckpointConfig().isExternalizedCheckpointsEnabled());
                
        }
        
        public static void configureRestart(StreamExecutionEnvironment env) 
throws
Exception {
                
                // Restart Strategy
                // Fixed Delay
                int restartAttempts = 3;
                int restartDelaySeconds = 5;
                long delayBetweenRestarts = restartDelaySeconds*1000;
        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts,
delayBetweenRestarts));
                
                // Failure Rate Restart
                int failureRate = 3;   
                Time failureInterval = Time.of(5, TimeUnit.MINUTES);
                Time delayInterval = Time.of(5, TimeUnit.SECONDS);
                //
env.setRestartStrategy(RestartStrategies.failureRateRestart(failureRate,
failureInterval, delayInterval));
                
                // No Restart
                // env.setRestartStrategy(RestartStrategies.noRestart());
        }

}       









--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to