Hi Cristian,

thanks for reporting this issue. It looks indeed like a very critical
problem.

The problem seems to be that the ApplicationDispatcherBootstrap class
produces an exception (that the request job can no longer be found because
of a lost ZooKeeper connection) which will be interpreted as a job failure.
Due to this interpretation, the cluster will be shut down with a terminal
state of FAILED which will cause the HA data to be cleaned up. The exact
problem occurs in the JobStatusPollingUtils.getJobResult which is called by
ApplicationDispatcherBootstrap.getJobResult().

I think there are two problems here: First of all not every exception
bubbling up in the future returned by
ApplicationDispatcherBootstrap.fixJobIdAndRunApplicationAsync() indicates a
job failure. Some of them can also indicate a framework failure which
should not lead to the clean up of HA data. The other problem is that the
polling logic cannot properly handle a temporary connection loss to
ZooKeeper which is a normal situation.

I am pulling in Aljoscha and Klou who worked on this feature and might be
able to propose a solution for these problems. I've also updated the JIRA
issue FLINK-19154.

Cheers,
Till

On Wed, Sep 9, 2020 at 9:00 AM Yang Wang <danrtsey...@gmail.com> wrote:

> > The job sub directory will be cleaned up when the job
> finished/canceled/failed.
> Since we could submit multiple jobs into a Flink session, what i mean is
> when a job
> reached to the terminal state, the sub node(e.g.
> /flink/application_xxxx/running_job_registry/4d255397c7aeb5327adb567238c983c1)
> on the Zookeeper will be cleaned up. But the root
> directory(/flink/application_xxxx/) still exists.
>
>
> For your current case, it is a different case(perjob cluster). I think we
> need to figure out why the only
> running job reached the terminal state. For example, the restart attempts
> are exhausted. And you
> could find the following logs in your JobManager log.
>
> "org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy"
>
>
> Best,
> Yang
>
>
>
>
> Cristian <k...@fastmail.fm> 于2020年9月9日周三 上午11:26写道:
>
>> > The job sub directory will be cleaned up when the job
>> finished/canceled/failed.
>>
>> What does this mean?
>>
>> Also, to clarify: I'm a very sloppy developer. My jobs crash ALL the
>> time... and yet, the jobs would ALWAYS resume from the last checkpoint.
>>
>> The only cases where I expect Flink to clean up the checkpoint data from
>> ZK is when I explicitly stop or cancel the job (in those cases the job
>> manager takes a savepoint before cleaning up zk and finishing the cluster).
>>
>> Which is not the case here. Flink was on autopilot here and decided to
>> wipe my poor, good checkpoint metadata as the logs show.
>>
>> On Tue, Sep 8, 2020, at 7:59 PM, Yang Wang wrote:
>>
>> AFAIK, the HA data, including Zookeeper meta data and real data on DFS,
>> will only be cleaned up
>> when the Flink cluster reached terminated state.
>>
>> So if you are using a session cluster, the root cluster node on Zk will
>> be cleaned up after you manually
>> stop the session cluster. The job sub directory will be cleaned up when
>> the job finished/canceled/failed.
>>
>> If you are using a job/application cluster, once the only running job
>> finished/failed, all the HA data will
>> be cleaned up. I think you need to check the job restart strategy you
>> have set. For example, the following
>> configuration will make the Flink cluster terminated after 10 attempts.
>>
>> restart-strategy: fixed-delay
>> restart-strategy.fixed-delay.attempts: 10
>>
>>
>> Best,
>> Yang
>>
>> Cristian <k...@fastmail.fm> 于2020年9月9日周三 上午12:28写道:
>>
>>
>> I'm using the standalone script to start the cluster.
>>
>> As far as I can tell, it's not easy to reproduce. We found that zookeeper
>> lost a node around the time this happened, but all of our other 75 Flink
>> jobs which use the same setup, version and zookeeper, didn't have any
>> issues. They didn't even restart.
>>
>> So unfortunately I don't know how to reproduce this. All I know is I
>> can't sleep. I have nightmares were my precious state is deleted. I wake up
>> crying and quickly start manually savepointing all jobs just in case,
>> because I feel the day of reckon is near. Flinkpocalypse!
>>
>> On Tue, Sep 8, 2020, at 5:54 AM, Robert Metzger wrote:
>>
>> Thanks a lot for reporting this problem here Cristian!
>>
>> I am not super familiar with the involved components, but the behavior
>> you are describing doesn't sound right to me.
>> Which entrypoint are you using? This is logged at the beginning, like
>> this: "2020-09-08 14:45:32,807 INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -
>>  Starting StandaloneSessionClusterEntrypoint (Version: 1.11.1, Scala: 2.12,
>> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)"
>>
>> Do you know by chance if this problem is reproducible? With
>> the StandaloneSessionClusterEntrypoint I was not able to reproduce the
>> problem.
>>
>>
>>
>>
>> On Tue, Sep 8, 2020 at 4:00 AM Husky Zeng <568793...@qq.com> wrote:
>>
>> Hi Cristian,
>>
>>
>> I don't know if it was designed to be like this deliberately.
>>
>> So I have already submitted an issue ,and wait for somebody to response.
>>
>> https://issues.apache.org/jira/browse/FLINK-19154
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>

Reply via email to