[ https://issues.apache.org/jira/browse/FLINK-4617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthew Barlocker updated FLINK-4617: ------------------------------------- Affects Version/s: 1.0.1 1.0.2 1.0.3 > Kafka & Flink duplicate messages on restart > ------------------------------------------- > > Key: FLINK-4617 > URL: https://issues.apache.org/jira/browse/FLINK-4617 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, State Backends, Checkpointing > Affects Versions: 1.1.0, 1.0.1, 1.0.2, 1.0.3, 1.1.1, 1.1.2 > Environment: Ubuntu 16.04 > Flink 1.1.* > Kafka 0.9.0.1 > Scala 2.11.7 > Java 1.8.0_91 > Reporter: Matthew Barlocker > Priority: Critical > > [StackOverflow > Link|http://stackoverflow.com/questions/39459315/kafka-flink-duplicate-messages-on-restart] > Flink (the kafka connector) re-runs the last 3-9 messages it saw before it > was shut down. > *My code:* > {code} > import java.util.Properties > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.api.CheckpointingMode > import org.apache.flink.streaming.connectors.kafka._ > import org.apache.flink.streaming.util.serialization._ > import org.apache.flink.runtime.state.filesystem._ > object Runner { > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.enableCheckpointing(500) > env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints")) > > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) > val properties = new Properties() > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "testing"); > val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new > SimpleStringSchema(), properties) > val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", > "testing-out", new SimpleStringSchema()) > env.addSource(kafkaConsumer) > .addSink(kafkaProducer) > env.execute() > } > } > {code} > *My sbt dependencies:* > {code} > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-scala" % "1.1.2", > "org.apache.flink" %% "flink-streaming-scala" % "1.1.2", > "org.apache.flink" %% "flink-clients" % "1.1.2", > "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2", > "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2" > ) > {code} > *My process:* > using 3 terminals: > {code} > TERM-1 start sbt, run program > TERM-2 create kafka topics testing-in and testing-out > TERM-2 run kafka-console-producer on testing-in topic > TERM-3 run kafka-console-consumer on testing-out topic > TERM-2 send data to kafka producer. > Wait for a couple seconds (buffers need to flush) > TERM-3 watch data appear in testing-out topic > Wait for at least 500 milliseconds for checkpointing to happen > TERM-1 stop sbt > TERM-1 run sbt > TERM-3 watch last few lines of data appear in testing-out topic > {code} > *My expectations:* > When there are no errors in the system, I expect to be able to turn flink on > and off without reprocessing messages that successfully completed the stream > in a prior run. > *My attempts to fix:* > I've added the call to setStateBackend, thinking that perhaps the default > memory backend just didn't remember correctly. That didn't seem to help. > I've removed the call to enableCheckpointing, hoping that perhaps there was a > separate mechanism to track state in Flink vs Zookeeper. That didn't seem to > help. > I've used different sinks, RollingFileSink, print(); hoping that maybe the > bug was in kafka. That didn't seem to help. > I've rolled back to flink (and all connectors) v1.1.0 and v1.1.1, hoping that > maybe the bug was in the latest version. That didn't seem to help. > I've added the zookeeper.connect config to the properties object, hoping that > the comment about it only being useful in 0.8 was wrong. That didn't seem to > help. > I've explicitly set the checkpointing mode to EXACTLY_ONCE (good idea > drfloob). That didn't seem to help. -- This message was sent by Atlassian JIRA (v6.3.4#6332)