Cross linking the dev ML thread [1]. Let us continue the discussion there. [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/HA-lock-nodes-Checkpoints-and-JobGraphs-after-failure-td28432.html
Cheers, Till On Tue, Apr 23, 2019 at 9:52 AM dyana.rose <dyana.r...@salecycle.com> wrote: > originally posted to the dev group, but it's a bit easy for things to get > buried a bit there, and this may concern other HA users. > > Flink v1.7.1 > > After a Flink reboot we've been seeing some unexpected issues with excess > retained checkpoints not being able to be removed from ZooKeeper after a > new checkpoint is created. > > I believe I've got my head around the role of ZK and lockNodes in > Checkpointing after going through the code. Could you check my logic on > this and add any insight, especially if I've got it wrong? > > The situation: > 1) Say we run JM1 and JM2 and retain 10 checkpoints and are running in HA > with S3 as the backing store. > > 2) JM1 and JM2 start up and each instance of ZooKeeperStateHandleStore has > its own lockNode UUID. JM1 is elected leader. > > 3) We submit a job, that JobGraph lockNode is added to ZK using JM1's > JobGraph lockNode. > > 4) Checkpoints start rolling in, latest 10 are retained in ZK using JM1's > checkpoint lockNode. We continue running, and checkpoints are successfully > being created and excess checkpoints removed. > > 5) Both JM1 and JM2 now are rebooted. > > 6) The JobGraph is recovered by the leader, the job restarts from the > latest checkpoint. > > Now after every new checkpoint we see in the ZooKeeper logs: > INFO [ProcessThread(sid:3 cport:-1)::PrepRequestProcessor@653] - Got > user-level KeeperException when processing sessionid:0x10000047715000d > type:delete cxid:0x210 zxid:0x700001091 txntype:-1 reqpath:n/a Error > Path:/flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/0000000000000057813 > Error:KeeperErrorCode = Directory not empty for > /flink/job-name/checkpoints/2fa0d694e245f5ec1f709630c7c7bf69/000000000000005781 > with an increasing checkpoint id on each subsequent call. > > When JM1 and JM2 were rebooted the lockNode UUIDs would have rolled, > right? As the old checkpoints were created under the old UUID, the new JMs > will never be able to remove the old retained checkpoints from ZooKeeper. > > Is that correct? > > If so, would this also happen with JobGraphs in the following situation > (we saw this just recently where we had a JobGraph for a cancelled job > still in ZK): > > Steps 1 through 3 above, then: > 4) JM1 fails over to JM2, the job keeps running uninterrupted. JM1 > restarts. > > 5) some time later while JM2 is still leader we hard cancel the job and > restart the JMs > > In this case JM2 would successfully remove the job from s3, but because > its lockNode is different from JM1 it cannot delete the lock file in the > jobgraph folder and so can’t remove the jobgraph. Then Flink restarts and > tries to process the JobGraph it has found, but the S3 files have been > deleted. > > Possible related closed issues (fixes went in v1.7.0): > https://issues.apache.org/jira/browse/FLINK-10184 and > https://issues.apache.org/jira/browse/FLINK-10255 > > Thanks for any insight, > Dyana >