[ https://issues.apache.org/jira/browse/FLINK-16279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenlong Lyu updated FLINK-16279: -------------------------------- Description: I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but the yarn cluster didn't be destroyed. After some research on the code, I found that: when running in attached mode, MiniDispatcher will never set {{shutDownfuture}} before received a request from job client. {code} if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully jobResultFuture.thenAccept((JobResult result) -> { ApplicationStatus status = result.getSerializedThrowable().isPresent() ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; LOG.debug("Shutting down per-job cluster because someone retrieved the job result."); shutDownFuture.complete(status); }); } {code} However, when running in async mode(submit job by env.executeAsync), there may be no request from job client because when a user find that the job is failed from job client, he may never request the result again. was: I run a job in yarn per job mode using {{env.executeAsync}}, the job failed but the yarn cluster didn't be destroyed. After some research on the code, I found that: when running in attached mode, MiniDispatcher will neve set {{shutDownfuture}} before received a request from job client. {code} if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully jobResultFuture.thenAccept((JobResult result) -> { ApplicationStatus status = result.getSerializedThrowable().isPresent() ? ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; LOG.debug("Shutting down per-job cluster because someone retrieved the job result."); shutDownFuture.complete(status); }); } {code} However, when running in async mode(submit job by env.executeAsync), there may be no request from job client because when a user find that the job is failed from job client, he may never request the result again. > Per job Yarn application leak in normal execution mode. > ------------------------------------------------------- > > Key: FLINK-16279 > URL: https://issues.apache.org/jira/browse/FLINK-16279 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.10.0 > Reporter: Wenlong Lyu > Priority: Major > > I run a job in yarn per job mode using {{env.executeAsync}}, the job failed > but the yarn cluster didn't be destroyed. > After some research on the code, I found that: > when running in attached mode, MiniDispatcher will never set > {{shutDownfuture}} before received a request from job client. > {code} > if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { > // terminate the MiniDispatcher once we served the > first JobResult successfully > jobResultFuture.thenAccept((JobResult result) -> { > ApplicationStatus status = > result.getSerializedThrowable().isPresent() ? > ApplicationStatus.FAILED : > ApplicationStatus.SUCCEEDED; > LOG.debug("Shutting down per-job cluster > because someone retrieved the job result."); > shutDownFuture.complete(status); > }); > } > {code} > However, when running in async mode(submit job by env.executeAsync), there > may be no request from job client because when a user find that the job is > failed from job client, he may never request the result again. -- This message was sent by Atlassian Jira (v8.3.4#803005)