We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old
state. After restoring state from a checkpoint, it seems like a timer had
been restored, but not the data that was expected to be in a related
MapState if such timer has been added.

The way I see this is that there's a bug, either of these:
- The writing of timers & map states to Flink state is not synchronized (or
maybe there are no such guarantees by design?)
- Flink may restore a checkpoint that is actually corrupted/incomplete

Our code (simplified):

    private MapState<String, String> mapState;

    public void processElement(..) {
            mapState.put("lastUpdated", ctx.timestamp().toString());
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
stateRetentionMillis);
    }

    public void onTimer(long timestamp, OnTimerContext ctx, ..) {
        long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
        if (timestamp >= lastUpdated + stateRetentionMillis) {
            mapState.clear();
        }
    }

Normally this "just works". As you can see, it shouldn't be possible that
"lastUpdated" doesn't exist in state if timer was registered and onTimer
gets called.

However, after restoring state from a checkpoint, the job kept failing with
this error:

Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:552)
at java.lang.Long.parseLong(Long.java:631)
at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
..

So apparently onTimer was called but lastUpdated wasn't found in the
MapState.

The background for restoring state in this case is not entirely clean.
There was an OS level issue "Too many open files" after running a job for
~11 days. To fix that, we replaced the cluster with a new one and launched
the Flink job again. State was successfully restored from the latest
checkpoint that had been created by the "problematic execution". Now, I'm
assuming that if the state wouldn't have been created successfully,
restoring wouldn't succeed either – correct? This is just to rule out that
the issue with state didn't happen because the checkpoint files were
somehow corrupted due to the Too many open files problem.

Thank you all for your continued support!

P.S. I would be very much interested to hear if there's some cleaner way to
achieve this kind of TTL for keyed state in Flink.

Reply via email to