Hello according to https://issues.apache.org/jira/browse/FLINK-4618 "FlinkKafkaConsumer09 should start from the next record on startup from offsets in Kafka". Is the same behavior expected of FlinkKafkaConsumer010? A document in Kafka is failing my job and I want on restart of the job (via the restart strategy or after stop and run again the job) processing to continue from the next document in the partition. Checkpoints are enabled:
env.enableCheckpointing(1000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); INFO org.apache.flink.streaming.runtime.tasks.StreamTask - State backend is set to heap memory (checkpoints to filesystem "file:/tmp/checkpoints") taskmanager_4 | 2017-11-21 17:31:42,873 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting restore state in the FlinkKafkaConsumer. taskmanager_4 | 2017-11-21 17:31:42,875 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 4 will commit offsets back to Kafka on completed checkpoints Also, if a TM (other than the one that fails) has managed to successfully complete reading and processing a record from Kafka, after the job is cancelled and restarted, the already complete record is retrieved and processed again together with the failing one flink-1.3.2 kafka_2.12-0.11.0.1 Thanks! - Robert