Xinyang Gao created FLINK-8770:
----------------------------------

             Summary: CompletedCheckPoints stored on ZooKeeper is not 
up-to-date, when JobManager is restarted it fails to recover the job due to 
"checkpoint FileNotFound exception"
                 Key: FLINK-8770
                 URL: https://issues.apache.org/jira/browse/FLINK-8770
             Project: Flink
          Issue Type: Bug
          Components: Local Runtime
    Affects Versions: 1.4.0
            Reporter: Xinyang Gao


Hi, I am running a Flink cluster (1 JobManager + 6 TaskManagers) with HA mode 
on OpenShift, I have enabled Chaos Monkey which kills either JobManager or one 
of the TaskManager in every 5 minutes, ZooKeeper quorum is stable with no chaos 
monkey enabled. Flink reads data from one Kafka topic and writes data into 
another Kafka topic. Checkpoint surely is enabled, with 1000ms interval. 
state.checkpoints.num-retained is set to 10. I am using PVC for state backend 
(checkpoint, recovery, etc), so the checkpoints and states are persistent. 

The restart strategy for Flink jobmanager DeploymentConfig is 
{color:#d04437}recreate, {color:#333333}which means it will kill the old 
container of jobmanager before it restarts the new one.{color}{color}

I have run the Chaos test for one day at first, however I have seen the 
exception:

{color:#FF0000}org.apache.flink.util.FlinkException: Could not retrieve 
checkpoint *** from state handle under /***. This indicates that the retrieved 
state handle is broken. Try cleaning the state handle store. {color:#333333}and 
the root cause is checkpoint {color:#d04437}FileNotFound{color}. {color}{color}

{color:#FF0000}{color:#333333}then the Flink job keeps restarting for a few 
hours and due to the above error it cannot be restarted successfully. 
{color}{color}

{color:#FF0000}{color:#333333}After further investigation, I have found the 
following facts in my PVC:{color}{color}

 

{color:#d04437}-rw-r--r--. 1 flink root 11379 Feb 23 02:10 
completedCheckpoint0ee95157de00
-rw-r--r--. 1 flink root 11379 Feb 23 01:51 completedCheckpoint498d0952cf00
-rw-r--r--. 1 flink root 11379 Feb 23 02:10 completedCheckpoint650fe5b021fe
-rw-r--r--. 1 flink root 11379 Feb 23 02:10 completedCheckpoint66634149683e
-rw-r--r--. 1 flink root 11379 Feb 23 02:11 completedCheckpoint67f24c3b018e
-rw-r--r--. 1 flink root 11379 Feb 23 02:10 completedCheckpoint6f64ebf0ae64
-rw-r--r--. 1 flink root 11379 Feb 23 02:11 completedCheckpoint906ebe1fb337
-rw-r--r--. 1 flink root 11379 Feb 23 02:11 completedCheckpoint98b79ea14b09
-rw-r--r--. 1 flink root 11379 Feb 23 02:10 completedCheckpointa0d1070e0b6c
-rw-r--r--. 1 flink root 11379 Feb 23 02:11 completedCheckpointbd3a9ba50322
-rw-r--r--. 1 flink root 11355 Feb 22 17:31 completedCheckpointd433b5e108be
-rw-r--r--. 1 flink root 11379 Feb 22 22:56 completedCheckpointdd0183ed092b
-rw-r--r--. 1 flink root 11379 Feb 22 00:00 completedCheckpointe0a5146c3d81
-rw-r--r--. 1 flink root 11331 Feb 22 17:06 completedCheckpointec82f3ebc2ad
-rw-r--r--. 1 flink root 11379 Feb 23 02:11 
completedCheckpointf86e460f6720{color}

 

{color:#333333}The latest 10 checkpoints are created at about 02:10, if you 
ignore the old checkpoints which were not deleted successfully (which I do not 
care too much).{color}

 

{color:#333333}However when checking on ZooKeeper, I see the followings in 
flink/checkpoints path (I only list one, but the other 9 are similar){color}

{color:#d04437}cZxid = 0x160001ff5d
��sr;org.apache.flink.runtime.state.RetrievableStreamStateHandle�U�+LwrappedStreamStateHandlet2Lorg/apache/flink/runtime/state/StreamStateHandle;xpsr9org.apache.flink.runtime.state.filesystem.FileStateHandle�u�b�▒▒J
 
stateSizefilePathtLorg/apache/flink/core/fs/Path;xp,ssrorg.apache.flink.core.fs.PathLuritLjava/net/URI;xpsr
 
java.net.URI�x.C�I�LstringtLjava/lang/String;xpt=file:/mnt/flink-test/recovery/completedCheckpointd004a3753870x
[zk: localhost:2181(CONNECTED) 7] ctime = Fri Feb 23 02:08:18 UTC 2018
mZxid = 0x160001ff5d
mtime = Fri Feb 23 02:08:18 UTC 2018
pZxid = 0x1d00000c6d
cversion = 31
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 492{color}

{color:#FF0000}{color:#333333} {color}{color}

so the latest completedCheckpoints status stored on ZooKeeper is at about 
{color:#d04437}02:08, {color:#333333}which implies that the completed 
checkpoints at{color}{color} {color:#d04437}02:10 {color:#333333}somehow are 
not successfully submitted to ZooKpeer, so when it tries to restart the Flink 
job it is not able to find the latest checkpoint thus being 
failed.{color}{color}

{color:#d04437}{color:#333333}I am very suprised by this since seems writing 
checkpoint to zookeeper is [synchronous 
|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L222],
 so I am not sure why this happens. Can anyone help looks at this 
?{color}{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to