I am running Flink 1.5.3 with two job managers and two task managers in 
Kubernetes along with HDFS and Zookeeper in high-availability mode.

My problem occurs after the following actions:
- Upload a .jar file to jobmanager-1
- Run a streaming job from the jar on jobmanager-1
- Wait for 1 or 2 checkpoints to succeed
- Kill pod of jobmanager-1
After a short delay, jobmanager-2 takes leadership and correctly restores the 
job and continues it
- Stop job from jobmanager-2

At this point all seems well, but the problem is that jobmanager-2 does not 
clean up anything that was left from jobmanager-1. This means that both in HDFS 
and in Zookeeper remain job graphs, which later on obstruct any work of both 
managers as after any reset they unsuccessfully try to restore a non-existent 
job and fail over and over again.

I am quite certain that jobmanager-2 does not know about any of jobmanager-1’s 
files since the Zookeeper logs reveal that it tries to duplicate job folders:

2018-08-27 13:11:00,038 [myid:] - INFO  [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x1657aa15e480033 type:create cxid:0x46 zxid:0x1ab 
txntype:-1 reqpath:n/a Error 
Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
 Error:KeeperErrorCode = NodeExists for 
/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77

2018-08-27 13:11:02,296 [myid:] - INFO  [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x1657aa15e480033 type:create cxid:0x5c zxid:0x1ac 
txntype:-1 reqpath:n/a Error 
Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15 
Error:KeeperErrorCode = NodeExists for 
/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15

Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper when the 
job is stopped, but fails since there are leftover files in it from 
jobmanager-1:

2018-08-27 13:12:13,406 [myid:] - INFO  [ProcessThread(sid:0 
cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException when 
processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8 zxid:0x1bd 
txntype:-1 reqpath:n/a Error 
Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15 
Error:KeeperErrorCode = Directory not empty for 
/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15

I’ve noticed that when restoring the job, it seems like jobmanager-2 does not 
get anything more than jobID, while it perhaps needs some metadata? Here is the 
log that seems suspicious to me:

2018-08-27 13:09:18,113 INFO  
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null).

All other logs seem fine in jobmanager-2, it doesn’t seem to be aware that it’s 
overwriting anything or not deleting properly.

My question is - what is the intended way for the job managers to correctly 
exchange metadata in HA mode and why is it not working for me?

Thanks in advance!

Reply via email to