Hi, I am trying to write a simple streaming program to count values from a Kafka topic in a fault tolerant manner, like this <https://gist.github.com/victorpoluceno/8690df8459bf3afd60477f83ec78f7a8>:
<code> val config: Configuration = new Configuration() config.setString(ConfigConstants.STATE_BACKEND, "filesystem") config.setString("state.backend.fs.checkpointdir", "file:///tmp/flink") val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(10) val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); val stream = env .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties)) .map((_, 1)) .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => { val newCount = in._2 + count.getOrElse(0) ((in._1, newCount), Some(newCount)) }).print env.execute("Job") </code> The idea is to use the filesystem state backend to persist the computation state (count) and to restore the computation state in case of failure or restart. I have a program that inject the same key on Kafka. But I am unable to make Flink work correctly, every time the Flink restarts the value from state is empty, so the count starts from zero. What am I missing here? I am running this on a local environment (sbt run) with Flink 1.3.1, Java 1.8.0_131, and Ubuntu 16.04. -- hooray! -- Victor Godoy Poluceno