Hello
according to https://issues.apache.org/jira/browse/FLINK-4618 
"FlinkKafkaConsumer09 should start from the next record on startup from offsets 
in Kafka".
Is the same behavior expected of FlinkKafkaConsumer010?
A document in Kafka is failing my job and I want on restart of the job (via the 
restart strategy or after stop and run again the job) processing to continue 
from the next document in the partition.
Checkpoints are enabled:

            env.enableCheckpointing(1000);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
            env.getCheckpointConfig().setCheckpointTimeout(60000);
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State 
backend is set to heap memory (checkpoints to filesystem 
"file:/tmp/checkpoints")
taskmanager_4  | 2017-11-21 17:31:42,873 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
restore state in the FlinkKafkaConsumer.
taskmanager_4  | 2017-11-21 17:31:42,875 INFO  
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer 
subtask 4 will commit offsets back to Kafka on completed checkpoints

Also, if a TM (other than the one that fails) has managed to successfully 
complete reading and processing a record from Kafka, after the job is cancelled 
and restarted, the already complete record is retrieved and processed again 
together with the failing one

flink-1.3.2
kafka_2.12-0.11.0.1

Thanks!
- Robert

Reply via email to