First time I'm trying to get this to work so bear with me. I'm trying to
learn checkpointing with Kafka and handling "bad" messages, restarting
without losing state.

Use Case:
Use checkpointing.
Read a stream of integers from Kafka, keep a running sum.
If a "bad" Kafka message read, restart app, skip the "bad" message, keep
state.
My stream would something look like this:

set1,5
set1,7
set1,foobar
set1,6

I want my app to keep a running sum of the integers it has seen, and restart
if it crashes without losing state. so my running sum would be:
5,
12,
app crashes and restarts
18

However, I'm finding when my app restarts, it keeps reading the bad "foobar"
message and doesnt get past it. Source code below. The mapper bombs when I
try to parse "foobar" as an Integer.
How can I modify app to get past "poison" message?

env.enableCheckpointing(1000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
env.getCheckpointConfig().setCheckpointTimeout(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new
FsStateBackend("hdfs://mymachine:9000/flink/checkpoints"));

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BROKERS);
properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
properties.setProperty("group.id", "consumerGroup1");

FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName,
new SimpleStringSchema(), properties);
DataStream<String> messageStream = env.addSource(kafkaConsumer);

DataStream<Tuple2&lt;String,Integer>> sums = messageStream
  .map(new NumberMapper())
  .keyBy(0)
  .sum(1);              
  sums.print();


        private static class NumberMapper implements
MapFunction<String,Tuple2&lt;String,Integer>> {
                public Tuple2<String,Integer> map(String input) throws 
Exception {
                        return parseData(input);
                }
                
                private Tuple2<String,Integer> parseData(String record) {
                        
                        String[] tokens = record.toLowerCase().split(",");
                        
                        // Get Key
                        String key = tokens[0];
                        
                        // Get Integer Value
                        String integerValue = tokens[1];
                        System.out.println("Trying to Parse=" + integerValue);
                        Integer value = Integer.parseInt(integerValue);
                        
                        // Build TupleBoundedOutOfOrdernessGenerator
                        return new Tuple2<String,Integer>(key, value);
                }
                
        }




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to