Hi, I'm using Flink 1.2.0 and try to do "exactly once" data transfer from Kafka to Elasticsearch, but I cannot. (Scala 2.11, Kafka 0.10, without YARN)
There are 2 Flink TaskManager nodes, and when processing with 2 parallelism, shutdown one of them (simulating node failure). Using flink-connector-kafka, I wrote following code: StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); env.enableCheckpointing(1000L); env.setParallelism(2); Properties kafkaProp = new Properties(); kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092"); kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181"); kafkaProp.setProperty("group.id", "id"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>( "topic", new SimpleStringSchema(), kafkaProp)); I found duplicated data transfer on map function. Data from the checkpoint before node failure seems duplicated. Is there any way to achieve "exactly once" on failure? Thanks. Yuichiro