After giving it a second thought, this problem could a side effect of the issue 
fixed in https://issues.apache.org/jira/browse/FLINK-6964 
<https://issues.apache.org/jira/browse/FLINK-6964>. If you want, you can try if 
your problem is fixed in the latest master. This fix will also go into the 
1.3.2 release branch today.

> Am 17.07.2017 um 14:37 schrieb Stefan Richter <s.rich...@data-artisans.com>:
> 
> Hi,
> 
> You assumed correctly that savepoints are always self-contained. Are you 
> using externalized checkpoints? There is a known problem in that was fixed in 
> the latest master and will go into 1.3.2, but this might be a different 
> problem.
> 
> You are also correct that incremental checkpoints can reference files from 
> previous checkpoints. Do you ever manually delete any checkpoint directories? 
> Because they might still be referenced in other checkpoints.
> 
> I would assume that the missing file was written completely, because 
> otherwise the checkpoint would already fail. However I am unsure about the 
> exact guarantees (e.g. about visibility) in Azure blobs. Can you check if 
> this file was ever created and when it starts to disappear? Does the 
> directory of checkpoint 37 still exist in the file system?
> 
> Best,
> Stefan
> 
>> Am 17.07.2017 um 13:12 schrieb Shai Kaplan <shai.kap...@microsoft.com 
>> <mailto:shai.kap...@microsoft.com>>:
>> 
>> Hi.
>> I'm running Flink 1.3.1 with checkpoints stored in Azure blobs. Incremental 
>> checkpoints feature is on.
>> The job is trying to restore a checkpoint and consistently gets:
>>  
>> java.lang.IllegalStateException: Could not initialize keyed state backend.
>>         at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>>         at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.FileNotFoundException: 
>> wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3
>>  
>> <wasbs://***/5065c840d4d4ba8c7cd91f793012bab1/chk-37/f52d633b-9d3f-47c1-bf23-58dcc54572e3>
>>         at 
>> org.apache.hadoop.fs.azure.NativeAzureFileSystem.open(NativeAzureFileSystem.java:1905)
>>         at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:767)
>>         at 
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:404)
>>         at 
>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:48)
>>         at 
>> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
>>         at 
>> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69)
>>         at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readStateData(RocksDBKeyedStateBackend.java:1276)
>>         at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.readAllStateData(RocksDBKeyedStateBackend.java:1458)
>>         at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:1319)
>>         at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:1493)
>>         at 
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:965)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>>         at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>>  
>> The name of the missing file is sometimes different, but it's always a 
>> missing file in checkpoint 37. The last successful checkpoint number was 41, 
>> so I'm guessing that's the checkpoint it's trying to restore, but because of 
>> the incremental checkpointing it also needs files from previous checkpoints, 
>> which are apparently missing. Could this be a problem in the interface with 
>> Azure? If some files failed to write, why didn't the checkpoint fail?
>>  
>> When I realized nothing is going to change I canceled the job, and started 
>> it from a savepoint, which was checkpoint number 40. I actually expected it 
>> to fail, and that I would have to restore it from a savepoint prior to the 
>> apparently corrupted checkpoint number 37, but it didn't fail. Should I 
>> infer that savepoints are self-contained and are not incremental?
> 

Reply via email to