Daniel Laszlo Magyar created FLINK-16306:
--------------------------------------------

             Summary: Validate YARN session state before job submission
                 Key: FLINK-16306
                 URL: https://issues.apache.org/jira/browse/FLINK-16306
             Project: Flink
          Issue Type: Task
          Components: Client / Job Submission
    Affects Versions: 1.10.0
            Reporter: Daniel Laszlo Magyar


To better handle not properly stopped yarn sessions, state of the session 
should be validated before job submission.
 Currently if {{execution.target: yarn-session}} is set in 
{{conf/flink-conf.yaml}} and the hidden YARN property file 
{{/tmp/.yarn-properties-root}} is present, FlinkSessionCli tries to submit the 
job regardless of the session’s state. 
Apparently, the property file cannot get cleaned up automatically when the 
session is killed e.g. via {{yarn app -kill <appID>}} and this behaviour is 
pointed out in the logs upon running via yarn-session.sh, but the contained 
application state could be checked before submitting to it. The current 
behaviour feels inconsistent with the scenario when the YARN property file 
actually does get cleaned up e.g. by manually deleting the file, in which case 
a per-job cluster is spun up before submitting to it.
 
Replication steps:
 • start flink yarn session via {{./bin/yarn-session.sh -d}}, this writes the 
application id to {{/tmp/.yarn-properties-root}}
 • set {{execution.target: yarn-session}} in {{/etc/flink/conf/flink-conf.yaml}}
 • kill session via {{yarn app -kill <appID>}}
 • try to submit job, e.g.: {{flink run -d -p 2 
examples/streaming/WordCount.jar}}

The logs clearly state that the FlinkYarnSessionCli tries to submit the job to 
the killed application:
{code:java}
20/02/26 13:34:26 ERROR yarn.YarnClusterDescriptor: The application 
application_1582646904843_0021 doesn't run anymore. It has previously completed 
with final status: KILLED
...
20/02/26 13:34:26 ERROR cli.CliFrontend: Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Couldn't retrieve Yarn cluster
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:709)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:258)
        at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:940)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1014)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
        at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1014)
Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: 
Couldn't retrieve Yarn cluster
        at 
org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:365)
        at 
org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:122)
        at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:63)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1750)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1637)
        at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:96)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        ... 11 more
Caused by: java.lang.RuntimeException: The Yarn application 
application_1582646904843_0021 doesn't run anymore.
        at 
org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:352)
        ... 23 more
{code}
If at this point the property file gets deleted e.g. by simply running {{rm -f 
/tmp/.yarn-properties-root}} and the job gets resubmitted, a per-job cluster 
gets spun up. This behaviour could be achieved without deleting the outdated 
property file.

CC: [~gyfora]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to