What I would like to see from the logs is (also depending a bit on your log 
level):

- all exceptions.
- in which context exactly the „too many open files“ problem occurred, because 
I think for checkpoint consistency it should not matter as a checkpoint with 
such a problem should never succeed.
- files that are written for checkpoints/savepoints.
- completed checkpoints/savepoints ids.
- the restored checkpoint/savepoint id.
- files that are loaded on restore.

> Am 15.05.2018 um 10:02 schrieb Juho Autio <juho.au...@rovio.com>:
> 
> Thanks all. I'll have to see about sharing the logs & configuration..
> 
> Is there something special that you'd like to see from the logs? It may be 
> easier for me to get specific lines and obfuscate sensitive information 
> instead of trying to do that for the full logs.
> 
> We basically have: RocksDBStateBackend with 
> enableIncrementalCheckpointing=true, external state path on s3.
> 
> The code that we use is:
> 
>             env.setStateBackend(getStateBackend(statePath, new 
> RocksDBStateBackend(statePath, true)));
>             
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause",
>  60 * 1000));
>             
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent",
>  1));
>             
> env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
>  10 * 60 * 1000));
>             
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> 
> The problematic state that we tried to use was a checkpoint created with this 
> conf.
> 
> > Are you using the local recovery feature?
> 
> Yes, and in this particular case the job was constantly failing/restarting 
> because of Too Many Open Files. So we terminated the cluster entirely, 
> created a new one, and launched a new job by specifying the latest checkpoint 
> path to restore state from.
> 
> This is the only time I have seen this error happen with timer state. I still 
> have that bad checkpoint data on s3, so I might be able to try to restore it 
> again if needed to debug it. But that would require some tweaking, because I 
> don't want to tangle with the same kafka consumer group offsets or send old 
> data again to production endpoint.
> 
> Please keep in mind that there was that Too Many Open Files issue on the 
> cluster that created the problematic checkpoint, if you think that's relevant.
> 
> On Tue, May 15, 2018 at 10:39 AM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> I agree, this looks like a bug. Can you tell us your exact configuration of 
> the state backend, e.g. if you are using incremental checkpoints or not. Are 
> you using the local recovery feature? Are you restarting the job from a 
> checkpoint or a savepoint? Can you provide logs for both the job that failed 
> and the restarted job?
> 
> Best,
> Stefan
> 
> 
>> Am 14.05.2018 um 13:00 schrieb Juho Autio <juho.au...@rovio.com 
>> <mailto:juho.au...@rovio.com>>:
>> 
>> We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old 
>> state. After restoring state from a checkpoint, it seems like a timer had 
>> been restored, but not the data that was expected to be in a related 
>> MapState if such timer has been added.
>> 
>> The way I see this is that there's a bug, either of these:
>> - The writing of timers & map states to Flink state is not synchronized (or 
>> maybe there are no such guarantees by design?)
>> - Flink may restore a checkpoint that is actually corrupted/incomplete
>> 
>> Our code (simplified):
>> 
>>     private MapState<String, String> mapState;
>> 
>>     public void processElement(..) {
>>             mapState.put("lastUpdated", ctx.timestamp().toString());
>>             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 
>> stateRetentionMillis);
>>     }
>> 
>>     public void onTimer(long timestamp, OnTimerContext ctx, ..) {
>>         long lastUpdated = Long.parseLong(mapState.get("lastUpdated"));
>>         if (timestamp >= lastUpdated + stateRetentionMillis) {
>>             mapState.clear();
>>         }
>>     }
>> 
>> Normally this "just works". As you can see, it shouldn't be possible that 
>> "lastUpdated" doesn't exist in state if timer was registered and onTimer 
>> gets called.
>> 
>> However, after restoring state from a checkpoint, the job kept failing with 
>> this error:
>> 
>> Caused by: java.lang.NumberFormatException: null
>> at java.lang.Long.parseLong(Long.java:552)
>> at java.lang.Long.parseLong(Long.java:631)
>> at ..EnrichEventProcessFunction.onTimer(EnrichEventProcessFunction.java:136)
>> ..
>> 
>> So apparently onTimer was called but lastUpdated wasn't found in the 
>> MapState.
>> 
>> The background for restoring state in this case is not entirely clean. There 
>> was an OS level issue "Too many open files" after running a job for ~11 
>> days. To fix that, we replaced the cluster with a new one and launched the 
>> Flink job again. State was successfully restored from the latest checkpoint 
>> that had been created by the "problematic execution". Now, I'm assuming that 
>> if the state wouldn't have been created successfully, restoring wouldn't 
>> succeed either – correct? This is just to rule out that the issue with state 
>> didn't happen because the checkpoint files were somehow corrupted due to the 
>> Too many open files problem.
>> 
>> Thank you all for your continued support!
>> 
>> P.S. I would be very much interested to hear if there's some cleaner way to 
>> achieve this kind of TTL for keyed state in Flink.
> 
> 
> 

Reply via email to