So I believe (I did't test it) the solution for this case is keeping the
original exception thrown from `env.execute()` and throwing this exception
out of main method.
It's a bit tricky, maybe we could have a better design of this scenario.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 18:55, Biao Liu <mmyy1...@gmail.com> wrote:

> The key point of this case is in `PackagedProgram#callMainMethod`.
> The `ProgramAbortException` is expected when executing the main method
> here. This `ProgramAbortException` thrown is wrapped with
> `InvocationTargetException` by Java reflection layer [1]. There is a piece
> of codes handling `InvocationTargetException`.
>
> try {
>   mainMethod.invoke(null, (Object) args);
> }
> catch (...
> catch (InvocationTargetException e) {
>   Throwable exceptionInMethod = e.getTargetException();
>   if (exceptionInMethod instanceof Error) {
>     throw (Error) exceptionInMethod;        *------>* *`ProgramAbortException`
> would be caught expectedly here.*
>   } else if (exceptionInMethod instanceof ProgramParametrizationException)
> {
>     throw (ProgramParametrizationException) exceptionInMethod;
>   } else if (exceptionInMethod instanceof ProgramInvocationException) {
>     throw (ProgramInvocationException) exceptionInMethod;
>   } else {     *------> If I'm right, the wrapped exception (Boxed Error
> or something else) change the exception, it is caught here*
>     throw new ProgramInvocationException("The main method caused an error:
> " + exceptionInMethod.getMessage(), exceptionInMethod);
>   }
>
> The `ProgramInvocationException` is handled specially in
> `OptimizerPlanEnvironment`.
>
> try {
>   prog.invokeInteractiveModeForExecution();
> }
> catch (ProgramInvocationException e) {
>   throw e;       *------> The submission is failed here in this case*
> }
> catch (Throwable t) {
>   // the invocation gets aborted with the preview plan
>   if (optimizerPlan != null) {
>     return optimizerPlan;                    *------> Normally it should
> be here*
>   } else {
>     throw new ProgramInvocationException("The program caused an error: ",
> t);
>   } ...
>
> 1.
> https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> Well, I think I got the solution though I am not yet sure of the problem
>> .. The original code looked like this ..
>>
>> Try {
>>   // from a parent class called Runner which runs a streamlet
>>   // run returns an abstraction which completes a Promise depending on
>> whether
>>   // the Job was successful or not
>>   val streamletExecution =
>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>
>>   // the runner waits for the execution to complete
>>   // In normal circumstances it will run forever for streaming data
>> source unless
>>   // being stopped forcibly or any of the queries faces an exception
>>   Await.result(streamletExecution.completed, Duration.Inf)
>> } match { //..
>>
>> and then the streamlet.run(..) in turn finally invoked the following ..
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> // creates datastreams and read from / writes to Kafka
>> // I pasted the body of this earlier in the thread
>> buildExecutionGraph()
>>
>> env.execute(..)
>>
>> This DID NOT run and failed with the exception I reported earlier. But
>> when I change the code to get the run statement out of the Try block,
>> things run fine .. like this ..
>>
>> // from a parent class called Runner which runs a streamlet
>> // run returns an abstraction which completes a Promise depending on
>> whether
>> // the Job was successful or not
>> val streamletExecution =
>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>
>> Try {
>>   // the runner waits for the execution to complete
>>   // In normal circumstances it will run forever for streaming data
>> source unless
>>   // being stopped forcibly or any of the queries faces an exception
>>   Await.result(streamletExecution.completed, Duration.Inf)
>> } match { //..
>>
>> Apparently it looks like the exception that I was facing earlier leaked
>> through the Flink engine and Try caught it and it got logged. But removing
>> it out of Try now enables Flink to catch it back and follow the course that
>> it should. But I am not sure if this is a cogent explanation and looking
>> forward to some more accurate one from the experts. Note there is no
>> asynchrony of concurrency going on here - the Runner code may look a bit
>> over-engineered but there is a context to this. The Runner code handles not
>> only Flink but other types of streaming engines as well like Spark and Akka
>> Streams.
>>
>> regards.
>>
>>
>> On Tue, Sep 24, 2019 at 10:17 AM Biao Liu <mmyy1...@gmail.com> wrote:
>>
>>> Hi Zili,
>>>
>>> Thanks for pointing that out.
>>> I didn't realize that it's a REST API based case. Debasish's case has
>>> been discussed not only in this thread...
>>>
>>> It's really hard to analyze the case without the full picture.
>>>
>>> I think the reason of why `ProgramAbortException` is not caught is that
>>> he did something outside `env.execute`. Like executing this piece of codes
>>> inside a Scala future.
>>>
>>> I guess the scenario is that he is submitting job through REST API. But
>>> in the main method, he wraps `env.execute` with Scala future, not executing
>>> it directly.
>>> The reason of env has been set to `StreamPlanEnvironment` is
>>> `JarHandlerUtils` retrieves job graph through it.
>>> And the `ProgramAbortException` is not thrown out, because the Scala
>>> future tackles this exception.
>>> So retrieving job graph fails due to an unrecognized exception (Boxed
>>> Error).
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Tue, 24 Sep 2019 at 10:44, Zili Chen <wander4...@gmail.com> wrote:
>>>
>>>> Hi Biao,
>>>>
>>>> The log below already infers that the job was submitted via REST API
>>>> and I don't think it matters.
>>>>
>>>> at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$
>>>> JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
>>>> getJobGraphAsync$6(JarRunHandler.java:142)
>>>>
>>>> What I don't understand it that flink DOES catch the exception at the
>>>> point it is reported thrown...
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>>
>>>> Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 上午10:34写道:
>>>>
>>>>>
>>>>> > We submit the code through Kubernetes Flink Operator which uses the
>>>>> REST API to submit the job to the Job Manager
>>>>>
>>>>> So you are submitting job through REST API, not Flink client? Could
>>>>> you explain more about this?
>>>>>
>>>>> Thanks,
>>>>> Biao /'bɪ.aʊ/
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh <ghosh.debas...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Dian -
>>>>>>
>>>>>> We submit one job through the operator. We just use the following to
>>>>>> complete a promise when the job completes ..
>>>>>>
>>>>>>       Try {
>>>>>>         createLogic.executeStreamingQueries(ctx.env)
>>>>>>       }.fold(
>>>>>>         th ⇒ completionPromise.tryFailure(th),
>>>>>>         _ ⇒ completionPromise.trySuccess(Dun)
>>>>>>       )
>>>>>>
>>>>>> If we totally do away with the promise and future stuff then we don't
>>>>>> get the boxed error - only the exception reported in Caused By.
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <dian0511...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Debasish,
>>>>>>>
>>>>>>> In which case will the exception occur? Does it occur when you
>>>>>>> submit one job at a time or when multiple jobs are submitted at the same
>>>>>>> time? I'm asking this because I noticed that you used Future to execute 
>>>>>>> the
>>>>>>> job unblocking. I guess ThreadLocal doesn't work well in this case.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dian
>>>>>>>
>>>>>>> 在 2019年9月23日,下午11:57,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>>>>>
>>>>>>> Hi tison -
>>>>>>>
>>>>>>> Please find my response below in >>.
>>>>>>>
>>>>>>> regards.
>>>>>>>
>>>>>>> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Debasish,
>>>>>>>>
>>>>>>>> The OptimizerPlanEnvironment.ProgramAbortException should be
>>>>>>>> caught at OptimizerPlanEnvironment#getOptimizedPlan
>>>>>>>> in its catch (Throwable t) branch.
>>>>>>>>
>>>>>>>
>>>>>>> >> true but what I get is a StreamPlanEnvironment. From my code I am
>>>>>>> only doing val env =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment.
>>>>>>>
>>>>>>>>
>>>>>>>> It should always throw a ProgramInvocationException instead of
>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException if any
>>>>>>>> exception thrown in the main method of your code.
>>>>>>>>
>>>>>>>> Another important problem is how the code is executed, (set context
>>>>>>>> environment should be another flink internal operation)
>>>>>>>> but given that you submit the job via flink k8s operator it might
>>>>>>>> require time to take a look at k8s operator implementation.
>>>>>>>>
>>>>>>>
>>>>>>> >> We submit the code through Kubernetes Flink Operator which uses
>>>>>>> the REST API to submit the job to the Job Manager
>>>>>>>
>>>>>>>>
>>>>>>>> However, given we catch Throwable in the place this exception
>>>>>>>> thrown, I highly suspect whether it is executed by an official
>>>>>>>> flink release.
>>>>>>>>
>>>>>>>
>>>>>>> >> It is an official Flink release 1.9.0
>>>>>>>
>>>>>>>>
>>>>>>>> A completed version of the code and the submission process is
>>>>>>>> helpful. Besides, what is buildExecutionGraph return type,
>>>>>>>> I think it is not ExecutionGraph in flink...
>>>>>>>>
>>>>>>>
>>>>>>> >> buildExecutionGraph is our function which returns a Unit. It's
>>>>>>> not ExecutionGraph. It builds the DataStream s by reading from Kafka and
>>>>>>> then finally writes to Kafka.
>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> tison.
>>>>>>>>
>>>>>>>>
>>>>>>>> Debasish Ghosh <ghosh.debas...@gmail.com> 于2019年9月23日周一 下午8:21写道:
>>>>>>>>
>>>>>>>>> This is the complete stack trace which we get from execution on
>>>>>>>>> Kubernetes using the Flink Kubernetes operator .. The boxed error 
>>>>>>>>> comes
>>>>>>>>> from the fact that we complete a Promise with Success when it returns 
>>>>>>>>> a
>>>>>>>>> JobExecutionResult and with Failure when we get an exception. And 
>>>>>>>>> here we r
>>>>>>>>> getting an exception. So the real stack trace we have is the one 
>>>>>>>>> below in
>>>>>>>>> Caused By.
>>>>>>>>>
>>>>>>>>> java.util.concurrent.ExecutionException: Boxed Error
>>>>>>>>> at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>>>>>>>>> at scala.concurrent.Promise.tryFailure(Promise.scala:112)
>>>>>>>>> at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
>>>>>>>>> at
>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
>>>>>>>>> at
>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
>>>>>>>>> at scala.util.Failure.fold(Try.scala:240)
>>>>>>>>> at
>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
>>>>>>>>> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
>>>>>>>>> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
>>>>>>>>> at scala.util.Try$.apply(Try.scala:213)
>>>>>>>>> at pipelines.runner.Runner$.run(Runner.scala:43)
>>>>>>>>> at pipelines.runner.Runner$.main(Runner.scala:30)
>>>>>>>>> at pipelines.runner.Runner.main(Runner.scala)
>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>> at
>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>> at
>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>> at
>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>> Caused by:
>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>> at
>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>> at
>>>>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
>>>>>>>>> at
>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
>>>>>>>>> at scala.util.Try$.apply(Try.scala:213)
>>>>>>>>> at
>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
>>>>>>>>> ... 20 more
>>>>>>>>>
>>>>>>>>> regards.
>>>>>>>>>
>>>>>>>>> On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Regarding to the code you pasted, personally I think nothing is
>>>>>>>>>> wrong. The problem is how it's executed. As you can see from the
>>>>>>>>>> implementation of of 
>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment, it
>>>>>>>>>> may created different StreamExecutionEnvironment implementations 
>>>>>>>>>> under
>>>>>>>>>> different scenarios. Could you paste the full exception stack if it 
>>>>>>>>>> exists?
>>>>>>>>>> It's difficult to figure out what's wrong with the current stack 
>>>>>>>>>> trace.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Dian
>>>>>>>>>>
>>>>>>>>>> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>>>>>>>>
>>>>>>>>>> Can it be the case that the threadLocal stuff in
>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609
>>>>>>>>>>  does
>>>>>>>>>> not behave deterministically when we submit job through a Kubernetes 
>>>>>>>>>> Flink
>>>>>>>>>> operator ? Utils also selects the factory to create the context 
>>>>>>>>>> based on
>>>>>>>>>> either Thread local storage or a static mutable variable.
>>>>>>>>>>
>>>>>>>>>> Can these be source of problems in our case ?
>>>>>>>>>>
>>>>>>>>>> regards.
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <
>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> ah .. Ok .. I get the Throwable part. I am using
>>>>>>>>>>>
>>>>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>>
>>>>>>>>>>> How can this lead to a wrong StreamExecutionEnvironment ? Any
>>>>>>>>>>> suggestion ?
>>>>>>>>>>>
>>>>>>>>>>> regards.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Debasish,
>>>>>>>>>>>>
>>>>>>>>>>>> As I said before, the exception is caught in [1]. It catches
>>>>>>>>>>>> the Throwable and so it could also catch "
>>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException". Regarding to
>>>>>>>>>>>> the cause of this exception, I have the same feeling with Tison 
>>>>>>>>>>>> and I also
>>>>>>>>>>>> think that the wrong StreamExecutionEnvironment is used.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Dian
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>>>>>>
>>>>>>>>>>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com>
>>>>>>>>>>>> 写道:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Tison -
>>>>>>>>>>>>
>>>>>>>>>>>> This is the code that builds the computation graph. readStream 
>>>>>>>>>>>> reads
>>>>>>>>>>>> from Kafka and writeStream writes to Kafka.
>>>>>>>>>>>>
>>>>>>>>>>>>     override def buildExecutionGraph = {
>>>>>>>>>>>>       val rides: DataStream[TaxiRide] =
>>>>>>>>>>>>         readStream(inTaxiRide)
>>>>>>>>>>>>           .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>>>>>>>>           .keyBy("rideId")
>>>>>>>>>>>>
>>>>>>>>>>>>       val fares: DataStream[TaxiFare] =
>>>>>>>>>>>>         readStream(inTaxiFare)
>>>>>>>>>>>>           .keyBy("rideId")
>>>>>>>>>>>>
>>>>>>>>>>>>       val processed: DataStream[TaxiRideFare] =
>>>>>>>>>>>>         rides
>>>>>>>>>>>>           .connect(fares)
>>>>>>>>>>>>           .flatMap(new EnrichmentFunction)
>>>>>>>>>>>>
>>>>>>>>>>>>       writeStream(out, processed)
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>> I also checked that my code enters this function
>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>>>>>>  and
>>>>>>>>>>>> then the exception is thrown. I tried to do a grep on the Flink 
>>>>>>>>>>>> code base
>>>>>>>>>>>> to see where this exception is caught. If I take off the tests, I 
>>>>>>>>>>>> don't see
>>>>>>>>>>>> any catch of this exception ..
>>>>>>>>>>>>
>>>>>>>>>>>> $ find . -name "*.java" | xargs grep
>>>>>>>>>>>> "OptimizerPlanEnvironment.ProgramAbortException"
>>>>>>>>>>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java:
>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java:
>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import
>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java:
>>>>>>>>>>>> @Test(expected = 
>>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException.class,
>>>>>>>>>>>> timeout = 30_000)
>>>>>>>>>>>>
>>>>>>>>>>>> What am I missing here ?
>>>>>>>>>>>>
>>>>>>>>>>>> regards.
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Debasish,
>>>>>>>>>>>>>
>>>>>>>>>>>>> As mentioned by Dian, it is an internal exception that should
>>>>>>>>>>>>> be always caught by
>>>>>>>>>>>>> Flink internally. I would suggest you share the
>>>>>>>>>>>>> job(abstractly). Generally it is because
>>>>>>>>>>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment
>>>>>>>>>>>>> directly.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> tison.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Austin Cawley-Edwards <austin.caw...@gmail.com> 于2019年9月23日周一
>>>>>>>>>>>>> 上午5:09写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Have you reached out to the FlinkK8sOperator team on Slack?
>>>>>>>>>>>>>> They’re usually pretty active on there.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here’s the link:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Austin
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <
>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The problem is I am submitting Flink jobs to Kubernetes
>>>>>>>>>>>>>>> cluster using a Flink Operator. Hence it's difficult to debug 
>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>> traditional sense of the term. And all I get is the exception 
>>>>>>>>>>>>>>> that I
>>>>>>>>>>>>>>> reported ..
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Caused by:
>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am thinking that this exception must be coming because of
>>>>>>>>>>>>>>> some other exceptions, which are not reported BTW. I expected a 
>>>>>>>>>>>>>>> Caused By
>>>>>>>>>>>>>>> portion in the stack trace. Any clue as to which area I should 
>>>>>>>>>>>>>>> look into to
>>>>>>>>>>>>>>> debug this.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <
>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the pointer .. I will try debugging. I am
>>>>>>>>>>>>>>>> getting this exception running my application on Kubernetes 
>>>>>>>>>>>>>>>> using the Flink
>>>>>>>>>>>>>>>> operator from Lyft.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <
>>>>>>>>>>>>>>>> dian0511...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This exception is used internally to get the plan of a job
>>>>>>>>>>>>>>>>> before submitting it for execution. It's thrown with special 
>>>>>>>>>>>>>>>>> purpose and
>>>>>>>>>>>>>>>>> will be caught internally in [1] and will not be thrown to 
>>>>>>>>>>>>>>>>> end users
>>>>>>>>>>>>>>>>> usually.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> You could check the following places to find out the cause
>>>>>>>>>>>>>>>>> to this problem:
>>>>>>>>>>>>>>>>> 1. Check the execution environment you used
>>>>>>>>>>>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if the
>>>>>>>>>>>>>>>>> type of the env wrapped in StreamPlanEnvironment is 
>>>>>>>>>>>>>>>>> OptimizerPlanEnvironment.
>>>>>>>>>>>>>>>>> Usually it should be.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <
>>>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> 写道:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi -
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When you get an exception stack trace like this ..
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Caused by:
>>>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> what is the recommended approach of debugging ? I mean
>>>>>>>>>>>>>>>>> what kind of errors can potentially lead to such a stacktrace 
>>>>>>>>>>>>>>>>> ? In my case
>>>>>>>>>>>>>>>>> it starts from env.execute(..) but does not give any 
>>>>>>>>>>>>>>>>> information as to what
>>>>>>>>>>>>>>>>> can go wrong.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Any help will be appreciated.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>
>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>
>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Debasish Ghosh
>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>
>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Debasish Ghosh
>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>
>>>>>>>>> Twttr: @debasishg
>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Debasish Ghosh
>>>>>>> http://manning.com/ghosh2
>>>>>>> http://manning.com/ghosh
>>>>>>>
>>>>>>> Twttr: @debasishg
>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>> Code: http://github.com/debasishg
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>

Reply via email to