I think there is an error in the code snippet describing the ProcessFunction time out example : https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
@Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception { // get the state for the key that scheduled the timer CountWithTimestamp result = state.value(); // check if this is an outdated timer or the latest timer if (timestamp == result.lastModified) { // emit the state out.collect(new Tuple2<String, Long>(result.key, result.count)); } } If, as stated in the example, the CountWithTimeoutFunction should emit a key/count if no further update occurred during the minute elapsed since last update, the test should be : if (timestamp == result.lastModified + 60000) { // emit the state on timeout out.collect(new Tuple2<String, Long>(result.key, result.count)); } As stated in the javadoc of the ProcessFunction : the timestamp arg of on timer method is the timestamp of the firing timer.