Youjun Yuan created FLINK-10381:
-----------------------------------

             Summary: concurrent submit job get ProgramAbortException
                 Key: FLINK-10381
                 URL: https://issues.apache.org/jira/browse/FLINK-10381
             Project: Flink
          Issue Type: Bug
          Components: JobManager
    Affects Versions: 1.6.0, 1.5.1, 1.4.0
         Environment: Flink 1.4.0, standardalone cluster.
            Reporter: Youjun Yuan
             Fix For: 1.7.0
         Attachments: image-2018-09-20-22-40-31-846.png

if submit multiple jobs concurrently, some the them are likely to fail, and 
return following exception: 

_java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkException: Could not run the jar._ 
_at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)_
 
_at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler$$Lambda$47/713642705.get(Unknown
 Source)_ 
_at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1582)_
 
_at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)_ 
_at java.util.concurrent.FutureTask.run(FutureTask.java:266)_ 
_at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)_
 
_at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)_
 
_at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)_
 
_at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)_
 
_at java.lang.Thread.run(Thread.java:745)_
_Caused by: org.apache.flink.util.FlinkException: Could not run the jar. ... 10 
more_

_Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
program caused an error:_ 
_at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)_
 
_at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)_
 
_at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76)_
 
_at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
 ... 9 more_

_Caused by: 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException_ 
_at 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:72)_
 
_..._ 
_at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_ 
_at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)_ 
_at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_
 
_at java.lang.reflect.Method.invoke(Method.java:497)_ 
_at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)_
 
_at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)_
 
_at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83_

 
h2. Possible Cause:

in OptimizerPlanEnvironment.getOptimizerPlan(), setAsContext() will set a 
static variable named contextEnvironmentFactory in ExecutionEnviroment, which 
will eventually cause ExecutionEnviroment.getExecutionEnvironment() returns the 
currently OptimizerPlanEnvironment instance, and capture the optimizerPlan and 
save to a instance vairable in OptimizerPlanEnvironment.

However, if multiple jobs are submitted at the same time, the static variable 
contextEnvironmentFactory in ExecutionEnvironment will be set again by a 
following job, hence force ExecutionEnviroment.getExecutionEnvironment() return 
another new instance of OptimizerPlanEnvironment, therefore, the first intance 
of OptimizerPlanEnvironment will not caputre the optimizerPlan, and throws 
ProgramInvocationException. The spot is copied below for you convience:

setAsContext();
 try {
 prog.invokeInteractiveModeForExecution();
 }
 catch (ProgramInvocationException e) {
 throw e;
 }
 catch (Throwable t) {
 // the invocation gets aborted with the preview plan
 if (optimizerPlan != null) {
 return optimizerPlan;
 } else {
 throw new ProgramInvocationException("The program caused an error: ", t);
 }
 }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to