So I believe (I did't test it) the solution for this case is keeping the original exception thrown from `env.execute()` and throwing this exception out of main method. It's a bit tricky, maybe we could have a better design of this scenario.
Thanks, Biao /'bɪ.aʊ/ On Tue, 24 Sep 2019 at 18:55, Biao Liu <mmyy1...@gmail.com> wrote: > The key point of this case is in `PackagedProgram#callMainMethod`. > The `ProgramAbortException` is expected when executing the main method > here. This `ProgramAbortException` thrown is wrapped with > `InvocationTargetException` by Java reflection layer [1]. There is a piece > of codes handling `InvocationTargetException`. > > try { > mainMethod.invoke(null, (Object) args); > } > catch (... > catch (InvocationTargetException e) { > Throwable exceptionInMethod = e.getTargetException(); > if (exceptionInMethod instanceof Error) { > throw (Error) exceptionInMethod; *------>* *`ProgramAbortException` > would be caught expectedly here.* > } else if (exceptionInMethod instanceof ProgramParametrizationException) > { > throw (ProgramParametrizationException) exceptionInMethod; > } else if (exceptionInMethod instanceof ProgramInvocationException) { > throw (ProgramInvocationException) exceptionInMethod; > } else { *------> If I'm right, the wrapped exception (Boxed Error > or something else) change the exception, it is caught here* > throw new ProgramInvocationException("The main method caused an error: > " + exceptionInMethod.getMessage(), exceptionInMethod); > } > > The `ProgramInvocationException` is handled specially in > `OptimizerPlanEnvironment`. > > try { > prog.invokeInteractiveModeForExecution(); > } > catch (ProgramInvocationException e) { > throw e; *------> The submission is failed here in this case* > } > catch (Throwable t) { > // the invocation gets aborted with the preview plan > if (optimizerPlan != null) { > return optimizerPlan; *------> Normally it should > be here* > } else { > throw new ProgramInvocationException("The program caused an error: ", > t); > } ... > > 1. > https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Well, I think I got the solution though I am not yet sure of the problem >> .. The original code looked like this .. >> >> Try { >> // from a parent class called Runner which runs a streamlet >> // run returns an abstraction which completes a Promise depending on >> whether >> // the Job was successful or not >> val streamletExecution = >> loadedStreamlet.streamlet.run(withPodRuntimeConfig) >> >> // the runner waits for the execution to complete >> // In normal circumstances it will run forever for streaming data >> source unless >> // being stopped forcibly or any of the queries faces an exception >> Await.result(streamletExecution.completed, Duration.Inf) >> } match { //.. >> >> and then the streamlet.run(..) in turn finally invoked the following .. >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> // creates datastreams and read from / writes to Kafka >> // I pasted the body of this earlier in the thread >> buildExecutionGraph() >> >> env.execute(..) >> >> This DID NOT run and failed with the exception I reported earlier. But >> when I change the code to get the run statement out of the Try block, >> things run fine .. like this .. >> >> // from a parent class called Runner which runs a streamlet >> // run returns an abstraction which completes a Promise depending on >> whether >> // the Job was successful or not >> val streamletExecution = >> loadedStreamlet.streamlet.run(withPodRuntimeConfig) >> >> Try { >> // the runner waits for the execution to complete >> // In normal circumstances it will run forever for streaming data >> source unless >> // being stopped forcibly or any of the queries faces an exception >> Await.result(streamletExecution.completed, Duration.Inf) >> } match { //.. >> >> Apparently it looks like the exception that I was facing earlier leaked >> through the Flink engine and Try caught it and it got logged. But removing >> it out of Try now enables Flink to catch it back and follow the course that >> it should. But I am not sure if this is a cogent explanation and looking >> forward to some more accurate one from the experts. Note there is no >> asynchrony of concurrency going on here - the Runner code may look a bit >> over-engineered but there is a context to this. The Runner code handles not >> only Flink but other types of streaming engines as well like Spark and Akka >> Streams. >> >> regards. >> >> >> On Tue, Sep 24, 2019 at 10:17 AM Biao Liu <mmyy1...@gmail.com> wrote: >> >>> Hi Zili, >>> >>> Thanks for pointing that out. >>> I didn't realize that it's a REST API based case. Debasish's case has >>> been discussed not only in this thread... >>> >>> It's really hard to analyze the case without the full picture. >>> >>> I think the reason of why `ProgramAbortException` is not caught is that >>> he did something outside `env.execute`. Like executing this piece of codes >>> inside a Scala future. >>> >>> I guess the scenario is that he is submitting job through REST API. But >>> in the main method, he wraps `env.execute` with Scala future, not executing >>> it directly. >>> The reason of env has been set to `StreamPlanEnvironment` is >>> `JarHandlerUtils` retrieves job graph through it. >>> And the `ProgramAbortException` is not thrown out, because the Scala >>> future tackles this exception. >>> So retrieving job graph fails due to an unrecognized exception (Boxed >>> Error). >>> >>> Thanks, >>> Biao /'bɪ.aʊ/ >>> >>> >>> >>> On Tue, 24 Sep 2019 at 10:44, Zili Chen <wander4...@gmail.com> wrote: >>> >>>> Hi Biao, >>>> >>>> The log below already infers that the job was submitted via REST API >>>> and I don't think it matters. >>>> >>>> at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$ >>>> JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) >>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$ >>>> getJobGraphAsync$6(JarRunHandler.java:142) >>>> >>>> What I don't understand it that flink DOES catch the exception at the >>>> point it is reported thrown... >>>> >>>> Best, >>>> tison. >>>> >>>> >>>> Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 上午10:34写道: >>>> >>>>> >>>>> > We submit the code through Kubernetes Flink Operator which uses the >>>>> REST API to submit the job to the Job Manager >>>>> >>>>> So you are submitting job through REST API, not Flink client? Could >>>>> you explain more about this? >>>>> >>>>> Thanks, >>>>> Biao /'bɪ.aʊ/ >>>>> >>>>> >>>>> >>>>> On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh <ghosh.debas...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Dian - >>>>>> >>>>>> We submit one job through the operator. We just use the following to >>>>>> complete a promise when the job completes .. >>>>>> >>>>>> Try { >>>>>> createLogic.executeStreamingQueries(ctx.env) >>>>>> }.fold( >>>>>> th ⇒ completionPromise.tryFailure(th), >>>>>> _ ⇒ completionPromise.trySuccess(Dun) >>>>>> ) >>>>>> >>>>>> If we totally do away with the promise and future stuff then we don't >>>>>> get the boxed error - only the exception reported in Caused By. >>>>>> >>>>>> regards. >>>>>> >>>>>> On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <dian0511...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Debasish, >>>>>>> >>>>>>> In which case will the exception occur? Does it occur when you >>>>>>> submit one job at a time or when multiple jobs are submitted at the same >>>>>>> time? I'm asking this because I noticed that you used Future to execute >>>>>>> the >>>>>>> job unblocking. I guess ThreadLocal doesn't work well in this case. >>>>>>> >>>>>>> Regards, >>>>>>> Dian >>>>>>> >>>>>>> 在 2019年9月23日,下午11:57,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>>>>>> >>>>>>> Hi tison - >>>>>>> >>>>>>> Please find my response below in >>. >>>>>>> >>>>>>> regards. >>>>>>> >>>>>>> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Debasish, >>>>>>>> >>>>>>>> The OptimizerPlanEnvironment.ProgramAbortException should be >>>>>>>> caught at OptimizerPlanEnvironment#getOptimizedPlan >>>>>>>> in its catch (Throwable t) branch. >>>>>>>> >>>>>>> >>>>>>> >> true but what I get is a StreamPlanEnvironment. From my code I am >>>>>>> only doing val env = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment. >>>>>>> >>>>>>>> >>>>>>>> It should always throw a ProgramInvocationException instead of >>>>>>>> OptimizerPlanEnvironment.ProgramAbortException if any >>>>>>>> exception thrown in the main method of your code. >>>>>>>> >>>>>>>> Another important problem is how the code is executed, (set context >>>>>>>> environment should be another flink internal operation) >>>>>>>> but given that you submit the job via flink k8s operator it might >>>>>>>> require time to take a look at k8s operator implementation. >>>>>>>> >>>>>>> >>>>>>> >> We submit the code through Kubernetes Flink Operator which uses >>>>>>> the REST API to submit the job to the Job Manager >>>>>>> >>>>>>>> >>>>>>>> However, given we catch Throwable in the place this exception >>>>>>>> thrown, I highly suspect whether it is executed by an official >>>>>>>> flink release. >>>>>>>> >>>>>>> >>>>>>> >> It is an official Flink release 1.9.0 >>>>>>> >>>>>>>> >>>>>>>> A completed version of the code and the submission process is >>>>>>>> helpful. Besides, what is buildExecutionGraph return type, >>>>>>>> I think it is not ExecutionGraph in flink... >>>>>>>> >>>>>>> >>>>>>> >> buildExecutionGraph is our function which returns a Unit. It's >>>>>>> not ExecutionGraph. It builds the DataStream s by reading from Kafka and >>>>>>> then finally writes to Kafka. >>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> tison. >>>>>>>> >>>>>>>> >>>>>>>> Debasish Ghosh <ghosh.debas...@gmail.com> 于2019年9月23日周一 下午8:21写道: >>>>>>>> >>>>>>>>> This is the complete stack trace which we get from execution on >>>>>>>>> Kubernetes using the Flink Kubernetes operator .. The boxed error >>>>>>>>> comes >>>>>>>>> from the fact that we complete a Promise with Success when it returns >>>>>>>>> a >>>>>>>>> JobExecutionResult and with Failure when we get an exception. And >>>>>>>>> here we r >>>>>>>>> getting an exception. So the real stack trace we have is the one >>>>>>>>> below in >>>>>>>>> Caused By. >>>>>>>>> >>>>>>>>> java.util.concurrent.ExecutionException: Boxed Error >>>>>>>>> at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) >>>>>>>>> at >>>>>>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) >>>>>>>>> at >>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) >>>>>>>>> at scala.concurrent.Promise.tryFailure(Promise.scala:112) >>>>>>>>> at scala.concurrent.Promise.tryFailure$(Promise.scala:112) >>>>>>>>> at >>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187) >>>>>>>>> at >>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186) >>>>>>>>> at >>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186) >>>>>>>>> at scala.util.Failure.fold(Try.scala:240) >>>>>>>>> at >>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187) >>>>>>>>> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153) >>>>>>>>> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) >>>>>>>>> at scala.util.Try$.apply(Try.scala:213) >>>>>>>>> at pipelines.runner.Runner$.run(Runner.scala:43) >>>>>>>>> at pipelines.runner.Runner$.main(Runner.scala:30) >>>>>>>>> at pipelines.runner.Runner.main(Runner.scala) >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>> at >>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>>> at >>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>>>>>>>> at >>>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) >>>>>>>>> at >>>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>>>> at >>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>> Caused by: >>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>> at >>>>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320) >>>>>>>>> at >>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184) >>>>>>>>> at scala.util.Try$.apply(Try.scala:213) >>>>>>>>> at >>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184) >>>>>>>>> ... 20 more >>>>>>>>> >>>>>>>>> regards. >>>>>>>>> >>>>>>>>> On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Regarding to the code you pasted, personally I think nothing is >>>>>>>>>> wrong. The problem is how it's executed. As you can see from the >>>>>>>>>> implementation of of >>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment, it >>>>>>>>>> may created different StreamExecutionEnvironment implementations >>>>>>>>>> under >>>>>>>>>> different scenarios. Could you paste the full exception stack if it >>>>>>>>>> exists? >>>>>>>>>> It's difficult to figure out what's wrong with the current stack >>>>>>>>>> trace. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Dian >>>>>>>>>> >>>>>>>>>> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>>>>>>>>> >>>>>>>>>> Can it be the case that the threadLocal stuff in >>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 >>>>>>>>>> does >>>>>>>>>> not behave deterministically when we submit job through a Kubernetes >>>>>>>>>> Flink >>>>>>>>>> operator ? Utils also selects the factory to create the context >>>>>>>>>> based on >>>>>>>>>> either Thread local storage or a static mutable variable. >>>>>>>>>> >>>>>>>>>> Can these be source of problems in our case ? >>>>>>>>>> >>>>>>>>>> regards. >>>>>>>>>> >>>>>>>>>> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh < >>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> ah .. Ok .. I get the Throwable part. I am using >>>>>>>>>>> >>>>>>>>>>> import org.apache.flink.streaming.api.scala._ >>>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>> >>>>>>>>>>> How can this lead to a wrong StreamExecutionEnvironment ? Any >>>>>>>>>>> suggestion ? >>>>>>>>>>> >>>>>>>>>>> regards. >>>>>>>>>>> >>>>>>>>>>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Debasish, >>>>>>>>>>>> >>>>>>>>>>>> As I said before, the exception is caught in [1]. It catches >>>>>>>>>>>> the Throwable and so it could also catch " >>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException". Regarding to >>>>>>>>>>>> the cause of this exception, I have the same feeling with Tison >>>>>>>>>>>> and I also >>>>>>>>>>>> think that the wrong StreamExecutionEnvironment is used. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Dian >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>>>>>>>>> >>>>>>>>>>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> >>>>>>>>>>>> 写道: >>>>>>>>>>>> >>>>>>>>>>>> Hi Tison - >>>>>>>>>>>> >>>>>>>>>>>> This is the code that builds the computation graph. readStream >>>>>>>>>>>> reads >>>>>>>>>>>> from Kafka and writeStream writes to Kafka. >>>>>>>>>>>> >>>>>>>>>>>> override def buildExecutionGraph = { >>>>>>>>>>>> val rides: DataStream[TaxiRide] = >>>>>>>>>>>> readStream(inTaxiRide) >>>>>>>>>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>>>>>>>>> .keyBy("rideId") >>>>>>>>>>>> >>>>>>>>>>>> val fares: DataStream[TaxiFare] = >>>>>>>>>>>> readStream(inTaxiFare) >>>>>>>>>>>> .keyBy("rideId") >>>>>>>>>>>> >>>>>>>>>>>> val processed: DataStream[TaxiRideFare] = >>>>>>>>>>>> rides >>>>>>>>>>>> .connect(fares) >>>>>>>>>>>> .flatMap(new EnrichmentFunction) >>>>>>>>>>>> >>>>>>>>>>>> writeStream(out, processed) >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> I also checked that my code enters this function >>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>>>>>>>>> and >>>>>>>>>>>> then the exception is thrown. I tried to do a grep on the Flink >>>>>>>>>>>> code base >>>>>>>>>>>> to see where this exception is caught. If I take off the tests, I >>>>>>>>>>>> don't see >>>>>>>>>>>> any catch of this exception .. >>>>>>>>>>>> >>>>>>>>>>>> $ find . -name "*.java" | xargs grep >>>>>>>>>>>> "OptimizerPlanEnvironment.ProgramAbortException" >>>>>>>>>>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: >>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: >>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: >>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import >>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException; >>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: >>>>>>>>>>>> @Test(expected = >>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException.class, >>>>>>>>>>>> timeout = 30_000) >>>>>>>>>>>> >>>>>>>>>>>> What am I missing here ? >>>>>>>>>>>> >>>>>>>>>>>> regards. >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Debasish, >>>>>>>>>>>>> >>>>>>>>>>>>> As mentioned by Dian, it is an internal exception that should >>>>>>>>>>>>> be always caught by >>>>>>>>>>>>> Flink internally. I would suggest you share the >>>>>>>>>>>>> job(abstractly). Generally it is because >>>>>>>>>>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment >>>>>>>>>>>>> directly. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> tison. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Austin Cawley-Edwards <austin.caw...@gmail.com> 于2019年9月23日周一 >>>>>>>>>>>>> 上午5:09写道: >>>>>>>>>>>>> >>>>>>>>>>>>>> Have you reached out to the FlinkK8sOperator team on Slack? >>>>>>>>>>>>>> They’re usually pretty active on there. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here’s the link: >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Austin >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh < >>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> The problem is I am submitting Flink jobs to Kubernetes >>>>>>>>>>>>>>> cluster using a Flink Operator. Hence it's difficult to debug >>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>> traditional sense of the term. And all I get is the exception >>>>>>>>>>>>>>> that I >>>>>>>>>>>>>>> reported .. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Caused by: >>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>>>>>>>> at >>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am thinking that this exception must be coming because of >>>>>>>>>>>>>>> some other exceptions, which are not reported BTW. I expected a >>>>>>>>>>>>>>> Caused By >>>>>>>>>>>>>>> portion in the stack trace. Any clue as to which area I should >>>>>>>>>>>>>>> look into to >>>>>>>>>>>>>>> debug this. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> regards. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh < >>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the pointer .. I will try debugging. I am >>>>>>>>>>>>>>>> getting this exception running my application on Kubernetes >>>>>>>>>>>>>>>> using the Flink >>>>>>>>>>>>>>>> operator from Lyft. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> regards. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu < >>>>>>>>>>>>>>>> dian0511...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This exception is used internally to get the plan of a job >>>>>>>>>>>>>>>>> before submitting it for execution. It's thrown with special >>>>>>>>>>>>>>>>> purpose and >>>>>>>>>>>>>>>>> will be caught internally in [1] and will not be thrown to >>>>>>>>>>>>>>>>> end users >>>>>>>>>>>>>>>>> usually. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> You could check the following places to find out the cause >>>>>>>>>>>>>>>>> to this problem: >>>>>>>>>>>>>>>>> 1. Check the execution environment you used >>>>>>>>>>>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if the >>>>>>>>>>>>>>>>> type of the env wrapped in StreamPlanEnvironment is >>>>>>>>>>>>>>>>> OptimizerPlanEnvironment. >>>>>>>>>>>>>>>>> Usually it should be. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>> Dian >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh < >>>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> 写道: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi - >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When you get an exception stack trace like this .. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Caused by: >>>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> what is the recommended approach of debugging ? I mean >>>>>>>>>>>>>>>>> what kind of errors can potentially lead to such a stacktrace >>>>>>>>>>>>>>>>> ? In my case >>>>>>>>>>>>>>>>> it starts from env.execute(..) but does not give any >>>>>>>>>>>>>>>>> information as to what >>>>>>>>>>>>>>>>> can go wrong. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Any help will be appreciated. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> regards. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Sent from my iPhone >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>> >>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Debasish Ghosh >>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>> >>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Debasish Ghosh >>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>> http://manning.com/ghosh >>>>>>>>>> >>>>>>>>>> Twttr: @debasishg >>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Debasish Ghosh >>>>>>>>> http://manning.com/ghosh2 >>>>>>>>> http://manning.com/ghosh >>>>>>>>> >>>>>>>>> Twttr: @debasishg >>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>> Code: http://github.com/debasishg >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Debasish Ghosh >>>>>>> http://manning.com/ghosh2 >>>>>>> http://manning.com/ghosh >>>>>>> >>>>>>> Twttr: @debasishg >>>>>>> Blog: http://debasishg.blogspot.com >>>>>>> Code: http://github.com/debasishg >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Debasish Ghosh >>>>>> http://manning.com/ghosh2 >>>>>> http://manning.com/ghosh >>>>>> >>>>>> Twttr: @debasishg >>>>>> Blog: http://debasishg.blogspot.com >>>>>> Code: http://github.com/debasishg >>>>>> >>>>> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> >