I am running Flink 1.5.3 with two job managers and two task managers in Kubernetes along with HDFS and Zookeeper in high-availability mode.
My problem occurs after the following actions: - Upload a .jar file to jobmanager-1 - Run a streaming job from the jar on jobmanager-1 - Wait for 1 or 2 checkpoints to succeed - Kill pod of jobmanager-1 After a short delay, jobmanager-2 takes leadership and correctly restores the job and continues it - Stop job from jobmanager-2 At this point all seems well, but the problem is that jobmanager-2 does not clean up anything that was left from jobmanager-1. This means that both in HDFS and in Zookeeper remain job graphs, which later on obstruct any work of both managers as after any reset they unsuccessfully try to restore a non-existent job and fail over and over again. I am quite certain that jobmanager-2 does not know about any of jobmanager-1’s files since the Zookeeper logs reveal that it tries to duplicate job folders: 2018-08-27 13:11:00,038 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when processing sessionid:0x1657aa15e480033 type:create cxid:0x46 zxid:0x1ab txntype:-1 reqpath:n/a Error Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77 Error:KeeperErrorCode = NodeExists for /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77 2018-08-27 13:11:02,296 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when processing sessionid:0x1657aa15e480033 type:create cxid:0x5c zxid:0x1ac txntype:-1 reqpath:n/a Error Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15 Error:KeeperErrorCode = NodeExists for /flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15 Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper when the job is stopped, but fails since there are leftover files in it from jobmanager-1: 2018-08-27 13:12:13,406 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8 zxid:0x1bd txntype:-1 reqpath:n/a Error Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15 Error:KeeperErrorCode = Directory not empty for /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15 I’ve noticed that when restoring the job, it seems like jobmanager-2 does not get anything more than jobID, while it perhaps needs some metadata? Here is the log that seems suspicious to me: 2018-08-27 13:09:18,113 INFO org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null). All other logs seem fine in jobmanager-2, it doesn’t seem to be aware that it’s overwriting anything or not deleting properly. My question is - what is the intended way for the job managers to correctly exchange metadata in HA mode and why is it not working for me? Thanks in advance!