What I would like to see from the logs is (also depending a bit on your log level):
- all exceptions. - in which context exactly the „too many open files“ problem occurred, because I think for checkpoint consistency it should not matter as a checkpoint with such a problem should never succeed. - files that are written for checkpoints/savepoints. - completed checkpoints/savepoints ids. - the restored checkpoint/savepoint id. - files that are loaded on restore. > Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.au...@rovio.com>: > > Thanks all. I'll have to see about sharing the logs & configuration.. > > Is there something special that you'd like to see from the logs? It may be > easier for me to get specific lines and obfuscate sensitive information > instead of trying to do that for the full logs. > > We basically have: RocksDBStateBackend with > enableIncrementalCheckpointing=true, external state path on s3. > > The code that we use is: > > env.setStateBackend(getStateBackend(statePath, new > RocksDBStateBackend(statePath, true))); > > env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause", > 60 * 1000)); > > env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent", > 1)); > > env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout", > 10 * 60 * 1000)); > > env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > > The problematic state that we tried to use was a checkpoint created with this > conf. > > > Are you using the local recovery feature? > > Yes, and in this particular case the job was constantly failing/restarting > because of Too Many Open Files. So we terminated the cluster entirely, > created a new one, and launched a new job by specifying the latest checkpoint > path to restore state from. > > This is the only time I have seen this error happen with timer state. I still > have that bad checkpoint data on s3, so I might be able to try to restore it > again if needed to debug it. But that would require some tweaking, because I > don't want to tangle with the same kafka consumer group offsets or send old > data again to production endpoint. > > Please keep in mind that there was that Too Many Open Files issue on the > cluster that created the problematic checkpoint, if you think that's relevant. > > On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s.rich...@data-artisans.com > <mailto:s.rich...@data-artisans.com>> wrote: > Hi, > > I agree, this looks like a bug. Can you tell us your exact configuration of > the state backend, e.g. if you are using incremental checkpoints or not. Are > you using the local recovery feature? Are you restarting the job from a > checkpoint or a savepoint? Can you provide logs for both the job that failed > and the restarted job? > > Best, > Stefan > > >> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.au...@rovio.com >> <mailto:juho.au...@rovio.com>>: >> >> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old >> state. After restoring state from a checkpoint, it seems like a timer had >> been restored, but not the data that was expected to be in a related >> MapState if such timer has been added. >> >> The way I see this is that there's a bug, either of these: >> - The writing of timers & map states to Flink state is not synchronized (or >> maybe there are no such guarantees by design?) >> - Flink may restore a checkpoint that is actually corrupted/incomplete >> >> Our code (simplified): >> >> private MapState<String, String> mapState; >> >> public void processElement(..) { >> mapState.put("lastUpdated", ctx.timestamp().toString()); >> ctx.timerService().registerEventTimeTimer(ctx.timestamp() + >> stateRetentionMillis); >> } >> >> public void onTimer(long timestamp, OnTimerContext ctx, ..) { >> long lastUpdated = Long.parseLong(mapState.get("lastUpdated")); >> if (timestamp >= lastUpdated + stateRetentionMillis) { >> mapState.clear(); >> } >> } >> >> Normally this "just works". As you can see, it shouldn't be possible that >> "lastUpdated" doesn't exist in state if timer was registered and onTimer >> gets called. >> >> However, after restoring state from a checkpoint, the job kept failing with >> this error: >> >> Caused by: java.lang.NumberFormatException: null >> at java.lang.Long.parseLong(Long.java:552) >> at java.lang.Long.parseLong(Long.java:631) >> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136) >> .. >> >> So apparently onTimer was called but lastUpdated wasn't found in the >> MapState. >> >> The background for restoring state in this case is not entirely clean. There >> was an OS level issue "Too many open files" after running a job for ~11 >> days. To fix that, we replaced the cluster with a new one and launched the >> Flink job again. State was successfully restored from the latest checkpoint >> that had been created by the "problematic execution". Now, I'm assuming that >> if the state wouldn't have been created successfully, restoring wouldn't >> succeed either – correct? This is just to rule out that the issue with state >> didn't happen because the checkpoint files were somehow corrupted due to the >> Too many open files problem. >> >> Thank you all for your continued support! >> >> P.S. I would be very much interested to hear if there's some cleaner way to >> achieve this kind of TTL for keyed state in Flink. > > >