About some implementation mechanisms.
Flink uses Zookeeper to store JobGraph (Job's description information and
metadata) as a basis for Job recovery.
However, previous implementations may cause this information to not be
properly cleaned up because it is asynchronously deleted by a background
thread.

Thanks, vino.

vino yang <yanghua1...@gmail.com> 于2018年8月28日周二 上午9:49写道:

> Hi Encho,
>
> This is a problem already known to the Flink community, you can track its
> progress through FLINK-10011[1], and currently Till is fixing this issue.
>
> [1]: https://issues.apache.org/jira/browse/FLINK-10011
>
> Thanks, vino.
>
> Encho Mishinev <encho.mishi...@gmail.com> 于2018年8月27日周一 下午10:13写道:
>
>> I am running Flink 1.5.3 with two job managers and two task managers in
>> Kubernetes along with HDFS and Zookeeper in high-availability mode.
>>
>> My problem occurs after the following actions:
>> - Upload a .jar file to jobmanager-1
>> - Run a streaming job from the jar on jobmanager-1
>> - Wait for 1 or 2 checkpoints to succeed
>> - Kill pod of jobmanager-1
>> After a short delay, jobmanager-2 takes leadership and correctly restores
>> the job and continues it
>> - Stop job from jobmanager-2
>>
>> At this point all seems well, but the problem is that jobmanager-2 does
>> not clean up anything that was left from jobmanager-1. This means that both
>> in HDFS and in Zookeeper remain job graphs, which later on obstruct any
>> work of both managers as after any reset they unsuccessfully try to restore
>> a non-existent job and fail over and over again.
>>
>> I am quite certain that jobmanager-2 does not know about any of
>> jobmanager-1’s files since the Zookeeper logs reveal that it tries to
>> duplicate job folders:
>>
>> 2018-08-27 13:11:00,038 [myid:] - INFO  [ProcessThread(sid:0
>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>> when processing sessionid:0x1657aa15e480033 type:create cxid:0x46
>> zxid:0x1ab txntype:-1 reqpath:n/a Error
>> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
>> Error:KeeperErrorCode = NodeExists for
>> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15/bbb259fd-7826-4950-bc7c-c2be23346c77
>>
>> 2018-08-27 13:11:02,296 [myid:] - INFO  [ProcessThread(sid:0
>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>> when processing sessionid:0x1657aa15e480033 type:create cxid:0x5c
>> zxid:0x1ac txntype:-1 reqpath:n/a Error
>> Path:/flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
>> Error:KeeperErrorCode = NodeExists for
>> /flink/default/checkpoint-counter/83bfa359ca59ce1d4635e18e16651e15
>>
>> Also jobmanager-2 attempts to delete the jobgraphs folder in Zookeeper
>> when the job is stopped, but fails since there are leftover files in it
>> from jobmanager-1:
>>
>> 2018-08-27 13:12:13,406 [myid:] - INFO  [ProcessThread(sid:0
>> cport:2181)::PrepRequestProcessor@648] - Got user-level KeeperException
>> when processing sessionid:0x1657aa15e480033 type:delete cxid:0xa8
>> zxid:0x1bd txntype:-1 reqpath:n/a Error
>> Path:/flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
>> Error:KeeperErrorCode = Directory not empty for
>> /flink/default/jobgraphs/83bfa359ca59ce1d4635e18e16651e15
>>
>> I’ve noticed that when restoring the job, it seems like jobmanager-2 does
>> not get anything more than jobID, while it perhaps needs some metadata?
>> Here is the log that seems suspicious to me:
>>
>> 2018-08-27 13:09:18,113 INFO
>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>> Recovered SubmittedJobGraph(83bfa359ca59ce1d4635e18e16651e15, null).
>>
>> All other logs seem fine in jobmanager-2, it doesn’t seem to be aware
>> that it’s overwriting anything or not deleting properly.
>>
>> My question is - what is the intended way for the job managers to
>> correctly exchange metadata in HA mode and why is it not working for me?
>>
>> Thanks in advance!
>
>

Reply via email to