Hi,

I am trying to write a simple streaming program to count values from a
Kafka topic in a fault tolerant manner, like this
<https://gist.github.com/victorpoluceno/8690df8459bf3afd60477f83ec78f7a8>:

<code>
val config: Configuration = new Configuration()
config.setString(ConfigConstants.STATE_BACKEND, "filesystem")
config.setString("state.backend.fs.checkpointdir", "file:///tmp/flink")

val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(10)

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

val stream = env
    .addSource(new FlinkKafkaConsumer010[String]("test", new
SimpleStringSchema(), properties))
    .map((_, 1))
    .keyBy(_._1)
    .mapWithState((in: (String, Int), count: Option[Int]) => {
      val newCount = in._2 + count.getOrElse(0)
      ((in._1, newCount), Some(newCount))
    }).print

env.execute("Job")
</code>

The idea is to use the filesystem state backend to persist the computation
state (count) and to restore the computation state in case of failure or
restart. I have a program that inject the same key on Kafka. But I am
unable to make Flink work correctly, every time the Flink restarts the
value from state is empty, so the count starts from zero. What am I missing
here?

I am running this on a local environment (sbt run) with Flink 1.3.1, Java
1.8.0_131, and Ubuntu 16.04.

-- 
hooray!

--
Victor Godoy Poluceno

Reply via email to