Hello,
I'm looking at the following page of the documentation https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html particularly at this piece of code: val stream: DataStream[(String, Int)] = ... val counts: DataStream[(String, Int)] = stream .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => count match { case Some(c) => ( (in._1, c), Some(c + in._2) ) case None => ( (in._1, 0), Some(in._2) ) }) How is the state clear/purge in this case for keys that no longer appear? Thank you, Juan