Hi Encho, This is a problem already known to the Flink community, you can track its progress through FLINK-10011[1], and currently Till is fixing this issue.
[1]: https://issues.apache.org/jira/browse/FLINK-10011 Thanks, vino. Encho Mishinev <encho.mishi...@gmail.com> 于2018年8月27日周一 下午10:13写道: > I am running Flink 1.5.3 with two job managers and two task managers in > Kubernetes along with HDFS and Zookeeper in high-availability mode. > > My problem occurs after the following actions: > - Upload a .jar file to jobmanager-1 > - Run a streaming job from the jar on jobmanager-1 > - Wait for 1 or 2 checkpoints to succeed > - Kill pod of jobmanager-1 > After a short delay, jobmanager-2 takes leadership and correctly restores > the job and continues it > - Stop job from jobmanager-2 > > At this point all seems well, but the problem is that jobmanager-2 does > not clean up anything that was left from jobmanager-1. This means that both > in HDFS and in Zookeeper remain job graphs, which later on obstruct any > work of both managers as after any reset they unsuccessfully try to restore > a non-existent job and fail over and over again. > > I am quite certain that jobmanager-2 does not know about any of > jobmanager-1’s files since the Zookeeper logs reveal that it tries to > duplicate job folders: > > 2018-08-27 13:11:00,038 [myid:] - INFO [ProcessThread(sid:0 > cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException > when processing sessionid:0x1657aa15e480033 type:create cxid:0x46 > zxid:0x1ab txntype:-1 reqpath:n/a Error > Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77 > Error:KeeperErrorCode = NodeExists for > /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77 > > 2018-08-27 13:11:02,296 [myid:] - INFO [ProcessThread(sid:0 > cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException > when processing sessionid:0x1657aa15e480033 type:create cxid:0x5c > zxid:0x1ac txntype:-1 reqpath:n/a Error > Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15 > Error:KeeperErrorCode = NodeExists for > /flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15 > > Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper > when the job is stopped, but fails since there are leftover files in it > from jobmanager-1: > > 2018-08-27 13:12:13,406 [myid:] - INFO [ProcessThread(sid:0 > cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException > when processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8 > zxid:0x1bd txntype:-1 reqpath:n/a Error > Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15 > Error:KeeperErrorCode = Directory not empty for > /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15 > > I’ve noticed that when restoring the job, it seems like jobmanager-2 does > not get anything more than jobID, while it perhaps needs some metadata? > Here is the log that seems suspicious to me: > > 2018-08-27 13:09:18,113 INFO > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - > Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null). > > All other logs seem fine in jobmanager-2, it doesn’t seem to be aware that > it’s overwriting anything or not deleting properly. > > My question is - what is the intended way for the job managers to > correctly exchange metadata in HA mode and why is it not working for me? > > Thanks in advance!