About some implementation mechanisms. Flink uses Zookeeper to store JobGraph (Job's description information and metadata) as a basis for Job recovery. However, previous implementations may cause this information to not be properly cleaned up because it is asynchronously deleted by a background thread.
Thanks, vino. vino yang <yanghua1...@gmail.com> 于2018年8月28日周二 上午9:49写道: > Hi Encho, > > This is a problem already known to the Flink community, you can track its > progress through FLINK-10011[1], and currently Till is fixing this issue. > > [1]: https://issues.apache.org/jira/browse/FLINK-10011 > > Thanks, vino. > > Encho Mishinev <encho.mishi...@gmail.com> 于2018年8月27日周一 下午10:13写道: > >> I am running Flink 1.5.3 with two job managers and two task managers in >> Kubernetes along with HDFS and Zookeeper in high-availability mode. >> >> My problem occurs after the following actions: >> - Upload a .jar file to jobmanager-1 >> - Run a streaming job from the jar on jobmanager-1 >> - Wait for 1 or 2 checkpoints to succeed >> - Kill pod of jobmanager-1 >> After a short delay, jobmanager-2 takes leadership and correctly restores >> the job and continues it >> - Stop job from jobmanager-2 >> >> At this point all seems well, but the problem is that jobmanager-2 does >> not clean up anything that was left from jobmanager-1. This means that both >> in HDFS and in Zookeeper remain job graphs, which later on obstruct any >> work of both managers as after any reset they unsuccessfully try to restore >> a non-existent job and fail over and over again. >> >> I am quite certain that jobmanager-2 does not know about any of >> jobmanager-1’s files since the Zookeeper logs reveal that it tries to >> duplicate job folders: >> >> 2018-08-27 13:11:00,038 [myid:] - INFO [ProcessThread(sid:0 >> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException >> when processing sessionid:0x1657aa15e480033 type:create cxid:0x46 >> zxid:0x1ab txntype:-1 reqpath:n/a Error >> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77 >> Error:KeeperErrorCode = NodeExists for >> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77 >> >> 2018-08-27 13:11:02,296 [myid:] - INFO [ProcessThread(sid:0 >> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException >> when processing sessionid:0x1657aa15e480033 type:create cxid:0x5c >> zxid:0x1ac txntype:-1 reqpath:n/a Error >> Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15 >> Error:KeeperErrorCode = NodeExists for >> /flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15 >> >> Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper >> when the job is stopped, but fails since there are leftover files in it >> from jobmanager-1: >> >> 2018-08-27 13:12:13,406 [myid:] - INFO [ProcessThread(sid:0 >> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException >> when processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8 >> zxid:0x1bd txntype:-1 reqpath:n/a Error >> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15 >> Error:KeeperErrorCode = Directory not empty for >> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15 >> >> I’ve noticed that when restoring the job, it seems like jobmanager-2 does >> not get anything more than jobID, while it perhaps needs some metadata? >> Here is the log that seems suspicious to me: >> >> 2018-08-27 13:09:18,113 INFO >> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - >> Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null). >> >> All other logs seem fine in jobmanager-2, it doesn’t seem to be aware >> that it’s overwriting anything or not deleting properly. >> >> My question is - what is the intended way for the job managers to >> correctly exchange metadata in HA mode and why is it not working for me? >> >> Thanks in advance! > >