Flink v1.7.1

After a Flink reboot we've been seeing some unexpected issues with excess 
retained checkpoints not being able to be removed from ZooKeeper after a new 
checkpoint is created.

I believe I've got my head around the role of ZK and lockNodes in Checkpointing 
after going through the code. Could you check my logic on this and add any 
insight, especially if I've got it wrong?

The situation:
1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA with 
S3 as the backing store.

2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore has its 
own lockNode UUID. JM1 is elected leader.

3) We submit a job, that JobGraph lockNode is added to ZK using JM1's JobGraph 
lockNode.

4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's 
checkpoint lockNode. We continue running, and checkpoints are successfully 
being created and excess checkpoints removed.

5) Both JM1 and JM2 now are rebooted.

6) The JobGraph is recovered by the leader, the job restarts from the latest 
checkpoint.

Now after every new checkpoint we see in the ZooKeeper logs:
INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got user-level 
KeeperException when processing sessionid:0x10000047715000d type:delete 
cxid:0x210 zxid:0x700001091 txntype:-1 reqpath:n/a Error 
Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0000000000000057813
 Error:KeeperErrorCode = Directory not empty for 
/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/000000000000005781
with an increasing checkpoint id on each subsequent call.

When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled, right? As 
the old checkpoints were created under the old UUID, the new JMs will never be 
able to remove the old retained checkpoints from ZooKeeper.

Is that correct?

If so, would this also happen with JobGraphs in the following situation (we saw 
this just recently where we had a JobGraph for a cancelled job still in ZK):

Steps 1 through 3 above, then:
4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1 restarts.

5) some time later while JM2 is still leader we hard cancel the job and restart 
the JMs

In this case JM2 would successfully remove the job from s3, but because its 
lockNode is different from JM1 it cannot delete the lock file in the jobgraph 
folder and so can’t remove the jobgraph. Then Flink restarts and tries to 
process the JobGraph it has found, but the S3 files have been deleted.

Possible related closed issues (fixes went in v1.7.0): 
https://issues.apache.org/jira/browse/FLINK-10184 and 
https://issues.apache.org/jira/browse/FLINK-10255

Thanks for any insight,
Dyana

Reply via email to